diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 90ba564e..894be819 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 90ba564e9e39a120ed7b99260f2835a19811af30 +Subproject commit 894be81929f227583081755288ab0927c077e411 diff --git a/api/source/mk_common.cpp b/api/source/mk_common.cpp index 4c5b5b9e..a69fdc8c 100644 --- a/api/source/mk_common.cpp +++ b/api/source/mk_common.cpp @@ -37,7 +37,8 @@ static std::shared_ptr rtpServer; #ifdef ENABLE_WEBRTC #include "../webrtc/WebRtcSession.h" -static std::shared_ptr rtcServer; +static std::shared_ptr rtcServer_udp; +static std::shared_ptr rtcServer_tcp; #endif #if defined(ENABLE_SRT) @@ -72,7 +73,8 @@ API_EXPORT void API_CALL mk_stop_all_server(){ rtpServer = nullptr; #endif #ifdef ENABLE_WEBRTC - rtcServer = nullptr; + rtcServer_udp = nullptr; + rtcServer_tcp = nullptr; #endif #ifdef ENABLE_SRT srtServer = nullptr; @@ -178,7 +180,7 @@ API_EXPORT uint16_t API_CALL mk_http_server_start(uint16_t port, int ssl) { } return http_server[ssl]->getPort(); } catch (std::exception &ex) { - http_server[ssl].reset(); + http_server[ssl] = nullptr;; WarnL << ex.what(); return 0; } @@ -195,7 +197,7 @@ API_EXPORT uint16_t API_CALL mk_rtsp_server_start(uint16_t port, int ssl) { } return rtsp_server[ssl]->getPort(); } catch (std::exception &ex) { - rtsp_server[ssl].reset(); + rtsp_server[ssl] = nullptr;; WarnL << ex.what(); return 0; } @@ -212,7 +214,7 @@ API_EXPORT uint16_t API_CALL mk_rtmp_server_start(uint16_t port, int ssl) { } return rtmp_server[ssl]->getPort(); } catch (std::exception &ex) { - rtmp_server[ssl].reset(); + rtmp_server[ssl] = nullptr;; WarnL << ex.what(); return 0; } @@ -226,7 +228,7 @@ API_EXPORT uint16_t API_CALL mk_rtp_server_start(uint16_t port){ rtpServer->start(port); return rtpServer->getPort(); } catch (std::exception &ex) { - rtpServer.reset(); + rtpServer = nullptr;; WarnL << ex.what(); return 0; } @@ -239,9 +241,9 @@ API_EXPORT uint16_t API_CALL mk_rtp_server_start(uint16_t port){ API_EXPORT uint16_t API_CALL mk_rtc_server_start(uint16_t port) { #ifdef ENABLE_WEBRTC try { - //创建rtc服务器 - rtcServer = std::make_shared(); - rtcServer->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { + //创建rtc udp服务器 + rtcServer_udp = std::make_shared(); + rtcServer_udp->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { if (!buf) { return Socket::createSocket(poller, false); } @@ -252,11 +254,15 @@ API_EXPORT uint16_t API_CALL mk_rtc_server_start(uint16_t port) { } return Socket::createSocket(new_poller, false); }); - rtcServer->start(port); - return rtcServer->getPort(); + rtcServer_udp->start(port); + //创建rtc tcp服务器 + rtcServer_tcp = std::make_shared(); + rtcServer_tcp->start(rtcServer_udp->getPort()); + return rtcServer_udp->getPort(); } catch (std::exception &ex) { - rtcServer.reset(); + rtcServer_udp = nullptr; + rtcServer_tcp = nullptr; WarnL << ex.what(); return 0; } @@ -323,7 +329,7 @@ API_EXPORT uint16_t API_CALL mk_srt_server_start(uint16_t port) { return srtServer->getPort(); } catch (std::exception &ex) { - srtServer.reset(); + srtServer = nullptr;; WarnL << ex.what(); return 0; } @@ -339,7 +345,7 @@ API_EXPORT uint16_t API_CALL mk_shell_server_start(uint16_t port){ shell_server->start(port); return shell_server->getPort(); } catch (std::exception &ex) { - shell_server.reset(); + shell_server = nullptr;; WarnL << ex.what(); return 0; } diff --git a/server/main.cpp b/server/main.cpp index a9c829c2..f41d1d10 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -277,9 +277,10 @@ int start_main(int argc,char *argv[]) { #endif//defined(ENABLE_RTPPROXY) #if defined(ENABLE_WEBRTC) + auto rtcSrv_tcp = std::make_shared(); //webrtc udp服务器 - auto rtcSrv = std::make_shared(); - rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { + auto rtcSrv_udp = std::make_shared(); + rtcSrv_udp->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { if (!buf) { return Socket::createSocket(poller, false); } @@ -337,7 +338,7 @@ int start_main(int argc,char *argv[]) { #if defined(ENABLE_WEBRTC) //webrtc udp服务器 - if (rtcPort) { rtcSrv->start(rtcPort); } + if (rtcPort) { rtcSrv_udp->start(rtcPort); rtcSrv_tcp->start(rtcPort); } #endif//defined(ENABLE_WEBRTC) #if defined(ENABLE_SRT) diff --git a/webrtc/WebRtcSession.cpp b/webrtc/WebRtcSession.cpp index f8a37300..f72f620c 100644 --- a/webrtc/WebRtcSession.cpp +++ b/webrtc/WebRtcSession.cpp @@ -10,14 +10,13 @@ #include "WebRtcSession.h" #include "Util/util.h" +#include "Network/TcpServer.h" using namespace std; namespace mediakit { -static string getUserName(const Buffer::Ptr &buffer) { - auto buf = buffer->data(); - auto len = buffer->size(); +static string getUserName(const char *buf, size_t len) { if (!RTC::StunPacket::IsStun((const uint8_t *) buf, len)) { return ""; } @@ -35,7 +34,7 @@ static string getUserName(const Buffer::Ptr &buffer) { } EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) { - auto user_name = getUserName(buffer); + auto user_name = getUserName(buffer->data(), buffer->size()); if (user_name.empty()) { return nullptr; } @@ -45,33 +44,63 @@ EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) { //////////////////////////////////////////////////////////////////////////////// -WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : UdpSession(sock) { +WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) { socklen_t addr_len = sizeof(_peer_addr); getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len); + _over_tcp = sock->sockType() == SockNum::Sock_TCP; } WebRtcSession::~WebRtcSession() { InfoP(this); } -void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { +void WebRtcSession::attachServer(const Server &server) { + _server = std::dynamic_pointer_cast(const_cast(server).shared_from_this()); +} + +void WebRtcSession::onRecv_l(const char *data, size_t len) { if (_find_transport) { - //只允许寻找一次transport + // 只允许寻找一次transport _find_transport = false; - auto user_name = getUserName(buffer); + auto user_name = getUserName(data, len); auto transport = WebRtcTransportManager::Instance().getItem(user_name); - CHECK(transport && transport->getPoller()->isCurrentThread()); + CHECK(transport); + + //WebRtcTransport在其他poller线程上,需要切换poller线程并重新创建WebRtcSession对象 + if (!transport->getPoller()->isCurrentThread()) { + auto sock = Socket::createSocket(transport->getPoller()); + sock->cloneFromPeerSocket(*(getSock())); + auto server = _server; + std::string str(data, len); + sock->getPoller()->async([sock, server, str](){ + auto strong_server = server.lock(); + if (strong_server) { + auto session = static_pointer_cast(strong_server->createSession(sock)); + session->onRecv_l(str.data(), str.size()); + } + }); + throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName()); + } + transport->setSession(shared_from_this()); _transport = std::move(transport); InfoP(this); } _ticker.resetTime(); CHECK(_transport); - _transport->inputSockData(buffer->data(), buffer->size(), (struct sockaddr *)&_peer_addr); + _transport->inputSockData((char *)data, len, (struct sockaddr *)&_peer_addr); +} + +void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { + if (_over_tcp) { + input(buffer->data(), buffer->size()); + } else { + onRecv_l(buffer->data(), buffer->size()); + } } void WebRtcSession::onError(const SockException &err) { - //udp链接超时,但是rtc链接不一定超时,因为可能存在udp链接迁移的情况 + //udp链接超时,但是rtc链接不一定超时,因为可能存在链接迁移的情况 //在udp链接迁移时,新的WebRtcSession对象将接管WebRtcTransport对象的生命周期 //本WebRtcSession对象将在超时后自动销毁 WarnP(this) << err.what(); @@ -97,6 +126,25 @@ void WebRtcSession::onManager() { } } +ssize_t WebRtcSession::onRecvHeader(const char *data, size_t len) { + onRecv_l(data + 2, len - 2); + return 0; +} + +const char *WebRtcSession::onSearchPacketTail(const char *data, size_t len) { + if (len < 2) { + // 数据不够 + return nullptr; + } + uint16_t length = (((uint8_t *)data)[0] << 8) | ((uint8_t *)data)[1]; + if (len < (size_t)(length + 2)) { + // 数据不够 + return nullptr; + } + // 返回rtp包末尾 + return data + 2 + length; +} + }// namespace mediakit diff --git a/webrtc/WebRtcSession.h b/webrtc/WebRtcSession.h index 655ca7fa..9d48e814 100644 --- a/webrtc/WebRtcSession.h +++ b/webrtc/WebRtcSession.h @@ -15,25 +15,38 @@ #include "Network/Session.h" #include "IceServer.hpp" #include "WebRtcTransport.h" +#include "Http/HttpRequestSplitter.h" + +namespace toolkit { + class TcpServer; +} namespace mediakit { -class WebRtcSession : public UdpSession { +class WebRtcSession : public Session, public HttpRequestSplitter { public: WebRtcSession(const Socket::Ptr &sock); ~WebRtcSession() override; + void attachServer(const Server &server) override; void onRecv(const Buffer::Ptr &) override; void onError(const SockException &err) override; void onManager() override; - //std::string getIdentifier() const override; - static EventPoller::Ptr queryPoller(const Buffer::Ptr &buffer); private: + //// HttpRequestSplitter override //// + ssize_t onRecvHeader(const char *data, size_t len) override; + const char *onSearchPacketTail(const char *data, size_t len) override; + + void onRecv_l(const char *data, size_t len); + +private: + bool _over_tcp = false; bool _find_transport = true; Ticker _ticker; struct sockaddr_storage _peer_addr; + std::weak_ptr _server; std::shared_ptr _transport; }; diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index bf9c309a..d345fddc 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -418,9 +418,21 @@ void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::Transp WarnL << "send data failed:" << buf->size(); return; } + // 一次性发送一帧的rtp数据,提高网络io性能 - _selected_session->setSendFlushFlag(flush); + if (_selected_session->getSock()->sockType() == SockNum::Sock_TCP) { + // 增加tcp两字节头 + auto len = buf->size(); + char tcp_len[2] = { 0 }; + tcp_len[0] = (len >> 8) & 0xff; + tcp_len[1] = len & 0xff; + _selected_session->SockSender::send(tcp_len, 2); + } _selected_session->send(std::move(buf)); + + if (flush) { + _selected_session->flushAll(); + } } /////////////////////////////////////////////////////////////////// @@ -590,6 +602,9 @@ makeIceCandidate(std::string ip, uint16_t port, uint32_t priority = 100, std::st candidate->address = ip; candidate->port = port; candidate->type = "host"; + if (proto == "tcp") { + candidate->type += " tcptype passive"; + } return candidate; } @@ -609,11 +624,13 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const { if (extern_ips.empty()) { std::string localIp = SockUtil::get_local_ip(); configure.addCandidate(*makeIceCandidate(localIp, local_port, 120, "udp")); + configure.addCandidate(*makeIceCandidate(localIp, local_port, 110, "tcp")); } else { const uint32_t delta = 10; uint32_t priority = 100 + delta * extern_ips.size(); for (auto ip : extern_ips) { - configure.addCandidate(*makeIceCandidate(ip, local_port, priority, "udp")); + configure.addCandidate(*makeIceCandidate(ip, local_port, priority + 5, "udp")); + configure.addCandidate(*makeIceCandidate(ip, local_port, priority, "tcp")); priority -= delta; } } @@ -1042,6 +1059,7 @@ void WebRtcTransportImp::setSession(Session::Ptr session) { << session->get_peer_port() << ", id:" << getIdentifier(); } _selected_session = std::move(session); + _selected_session->setSendFlushFlag(false); unrefSelf(); } diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 70f63b30..d3a492cf 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -179,7 +179,7 @@ private: std::shared_ptr _srtp_session_send; std::shared_ptr _srtp_session_recv; Ticker _ticker; - //循环池 + // 循环池 ResourcePool _packet_pool; #ifdef ENABLE_SCTP