新增支持webrtc over tcp模式 (#2092)

* webrtc server/session/cadidate 改为tcp

* 先屏蔽检查isCurrentThread

* 接受和发送的数据处理tcp 2字节头

* 处理rtc tcp 分片

* 完善webrtc over tcp

* 精简rtp服务器相关代码

* 适配webrtc AV1编码: #2091

* webrtc tcp模式支持Firefox

* webrtc tcp模式支持线程安全

* c sdk支持webrtc tcp

Co-authored-by: ziyue <1213642868@qq.com>
This commit is contained in:
Dw9 2022-11-18 22:52:57 +08:00 committed by GitHub
parent fc433de9ac
commit 47530ce830
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 121 additions and 35 deletions

@ -1 +1 @@
Subproject commit 90ba564e9e39a120ed7b99260f2835a19811af30 Subproject commit 894be81929f227583081755288ab0927c077e411

View File

@ -37,7 +37,8 @@ static std::shared_ptr<RtpServer> rtpServer;
#ifdef ENABLE_WEBRTC #ifdef ENABLE_WEBRTC
#include "../webrtc/WebRtcSession.h" #include "../webrtc/WebRtcSession.h"
static std::shared_ptr<UdpServer> rtcServer; static std::shared_ptr<UdpServer> rtcServer_udp;
static std::shared_ptr<TcpServer> rtcServer_tcp;
#endif #endif
#if defined(ENABLE_SRT) #if defined(ENABLE_SRT)
@ -72,7 +73,8 @@ API_EXPORT void API_CALL mk_stop_all_server(){
rtpServer = nullptr; rtpServer = nullptr;
#endif #endif
#ifdef ENABLE_WEBRTC #ifdef ENABLE_WEBRTC
rtcServer = nullptr; rtcServer_udp = nullptr;
rtcServer_tcp = nullptr;
#endif #endif
#ifdef ENABLE_SRT #ifdef ENABLE_SRT
srtServer = nullptr; srtServer = nullptr;
@ -178,7 +180,7 @@ API_EXPORT uint16_t API_CALL mk_http_server_start(uint16_t port, int ssl) {
} }
return http_server[ssl]->getPort(); return http_server[ssl]->getPort();
} catch (std::exception &ex) { } catch (std::exception &ex) {
http_server[ssl].reset(); http_server[ssl] = nullptr;;
WarnL << ex.what(); WarnL << ex.what();
return 0; return 0;
} }
@ -195,7 +197,7 @@ API_EXPORT uint16_t API_CALL mk_rtsp_server_start(uint16_t port, int ssl) {
} }
return rtsp_server[ssl]->getPort(); return rtsp_server[ssl]->getPort();
} catch (std::exception &ex) { } catch (std::exception &ex) {
rtsp_server[ssl].reset(); rtsp_server[ssl] = nullptr;;
WarnL << ex.what(); WarnL << ex.what();
return 0; return 0;
} }
@ -212,7 +214,7 @@ API_EXPORT uint16_t API_CALL mk_rtmp_server_start(uint16_t port, int ssl) {
} }
return rtmp_server[ssl]->getPort(); return rtmp_server[ssl]->getPort();
} catch (std::exception &ex) { } catch (std::exception &ex) {
rtmp_server[ssl].reset(); rtmp_server[ssl] = nullptr;;
WarnL << ex.what(); WarnL << ex.what();
return 0; return 0;
} }
@ -226,7 +228,7 @@ API_EXPORT uint16_t API_CALL mk_rtp_server_start(uint16_t port){
rtpServer->start(port); rtpServer->start(port);
return rtpServer->getPort(); return rtpServer->getPort();
} catch (std::exception &ex) { } catch (std::exception &ex) {
rtpServer.reset(); rtpServer = nullptr;;
WarnL << ex.what(); WarnL << ex.what();
return 0; return 0;
} }
@ -239,9 +241,9 @@ API_EXPORT uint16_t API_CALL mk_rtp_server_start(uint16_t port){
API_EXPORT uint16_t API_CALL mk_rtc_server_start(uint16_t port) { API_EXPORT uint16_t API_CALL mk_rtc_server_start(uint16_t port) {
#ifdef ENABLE_WEBRTC #ifdef ENABLE_WEBRTC
try { try {
//创建rtc服务器 //创建rtc udp服务器
rtcServer = std::make_shared<UdpServer>(); rtcServer_udp = std::make_shared<UdpServer>();
rtcServer->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { rtcServer_udp->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) {
if (!buf) { if (!buf) {
return Socket::createSocket(poller, false); return Socket::createSocket(poller, false);
} }
@ -252,11 +254,15 @@ API_EXPORT uint16_t API_CALL mk_rtc_server_start(uint16_t port) {
} }
return Socket::createSocket(new_poller, false); return Socket::createSocket(new_poller, false);
}); });
rtcServer->start<WebRtcSession>(port); rtcServer_udp->start<WebRtcSession>(port);
return rtcServer->getPort(); //创建rtc tcp服务器
rtcServer_tcp = std::make_shared<TcpServer>();
rtcServer_tcp->start<WebRtcSession>(rtcServer_udp->getPort());
return rtcServer_udp->getPort();
} catch (std::exception &ex) { } catch (std::exception &ex) {
rtcServer.reset(); rtcServer_udp = nullptr;
rtcServer_tcp = nullptr;
WarnL << ex.what(); WarnL << ex.what();
return 0; return 0;
} }
@ -323,7 +329,7 @@ API_EXPORT uint16_t API_CALL mk_srt_server_start(uint16_t port) {
return srtServer->getPort(); return srtServer->getPort();
} catch (std::exception &ex) { } catch (std::exception &ex) {
srtServer.reset(); srtServer = nullptr;;
WarnL << ex.what(); WarnL << ex.what();
return 0; return 0;
} }
@ -339,7 +345,7 @@ API_EXPORT uint16_t API_CALL mk_shell_server_start(uint16_t port){
shell_server->start<ShellSession>(port); shell_server->start<ShellSession>(port);
return shell_server->getPort(); return shell_server->getPort();
} catch (std::exception &ex) { } catch (std::exception &ex) {
shell_server.reset(); shell_server = nullptr;;
WarnL << ex.what(); WarnL << ex.what();
return 0; return 0;
} }

