diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index b8e06622..eb89de6e 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit b8e066222b757a2c11b5e44c49ef6982acb95fe2 +Subproject commit eb89de6e349202c9b6c85d55544faec0cdc7d581 diff --git a/conf/config.ini b/conf/config.ini index bb37ed8c..1a7d7747 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -227,6 +227,10 @@ timeoutSec=15 timeoutSec=15 #本机对rtc客户端的可见ip,作为服务器时一般为公网ip,置空时,会自动获取网卡ip externIP= +#rtc udp服务器监听端口号,所有rtc客户端将通过该端口传输stun/dtls/srtp/srtcp数据, +#该端口是多线程的,同时支持客户端网络切换导致的连接迁移 +#需要注意的是,如果服务器在nat内,需要做端口映射时,必须确保外网映射端口跟该端口一致 +port=8000 #设置remb比特率,非0时关闭twcc并开启remb。该设置在rtc推流时有效,可以控制推流画质 rembBitRate=1000000 diff --git a/webrtc/DtlsTransport.cpp b/webrtc/DtlsTransport.cpp index 8abb2a21..708b89f6 100644 --- a/webrtc/DtlsTransport.cpp +++ b/webrtc/DtlsTransport.cpp @@ -165,7 +165,7 @@ namespace RTC EC_KEY* ecKey{ nullptr }; X509_NAME* certName{ nullptr }; std::string subject = - std::string("mediasoup") + std::to_string(rand() % 999999 + 100000); + std::string("mediasoup") + to_string(rand() % 999999 + 100000); // Create key with curve. ecKey = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1); diff --git a/webrtc/WebRtcSession.cpp b/webrtc/WebRtcSession.cpp index fdb3a539..1e1bcf76 100644 --- a/webrtc/WebRtcSession.cpp +++ b/webrtc/WebRtcSession.cpp @@ -45,7 +45,7 @@ EventPoller::Ptr WebRtcSession::getPoller(const Buffer::Ptr &buffer) { if (user_name.empty()) { return nullptr; } - auto ret = WebRtcTransportImp::getTransport(user_name); + auto ret = WebRtcTransportImp::getRtcTransport(user_name, false); return ret ? ret->getPoller() : nullptr; } @@ -60,24 +60,34 @@ void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { WarnL << user_name; return; } - _transport = WebRtcTransportImp::getTransport(user_name); + _transport = WebRtcTransportImp::getRtcTransport(user_name, true); if (!_transport) { //逻辑分支不太可能走到这里 WarnL << user_name; return; } - _transport->setSession(this); + _transport->setSession(shared_from_this()); } + _ticker.resetTime(); _transport->inputSockData(buf, len, &_peer_addr); } void WebRtcSession::onError(const SockException &err) { - if (_transport) { - _transport->unrefSelf(err); - _transport = nullptr; - } + //udp链接超时,但是rtc链接不一定超时,因为可能存在udp链接迁移的情况 + //在udp链接迁移时,新的WebRtcSession对象将接管WebRtcTransport对象的生命周期 + //本WebRtcSession对象将在超时后自动销毁 + WarnP(this) << err.what(); + _transport = nullptr; } void WebRtcSession::onManager() { - + GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec); + if (!_transport && _ticker.createdTime() > timeoutSec * 1000) { + shutdown(SockException(Err_timeout, "illegal webrtc connection")); + return; + } + if (_ticker.elapsedTime() > timeoutSec * 1000) { + shutdown(SockException(Err_timeout, "webrtc connection timeout")); + return; + } } diff --git a/webrtc/WebRtcSession.h b/webrtc/WebRtcSession.h index b86f3f81..9fbe342c 100644 --- a/webrtc/WebRtcSession.h +++ b/webrtc/WebRtcSession.h @@ -30,6 +30,7 @@ public: void onManager() override; private: + Ticker _ticker; struct sockaddr _peer_addr; std::shared_ptr _transport; }; diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index a2698bf4..25575bda 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -50,45 +50,21 @@ void WebRtcTransport::onCreate(){ _key = to_string(reinterpret_cast(this)); _dtls_transport = std::make_shared(_poller, this); _ice_server = std::make_shared(this, _key, makeRandStr(24)); - refSelf(); } void WebRtcTransport::onDestory(){ _dtls_transport = nullptr; _ice_server = nullptr; - unrefSelf(SockException()); -} - -static mutex s_rtc_mtx; -static unordered_map > s_rtc_map; - -void WebRtcTransport::refSelf() { - _self = shared_from_this(); - - lock_guard lck(s_rtc_mtx); - s_rtc_map[_key] = static_pointer_cast(_self); -} - -void WebRtcTransport::unrefSelf(const SockException &ex) { - _self = nullptr; - - lock_guard lck(s_rtc_mtx); - s_rtc_map.erase(_key); -} - -WebRtcTransportImp::Ptr WebRtcTransportImp::getTransport(const string &key){ - lock_guard lck(s_rtc_mtx); - auto it = s_rtc_map.find(key); - if (it == s_rtc_map.end()) { - return nullptr; - } - return it->second.lock(); } const EventPoller::Ptr& WebRtcTransport::getPoller() const{ return _poller; } +const string &WebRtcTransport::getKey() const { + return _key; +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) { @@ -330,6 +306,8 @@ WebRtcTransportImp::Ptr WebRtcTransportImp::create(const EventPoller::Ptr &polle void WebRtcTransportImp::onCreate(){ WebRtcTransport::onCreate(); + registerSelf(); + weak_ptr weak_self = static_pointer_cast(shared_from_this()); GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec); _timer = std::make_shared(timeoutSec / 2, [weak_self]() { @@ -354,8 +332,14 @@ WebRtcTransportImp::~WebRtcTransportImp() { void WebRtcTransportImp::onDestory() { WebRtcTransport::onDestory(); - uint64_t duration = _alive_ticker.createdTime() / 1000; + unregisterSelf(); + auto session = _session.lock(); + if (!session) { + return; + } + + uint64_t duration = _alive_ticker.createdTime() / 1000; //流量统计事件广播 GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); @@ -366,7 +350,7 @@ void WebRtcTransportImp::onDestory() { << _media_info._streamid << ")结束播放,耗时(s):" << duration; if (_bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, *static_cast(_session)); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, static_cast(*session)); } } @@ -377,7 +361,7 @@ void WebRtcTransportImp::onDestory() { << _media_info._streamid << ")结束推流,耗时(s):" << duration; if (_bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, *static_cast(_session)); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, static_cast(*session)); } } } @@ -393,9 +377,14 @@ void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, const MediaInfo } void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) { + auto session = _session.lock(); + if (!session) { + WarnL << "send data failed:" << len; + return; + } auto ptr = BufferRaw::create(); ptr->assign(buf, len); - _session->send(std::move(ptr)); + session->send(std::move(ptr)); } /////////////////////////////////////////////////////////////////// @@ -966,8 +955,11 @@ void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, int &len, void *ctx void WebRtcTransportImp::onShutdown(const SockException &ex){ WarnL << ex.what(); - unrefSelf(ex); - _session->shutdown(ex); + unrefSelf(); + auto session = _session.lock(); + if (session) { + session->shutdown(ex); + } } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -999,9 +991,43 @@ string WebRtcTransportImp::getOriginUrl(MediaSource &sender) const { } std::shared_ptr WebRtcTransportImp::getOriginSock(MediaSource &sender) const { - return static_pointer_cast(const_cast(_session)->shared_from_this()); + return static_pointer_cast(_session.lock()); } -void WebRtcTransportImp::setSession(Session *session) { - _session = session; +void WebRtcTransportImp::setSession(weak_ptr session) { + _session = std::move(session); +} + + +static mutex s_rtc_mtx; +static unordered_map > s_rtc_map; + +void WebRtcTransportImp::registerSelf() { + _self = static_pointer_cast(shared_from_this()); + lock_guard lck(s_rtc_mtx); + s_rtc_map[getKey()] = static_pointer_cast(_self); +} + +void WebRtcTransportImp::unrefSelf() { + _self = nullptr; +} + +void WebRtcTransportImp::unregisterSelf() { + unrefSelf(); + lock_guard lck(s_rtc_mtx); + s_rtc_map.erase(getKey()); +} + +WebRtcTransportImp::Ptr WebRtcTransportImp::getRtcTransport(const string &key, bool unref_self){ + lock_guard lck(s_rtc_mtx); + auto it = s_rtc_map.find(key); + if (it == s_rtc_map.end()) { + return nullptr; + } + auto ret = it->second.lock(); + if (unref_self) { + //此对象不再强引用自己,因为自己将被WebRtcSession对象持有 + ret->unrefSelf(); + } + return ret; } \ No newline at end of file diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 2c1394a2..251024e4 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -31,6 +31,7 @@ using namespace mediakit; //RTC配置项目 namespace RTC { extern const string kPort; +extern const string kTimeOutSec; }//namespace RTC class WebRtcTransport : public RTC::DtlsTransport::Listener, public RTC::IceServer::Listener, public std::enable_shared_from_this { @@ -39,8 +40,6 @@ public: WebRtcTransport(const EventPoller::Ptr &poller); ~WebRtcTransport() override = default; - void unrefSelf(const SockException &ex); - /** * 创建对象 */ @@ -77,6 +76,7 @@ public: void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); const EventPoller::Ptr& getPoller() const; + const string& getKey() const; protected: //// dtls相关的回调 //// @@ -123,7 +123,6 @@ protected: private: void onSendSockData(const char *buf, size_t len, bool flush = true); void setRemoteDtlsFingerprint(const RtcSession &remote); - void refSelf(); private: uint8_t _srtp_buf[2000]; @@ -135,8 +134,6 @@ private: std::shared_ptr _srtp_session_recv; RtcSession::Ptr _offer_sdp; RtcSession::Ptr _answer_sdp; - //保持自我强引用 - WebRtcTransport::Ptr _self; }; class RtpChannel; @@ -172,9 +169,9 @@ public: * @return 对象 */ static Ptr create(const EventPoller::Ptr &poller); - static Ptr getTransport(const string &key); + static Ptr getRtcTransport(const string &key, bool unref_self); - void setSession(Session *session); + void setSession(weak_ptr session); /** * 绑定rtsp媒体源 @@ -220,12 +217,17 @@ private: void onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp); void onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc); void createRtpChannel(const string &rid, uint32_t ssrc, MediaTrack &track); + void registerSelf(); + void unregisterSelf(); + void unrefSelf(); private: bool _simulcast = false; uint16_t _rtx_seq[2] = {0, 0}; //用掉的总流量 uint64_t _bytes_usage = 0; + //保持自我强引用 + Ptr _self; //媒体相关元数据 MediaInfo _media_info; //检测超时的定时器 @@ -235,7 +237,7 @@ private: //pli rtcp计时器 Ticker _pli_ticker; //udp session - Session *_session; + weak_ptr _session; //推流的rtsp源 RtspMediaSource::Ptr _push_src; unordered_map _push_src_simulcast;