diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp index c2882319..0ef9cbfe 100644 --- a/webrtc/WebRtcPlayer.cpp +++ b/webrtc/WebRtcPlayer.cpp @@ -16,8 +16,9 @@ namespace mediakit { WebRtcPlayer::Ptr WebRtcPlayer::create(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, - const MediaInfo &info) { - WebRtcPlayer::Ptr ret(new WebRtcPlayer(poller, src, info), [](WebRtcPlayer *ptr) { + const MediaInfo &info, + bool perferred_tcp) { + WebRtcPlayer::Ptr ret(new WebRtcPlayer(poller, src, info, perferred_tcp), [](WebRtcPlayer *ptr) { ptr->onDestory(); delete ptr; }); @@ -27,7 +28,8 @@ WebRtcPlayer::Ptr WebRtcPlayer::create(const EventPoller::Ptr &poller, WebRtcPlayer::WebRtcPlayer(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, - const MediaInfo &info) : WebRtcTransportImp(poller) { + const MediaInfo &info, + bool perferred_tcp) : WebRtcTransportImp(poller,perferred_tcp) { _media_info = info; _play_src = src; CHECK(_play_src); diff --git a/webrtc/WebRtcPlayer.h b/webrtc/WebRtcPlayer.h index c3a31b14..538c4151 100644 --- a/webrtc/WebRtcPlayer.h +++ b/webrtc/WebRtcPlayer.h @@ -19,7 +19,7 @@ class WebRtcPlayer : public WebRtcTransportImp { public: using Ptr = std::shared_ptr; ~WebRtcPlayer() override = default; - static Ptr create(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, const MediaInfo &info); + static Ptr create(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, const MediaInfo &info, bool perferred_tcp = false); protected: ///////WebRtcTransportImp override/////// @@ -29,7 +29,7 @@ protected: void onRecvRtp(MediaTrack &track, const std::string &rid, RtpPacket::Ptr rtp) override {}; private: - WebRtcPlayer(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, const MediaInfo &info); + WebRtcPlayer(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, const MediaInfo &info, bool perferred_tcp); private: //媒体相关元数据 diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index ca9115c8..6efe8698 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -18,8 +18,9 @@ WebRtcPusher::Ptr WebRtcPusher::create(const EventPoller::Ptr &poller, const RtspMediaSourceImp::Ptr &src, const std::shared_ptr &ownership, const MediaInfo &info, - const ProtocolOption &option) { - WebRtcPusher::Ptr ret(new WebRtcPusher(poller, src, ownership, info, option), [](WebRtcPusher *ptr) { + const ProtocolOption &option, + bool perferred_tcp) { + WebRtcPusher::Ptr ret(new WebRtcPusher(poller, src, ownership, info, option,perferred_tcp), [](WebRtcPusher *ptr) { ptr->onDestory(); delete ptr; }); @@ -31,7 +32,8 @@ WebRtcPusher::WebRtcPusher(const EventPoller::Ptr &poller, const RtspMediaSourceImp::Ptr &src, const std::shared_ptr &ownership, const MediaInfo &info, - const ProtocolOption &option) : WebRtcTransportImp(poller) { + const ProtocolOption &option, + bool perferred_tcp) : WebRtcTransportImp(poller,perferred_tcp) { _media_info = info; _push_src = src; _push_src_ownership = ownership; diff --git a/webrtc/WebRtcPusher.h b/webrtc/WebRtcPusher.h index fee55e11..7f64b1f5 100644 --- a/webrtc/WebRtcPusher.h +++ b/webrtc/WebRtcPusher.h @@ -20,7 +20,7 @@ public: using Ptr = std::shared_ptr; ~WebRtcPusher() override = default; static Ptr create(const EventPoller::Ptr &poller, const RtspMediaSourceImp::Ptr &src, - const std::shared_ptr &ownership, const MediaInfo &info, const ProtocolOption &option); + const std::shared_ptr &ownership, const MediaInfo &info, const ProtocolOption &option, bool perferred_tcp = false); protected: ///////WebRtcTransportImp override/////// @@ -51,7 +51,7 @@ protected: private: WebRtcPusher(const EventPoller::Ptr &poller, const RtspMediaSourceImp::Ptr &src, - const std::shared_ptr &ownership, const MediaInfo &info, const ProtocolOption &option); + const std::shared_ptr &ownership, const MediaInfo &info, const ProtocolOption &option, bool perferred_tcp); private: bool _simulcast = false; diff --git a/webrtc/WebRtcSession.cpp b/webrtc/WebRtcSession.cpp index f72f620c..ab95a8ad 100644 --- a/webrtc/WebRtcSession.cpp +++ b/webrtc/WebRtcSession.cpp @@ -69,6 +69,7 @@ void WebRtcSession::onRecv_l(const char *data, size_t len) { //WebRtcTransport在其他poller线程上,需要切换poller线程并重新创建WebRtcSession对象 if (!transport->getPoller()->isCurrentThread()) { auto sock = Socket::createSocket(transport->getPoller()); + //1、克隆socket(fd不变),切换poller线程到WebRtcTransport所在线程 sock->cloneFromPeerSocket(*(getSock())); auto server = _server; std::string str(data, len); @@ -76,9 +77,11 @@ void WebRtcSession::onRecv_l(const char *data, size_t len) { auto strong_server = server.lock(); if (strong_server) { auto session = static_pointer_cast(strong_server->createSession(sock)); + //2、创建新的WebRtcSession对象(绑定到WebRtcTransport所在线程),重新处理一遍ice binding request命令 session->onRecv_l(str.data(), str.size()); } }); + //3、销毁原先的socket和WebRtcSession(原先的对象跟WebRtcTransport不在同一条线程) throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName()); } diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 1d4642af..0ca504ed 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -402,8 +402,8 @@ void WebRtcTransportImp::OnDtlsTransportApplicationDataReceived(const RTC::DtlsT #endif } -WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) - : WebRtcTransport(poller) { +WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller,bool perferred_tcp) + : WebRtcTransport(poller), _perferred_tcp(perferred_tcp) { InfoL << getIdentifier(); } @@ -629,13 +629,13 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const { if (extern_ips.empty()) { std::string local_ip = SockUtil::get_local_ip(); if (local_udp_port) { configure.addCandidate(*makeIceCandidate(local_ip, local_udp_port, 120, "udp")); } - if (local_tcp_port) { configure.addCandidate(*makeIceCandidate(local_ip, local_tcp_port, 110, "tcp")); } + if (local_tcp_port) { configure.addCandidate(*makeIceCandidate(local_ip, local_tcp_port, _perferred_tcp ? 125 : 115, "tcp")); } } else { const uint32_t delta = 10; uint32_t priority = 100 + delta * extern_ips.size(); for (auto ip : extern_ips) { - if (local_udp_port) { configure.addCandidate(*makeIceCandidate(ip, local_udp_port, priority + 5, "udp")); } - if (local_tcp_port) { configure.addCandidate(*makeIceCandidate(ip, local_tcp_port, priority, "tcp")); } + if (local_udp_port) { configure.addCandidate(*makeIceCandidate(ip, local_udp_port, priority, "udp")); } + if (local_tcp_port) { configure.addCandidate(*makeIceCandidate(ip, local_tcp_port, priority - (_perferred_tcp ? -5 : 5), "tcp")); } priority -= delta; } } @@ -1153,7 +1153,9 @@ void echo_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana void push_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) { MediaInfo info(args["url"]); - Broadcast::PublishAuthInvoker invoker = [cb, info](const string &err, const ProtocolOption &option) mutable { + bool perferred_tcp = args["perferred_tcp"]; + + Broadcast::PublishAuthInvoker invoker = [cb, info, perferred_tcp](const string &err, const ProtocolOption &option) mutable { if (!err.empty()) { cb(WebRtcException(SockException(Err_other, err))); return; @@ -1192,7 +1194,7 @@ void push_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana push_src_ownership = push_src->getOwnership(); push_src->setProtocolOption(option); } - auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info, option); + auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info, option, perferred_tcp); push_src->setListener(rtc); cb(*rtc); }; @@ -1207,8 +1209,10 @@ void push_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana void play_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) { MediaInfo info(args["url"]); + bool perferred_tcp = args["perferred_tcp"]; + auto session_ptr = sender.shared_from_this(); - Broadcast::AuthInvoker invoker = [cb, info, session_ptr](const string &err) mutable { + Broadcast::AuthInvoker invoker = [cb, info, session_ptr, perferred_tcp](const string &err) mutable { if (!err.empty()) { cb(WebRtcException(SockException(Err_other, err))); return; @@ -1224,7 +1228,7 @@ void play_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana } // 还原成rtc,目的是为了hook时识别哪种播放协议 info._schema = RTC_SCHEMA; - auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info); + auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info, perferred_tcp); cb(*rtc); }); }; diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 14615146..c34ad017 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -251,7 +251,7 @@ public: void createRtpChannel(const std::string &rid, uint32_t ssrc, MediaTrack &track); protected: - WebRtcTransportImp(const EventPoller::Ptr &poller); + WebRtcTransportImp(const EventPoller::Ptr &poller,bool perferred_tcp = false); void OnDtlsTransportApplicationDataReceived(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) override; void onStartWebRTC() override; void onSendSockData(Buffer::Ptr buf, bool flush = true, RTC::TransportTuple *tuple = nullptr) override; @@ -281,6 +281,7 @@ private: void onCheckAnswer(RtcSession &sdp); private: + bool _perferred_tcp; uint16_t _rtx_seq[2] = {0, 0}; //用掉的总流量 uint64_t _bytes_usage = 0;