From 4d3917687711e639c7ef5013f72b635f3c3862a3 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 1 Oct 2020 21:33:07 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E5=86=99rtp=E6=8E=92=E5=BA=8F?= =?UTF-8?q?=E7=AE=97=E6=B3=95:#510?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/RtpReceiver.cpp | 29 +++++----- src/Rtsp/RtpReceiver.h | 116 ++++++++++++++++----------------------- tests/test_sortor.cpp | 58 ++++++++++++++++++++ 3 files changed, 118 insertions(+), 85 deletions(-) create mode 100644 tests/test_sortor.cpp diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 6268c358..8aee75d0 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -20,12 +20,9 @@ namespace mediakit { RtpReceiver::RtpReceiver() { - GET_CONFIG(uint32_t, clearCount, Rtp::kClearCount); - GET_CONFIG(uint32_t, maxRtpCount, Rtp::kMaxRtpCount); int index = 0; for (auto &sortor : _rtp_sortor) { - sortor.setup(maxRtpCount, clearCount); - sortor.setOnSort([this, index](uint16_t seq, const RtpPacket::Ptr &packet) { + sortor.setOnSort([this, index](uint16_t seq, RtpPacket::Ptr &packet) { onRtpSorted(packet, index); }); ++index; @@ -34,7 +31,7 @@ RtpReceiver::RtpReceiver() { RtpReceiver::~RtpReceiver() {} bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) { - if(rtp_raw_len < 12){ + if (rtp_raw_len < 12) { WarnL << "rtp包太小:" << rtp_raw_len; return false; } @@ -65,7 +62,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, memcpy(&rtp.timeStamp, rtp_raw_ptr + 4, 4); rtp.timeStamp = ntohl(rtp.timeStamp); - if(!samplerate){ + if (!samplerate) { //无法把时间戳转换成毫秒 return false; } @@ -80,7 +77,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, if (_ssrc[track_index] == 0) { //保存SSRC至track对象 _ssrc[track_index] = rtp.ssrc; - }else{ + } else { //ssrc错误 WarnL << "ssrc错误:" << rtp.ssrc << " != " << _ssrc[track_index]; if (_ssrc_err_count[track_index]++ > 10) { @@ -97,22 +94,22 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, _ssrc_err_count[track_index] = 0; //获取rtp中媒体数据偏移量 - rtp.offset = 12 + 4; - int csrc = rtp_raw_ptr[0] & 0x0f; - int ext = rtp_raw_ptr[0] & 0x10; - rtp.offset += 4 * csrc; + rtp.offset = 12 + 4; + int csrc = rtp_raw_ptr[0] & 0x0f; + int ext = rtp_raw_ptr[0] & 0x10; + rtp.offset += 4 * csrc; if (ext && rtp_raw_len >= rtp.offset) { /* calculate the header extension length (stored as number of 32-bit words) */ ext = (AV_RB16(rtp_raw_ptr + rtp.offset - 2) + 1) << 2; rtp.offset += ext; } - if(rtp_raw_len + 4 <= rtp.offset){ - WarnL << "无有效负载的rtp包:" << rtp_raw_len << " <= " << (int)rtp.offset; + if (rtp_raw_len + 4 <= rtp.offset) { + WarnL << "无有效负载的rtp包:" << rtp_raw_len << " <= " << (int) rtp.offset; return false; } - if(rtp_raw_len > RTP_MAX_SIZE){ + if (rtp_raw_len > RTP_MAX_SIZE) { WarnL << "超大的rtp包:" << rtp_raw_len << " > " << RTP_MAX_SIZE; return false; } @@ -120,7 +117,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, //设置rtp负载长度 rtp.setCapacity(rtp_raw_len + 4); rtp.setSize(rtp_raw_len + 4); - uint8_t *payload_ptr = (uint8_t *)rtp.data(); + uint8_t *payload_ptr = (uint8_t *) rtp.data(); payload_ptr[0] = '$'; payload_ptr[1] = rtp.interleaved; payload_ptr[2] = rtp_raw_len >> 8; @@ -128,7 +125,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, //拷贝rtp负载 memcpy(payload_ptr + 4, rtp_raw_ptr, rtp_raw_len); //排序rtp - sortRtp(rtp_ptr,track_index); + sortRtp(std::move(rtp_ptr), track_index); return true; } diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index f4c7e94b..d06473fb 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -21,23 +21,13 @@ using namespace toolkit; namespace mediakit { -template +template class PacketSortor { public: PacketSortor() = default; ~PacketSortor() = default; - /** - * 设置参数 - * @param max_sort_size 最大排序缓存长度 - * @param clear_sort_size seq连续次数超过该值后,清空并关闭排序缓存 - */ - void setup(uint32_t max_sort_size, uint32_t clear_sort_size) { - _max_sort_size = max_sort_size; - _clear_sort_size = clear_sort_size; - } - - void setOnSort(function cb){ + void setOnSort(function cb) { _cb = std::move(cb); } @@ -45,24 +35,23 @@ public: * 清空状态 */ void clear() { - _last_seq = 0; - _seq_ok_count = 0; - _sort_started = 0; _seq_cycle_count = 0; _rtp_sort_cache_map.clear(); + _next_seq_out = 0; + _max_sort_size = kMin; } /** * 获取排序缓存长度 */ - int getJitterSize(){ + int getJitterSize() { return _rtp_sort_cache_map.size(); } /** * 获取seq回环次数 */ - int getCycleCount(){ + int getCycleCount() { return _seq_cycle_count; } @@ -71,73 +60,62 @@ public: * @param seq 序列号 * @param packet 包负载 */ - void sortPacket(SEQ seq, const T &packet){ - if (seq != _last_seq + 1 && _last_seq != 0) { - //包乱序或丢包 - _seq_ok_count = 0; - _sort_started = true; - if (_last_seq > seq && _last_seq - seq > 0xFF) { - //sequence回环,清空所有排序缓存 - while (_rtp_sort_cache_map.size()) { - popPacket(); - } - ++_seq_cycle_count; - } - } else { - //正确序列的包 - _seq_ok_count++; - } - - _last_seq = seq; - - //开始排序缓存 - if (_sort_started) { - _rtp_sort_cache_map.emplace(seq, packet); - if (_seq_ok_count >= _clear_sort_size) { - //网络环境改善,需要清空排序缓存 - _seq_ok_count = 0; - _sort_started = false; - while (_rtp_sort_cache_map.size()) { - popPacket(); - } - } else if (_rtp_sort_cache_map.size() >= _max_sort_size) { - //排序缓存溢出 - popPacket(); - } - } else { - //正确序列 - onPacketSorted(seq, packet); + void sortPacket(SEQ seq, T packet) { + if (seq < _next_seq_out && _next_seq_out - seq > kMax) { + //回环 + ++_seq_cycle_count; } + //放入排序缓存 + _rtp_sort_cache_map.emplace(seq, std::move(packet)); + //尝试输出排序后的包 + tryPopPacket(); } private: void popPacket() { auto it = _rtp_sort_cache_map.begin(); - onPacketSorted(it->first, it->second); + _cb(it->first, it->second); + _next_seq_out = it->first + 1; _rtp_sort_cache_map.erase(it); } - void onPacketSorted(SEQ seq, const T &packet) { - _cb(seq, packet); + void tryPopPacket() { + bool flag = false; + while ((!_rtp_sort_cache_map.empty() && _rtp_sort_cache_map.begin()->first == _next_seq_out)) { + //找到下个包,直接输出 + popPacket(); + flag = true; + } + + if (flag) { + setSortSize(); + } else if (_rtp_sort_cache_map.size() > _max_sort_size) { + //排序缓存溢出,不再继续排序 + popPacket(); + setSortSize(); + } + } + + void setSortSize() { + _max_sort_size = 2 * _rtp_sort_cache_map.size(); + if (_max_sort_size > kMax) { + _max_sort_size = kMax; + } else if (_max_sort_size < kMin) { + _max_sort_size = kMin; + } } private: - //是否开始seq排序 - bool _sort_started = false; - //上次seq - SEQ _last_seq = 0; - //seq连续次数计数 - uint32_t _seq_ok_count = 0; + //下次应该输出的SEQ + SEQ _next_seq_out = 0; //seq回环次数计数 uint32_t _seq_cycle_count = 0; //排序缓存长度 - uint32_t _max_sort_size; - //seq连续次数超过该值后,清空并关闭排序缓存 - uint32_t _clear_sort_size; + uint32_t _max_sort_size = kMin; //rtp排序缓存,根据seq排序 map _rtp_sort_cache_map; //回调 - function _cb; + function _cb; }; class RtpReceiver { @@ -162,7 +140,7 @@ protected: * @param rtp rtp数据包 * @param track_index track索引 */ - virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index){} + virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) {} void clear(); void setPoolSize(int size); @@ -173,9 +151,9 @@ private: void sortRtp(const RtpPacket::Ptr &rtp , int track_index); private: - uint32_t _ssrc[2] = { 0, 0 }; + uint32_t _ssrc[2] = {0, 0}; //ssrc不匹配计数 - uint32_t _ssrc_err_count[2] = { 0, 0 }; + uint32_t _ssrc_err_count[2] = {0, 0}; //rtp排序缓存,根据seq排序 PacketSortor _rtp_sortor[2]; //rtp循环池 diff --git a/tests/test_sortor.cpp b/tests/test_sortor.cpp new file mode 100644 index 00000000..21bb9980 --- /dev/null +++ b/tests/test_sortor.cpp @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include +#include "Rtsp/RtpReceiver.h" +using namespace mediakit; + +//该测试程序由于检验rtp排序算法的正确性 +int main(int argc, char *argv[]) { + srand((unsigned) time(NULL)); + PacketSortor sortor; + list input_list, sorted_list; + sortor.setOnSort([&](uint16_t seq, const uint16_t &packet) { + sorted_list.push_back(seq); + }); + + for (int i = 0; i < 1000;) { + int count = 1 + rand() % 8; + for (int j = i + count; j >= i; --j) { + auto seq = j; + sortor.sortPacket(seq, seq); + input_list.push_back(seq); + } + i += (count + 1); + } + + { + cout << "排序前:" << endl; + int i = 0; + for (auto &item : input_list) { + cout << item << " "; + if (++i % 10 == 0) { + cout << endl; + } + } + cout << endl; + } + { + cout << "排序后:" << endl; + int i = 0; + for (auto &item : sorted_list) { + cout << item << " "; + if (++i % 10 == 0) { + cout << endl; + } + } + cout << endl; + } + return 0; +} \ No newline at end of file