diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 5143a176..363a3175 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -114,7 +114,7 @@ void RtpTrack::setNtpStamp(uint32_t rtp_stamp, uint64_t ntp_stamp_ms) { } } -void RtpTrack::setPT(uint8_t pt){ +void RtpTrack::setPayloadType(uint8_t pt) { _pt = pt; } diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 41a9bacb..5a2e4da6 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -18,19 +18,18 @@ #include "Extension/Frame.h" // for NtpStamp #include "Common/Stamp.h" +#include "Util/TimeTicker.h" namespace mediakit { -template +template class PacketSortor { public: static constexpr SEQ SEQ_MAX = (std::numeric_limits::max)(); PacketSortor() = default; ~PacketSortor() = default; - void setOnSort(std::function cb) { - _cb = std::move(cb); - } + void setOnSort(std::function cb) { _cb = std::move(cb); } /** * 清空状态 @@ -38,23 +37,18 @@ public: void clear() { _started = false; _seq_cycle_count = 0; - _max_sort_size = kMin; _pkt_sort_cache_map.clear(); } /** * 获取排序缓存长度 */ - size_t getJitterSize() const { - return _pkt_sort_cache_map.size(); - } + size_t getJitterSize() const { return _pkt_sort_cache_map.size(); } /** * 获取seq回环次数 */ - size_t getCycleCount() const{ - return _seq_cycle_count; - } + size_t getCycleCount() const { return _seq_cycle_count; } /** * 输入并排序 @@ -73,51 +67,66 @@ public: return; } - if (seq < _last_seq_out && _last_seq_out != SEQ_MAX && seq < kMax && _last_seq_out > SEQ_MAX - kMax) { + if (seq < _last_seq_out && _last_seq_out != SEQ_MAX && seq < 1024 && _last_seq_out > SEQ_MAX - 1024) { // seq回环,清空回环前缓存 flush(); _last_seq_out = SEQ_MAX; - _pkt_sort_cache_map.emplace(seq, std::move(packet)); ++_seq_cycle_count; + sortPacket(seq, std::move(packet)); return; } if (seq <= _last_seq_out && _last_seq_out != SEQ_MAX) { // 这个回退包已经不再等待 - setBufferSize(seq); return; } _pkt_sort_cache_map.emplace(seq, std::move(packet)); - auto max_seq = _pkt_sort_cache_map.rbegin()->first; - auto min_seq = _pkt_sort_cache_map.begin()->first; - auto diff = max_seq - min_seq; - if (diff > (SEQ_MAX >> 1)) { + auto it_min = _pkt_sort_cache_map.begin(); + auto it_max = _pkt_sort_cache_map.rbegin(); + if (it_max->first - it_min->first > (SEQ_MAX >> 1)) { // 回环后,收到回环前的大值seq, 忽略掉 - _pkt_sort_cache_map.erase(max_seq); + _pkt_sort_cache_map.erase((++it_max).base()); return; } - if (min_seq == static_cast(_last_seq_out + 1) && _pkt_sort_cache_map.size() == (size_t)diff + 1) { - // 都是连续的seq, 未丢包 - flush(); - } else { - // seq不连续,有丢包 - if (_pkt_sort_cache_map.size() >= _max_sort_size) { - //buffer太长,强行减小 - popIterator(_pkt_sort_cache_map.begin()); - } + tryFlushFrontPacket(); + + if (_pkt_sort_cache_map.size() > _max_buffer_size || (_ticker.elapsedTime() > _max_buffer_ms && !_pkt_sort_cache_map.empty())) { + // buffer太长,强行减小 + WarnL << "packet dropped: " << static_cast(_last_seq_out + 1) << " -> " + << static_cast(_pkt_sort_cache_map.begin()->first - 1) + << ", jitter buffer size: " << _pkt_sort_cache_map.size() + << ", jitter buffer ms: " << _ticker.elapsedTime(); + popIterator(_pkt_sort_cache_map.begin()); } } - void flush(){ - //清空缓存 + void flush() { + // 清空缓存 while (!_pkt_sort_cache_map.empty()) { popIterator(_pkt_sort_cache_map.begin()); } } private: + void tryFlushFrontPacket() { + while (!_pkt_sort_cache_map.empty()) { + auto it = _pkt_sort_cache_map.begin(); + auto next_seq = static_cast(_last_seq_out + 1); + if (it->first < next_seq) { + _pkt_sort_cache_map.erase(it); + continue; + } + if (it->first == next_seq) { + // 连续的seq + popIterator(it); + continue; + } + break; + } + } + void popIterator(typename std::map::iterator it) { auto seq = it->first; auto data = std::move(it->second); @@ -128,22 +137,21 @@ private: void output(SEQ seq, T packet) { _last_seq_out = seq; _cb(seq, std::move(packet)); - } - - void setBufferSize(SEQ seq) { - auto next_seq = static_cast(_last_seq_out + 1); - auto min_seq = _pkt_sort_cache_map.empty() ? next_seq : _pkt_sort_cache_map.begin()->first; - _max_sort_size = MAX(std::min(_pkt_sort_cache_map.size() + min_seq - seq, kMax), kMin); + _ticker.resetTime(); } private: bool _started = false; + //排序缓存最大保存数据长度,单位毫秒 + size_t _max_buffer_ms = 3000; + //排序缓存最大保存数据个数 + size_t _max_buffer_size = 1024; + //记录上次output至今的时间 + toolkit::Ticker _ticker; //下次应该输出的SEQ SEQ _last_seq_out = 0; //seq回环次数计数 size_t _seq_cycle_count = 0; - //排序缓存长度 - size_t _max_sort_size = kMin; //pkt排序缓存,根据seq排序 std::map _pkt_sort_cache_map; //回调 @@ -166,7 +174,7 @@ public: uint32_t getSSRC() const; RtpPacket::Ptr inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len); void setNtpStamp(uint32_t rtp_stamp, uint64_t ntp_stamp_ms); - void setPT(uint8_t pt); + void setPayloadType(uint8_t pt); protected: virtual void onRtpSorted(RtpPacket::Ptr rtp) {} @@ -244,9 +252,9 @@ public: _track[index].setNtpStamp(rtp_stamp, ntp_stamp_ms); } - void setPT(int index, uint8_t pt){ + void setPayloadType(int index, uint8_t pt){ assert(index < kCount && index >= 0); - _track[index].setPT(pt); + _track[index].setPayloadType(pt); } void clear() { diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index e99e2591..2d7ab925 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -225,7 +225,7 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { _rtcp_context.clear(); for (auto &track : _sdp_track) { if(track->_pt != 0xff){ - setPT(_rtcp_context.size(),track->_pt); + setPayloadType(_rtcp_context.size(),track->_pt); } _rtcp_context.emplace_back(std::make_shared()); } diff --git a/webrtc/Nack.h b/webrtc/Nack.h index f2631035..8780e27a 100644 --- a/webrtc/Nack.h +++ b/webrtc/Nack.h @@ -49,7 +49,7 @@ public: // rtp丢包状态最长保留时间 static constexpr auto kNackMaxMS = 3 * 1000; // nack最多请求重传10次 - static constexpr auto kNackMaxCount = 10; + static constexpr auto kNackMaxCount = 15; // nack重传频率,rtt的倍数 static constexpr auto kNackIntervalRatio = 1.0f; // nack包中rtp个数,减小此值可以让nack包响应更灵敏