From f70bfb5f8b1253981606336dda5e8776f71aaa13 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 22 Apr 2021 22:02:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=BC=E5=AE=B9GB28181=20tcp=E6=8E=A8?= =?UTF-8?q?=E6=B5=81=E7=BC=93=E5=AD=98=E8=A6=86=E7=9B=96=E7=9A=84=E5=9E=83?= =?UTF-8?q?=E5=9C=BE=E8=AE=BE=E5=A4=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtp/RtpSession.cpp | 80 +++++++++++++++++++++++++++++++++++++--- src/Rtp/RtpSession.h | 5 +++ src/Rtsp/RtpReceiver.cpp | 2 +- src/Rtsp/RtpReceiver.h | 7 ++++ 4 files changed, 88 insertions(+), 6 deletions(-) diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index bf89073d..4eb79120 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -12,6 +12,7 @@ #include "RtpSession.h" #include "RtpSelector.h" #include "Network/TcpServer.h" +#include "Rtsp/RtpReceiver.h" namespace mediakit{ const string RtpSession::kStreamID = "stream_id"; @@ -57,23 +58,40 @@ void RtpSession::onManager() { } void RtpSession::onRtpPacket(const char *data, size_t len) { + if (_search_rtp) { + //搜索上下文期间,数据丢弃 + if (_search_rtp_finished) { + //下个包开始就是正确的rtp包了 + _search_rtp_finished = false; + _search_rtp = false; + } + return; + } if (len > 1024 * 10) { - throw SockException(Err_shutdown, StrPrinter << "rtp包长度异常(" << len << "),发送端可能缓存溢出并覆盖"); + _search_rtp = true; + WarnL << "rtp包长度异常(" << len << "),发送端可能缓存溢出并覆盖,开始搜索ssrc以便恢复上下文"; + return; } if (!_process) { - uint32_t ssrc; - if (!RtpSelector::getSSRC(data, len, ssrc)) { + if (!RtpSelector::getSSRC(data, len, _ssrc)) { return; } if (_stream_id.empty()) { //未指定流id就使用ssrc为流id - _stream_id = printSSRC(ssrc); + _stream_id = printSSRC(_ssrc); } //tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess _process = RtpSelector::Instance().getProcess(_stream_id, true); _process->setListener(dynamic_pointer_cast(shared_from_this())); } - _process->inputRtp(false, getSock(), data, len, &addr); + try { + _process->inputRtp(false, getSock(), data, len, &addr); + } catch (RtpReceiver::BadRtpException &ex) { + WarnL << ex.what() << ",开始搜索ssrc以便恢复上下文"; + _search_rtp = true; + } catch (...) { + throw; + } _ticker.resetTime(); } @@ -92,5 +110,57 @@ int RtpSession::totalReaderCount(MediaSource &sender) { return _process ? _process->getTotalReaderCount() : sender.totalReaderCount(); } +static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) { + //rtp前面必须预留两个字节的长度字段 + for (ssize_t i = 2; i <= len - 4; ++i) { + auto ptr = (const uint8_t *) data + i; + if (ptr[0] == (ssrc >> 24) && ptr[1] == ((ssrc >> 16) & 0xFF) && + ptr[2] == ((ssrc >> 8) & 0xFF) && ptr[3] == (ssrc & 0xFF)) { + return (const char *) ptr; + } + } + return nullptr; +} + +//rtp长度到ssrc间的长度固定为10 +static size_t constexpr kSSRCOffset = 2 + 4 + 4; + +const char *RtpSession::onSearchPacketTail(const char *data, size_t len) { + if (!_search_rtp) { + //tcp上下文正常,不用搜索ssrc + return RtpSplitter::onSearchPacketTail(data, len); + } + if (!_process) { + throw SockException(Err_shutdown, "ssrc未获取到,无法通过ssrc恢复tcp上下文"); + } + //搜索第一个rtp的ssrc + auto ssrc_ptr0 = findSSRC(data, len, _ssrc); + if (!ssrc_ptr0) { + //未搜索到任意rtp,返回数据不够 + return nullptr; + } + //这两个字节是第一个rtp的长度字段 + auto rtp_len_ptr = (ssrc_ptr0 - kSSRCOffset); + auto rtp_len = ((uint8_t *)rtp_len_ptr)[0] << 8 | ((uint8_t *)rtp_len_ptr)[1]; + + //搜索第二个rtp的ssrc + auto ssrc_ptr1 = findSSRC(ssrc_ptr0 + rtp_len, data + (ssize_t) len - ssrc_ptr0 - rtp_len, _ssrc); + if (!ssrc_ptr1) { + //未搜索到第二个rtp,返回数据不够 + return nullptr; + } + + //两个ssrc的间隔正好等于rtp的长度(外加rtp长度字段),那么说明找到rtp + auto ssrc_offset = ssrc_ptr1 - ssrc_ptr0; + if (ssrc_offset == rtp_len + 2 || ssrc_offset == rtp_len + 4) { + InfoL << "rtp搜索成功,tcp上下文恢复成功,丢弃的rtp残余数据为:" << rtp_len_ptr - data; + _search_rtp_finished = true; + //前面的数据都需要丢弃,这个是rtp的起始 + return rtp_len_ptr; + } + //第一个rtp长度不匹配,说明第一个找到的ssrc不是rtp,丢弃之,我们从第二个ssrc所在rtp开始搜索 + return ssrc_ptr1 - kSSRCOffset; +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 60fa0654..2999b2b9 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -38,7 +38,12 @@ protected: // 收到rtp回调 void onRtpPacket(const char *data, size_t len) override; + const char *onSearchPacketTail(const char *data, size_t len) override; + private: + bool _search_rtp = false; + bool _search_rtp_finished = false; + uint32_t _ssrc = 0; Ticker _ticker; string _stream_id; struct sockaddr addr; diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 70c22b60..7601bfdf 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -42,7 +42,7 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 } RtpHeader *header = (RtpHeader *) ptr; if (header->version != RtpPacket::kRtpVersion) { - throw std::invalid_argument("非法的rtp,version字段非法"); + throw BadRtpException("非法的rtp,version字段非法"); } if (!header->getPayloadSize(len)) { //无有效负载的rtp包 diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index f8d2fde9..a5f1eaa0 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -165,6 +165,13 @@ public: RtpReceiver(); virtual ~RtpReceiver(); + class BadRtpException : public invalid_argument { + public: + template + BadRtpException(Type &&type) : invalid_argument(std::forward(type)) {} + ~BadRtpException() = default; + }; + protected: /** * 输入数据指针生成并排序rtp包