重写rtp排序算法:#510

This commit is contained in:
xiongziliang 2020-10-01 21:33:07 +08:00
parent 6a8268f323
commit 4d39176877
3 changed files with 118 additions and 85 deletions

View File

@ -20,12 +20,9 @@
namespace mediakit { namespace mediakit {
RtpReceiver::RtpReceiver() { RtpReceiver::RtpReceiver() {
GET_CONFIG(uint32_t, clearCount, Rtp::kClearCount);
GET_CONFIG(uint32_t, maxRtpCount, Rtp::kMaxRtpCount);
int index = 0; int index = 0;
for (auto &sortor : _rtp_sortor) { for (auto &sortor : _rtp_sortor) {
sortor.setup(maxRtpCount, clearCount); sortor.setOnSort([this, index](uint16_t seq, RtpPacket::Ptr &packet) {
sortor.setOnSort([this, index](uint16_t seq, const RtpPacket::Ptr &packet) {
onRtpSorted(packet, index); onRtpSorted(packet, index);
}); });
++index; ++index;
@ -34,7 +31,7 @@ RtpReceiver::RtpReceiver() {
RtpReceiver::~RtpReceiver() {} RtpReceiver::~RtpReceiver() {}
bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) { bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) {
if(rtp_raw_len < 12){ if (rtp_raw_len < 12) {
WarnL << "rtp包太小:" << rtp_raw_len; WarnL << "rtp包太小:" << rtp_raw_len;
return false; return false;
} }
@ -65,7 +62,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
memcpy(&rtp.timeStamp, rtp_raw_ptr + 4, 4); memcpy(&rtp.timeStamp, rtp_raw_ptr + 4, 4);
rtp.timeStamp = ntohl(rtp.timeStamp); rtp.timeStamp = ntohl(rtp.timeStamp);
if(!samplerate){ if (!samplerate) {
//无法把时间戳转换成毫秒 //无法把时间戳转换成毫秒
return false; return false;
} }
@ -80,7 +77,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
if (_ssrc[track_index] == 0) { if (_ssrc[track_index] == 0) {
//保存SSRC至track对象 //保存SSRC至track对象
_ssrc[track_index] = rtp.ssrc; _ssrc[track_index] = rtp.ssrc;
}else{ } else {
//ssrc错误 //ssrc错误
WarnL << "ssrc错误:" << rtp.ssrc << " != " << _ssrc[track_index]; WarnL << "ssrc错误:" << rtp.ssrc << " != " << _ssrc[track_index];
if (_ssrc_err_count[track_index]++ > 10) { if (_ssrc_err_count[track_index]++ > 10) {
@ -97,22 +94,22 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
_ssrc_err_count[track_index] = 0; _ssrc_err_count[track_index] = 0;
//获取rtp中媒体数据偏移量 //获取rtp中媒体数据偏移量
rtp.offset = 12 + 4; rtp.offset = 12 + 4;
int csrc = rtp_raw_ptr[0] & 0x0f; int csrc = rtp_raw_ptr[0] & 0x0f;
int ext = rtp_raw_ptr[0] & 0x10; int ext = rtp_raw_ptr[0] & 0x10;
rtp.offset += 4 * csrc; rtp.offset += 4 * csrc;
if (ext && rtp_raw_len >= rtp.offset) { if (ext && rtp_raw_len >= rtp.offset) {
/* calculate the header extension length (stored as number of 32-bit words) */ /* calculate the header extension length (stored as number of 32-bit words) */
ext = (AV_RB16(rtp_raw_ptr + rtp.offset - 2) + 1) << 2; ext = (AV_RB16(rtp_raw_ptr + rtp.offset - 2) + 1) << 2;
rtp.offset += ext; rtp.offset += ext;
} }
if(rtp_raw_len + 4 <= rtp.offset){ if (rtp_raw_len + 4 <= rtp.offset) {
WarnL << "无有效负载的rtp包:" << rtp_raw_len << " <= " << (int)rtp.offset; WarnL << "无有效负载的rtp包:" << rtp_raw_len << " <= " << (int) rtp.offset;
return false; return false;
} }
if(rtp_raw_len > RTP_MAX_SIZE){ if (rtp_raw_len > RTP_MAX_SIZE) {
WarnL << "超大的rtp包:" << rtp_raw_len << " > " << RTP_MAX_SIZE; WarnL << "超大的rtp包:" << rtp_raw_len << " > " << RTP_MAX_SIZE;
return false; return false;
} }
@ -120,7 +117,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
//设置rtp负载长度 //设置rtp负载长度
rtp.setCapacity(rtp_raw_len + 4); rtp.setCapacity(rtp_raw_len + 4);
rtp.setSize(rtp_raw_len + 4); rtp.setSize(rtp_raw_len + 4);
uint8_t *payload_ptr = (uint8_t *)rtp.data(); uint8_t *payload_ptr = (uint8_t *) rtp.data();
payload_ptr[0] = '$'; payload_ptr[0] = '$';
payload_ptr[1] = rtp.interleaved; payload_ptr[1] = rtp.interleaved;
payload_ptr[2] = rtp_raw_len >> 8; payload_ptr[2] = rtp_raw_len >> 8;
@ -128,7 +125,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
//拷贝rtp负载 //拷贝rtp负载
memcpy(payload_ptr + 4, rtp_raw_ptr, rtp_raw_len); memcpy(payload_ptr + 4, rtp_raw_ptr, rtp_raw_len);
//排序rtp //排序rtp
sortRtp(rtp_ptr,track_index); sortRtp(std::move(rtp_ptr), track_index);
return true; return true;
} }

View File

@ -21,23 +21,13 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
template<typename T, typename SEQ = uint16_t> template<typename T, typename SEQ = uint16_t, uint32_t kMax = 256, uint32_t kMin = 10>
class PacketSortor { class PacketSortor {
public: public:
PacketSortor() = default; PacketSortor() = default;
~PacketSortor() = default; ~PacketSortor() = default;
/** void setOnSort(function<void(SEQ seq, T &packet)> cb) {
*
* @param max_sort_size
* @param clear_sort_size seq连续次数超过该值后
*/
void setup(uint32_t max_sort_size, uint32_t clear_sort_size) {
_max_sort_size = max_sort_size;
_clear_sort_size = clear_sort_size;
}
void setOnSort(function<void(SEQ seq, const T &packet)> cb){
_cb = std::move(cb); _cb = std::move(cb);
} }
@ -45,24 +35,23 @@ public:
* *
*/ */
void clear() { void clear() {
_last_seq = 0;
_seq_ok_count = 0;
_sort_started = 0;
_seq_cycle_count = 0; _seq_cycle_count = 0;
_rtp_sort_cache_map.clear(); _rtp_sort_cache_map.clear();
_next_seq_out = 0;
_max_sort_size = kMin;
} }
/** /**
* *
*/ */
int getJitterSize(){ int getJitterSize() {
return _rtp_sort_cache_map.size(); return _rtp_sort_cache_map.size();
} }
/** /**
* seq回环次数 * seq回环次数
*/ */
int getCycleCount(){ int getCycleCount() {
return _seq_cycle_count; return _seq_cycle_count;
} }
@ -71,73 +60,62 @@ public:
* @param seq * @param seq
* @param packet * @param packet
*/ */
void sortPacket(SEQ seq, const T &packet){ void sortPacket(SEQ seq, T packet) {
if (seq != _last_seq + 1 && _last_seq != 0) { if (seq < _next_seq_out && _next_seq_out - seq > kMax) {
//包乱序或丢包 //回环
_seq_ok_count = 0; ++_seq_cycle_count;
_sort_started = true;
if (_last_seq > seq && _last_seq - seq > 0xFF) {
//sequence回环清空所有排序缓存
while (_rtp_sort_cache_map.size()) {
popPacket();
}
++_seq_cycle_count;
}
} else {
//正确序列的包
_seq_ok_count++;
}
_last_seq = seq;
//开始排序缓存
if (_sort_started) {
_rtp_sort_cache_map.emplace(seq, packet);
if (_seq_ok_count >= _clear_sort_size) {
//网络环境改善,需要清空排序缓存
_seq_ok_count = 0;
_sort_started = false;
while (_rtp_sort_cache_map.size()) {
popPacket();
}
} else if (_rtp_sort_cache_map.size() >= _max_sort_size) {
//排序缓存溢出
popPacket();
}
} else {
//正确序列
onPacketSorted(seq, packet);
} }
//放入排序缓存
_rtp_sort_cache_map.emplace(seq, std::move(packet));
//尝试输出排序后的包
tryPopPacket();
} }
private: private:
void popPacket() { void popPacket() {
auto it = _rtp_sort_cache_map.begin(); auto it = _rtp_sort_cache_map.begin();
onPacketSorted(it->first, it->second); _cb(it->first, it->second);
_next_seq_out = it->first + 1;
_rtp_sort_cache_map.erase(it); _rtp_sort_cache_map.erase(it);
} }
void onPacketSorted(SEQ seq, const T &packet) { void tryPopPacket() {
_cb(seq, packet); bool flag = false;
while ((!_rtp_sort_cache_map.empty() && _rtp_sort_cache_map.begin()->first == _next_seq_out)) {
//找到下个包,直接输出
popPacket();
flag = true;
}
if (flag) {
setSortSize();
} else if (_rtp_sort_cache_map.size() > _max_sort_size) {
//排序缓存溢出,不再继续排序
popPacket();
setSortSize();
}
}
void setSortSize() {
_max_sort_size = 2 * _rtp_sort_cache_map.size();
if (_max_sort_size > kMax) {
_max_sort_size = kMax;
} else if (_max_sort_size < kMin) {
_max_sort_size = kMin;
}
} }
private: private:
//是否开始seq排序 //下次应该输出的SEQ
bool _sort_started = false; SEQ _next_seq_out = 0;
//上次seq
SEQ _last_seq = 0;
//seq连续次数计数
uint32_t _seq_ok_count = 0;
//seq回环次数计数 //seq回环次数计数
uint32_t _seq_cycle_count = 0; uint32_t _seq_cycle_count = 0;
//排序缓存长度 //排序缓存长度
uint32_t _max_sort_size; uint32_t _max_sort_size = kMin;
//seq连续次数超过该值后清空并关闭排序缓存
uint32_t _clear_sort_size;
//rtp排序缓存根据seq排序 //rtp排序缓存根据seq排序
map<SEQ, T> _rtp_sort_cache_map; map<SEQ, T> _rtp_sort_cache_map;
//回调 //回调
function<void(SEQ seq, const T &packet)> _cb; function<void(SEQ seq, T &packet)> _cb;
}; };
class RtpReceiver { class RtpReceiver {
@ -162,7 +140,7 @@ protected:
* @param rtp rtp数据包 * @param rtp rtp数据包
* @param track_index track索引 * @param track_index track索引
*/ */
virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index){} virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) {}
void clear(); void clear();
void setPoolSize(int size); void setPoolSize(int size);
@ -173,9 +151,9 @@ private:
void sortRtp(const RtpPacket::Ptr &rtp , int track_index); void sortRtp(const RtpPacket::Ptr &rtp , int track_index);
private: private:
uint32_t _ssrc[2] = { 0, 0 }; uint32_t _ssrc[2] = {0, 0};
//ssrc不匹配计数 //ssrc不匹配计数
uint32_t _ssrc_err_count[2] = { 0, 0 }; uint32_t _ssrc_err_count[2] = {0, 0};
//rtp排序缓存根据seq排序 //rtp排序缓存根据seq排序
PacketSortor<RtpPacket::Ptr> _rtp_sortor[2]; PacketSortor<RtpPacket::Ptr> _rtp_sortor[2];
//rtp循环池 //rtp循环池

58
tests/test_sortor.cpp Normal file
View File

@ -0,0 +1,58 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <iostream>
#include <list>
#include "Rtsp/RtpReceiver.h"
using namespace mediakit;
//该测试程序由于检验rtp排序算法的正确性
int main(int argc, char *argv[]) {
srand((unsigned) time(NULL));
PacketSortor<uint16_t, uint16_t> sortor;
list<uint16_t> input_list, sorted_list;
sortor.setOnSort([&](uint16_t seq, const uint16_t &packet) {
sorted_list.push_back(seq);
});
for (int i = 0; i < 1000;) {
int count = 1 + rand() % 8;
for (int j = i + count; j >= i; --j) {
auto seq = j;
sortor.sortPacket(seq, seq);
input_list.push_back(seq);
}
i += (count + 1);
}
{
cout << "排序前:" << endl;
int i = 0;
for (auto &item : input_list) {
cout << item << " ";
if (++i % 10 == 0) {
cout << endl;
}
}
cout << endl;
}
{
cout << "排序后:" << endl;
int i = 0;
for (auto &item : sorted_list) {
cout << item << " ";
if (++i % 10 == 0) {
cout << endl;
}
}
cout << endl;
}
return 0;
}