From 4783ac080820f878d517b34f681b0ebc15335571 Mon Sep 17 00:00:00 2001 From: mtdxc Date: Fri, 3 Mar 2023 11:18:21 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dwebrtc=E5=A4=9A=E5=80=99?= =?UTF-8?q?=E9=80=89=E5=9C=B0=E5=9D=80=E6=97=A0=E6=B3=95=E6=9D=A5=E5=9B=9E?= =?UTF-8?q?=E5=88=87=E6=8D=A2=E7=9A=84bug=20(#2266)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 最后一个连通的候选地址会被赋值并锁定为_selected_session,如果之前的候选地址再发送数据,将通过_selected_session回复,导致无法切换为旧的候选地址。 --- webrtc/IceServer.cpp | 28 +++++++------ webrtc/IceServer.hpp | 20 +++++----- webrtc/WebRtcPlayer.cpp | 10 ++--- webrtc/WebRtcPusher.cpp | 10 ++--- webrtc/WebRtcSession.cpp | 12 +++--- webrtc/WebRtcSession.h | 1 - webrtc/WebRtcTransport.cpp | 81 ++++++++++++++++++++------------------ webrtc/WebRtcTransport.h | 13 ++---- 8 files changed, 84 insertions(+), 91 deletions(-) diff --git a/webrtc/IceServer.cpp b/webrtc/IceServer.cpp index 48709ab2..7dfac0b7 100644 --- a/webrtc/IceServer.cpp +++ b/webrtc/IceServer.cpp @@ -198,8 +198,12 @@ namespace RTC // Create a success response. RTC::StunPacket* response = packet->CreateSuccessResponse(); + sockaddr_storage peerAddr; + socklen_t addr_len = sizeof(peerAddr); + getpeername(tuple->getSock()->rawFD(), (struct sockaddr *)&peerAddr, &addr_len); + // Add XOR-MAPPED-ADDRESS. - response->SetXorMappedAddress(tuple); + response->SetXorMappedAddress((struct sockaddr *)&peerAddr); // Authenticate the response. if (this->oldPassword.empty()) @@ -260,9 +264,9 @@ namespace RTC for (; it != this->tuples.end(); ++it) { - RTC::TransportTuple* storedTuple = std::addressof(*it); + RTC::TransportTuple* storedTuple = *it; - if (memcmp(storedTuple, tuple, sizeof (RTC::TransportTuple)) == 0) + if (storedTuple == tuple) { removedTuple = storedTuple; @@ -285,9 +289,9 @@ namespace RTC this->selectedTuple = nullptr; // Mark the first tuple as selected tuple (if any). - if (this->tuples.begin() != this->tuples.end()) + if (!this->tuples.empty()) { - SetSelectedTuple(std::addressof(*this->tuples.begin())); + SetSelectedTuple(this->tuples.front()); } // Or just emit 'disconnected'. else @@ -477,12 +481,10 @@ namespace RTC MS_TRACE(); // Add the new tuple at the beginning of the list. - this->tuples.push_front(*tuple); - - auto* storedTuple = std::addressof(*this->tuples.begin()); + this->tuples.push_front(tuple); // Return the address of the inserted tuple. - return storedTuple; + return tuple; } inline RTC::TransportTuple* IceServer::HasTuple(const RTC::TransportTuple* tuple) const @@ -495,15 +497,14 @@ namespace RTC return nullptr; // Check the current selected tuple. - if (memcmp(selectedTuple, tuple, sizeof (RTC::TransportTuple)) == 0) + if (selectedTuple == tuple) return this->selectedTuple; // Otherwise check other stored tuples. for (const auto& it : this->tuples) { - auto* storedTuple = const_cast(std::addressof(it)); - - if (memcmp(storedTuple, tuple, sizeof (RTC::TransportTuple)) == 0) + auto& storedTuple = it; + if (storedTuple == tuple) return storedTuple; } @@ -519,6 +520,7 @@ namespace RTC return; this->selectedTuple = storedTuple; + this->lastSelectedTuple = storedTuple->shared_from_this(); // Notify the listener. this->listener->OnIceServerSelectedTuple(this, this->selectedTuple); diff --git a/webrtc/IceServer.hpp b/webrtc/IceServer.hpp index 8b9742ad..587fb1e9 100644 --- a/webrtc/IceServer.hpp +++ b/webrtc/IceServer.hpp @@ -20,6 +20,7 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. #define MS_RTC_ICE_SERVER_HPP #include "StunPacket.hpp" +#include "Network/Session.h" #include "logger.h" #include "Utils.hpp" #include @@ -27,11 +28,9 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. #include #include -using _TransportTuple = struct sockaddr; - namespace RTC { - using TransportTuple = _TransportTuple; + using TransportTuple = toolkit::Session; class IceServer { public: @@ -80,10 +79,10 @@ namespace RTC { return this->state; } - RTC::TransportTuple* GetSelectedTuple() const + RTC::TransportTuple* GetSelectedTuple(bool try_last_tuple = false) const { - return this->selectedTuple; - } + return try_last_tuple ? this->lastSelectedTuple.lock().get() : this->selectedTuple; + } void SetUsernameFragment(const std::string& usernameFragment) { this->oldUsernameFragment = this->usernameFragment; @@ -100,7 +99,9 @@ namespace RTC // and the given tuple must be an already valid tuple. void ForceSelectedTuple(const RTC::TransportTuple* tuple); - private: + const std::list& GetTuples() const { return tuples; } + + private: void HandleTuple(RTC::TransportTuple* tuple, bool hasUseCandidate); /** * Store the given tuple and return its stored address. @@ -125,8 +126,9 @@ namespace RTC std::string oldUsernameFragment; std::string oldPassword; IceState state{ IceState::NEW }; - std::list tuples; - RTC::TransportTuple* selectedTuple{ nullptr }; + std::list tuples; + RTC::TransportTuple *selectedTuple; + std::weak_ptr lastSelectedTuple; //最大不超过mtu static constexpr size_t StunSerializeBufferSize{ 1600 }; uint8_t StunSerializeBuffer[StunSerializeBufferSize]; diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp index 3b533594..bbdb6274 100644 --- a/webrtc/WebRtcPlayer.cpp +++ b/webrtc/WebRtcPlayer.cpp @@ -70,21 +70,17 @@ void WebRtcPlayer::onStartWebRTC() { } } void WebRtcPlayer::onDestory() { - WebRtcTransportImp::onDestory(); - auto duration = getDuration(); auto bytes_usage = getBytesUsage(); //流量统计事件广播 GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_reader && getSession()) { - WarnL << "RTC播放器(" - << _media_info.shortUrl() - << ")结束播放,耗时(s):" << duration; + WarnL << "RTC播放器(" << _media_info.shortUrl() << ")结束播放,耗时(s):" << duration; if (bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, - true, static_cast(*getSession())); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, true, static_cast(*getSession())); } } + WebRtcTransportImp::onDestory(); } void WebRtcPlayer::onRtcConfigure(RtcConfigure &configure) const { diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 2fdd5e9a..8fe6549c 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -118,20 +118,15 @@ void WebRtcPusher::onStartWebRTC() { } void WebRtcPusher::onDestory() { - WebRtcTransportImp::onDestory(); - auto duration = getDuration(); auto bytes_usage = getBytesUsage(); //流量统计事件广播 GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (getSession()) { - WarnL << "RTC推流器(" - << _media_info.shortUrl() - << ")结束推流,耗时(s):" << duration; + WarnL << "RTC推流器(" << _media_info.shortUrl() << ")结束推流,耗时(s):" << duration; if (bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, - false, static_cast(*getSession())); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, false, static_cast(*getSession())); } } @@ -142,6 +137,7 @@ void WebRtcPusher::onDestory() { auto push_src = std::move(_push_src); getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; }); } + WebRtcTransportImp::onDestory(); } void WebRtcPusher::onRtcConfigure(RtcConfigure &configure) const { diff --git a/webrtc/WebRtcSession.cpp b/webrtc/WebRtcSession.cpp index c797ddb0..ade1ce20 100644 --- a/webrtc/WebRtcSession.cpp +++ b/webrtc/WebRtcSession.cpp @@ -48,8 +48,6 @@ EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) { //////////////////////////////////////////////////////////////////////////////// WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) { - socklen_t addr_len = sizeof(_peer_addr); - getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len); _over_tcp = sock->sockType() == SockNum::Sock_TCP; } @@ -87,14 +85,12 @@ void WebRtcSession::onRecv_l(const char *data, size_t len) { //3、销毁原先的socket和WebRtcSession(原先的对象跟WebRtcTransport不在同一条线程) throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName()); } - - transport->setSession(shared_from_this()); _transport = std::move(transport); InfoP(this); } _ticker.resetTime(); CHECK(_transport); - _transport->inputSockData((char *)data, len, (struct sockaddr *)&_peer_addr); + _transport->inputSockData((char *)data, len, this); } void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { @@ -114,9 +110,13 @@ void WebRtcSession::onError(const SockException &err) { if (!_transport) { return; } + auto self = shared_from_this(); auto transport = std::move(_transport); - getPoller()->async([transport] { + getPoller()->async([transport, self]() mutable { //延时减引用,防止使用transport对象时,销毁对象 + transport->removeTuple(self.get()); + //确保transport在Session对象前销毁,防止WebRtcTransport::onDestory()时获取不到Session对象 + transport = nullptr; }, false); } diff --git a/webrtc/WebRtcSession.h b/webrtc/WebRtcSession.h index 6ce881ba..f70d5e74 100644 --- a/webrtc/WebRtcSession.h +++ b/webrtc/WebRtcSession.h @@ -46,7 +46,6 @@ private: bool _over_tcp = false; bool _find_transport = true; Ticker _ticker; - struct sockaddr_storage _peer_addr; std::weak_ptr _server; WebRtcTransportImp::Ptr _transport; }; diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index b4d1cb73..99f9f55d 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -75,6 +75,17 @@ static void translateIPFromEnv(std::vector &v) { } } +const char* sockTypeStr(Session* session) { + if (session) { + switch (session->getSock()->sockType()) { + case SockNum::Sock_TCP: return "tcp"; + case SockNum::Sock_UDP: return "udp"; + default: break; + } + } + return "unknown"; +} + WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) { _poller = poller; _identifier = "zlm_" + to_string(++s_key); @@ -109,16 +120,18 @@ void WebRtcTransport::OnIceServerSendStunPacket( sendSockData((char *)packet->GetData(), packet->GetSize(), tuple); } -void WebRtcTransport::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) { - InfoL; +void WebRtcTransportImp::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) { + InfoL << getIdentifier() << " select tuple " << sockTypeStr(tuple) << " " << tuple->get_peer_ip() << ":" << tuple->get_peer_port(); + tuple->setSendFlushFlag(false); + unrefSelf(); } void WebRtcTransport::OnIceServerConnected(const RTC::IceServer *iceServer) { - InfoL; + InfoL << getIdentifier(); } void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer *iceServer) { - InfoL; + InfoL << getIdentifier(); if (_answer_sdp->media[0].role == DtlsRole::passive) { _dtls_transport->Run(RTC::DtlsTransport::Role::SERVER); } else { @@ -127,7 +140,7 @@ void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer *iceServer) { } void WebRtcTransport::OnIceServerDisconnected(const RTC::IceServer *iceServer) { - InfoL; + InfoL << getIdentifier(); } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -135,7 +148,7 @@ void WebRtcTransport::OnIceServerDisconnected(const RTC::IceServer *iceServer) { void WebRtcTransport::OnDtlsTransportConnected( const RTC::DtlsTransport *dtlsTransport, RTC::SrtpSession::CryptoSuite srtpCryptoSuite, uint8_t *srtpLocalKey, size_t srtpLocalKeyLen, uint8_t *srtpRemoteKey, size_t srtpRemoteKeyLen, std::string &remoteCert) { - InfoL; + InfoL << getIdentifier(); _srtp_session_send = std::make_shared( RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen); _srtp_session_recv = std::make_shared( @@ -153,16 +166,16 @@ void WebRtcTransport::OnDtlsTransportSendData( } void WebRtcTransport::OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) { - InfoL; + InfoL << getIdentifier(); } void WebRtcTransport::OnDtlsTransportFailed(const RTC::DtlsTransport *dtlsTransport) { - InfoL; + InfoL << getIdentifier(); onShutdown(SockException(Err_shutdown, "dtls transport failed")); } void WebRtcTransport::OnDtlsTransportClosed(const RTC::DtlsTransport *dtlsTransport) { - InfoL; + InfoL << getIdentifier(); onShutdown(SockException(Err_shutdown, "dtls close notify received")); } @@ -178,7 +191,7 @@ void WebRtcTransport::OnDtlsTransportApplicationDataReceived( ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #ifdef ENABLE_SCTP void WebRtcTransport::OnSctpAssociationConnecting(RTC::SctpAssociation *sctpAssociation) { - TraceL; + TraceL << getIdentifier(); } void WebRtcTransport::OnSctpAssociationConnected(RTC::SctpAssociation *sctpAssociation) { @@ -215,8 +228,9 @@ void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTu onSendSockData(std::move(pkt), true, tuple ? tuple : _ice_server->GetSelectedTuple()); } -RTC::TransportTuple *WebRtcTransport::getSelectedTuple() const { - return _ice_server->GetSelectedTuple(); +Session::Ptr WebRtcTransport::getSession() const { + auto tuple = _ice_server->GetSelectedTuple(true); + return tuple ? tuple->shared_from_this() : nullptr; } void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) { @@ -293,7 +307,7 @@ static bool isDtls(char *buf) { } static string getPeerAddress(RTC::TransportTuple *tuple) { - return SockUtil::inet_ntoa(tuple); + return tuple->get_peer_ip(); } void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tuple) { @@ -409,24 +423,27 @@ void WebRtcTransportImp::onDestory() { } void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::TransportTuple *tuple) { - if (!_selected_session) { - WarnL << "send data failed:" << buf->size(); - return; + if (tuple == nullptr) { + tuple = _ice_server->GetSelectedTuple(); + if (!tuple) { + WarnL << "send data failed:" << buf->size(); + return; + } } // 一次性发送一帧的rtp数据,提高网络io性能 - if (_selected_session->getSock()->sockType() == SockNum::Sock_TCP) { + if (tuple->getSock()->sockType() == SockNum::Sock_TCP) { // 增加tcp两字节头 auto len = buf->size(); char tcp_len[2] = { 0 }; tcp_len[0] = (len >> 8) & 0xff; tcp_len[1] = len & 0xff; - _selected_session->SockSender::send(tcp_len, 2); + tuple->SockSender::send(tcp_len, 2); } - _selected_session->send(std::move(buf)); + tuple->send(std::move(buf)); if (flush) { - _selected_session->flushAll(); + tuple->flushAll(); } } @@ -1040,28 +1057,14 @@ void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, int &len, void *ctx void WebRtcTransportImp::onShutdown(const SockException &ex) { WarnL << ex.what(); unrefSelf(); - for (auto &pr : _history_sessions) { - auto session = pr.second.lock(); - if (session) { - session->shutdown(ex); - } + for (auto &tuple : _ice_server->GetTuples()) { + tuple->shutdown(ex); } } -void WebRtcTransportImp::setSession(Session::Ptr session) { - _history_sessions.emplace(session.get(), session); - if (_selected_session) { - InfoL << "rtc network changed: " << _selected_session->get_peer_ip() << ":" - << _selected_session->get_peer_port() << " -> " << session->get_peer_ip() << ":" - << session->get_peer_port() << ", id:" << getIdentifier(); - } - _selected_session = std::move(session); - _selected_session->setSendFlushFlag(false); - unrefSelf(); -} - -const Session::Ptr &WebRtcTransportImp::getSession() const { - return _selected_session; +void WebRtcTransportImp::removeTuple(RTC::TransportTuple *tuple) { + InfoL << getIdentifier() << " remove tuple " << tuple->get_peer_ip() << ":" << tuple->get_peer_port(); + this->_ice_server->RemoveTuple(tuple); } uint64_t WebRtcTransportImp::getBytesUsage() const { diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 3978864b..dfae8012 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -110,6 +110,7 @@ public: void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); const EventPoller::Ptr& getPoller() const; + Session::Ptr getSession() const; protected: //// dtls相关的回调 //// @@ -130,7 +131,6 @@ protected: protected: //// ice相关的回调 /// void OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) override; - void OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) override; void OnIceServerConnected(const RTC::IceServer *iceServer) override; void OnIceServerCompleted(const RTC::IceServer *iceServer) override; void OnIceServerDisconnected(const RTC::IceServer *iceServer) override; @@ -159,7 +159,6 @@ protected: virtual void onRtcpBye() = 0; protected: - RTC::TransportTuple* getSelectedTuple() const; void sendRtcpRemb(uint32_t ssrc, size_t bit_rate); void sendRtcpPli(uint32_t ssrc); @@ -170,11 +169,11 @@ private: protected: RtcSession::Ptr _offer_sdp; RtcSession::Ptr _answer_sdp; + std::shared_ptr _ice_server; private: std::string _identifier; EventPoller::Ptr _poller; - std::shared_ptr _ice_server; std::shared_ptr _dtls_transport; std::shared_ptr _srtp_session_send; std::shared_ptr _srtp_session_recv; @@ -239,8 +238,6 @@ public: using Ptr = std::shared_ptr; ~WebRtcTransportImp() override; - void setSession(Session::Ptr session); - const Session::Ptr& getSession() const; uint64_t getBytesUsage() const; uint64_t getDuration() const; bool canSendRtp() const; @@ -248,8 +245,10 @@ public: void onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx = false); void createRtpChannel(const std::string &rid, uint32_t ssrc, MediaTrack &track); + void removeTuple(RTC::TransportTuple* tuple); protected: + void OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) override; WebRtcTransportImp(const EventPoller::Ptr &poller,bool preferred_tcp = false); void OnDtlsTransportApplicationDataReceived(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) override; void onStartWebRTC() override; @@ -292,10 +291,6 @@ private: Ticker _alive_ticker; //pli rtcp计时器 Ticker _pli_ticker; - //当前选中的udp链接 - Session::Ptr _selected_session; - //链接迁移前后使用过的udp链接 - std::unordered_map > _history_sessions; //twcc rtcp发送上下文对象 TwccContext _twcc_ctx; //根据发送rtp的track类型获取相关信息