mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2024-11-22 19:00:01 +08:00
优化rtp排序抖动缓存算法,提高webrtc/rtsp抗丢包性能
This commit is contained in:
parent
4942a0f574
commit
9fdb3de8b2
@ -14,7 +14,7 @@
|
||||
namespace mediakit {
|
||||
|
||||
RtpTrack::RtpTrack() {
|
||||
setOnSort([this](uint16_t seq, RtpPacket::Ptr &packet) {
|
||||
setOnSort([this](uint16_t seq, RtpPacket::Ptr packet) {
|
||||
onRtpSorted(std::move(packet));
|
||||
});
|
||||
}
|
||||
|
@ -24,10 +24,11 @@ namespace mediakit {
|
||||
template<typename T, typename SEQ = uint16_t, size_t kMax = 1024, size_t kMin = 32>
|
||||
class PacketSortor {
|
||||
public:
|
||||
static constexpr SEQ SEQ_MAX = (std::numeric_limits<SEQ>::max)();
|
||||
PacketSortor() = default;
|
||||
~PacketSortor() = default;
|
||||
|
||||
void setOnSort(std::function<void(SEQ seq, T &packet)> cb) {
|
||||
void setOnSort(std::function<void(SEQ seq, T packet)> cb) {
|
||||
_cb = std::move(cb);
|
||||
}
|
||||
|
||||
@ -35,16 +36,16 @@ public:
|
||||
* 清空状态
|
||||
*/
|
||||
void clear() {
|
||||
_started = false;
|
||||
_seq_cycle_count = 0;
|
||||
_pkt_sort_cache_map.clear();
|
||||
_next_seq_out = 0;
|
||||
_max_sort_size = kMin;
|
||||
_pkt_sort_cache_map.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取排序缓存长度
|
||||
*/
|
||||
size_t getJitterSize() const{
|
||||
size_t getJitterSize() const {
|
||||
return _pkt_sort_cache_map.size();
|
||||
}
|
||||
|
||||
@ -61,24 +62,52 @@ public:
|
||||
* @param packet 包负载
|
||||
*/
|
||||
void sortPacket(SEQ seq, T packet) {
|
||||
if(!_is_inited && _next_seq_out == 0){
|
||||
_next_seq_out = seq;
|
||||
_is_inited = true;
|
||||
if (!_started) {
|
||||
// 记录第一个seq
|
||||
_started = true;
|
||||
_last_seq_out = seq - 1;
|
||||
}
|
||||
if (seq < _next_seq_out) {
|
||||
if (_next_seq_out < seq + kMax) {
|
||||
//过滤seq回退包(回环包除外)
|
||||
return;
|
||||
}
|
||||
} else if (_next_seq_out && seq - _next_seq_out > ((std::numeric_limits<SEQ>::max)() >> 1)) {
|
||||
//过滤seq跳变非常大的包(防止回环时乱序时收到非常大的seq)
|
||||
if (seq == static_cast<SEQ>(_last_seq_out + 1)) {
|
||||
// 收到下一个seq
|
||||
output(seq, std::move(packet));
|
||||
return;
|
||||
}
|
||||
|
||||
//放入排序缓存
|
||||
if (seq < _last_seq_out && _last_seq_out != SEQ_MAX && seq < kMax && _last_seq_out > SEQ_MAX - kMax) {
|
||||
// seq回环,清空回环前缓存
|
||||
flush();
|
||||
_last_seq_out = SEQ_MAX;
|
||||
_pkt_sort_cache_map.emplace(seq, std::move(packet));
|
||||
//尝试输出排序后的包
|
||||
tryPopPacket();
|
||||
++_seq_cycle_count;
|
||||
return;
|
||||
}
|
||||
|
||||
if (seq <= _last_seq_out && _last_seq_out != SEQ_MAX) {
|
||||
// 这个回退包已经不再等待
|
||||
setBufferSize(seq);
|
||||
return;
|
||||
}
|
||||
|
||||
_pkt_sort_cache_map.emplace(seq, std::move(packet));
|
||||
auto max_seq = _pkt_sort_cache_map.rbegin()->first;
|
||||
auto min_seq = _pkt_sort_cache_map.begin()->first;
|
||||
auto diff = max_seq - min_seq;
|
||||
if (diff > (SEQ_MAX >> 1)) {
|
||||
// 回环后,收到回环前的大值seq, 忽略掉
|
||||
_pkt_sort_cache_map.erase(max_seq);
|
||||
return;
|
||||
}
|
||||
|
||||
if (min_seq == static_cast<SEQ>(_last_seq_out + 1) && _pkt_sort_cache_map.size() == (size_t)diff + 1) {
|
||||
// 都是连续的seq, 未丢包
|
||||
flush();
|
||||
} else {
|
||||
// seq不连续,有丢包
|
||||
if (_pkt_sort_cache_map.size() >= _max_sort_size) {
|
||||
//buffer太长,强行减小
|
||||
popIterator(_pkt_sort_cache_map.begin());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void flush(){
|
||||
@ -89,74 +118,28 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
void popPacket() {
|
||||
auto it = _pkt_sort_cache_map.begin();
|
||||
if (it->first >= _next_seq_out) {
|
||||
//过滤回跳包
|
||||
popIterator(it);
|
||||
return;
|
||||
}
|
||||
|
||||
if (_next_seq_out - it->first > (0xFFFF >> 1)) {
|
||||
//产生回环了
|
||||
if (_pkt_sort_cache_map.size() < 2 * kMin) {
|
||||
//等足够多的数据后才处理回环, 因为后面还可能出现大的SEQ
|
||||
return;
|
||||
}
|
||||
++_seq_cycle_count;
|
||||
//找到大的SEQ并清空掉,然后从小的SEQ重新开始排序
|
||||
auto hit = _pkt_sort_cache_map.upper_bound((SEQ) (_next_seq_out - _pkt_sort_cache_map.size()));
|
||||
while (hit != _pkt_sort_cache_map.end()) {
|
||||
//回环前,清空剩余的大的SEQ的数据
|
||||
_cb(hit->first, hit->second);
|
||||
hit = _pkt_sort_cache_map.erase(hit);
|
||||
}
|
||||
//下一个回环的数据
|
||||
popIterator(_pkt_sort_cache_map.begin());
|
||||
return;
|
||||
}
|
||||
//删除回跳的数据包
|
||||
_pkt_sort_cache_map.erase(it);
|
||||
}
|
||||
|
||||
void popIterator(typename std::map<SEQ, T>::iterator it) {
|
||||
auto seq = it->first;
|
||||
auto data = std::move(it->second);
|
||||
_pkt_sort_cache_map.erase(it);
|
||||
_next_seq_out = seq + 1;
|
||||
_cb(seq, data);
|
||||
output(seq, std::move(data));
|
||||
}
|
||||
|
||||
void tryPopPacket() {
|
||||
int count = 0;
|
||||
while ((!_pkt_sort_cache_map.empty() && _pkt_sort_cache_map.begin()->first == _next_seq_out)) {
|
||||
//找到下个包,直接输出
|
||||
popPacket();
|
||||
++count;
|
||||
void output(SEQ seq, T packet) {
|
||||
_last_seq_out = seq;
|
||||
_cb(seq, std::move(packet));
|
||||
}
|
||||
|
||||
if (count) {
|
||||
setSortSize();
|
||||
} else if (_pkt_sort_cache_map.size() > _max_sort_size) {
|
||||
//排序缓存溢出,不再继续排序
|
||||
popPacket();
|
||||
setSortSize();
|
||||
}
|
||||
}
|
||||
|
||||
void setSortSize() {
|
||||
_max_sort_size = kMin + _pkt_sort_cache_map.size();
|
||||
if (_max_sort_size > kMax) {
|
||||
_max_sort_size = kMax;
|
||||
}
|
||||
void setBufferSize(SEQ seq) {
|
||||
auto next_seq = static_cast<SEQ>(_last_seq_out + 1);
|
||||
auto min_seq = _pkt_sort_cache_map.empty() ? next_seq : _pkt_sort_cache_map.begin()->first;
|
||||
_max_sort_size = MAX(std::min<SEQ>(_pkt_sort_cache_map.size() + min_seq - seq, kMax), kMin);
|
||||
}
|
||||
|
||||
private:
|
||||
//第一个包是已经进入
|
||||
bool _is_inited = false;
|
||||
|
||||
bool _started = false;
|
||||
//下次应该输出的SEQ
|
||||
SEQ _next_seq_out = 0;
|
||||
SEQ _last_seq_out = 0;
|
||||
//seq回环次数计数
|
||||
size_t _seq_cycle_count = 0;
|
||||
//排序缓存长度
|
||||
@ -164,7 +147,7 @@ private:
|
||||
//pkt排序缓存,根据seq排序
|
||||
std::map<SEQ, T> _pkt_sort_cache_map;
|
||||
//回调
|
||||
std::function<void(SEQ seq, T &packet)> _cb;
|
||||
std::function<void(SEQ seq, T packet)> _cb;
|
||||
};
|
||||
|
||||
class RtpTrack : private PacketSortor<RtpPacket::Ptr> {
|
||||
|
@ -102,7 +102,7 @@ void test_real() {
|
||||
|
||||
PacketSortor<uint16_t, uint16_t> sortor;
|
||||
list<uint16_t> sorted_list;
|
||||
sortor.setOnSort([&](uint16_t seq, const uint16_t &packet) {
|
||||
sortor.setOnSort([&](uint16_t seq, uint16_t packet) {
|
||||
sorted_list.push_back(seq);
|
||||
});
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user