diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 8ac620ac..f2ab0d41 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -42,7 +42,7 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { } if (it != _map_rtp_process.end() && makeNew) { //已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题 - throw std::runtime_error(StrPrinter << "RtpProcess(" << stream_id << ") already existed"); + throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id << ") already existed"); } RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id]; if (!ref) { diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index d5d6dda1..db0683e8 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -44,6 +44,13 @@ public: RtpSelector() = default; ~RtpSelector() = default; + class ProcessExisted : public std::runtime_error { + public: + template + ProcessExisted(T && ...args) : std::runtime_error(std::forward(args)...) {} + ~ProcessExisted() override = default; + }; + static bool getSSRC(const char *data,size_t data_len, uint32_t &ssrc); static RtpSelector &Instance(); diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 75d1f9a5..e66e6f42 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -76,6 +76,15 @@ void RtpSession::onManager() { } void RtpSession::onRtpPacket(const char *data, size_t len) { + if (_delay_close) { + // 正在延时关闭中,忽略所有数据 + return; + } + if (!isRtp(data, len)) { + // 忽略非rtp数据 + WarnP(this) << "Not rtp packet"; + return; + } if (!_is_udp) { if (_search_rtp) { //搜索上下文期间,数据丢弃 @@ -94,10 +103,6 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { } } if (!_process) { - if (!isRtp(data, len)) { - WarnP(this) << "Not rtp packet"; - return; - } //未设置ssrc时,尝试获取ssrc if (!_ssrc && !RtpSelector::getSSRC(data, len, _ssrc)) { return; @@ -106,8 +111,18 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { //未指定流id就使用ssrc为流id _stream_id = printSSRC(_ssrc); } - //tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess - _process = RtpSelector::Instance().getProcess(_stream_id, true); + try { + _process = RtpSelector::Instance().getProcess(_stream_id, true); + } catch (RtpSelector::ProcessExisted &ex) { + if (!_is_udp) { + // tcp情况下立即断开连接 + throw; + } + // udp情况下延时断开连接(等待超时自动关闭),防止频繁创建销毁RtpSession对象 + WarnP(this) << ex.what(); + _delay_close = true; + return; + } _process->setOnlyAudio(_only_audio); _process->setDelegate(dynamic_pointer_cast(shared_from_this())); } diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 586e2df2..81a8c2e1 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -43,6 +43,7 @@ protected: const char *onSearchPacketTail(const char *data, size_t len) override; private: + bool _delay_close = false; bool _is_udp = false; bool _search_rtp = false; bool _search_rtp_finished = false;