提炼rtp排序算法

This commit is contained in:
xiongziliang 2020-10-01 19:02:14 +08:00
parent eee65a51ce
commit bc36e000b3
2 changed files with 141 additions and 66 deletions

View File

@ -11,11 +11,6 @@
#include "Common/config.h" #include "Common/config.h"
#include "RtpReceiver.h" #include "RtpReceiver.h"
#define POP_HEAD(trackidx) \
auto it = _rtp_sort_cache_map[trackidx].begin(); \
onRtpSorted(it->second, trackidx); \
_rtp_sort_cache_map[trackidx].erase(it);
#define AV_RB16(x) \ #define AV_RB16(x) \
((((const uint8_t*)(x))[0] << 8) | \ ((((const uint8_t*)(x))[0] << 8) | \
((const uint8_t*)(x))[1]) ((const uint8_t*)(x))[1])
@ -24,7 +19,18 @@
namespace mediakit { namespace mediakit {
RtpReceiver::RtpReceiver() {} RtpReceiver::RtpReceiver() {
GET_CONFIG(uint32_t, clearCount, Rtp::kClearCount);
GET_CONFIG(uint32_t, maxRtpCount, Rtp::kMaxRtpCount);
int index = 0;
for (auto &sortor : _rtp_sortor) {
sortor.setup(maxRtpCount, clearCount);
sortor.setOnSort([this, index](uint16_t seq, const RtpPacket::Ptr &packet) {
onRtpSorted(packet, index);
});
++index;
}
}
RtpReceiver::~RtpReceiver() {} RtpReceiver::~RtpReceiver() {}
bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) { bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) {
@ -80,7 +86,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
if (_ssrc_err_count[track_index]++ > 10) { if (_ssrc_err_count[track_index]++ > 10) {
//ssrc切换后清除老数据 //ssrc切换后清除老数据
WarnL << "ssrc更换:" << _ssrc[track_index] << " -> " << rtp.ssrc; WarnL << "ssrc更换:" << _ssrc[track_index] << " -> " << rtp.ssrc;
_rtp_sort_cache_map[track_index].clear(); _rtp_sortor[track_index].clear();
_ssrc[track_index] = rtp.ssrc; _ssrc[track_index] = rtp.ssrc;
} }
return false; return false;
@ -127,56 +133,15 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
} }
void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){ void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){
if(rtp->sequence != _last_seq[track_index] + 1 && _last_seq[track_index] != 0){ _rtp_sortor[track_index].sortPacket(rtp->sequence, rtp);
//包乱序或丢包
_seq_ok_count[track_index] = 0;
_sort_started[track_index] = true;
if(_last_seq[track_index] > rtp->sequence && _last_seq[track_index] - rtp->sequence > 0xFF){
//sequence回环清空所有排序缓存
while (_rtp_sort_cache_map[track_index].size()) {
POP_HEAD(track_index)
}
++_seq_cycle_count[track_index];
}
}else{
//正确序列的包
_seq_ok_count[track_index]++;
}
_last_seq[track_index] = rtp->sequence;
//开始排序缓存
if (_sort_started[track_index]) {
_rtp_sort_cache_map[track_index].emplace(rtp->sequence, rtp);
GET_CONFIG(uint32_t,clearCount,Rtp::kClearCount);
GET_CONFIG(uint32_t,maxRtpCount,Rtp::kMaxRtpCount);
if (_seq_ok_count[track_index] >= clearCount) {
//网络环境改善,需要清空排序缓存
_seq_ok_count[track_index] = 0;
_sort_started[track_index] = false;
while (_rtp_sort_cache_map[track_index].size()) {
POP_HEAD(track_index)
}
} else if (_rtp_sort_cache_map[track_index].size() >= maxRtpCount) {
//排序缓存溢出
POP_HEAD(track_index)
}
}else{
//正确序列
onRtpSorted(rtp, track_index);
}
} }
void RtpReceiver::clear() { void RtpReceiver::clear() {
CLEAR_ARR(_last_seq);
CLEAR_ARR(_ssrc); CLEAR_ARR(_ssrc);
CLEAR_ARR(_ssrc_err_count); CLEAR_ARR(_ssrc_err_count);
CLEAR_ARR(_seq_ok_count); for (auto &sortor : _rtp_sortor) {
CLEAR_ARR(_sort_started); sortor.clear();
CLEAR_ARR(_seq_cycle_count); }
_rtp_sort_cache_map[0].clear();
_rtp_sort_cache_map[1].clear();
} }
void RtpReceiver::setPoolSize(int size) { void RtpReceiver::setPoolSize(int size) {
@ -184,11 +149,11 @@ void RtpReceiver::setPoolSize(int size) {
} }
int RtpReceiver::getJitterSize(int track_index){ int RtpReceiver::getJitterSize(int track_index){
return _rtp_sort_cache_map[track_index].size(); return _rtp_sortor[track_index].getJitterSize();
} }
int RtpReceiver::getCycleCount(int track_index){ int RtpReceiver::getCycleCount(int track_index){
return _seq_cycle_count[track_index]; return _rtp_sortor[track_index].getCycleCount();
} }

View File

@ -11,24 +11,141 @@
#ifndef ZLMEDIAKIT_RTPRECEIVER_H #ifndef ZLMEDIAKIT_RTPRECEIVER_H
#define ZLMEDIAKIT_RTPRECEIVER_H #define ZLMEDIAKIT_RTPRECEIVER_H
#include <map> #include <map>
#include <string> #include <string>
#include <memory> #include <memory>
#include "RtpCodec.h" #include "RtpCodec.h"
#include "RtspMediaSource.h" #include "RtspMediaSource.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
template<typename T, typename SEQ = uint16_t>
class PacketSortor {
public:
PacketSortor() = default;
~PacketSortor() = default;
/**
*
* @param max_sort_size
* @param clear_sort_size seq连续次数超过该值后
*/
void setup(uint32_t max_sort_size, uint32_t clear_sort_size) {
_max_sort_size = max_sort_size;
_clear_sort_size = clear_sort_size;
}
void setOnSort(function<void(SEQ seq, const T &packet)> cb){
_cb = std::move(cb);
}
/**
*
*/
void clear() {
_last_seq = 0;
_seq_ok_count = 0;
_sort_started = 0;
_seq_cycle_count = 0;
_rtp_sort_cache_map.clear();
}
/**
*
*/
int getJitterSize(){
return _rtp_sort_cache_map.size();
}
/**
* seq回环次数
*/
int getCycleCount(){
return _seq_cycle_count;
}
/**
*
* @param seq
* @param packet
*/
void sortPacket(SEQ seq, const T &packet){
if (seq != _last_seq + 1 && _last_seq != 0) {
//包乱序或丢包
_seq_ok_count = 0;
_sort_started = true;
if (_last_seq > seq && _last_seq - seq > 0xFF) {
//sequence回环清空所有排序缓存
while (_rtp_sort_cache_map.size()) {
popPacket();
}
++_seq_cycle_count;
}
} else {
//正确序列的包
_seq_ok_count++;
}
_last_seq = seq;
//开始排序缓存
if (_sort_started) {
_rtp_sort_cache_map.emplace(seq, packet);
if (_seq_ok_count >= _clear_sort_size) {
//网络环境改善,需要清空排序缓存
_seq_ok_count = 0;
_sort_started = false;
while (_rtp_sort_cache_map.size()) {
popPacket();
}
} else if (_rtp_sort_cache_map.size() >= _max_sort_size) {
//排序缓存溢出
popPacket();
}
} else {
//正确序列
onPacketSorted(seq, packet);
}
}
private:
void popPacket() {
auto it = _rtp_sort_cache_map.begin();
onPacketSorted(it->first, it->second);
_rtp_sort_cache_map.erase(it);
}
void onPacketSorted(SEQ seq, const T &packet) {
_cb(seq, packet);
}
private:
//是否开始seq排序
bool _sort_started = false;
//上次seq
SEQ _last_seq = 0;
//seq连续次数计数
uint32_t _seq_ok_count = 0;
//seq回环次数计数
uint32_t _seq_cycle_count = 0;
//排序缓存长度
uint32_t _max_sort_size;
//seq连续次数超过该值后清空并关闭排序缓存
uint32_t _clear_sort_size;
//rtp排序缓存根据seq排序
map<SEQ, T> _rtp_sort_cache_map;
//回调
function<void(SEQ seq, const T &packet)> _cb;
};
class RtpReceiver { class RtpReceiver {
public: public:
RtpReceiver(); RtpReceiver();
virtual ~RtpReceiver(); virtual ~RtpReceiver();
protected:
protected:
/** /**
* rtp包 * rtp包
* @param track_index track下标索引 * @param track_index track下标索引
@ -46,6 +163,7 @@ protected:
* @param track_index track索引 * @param track_index track索引
*/ */
virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index){} virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index){}
void clear(); void clear();
void setPoolSize(int size); void setPoolSize(int size);
int getJitterSize(int track_index); int getJitterSize(int track_index);
@ -58,16 +176,8 @@ private:
uint32_t _ssrc[2] = { 0, 0 }; uint32_t _ssrc[2] = { 0, 0 };
//ssrc不匹配计数 //ssrc不匹配计数
uint32_t _ssrc_err_count[2] = { 0, 0 }; uint32_t _ssrc_err_count[2] = { 0, 0 };
//上次seq
uint16_t _last_seq[2] = { 0 , 0 };
//seq连续次数计数
uint32_t _seq_ok_count[2] = { 0 , 0};
//seq回环次数计数
uint32_t _seq_cycle_count[2] = { 0 , 0};
//是否开始seq排序
bool _sort_started[2] = { 0 , 0};
//rtp排序缓存根据seq排序 //rtp排序缓存根据seq排序
map<uint16_t , RtpPacket::Ptr> _rtp_sort_cache_map[2]; PacketSortor<RtpPacket::Ptr> _rtp_sortor[2];
//rtp循环池 //rtp循环池
RtspMediaSource::PoolType _rtp_pool; RtspMediaSource::PoolType _rtp_pool;
}; };