View File

@ -277,9 +277,10 @@ int start_main(int argc,char *argv[]) {
#endif//defined(ENABLE_RTPPROXY) #endif//defined(ENABLE_RTPPROXY)
#if defined(ENABLE_WEBRTC) #if defined(ENABLE_WEBRTC)
auto rtcSrv_tcp = std::make_shared<TcpServer>();
//webrtc udp服务器 //webrtc udp服务器
auto rtcSrv = std::make_shared<UdpServer>(); auto rtcSrv_udp = std::make_shared<UdpServer>();
rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { rtcSrv_udp->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) {
if (!buf) { if (!buf) {
return Socket::createSocket(poller, false); return Socket::createSocket(poller, false);
} }
@ -337,7 +338,7 @@ int start_main(int argc,char *argv[]) {
#if defined(ENABLE_WEBRTC) #if defined(ENABLE_WEBRTC)
//webrtc udp服务器 //webrtc udp服务器
if (rtcPort) { rtcSrv->start<WebRtcSession>(rtcPort); } if (rtcPort) { rtcSrv_udp->start<WebRtcSession>(rtcPort); rtcSrv_tcp->start<WebRtcSession>(rtcPort); }
#endif//defined(ENABLE_WEBRTC) #endif//defined(ENABLE_WEBRTC)
#if defined(ENABLE_SRT) #if defined(ENABLE_SRT)

View File

@ -10,14 +10,13 @@
#include "WebRtcSession.h" #include "WebRtcSession.h"
#include "Util/util.h" #include "Util/util.h"
#include "Network/TcpServer.h"
using namespace std; using namespace std;
namespace mediakit { namespace mediakit {
static string getUserName(const Buffer::Ptr &buffer) { static string getUserName(const char *buf, size_t len) {
auto buf = buffer->data();
auto len = buffer->size();
if (!RTC::StunPacket::IsStun((const uint8_t *) buf, len)) { if (!RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {
return ""; return "";
} }
@ -35,7 +34,7 @@ static string getUserName(const Buffer::Ptr &buffer) {
} }
EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) { EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) {
auto user_name = getUserName(buffer); auto user_name = getUserName(buffer->data(), buffer->size());
if (user_name.empty()) { if (user_name.empty()) {
return nullptr; return nullptr;
} }
@ -45,33 +44,63 @@ EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : UdpSession(sock) { WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) {
socklen_t addr_len = sizeof(_peer_addr); socklen_t addr_len = sizeof(_peer_addr);
getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len); getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len);
_over_tcp = sock->sockType() == SockNum::Sock_TCP;
} }
WebRtcSession::~WebRtcSession() { WebRtcSession::~WebRtcSession() {
InfoP(this); InfoP(this);
} }
void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { void WebRtcSession::attachServer(const Server &server) {
_server = std::dynamic_pointer_cast<toolkit::TcpServer>(const_cast<Server &>(server).shared_from_this());
}
void WebRtcSession::onRecv_l(const char *data, size_t len) {
if (_find_transport) { if (_find_transport) {
// 只允许寻找一次transport // 只允许寻找一次transport
_find_transport = false; _find_transport = false;
auto user_name = getUserName(buffer); auto user_name = getUserName(data, len);
auto transport = WebRtcTransportManager::Instance().getItem(user_name); auto transport = WebRtcTransportManager::Instance().getItem(user_name);
CHECK(transport && transport->getPoller()->isCurrentThread()); CHECK(transport);
//WebRtcTransport在其他poller线程上需要切换poller线程并重新创建WebRtcSession对象
if (!transport->getPoller()->isCurrentThread()) {
auto sock = Socket::createSocket(transport->getPoller());
sock->cloneFromPeerSocket(*(getSock()));
auto server = _server;
std::string str(data, len);
sock->getPoller()->async([sock, server, str](){
auto strong_server = server.lock();
if (strong_server) {
auto session = static_pointer_cast<WebRtcSession>(strong_server->createSession(sock));
session->onRecv_l(str.data(), str.size());
}
});
throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName());
}
transport->setSession(shared_from_this()); transport->setSession(shared_from_this());
_transport = std::move(transport); _transport = std::move(transport);
InfoP(this); InfoP(this);
} }
_ticker.resetTime(); _ticker.resetTime();
CHECK(_transport); CHECK(_transport);
_transport->inputSockData(buffer->data(), buffer->size(), (struct sockaddr *)&_peer_addr); _transport->inputSockData((char *)data, len, (struct sockaddr *)&_peer_addr);
}
void WebRtcSession::onRecv(const Buffer::Ptr &buffer) {
if (_over_tcp) {
input(buffer->data(), buffer->size());
} else {
onRecv_l(buffer->data(), buffer->size());
}
} }
void WebRtcSession::onError(const SockException &err) { void WebRtcSession::onError(const SockException &err) {
//udp链接超时但是rtc链接不一定超时因为可能存在udp链接迁移的情况 //udp链接超时但是rtc链接不一定超时因为可能存在链接迁移的情况
//在udp链接迁移时新的WebRtcSession对象将接管WebRtcTransport对象的生命周期 //在udp链接迁移时新的WebRtcSession对象将接管WebRtcTransport对象的生命周期
//本WebRtcSession对象将在超时后自动销毁 //本WebRtcSession对象将在超时后自动销毁
WarnP(this) << err.what(); WarnP(this) << err.what();
@ -97,6 +126,25 @@ void WebRtcSession::onManager() {
} }
} }
ssize_t WebRtcSession::onRecvHeader(const char *data, size_t len) {
onRecv_l(data + 2, len - 2);
return 0;
}
const char *WebRtcSession::onSearchPacketTail(const char *data, size_t len) {
if (len < 2) {
// 数据不够
return nullptr;
}
uint16_t length = (((uint8_t *)data)[0] << 8) | ((uint8_t *)data)[1];
if (len < (size_t)(length + 2)) {
// 数据不够
return nullptr;
}
// 返回rtp包末尾
return data + 2 + length;
}
}// namespace mediakit }// namespace mediakit

View File

@ -15,25 +15,38 @@
#include "Network/Session.h" #include "Network/Session.h"
#include "IceServer.hpp" #include "IceServer.hpp"
#include "WebRtcTransport.h" #include "WebRtcTransport.h"
#include "Http/HttpRequestSplitter.h"
namespace toolkit {
class TcpServer;
}
namespace mediakit { namespace mediakit {
class WebRtcSession : public UdpSession { class WebRtcSession : public Session, public HttpRequestSplitter {
public: public:
WebRtcSession(const Socket::Ptr &sock); WebRtcSession(const Socket::Ptr &sock);
~WebRtcSession() override; ~WebRtcSession() override;
void attachServer(const Server &server) override;
void onRecv(const Buffer::Ptr &) override; void onRecv(const Buffer::Ptr &) override;
void onError(const SockException &err) override; void onError(const SockException &err) override;
void onManager() override; void onManager() override;
//std::string getIdentifier() const override;
static EventPoller::Ptr queryPoller(const Buffer::Ptr &buffer); static EventPoller::Ptr queryPoller(const Buffer::Ptr &buffer);
private: private:
//// HttpRequestSplitter override ////
ssize_t onRecvHeader(const char *data, size_t len) override;
const char *onSearchPacketTail(const char *data, size_t len) override;
void onRecv_l(const char *data, size_t len);
private:
bool _over_tcp = false;
bool _find_transport = true; bool _find_transport = true;
Ticker _ticker; Ticker _ticker;
struct sockaddr_storage _peer_addr; struct sockaddr_storage _peer_addr;
std::weak_ptr<toolkit::TcpServer> _server;
std::shared_ptr<WebRtcTransportImp> _transport; std::shared_ptr<WebRtcTransportImp> _transport;
}; };

View File

@ -418,9 +418,21 @@ void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::Transp
WarnL << "send data failed:" << buf->size(); WarnL << "send data failed:" << buf->size();
return; return;
} }
// 一次性发送一帧的rtp数据提高网络io性能 // 一次性发送一帧的rtp数据提高网络io性能
_selected_session->setSendFlushFlag(flush); if (_selected_session->getSock()->sockType() == SockNum::Sock_TCP) {
// 增加tcp两字节头
auto len = buf->size();
char tcp_len[2] = { 0 };
tcp_len[0] = (len >> 8) & 0xff;
tcp_len[1] = len & 0xff;
_selected_session->SockSender::send(tcp_len, 2);
}
_selected_session->send(std::move(buf)); _selected_session->send(std::move(buf));
if (flush) {
_selected_session->flushAll();
}
} }
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////
@ -590,6 +602,9 @@ makeIceCandidate(std::string ip, uint16_t port, uint32_t priority = 100, std::st
candidate->address = ip; candidate->address = ip;
candidate->port = port; candidate->port = port;
candidate->type = "host"; candidate->type = "host";
if (proto == "tcp") {
candidate->type += " tcptype passive";
}
return candidate; return candidate;
} }
@ -609,11 +624,13 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
if (extern_ips.empty()) { if (extern_ips.empty()) {
std::string localIp = SockUtil::get_local_ip(); std::string localIp = SockUtil::get_local_ip();
configure.addCandidate(*makeIceCandidate(localIp, local_port, 120, "udp")); configure.addCandidate(*makeIceCandidate(localIp, local_port, 120, "udp"));
configure.addCandidate(*makeIceCandidate(localIp, local_port, 110, "tcp"));
} else { } else {
const uint32_t delta = 10; const uint32_t delta = 10;
uint32_t priority = 100 + delta * extern_ips.size(); uint32_t priority = 100 + delta * extern_ips.size();
for (auto ip : extern_ips) { for (auto ip : extern_ips) {
configure.addCandidate(*makeIceCandidate(ip, local_port, priority, "udp")); configure.addCandidate(*makeIceCandidate(ip, local_port, priority + 5, "udp"));
configure.addCandidate(*makeIceCandidate(ip, local_port, priority, "tcp"));
priority -= delta; priority -= delta;
} }
} }
@ -1042,6 +1059,7 @@ void WebRtcTransportImp::setSession(Session::Ptr session) {
<< session->get_peer_port() << ", id:" << getIdentifier(); << session->get_peer_port() << ", id:" << getIdentifier();
} }
_selected_session = std::move(session); _selected_session = std::move(session);
_selected_session->setSendFlushFlag(false);
unrefSelf(); unrefSelf();
} }