From d05c9d5f511db0c2d6b93b3ec0374dd0e29b5fae Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Mon, 6 Mar 2023 20:43:07 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E9=87=8D=E5=86=99jitter=20buffer=E9=95=BF?= =?UTF-8?q?=E5=BA=A6=E6=8E=A7=E5=88=B6=E7=AE=97=E6=B3=95=EF=BC=8C=E6=8F=90?= =?UTF-8?q?=E9=AB=98webrtc/rtp=E6=8A=97=E4=B8=A2=E5=8C=85=E8=83=BD?= =?UTF-8?q?=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/RtpReceiver.cpp | 2 +- src/Rtsp/RtpReceiver.h | 90 ++++++++++++++++++++++------------------ src/Rtsp/RtspPlayer.cpp | 2 +- 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 5143a176..363a3175 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -114,7 +114,7 @@ void RtpTrack::setNtpStamp(uint32_t rtp_stamp, uint64_t ntp_stamp_ms) { } } -void RtpTrack::setPT(uint8_t pt){ +void RtpTrack::setPayloadType(uint8_t pt) { _pt = pt; } diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 41a9bacb..7dc40bc9 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -18,19 +18,18 @@ #include "Extension/Frame.h" // for NtpStamp #include "Common/Stamp.h" +#include "Util/TimeTicker.h" namespace mediakit { -template +template class PacketSortor { public: static constexpr SEQ SEQ_MAX = (std::numeric_limits::max)(); PacketSortor() = default; ~PacketSortor() = default; - void setOnSort(std::function cb) { - _cb = std::move(cb); - } + void setOnSort(std::function cb) { _cb = std::move(cb); } /** * 清空状态 @@ -38,23 +37,18 @@ public: void clear() { _started = false; _seq_cycle_count = 0; - _max_sort_size = kMin; _pkt_sort_cache_map.clear(); } /** * 获取排序缓存长度 */ - size_t getJitterSize() const { - return _pkt_sort_cache_map.size(); - } + size_t getJitterSize() const { return _pkt_sort_cache_map.size(); } /** * 获取seq回环次数 */ - size_t getCycleCount() const{ - return _seq_cycle_count; - } + size_t getCycleCount() const { return _seq_cycle_count; } /** * 输入并排序 @@ -73,51 +67,66 @@ public: return; } - if (seq < _last_seq_out && _last_seq_out != SEQ_MAX && seq < kMax && _last_seq_out > SEQ_MAX - kMax) { + if (seq < _last_seq_out && _last_seq_out != SEQ_MAX && seq < 1024 && _last_seq_out > SEQ_MAX - 1024) { // seq回环,清空回环前缓存 flush(); _last_seq_out = SEQ_MAX; - _pkt_sort_cache_map.emplace(seq, std::move(packet)); ++_seq_cycle_count; + sortPacket(seq, std::move(packet)); return; } if (seq <= _last_seq_out && _last_seq_out != SEQ_MAX) { // 这个回退包已经不再等待 - setBufferSize(seq); return; } _pkt_sort_cache_map.emplace(seq, std::move(packet)); - auto max_seq = _pkt_sort_cache_map.rbegin()->first; - auto min_seq = _pkt_sort_cache_map.begin()->first; - auto diff = max_seq - min_seq; - if (diff > (SEQ_MAX >> 1)) { + auto it_min = _pkt_sort_cache_map.begin(); + auto it_max = _pkt_sort_cache_map.rbegin(); + if (it_max->first - it_min->first > (SEQ_MAX >> 1)) { // 回环后,收到回环前的大值seq, 忽略掉 - _pkt_sort_cache_map.erase(max_seq); + _pkt_sort_cache_map.erase((++it_max).base()); return; } - if (min_seq == static_cast(_last_seq_out + 1) && _pkt_sort_cache_map.size() == (size_t)diff + 1) { - // 都是连续的seq, 未丢包 - flush(); - } else { - // seq不连续,有丢包 - if (_pkt_sort_cache_map.size() >= _max_sort_size) { - //buffer太长,强行减小 - popIterator(_pkt_sort_cache_map.begin()); - } + tryFlushFrontPacket(); + + if (_pkt_sort_cache_map.size() > _max_buffer_size || (_ticker.elapsedTime() > _max_buffer_ms && !_pkt_sort_cache_map.empty())) { + // buffer太长,强行减小 + WarnL << "packet dropped: " << static_cast(_last_seq_out + 1) << " -> " + << static_cast(_pkt_sort_cache_map.begin()->first - 1) + << ", jitter buffer size: " << _pkt_sort_cache_map.size() + << ", jitter buffer ms: " << _ticker.elapsedTime(); + popIterator(_pkt_sort_cache_map.begin()); } } - void flush(){ - //清空缓存 + void flush() { + // 清空缓存 while (!_pkt_sort_cache_map.empty()) { popIterator(_pkt_sort_cache_map.begin()); } } private: + void tryFlushFrontPacket() { + while (!_pkt_sort_cache_map.empty()) { + auto it = _pkt_sort_cache_map.begin(); + auto next_seq = static_cast(_last_seq_out + 1); + if (it->first < next_seq) { + _pkt_sort_cache_map.erase(it); + continue; + } + if (it->first == next_seq) { + // 连续的seq + popIterator(it); + continue; + } + break; + } + } + void popIterator(typename std::map::iterator it) { auto seq = it->first; auto data = std::move(it->second); @@ -128,22 +137,21 @@ private: void output(SEQ seq, T packet) { _last_seq_out = seq; _cb(seq, std::move(packet)); - } - - void setBufferSize(SEQ seq) { - auto next_seq = static_cast(_last_seq_out + 1); - auto min_seq = _pkt_sort_cache_map.empty() ? next_seq : _pkt_sort_cache_map.begin()->first; - _max_sort_size = MAX(std::min(_pkt_sort_cache_map.size() + min_seq - seq, kMax), kMin); + _ticker.resetTime(); } private: bool _started = false; + //排序缓存最大保存数据长度,单位毫秒 + size_t _max_buffer_ms = 1000; + //排序缓存最大保存数据个数 + size_t _max_buffer_size = 1024; + //记录上次output至今的时间 + toolkit::Ticker _ticker; //下次应该输出的SEQ SEQ _last_seq_out = 0; //seq回环次数计数 size_t _seq_cycle_count = 0; - //排序缓存长度 - size_t _max_sort_size = kMin; //pkt排序缓存,根据seq排序 std::map _pkt_sort_cache_map; //回调 @@ -166,7 +174,7 @@ public: uint32_t getSSRC() const; RtpPacket::Ptr inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len); void setNtpStamp(uint32_t rtp_stamp, uint64_t ntp_stamp_ms); - void setPT(uint8_t pt); + void setPayloadType(uint8_t pt); protected: virtual void onRtpSorted(RtpPacket::Ptr rtp) {} @@ -244,9 +252,9 @@ public: _track[index].setNtpStamp(rtp_stamp, ntp_stamp_ms); } - void setPT(int index, uint8_t pt){ + void setPayloadType(int index, uint8_t pt){ assert(index < kCount && index >= 0); - _track[index].setPT(pt); + _track[index].setPayloadType(pt); } void clear() { diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index e99e2591..2d7ab925 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -225,7 +225,7 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { _rtcp_context.clear(); for (auto &track : _sdp_track) { if(track->_pt != 0xff){ - setPT(_rtcp_context.size(),track->_pt); + setPayloadType(_rtcp_context.size(),track->_pt); } _rtcp_context.emplace_back(std::make_shared()); } From 23296ae5fade3a06e98df857d4c7cf83b1443d60 Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Tue, 7 Mar 2023 10:37:44 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=8A=A0=E5=A4=A7=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E5=86=97=E4=BD=99=EF=BC=8C=E6=8F=90=E9=AB=98webrtc=E6=8A=97?= =?UTF-8?q?=E4=B8=A2=E5=8C=85=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/RtpReceiver.h | 2 +- webrtc/Nack.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 7dc40bc9..5a2e4da6 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -143,7 +143,7 @@ private: private: bool _started = false; //排序缓存最大保存数据长度,单位毫秒 - size_t _max_buffer_ms = 1000; + size_t _max_buffer_ms = 3000; //排序缓存最大保存数据个数 size_t _max_buffer_size = 1024; //记录上次output至今的时间 diff --git a/webrtc/Nack.h b/webrtc/Nack.h index f2631035..8780e27a 100644 --- a/webrtc/Nack.h +++ b/webrtc/Nack.h @@ -49,7 +49,7 @@ public: // rtp丢包状态最长保留时间 static constexpr auto kNackMaxMS = 3 * 1000; // nack最多请求重传10次 - static constexpr auto kNackMaxCount = 10; + static constexpr auto kNackMaxCount = 15; // nack重传频率,rtt的倍数 static constexpr auto kNackIntervalRatio = 1.0f; // nack包中rtp个数,减小此值可以让nack包响应更灵敏