diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 96b077d7..5143a176 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -14,7 +14,7 @@ namespace mediakit { RtpTrack::RtpTrack() { - setOnSort([this](uint16_t seq, RtpPacket::Ptr &packet) { + setOnSort([this](uint16_t seq, RtpPacket::Ptr packet) { onRtpSorted(std::move(packet)); }); } diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 392df104..41a9bacb 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -24,10 +24,11 @@ namespace mediakit { template class PacketSortor { public: + static constexpr SEQ SEQ_MAX = (std::numeric_limits::max)(); PacketSortor() = default; ~PacketSortor() = default; - void setOnSort(std::function cb) { + void setOnSort(std::function cb) { _cb = std::move(cb); } @@ -35,16 +36,16 @@ public: * 清空状态 */ void clear() { + _started = false; _seq_cycle_count = 0; - _pkt_sort_cache_map.clear(); - _next_seq_out = 0; _max_sort_size = kMin; + _pkt_sort_cache_map.clear(); } /** * 获取排序缓存长度 */ - size_t getJitterSize() const{ + size_t getJitterSize() const { return _pkt_sort_cache_map.size(); } @@ -61,24 +62,52 @@ public: * @param packet 包负载 */ void sortPacket(SEQ seq, T packet) { - if(!_is_inited && _next_seq_out == 0){ - _next_seq_out = seq; - _is_inited = true; + if (!_started) { + // 记录第一个seq + _started = true; + _last_seq_out = seq - 1; } - if (seq < _next_seq_out) { - if (_next_seq_out < seq + kMax) { - //过滤seq回退包(回环包除外) - return; - } - } else if (_next_seq_out && seq - _next_seq_out > ((std::numeric_limits::max)() >> 1)) { - //过滤seq跳变非常大的包(防止回环时乱序时收到非常大的seq) + if (seq == static_cast(_last_seq_out + 1)) { + // 收到下一个seq + output(seq, std::move(packet)); + return; + } + + if (seq < _last_seq_out && _last_seq_out != SEQ_MAX && seq < kMax && _last_seq_out > SEQ_MAX - kMax) { + // seq回环,清空回环前缓存 + flush(); + _last_seq_out = SEQ_MAX; + _pkt_sort_cache_map.emplace(seq, std::move(packet)); + ++_seq_cycle_count; + return; + } + + if (seq <= _last_seq_out && _last_seq_out != SEQ_MAX) { + // 这个回退包已经不再等待 + setBufferSize(seq); return; } - //放入排序缓存 _pkt_sort_cache_map.emplace(seq, std::move(packet)); - //尝试输出排序后的包 - tryPopPacket(); + auto max_seq = _pkt_sort_cache_map.rbegin()->first; + auto min_seq = _pkt_sort_cache_map.begin()->first; + auto diff = max_seq - min_seq; + if (diff > (SEQ_MAX >> 1)) { + // 回环后,收到回环前的大值seq, 忽略掉 + _pkt_sort_cache_map.erase(max_seq); + return; + } + + if (min_seq == static_cast(_last_seq_out + 1) && _pkt_sort_cache_map.size() == (size_t)diff + 1) { + // 都是连续的seq, 未丢包 + flush(); + } else { + // seq不连续,有丢包 + if (_pkt_sort_cache_map.size() >= _max_sort_size) { + //buffer太长,强行减小 + popIterator(_pkt_sort_cache_map.begin()); + } + } } void flush(){ @@ -89,74 +118,28 @@ public: } private: - void popPacket() { - auto it = _pkt_sort_cache_map.begin(); - if (it->first >= _next_seq_out) { - //过滤回跳包 - popIterator(it); - return; - } - - if (_next_seq_out - it->first > (0xFFFF >> 1)) { - //产生回环了 - if (_pkt_sort_cache_map.size() < 2 * kMin) { - //等足够多的数据后才处理回环, 因为后面还可能出现大的SEQ - return; - } - ++_seq_cycle_count; - //找到大的SEQ并清空掉,然后从小的SEQ重新开始排序 - auto hit = _pkt_sort_cache_map.upper_bound((SEQ) (_next_seq_out - _pkt_sort_cache_map.size())); - while (hit != _pkt_sort_cache_map.end()) { - //回环前,清空剩余的大的SEQ的数据 - _cb(hit->first, hit->second); - hit = _pkt_sort_cache_map.erase(hit); - } - //下一个回环的数据 - popIterator(_pkt_sort_cache_map.begin()); - return; - } - //删除回跳的数据包 - _pkt_sort_cache_map.erase(it); - } - void popIterator(typename std::map::iterator it) { auto seq = it->first; auto data = std::move(it->second); _pkt_sort_cache_map.erase(it); - _next_seq_out = seq + 1; - _cb(seq, data); + output(seq, std::move(data)); } - void tryPopPacket() { - int count = 0; - while ((!_pkt_sort_cache_map.empty() && _pkt_sort_cache_map.begin()->first == _next_seq_out)) { - //找到下个包,直接输出 - popPacket(); - ++count; - } - - if (count) { - setSortSize(); - } else if (_pkt_sort_cache_map.size() > _max_sort_size) { - //排序缓存溢出,不再继续排序 - popPacket(); - setSortSize(); - } + void output(SEQ seq, T packet) { + _last_seq_out = seq; + _cb(seq, std::move(packet)); } - void setSortSize() { - _max_sort_size = kMin + _pkt_sort_cache_map.size(); - if (_max_sort_size > kMax) { - _max_sort_size = kMax; - } + void setBufferSize(SEQ seq) { + auto next_seq = static_cast(_last_seq_out + 1); + auto min_seq = _pkt_sort_cache_map.empty() ? next_seq : _pkt_sort_cache_map.begin()->first; + _max_sort_size = MAX(std::min(_pkt_sort_cache_map.size() + min_seq - seq, kMax), kMin); } private: - //第一个包是已经进入 - bool _is_inited = false; - + bool _started = false; //下次应该输出的SEQ - SEQ _next_seq_out = 0; + SEQ _last_seq_out = 0; //seq回环次数计数 size_t _seq_cycle_count = 0; //排序缓存长度 @@ -164,7 +147,7 @@ private: //pkt排序缓存,根据seq排序 std::map _pkt_sort_cache_map; //回调 - std::function _cb; + std::function _cb; }; class RtpTrack : private PacketSortor { diff --git a/tests/test_sortor.cpp b/tests/test_sortor.cpp index be153b9c..921f248c 100644 --- a/tests/test_sortor.cpp +++ b/tests/test_sortor.cpp @@ -102,7 +102,7 @@ void test_real() { PacketSortor sortor; list sorted_list; - sortor.setOnSort([&](uint16_t seq, const uint16_t &packet) { + sortor.setOnSort([&](uint16_t seq, uint16_t packet) { sorted_list.push_back(seq); });