完善nack与丢包重传

This commit is contained in:
xia-chu 2021-05-11 11:18:55 +08:00
parent cbafdbabc6
commit 5c90a1e137
6 changed files with 155 additions and 26 deletions

View File

@ -183,7 +183,7 @@ vector<bool> FCI_NACK::getBitArray() const {
vector<bool> ret;
ret.resize(kBitSize + 1);
//nack第一个包丢包
ret[0] = false;
ret[0] = true;
auto blp_h = getBlp();
for (size_t i = 0; i < kBitSize; ++i) {

View File

@ -243,18 +243,17 @@ private:
class FCI_NACK {
public:
static constexpr size_t kSize = 4;
static constexpr size_t kBitSize = 16;
FCI_NACK(uint16_t pid_h, const vector<bool> &type);
void check(size_t size);
uint16_t getPid() const;
uint16_t getBlp() const;
//返回丢包列表总长度17第一个包必丢
vector<bool> getBitArray() const;
string dumpString() const;
private:
static constexpr size_t kBitSize = 16;
private:
// The PID field is used to specify a lost packet. The PID field
// refers to the RTP sequence number of the lost packet.

View File

@ -21,7 +21,7 @@ using namespace toolkit;
namespace mediakit {
template<typename T, typename SEQ = uint16_t, size_t kMax = 256, size_t kMin = 10>
template<typename T, typename SEQ = uint16_t, size_t kMax = 1024, size_t kMin = 32>
class PacketSortor {
public:
PacketSortor() = default;

37
tests/test_rtcp_nack.cpp Normal file
View File

@ -0,0 +1,37 @@
/*
* Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <iostream>
#include "Util/logger.h"
#include "Rtcp/RtcpFCI.h"
#include "../webrtc/WebRtcTransport.h"
using namespace std;
using namespace toolkit;
using namespace mediakit;
extern void testFCI();
int main() {
Logger::Instance().add(std::make_shared<ConsoleChannel>());
srand((unsigned) time(NULL));
NackContext ctx;
for (int i = 1; i < 1000; ++i) {
if (i % (1 + (rand() % 30)) == 0) {
DebugL << "drop:" << i;
} else {
ctx.received(i);
}
}
sleep(1);
return 0;
}

View File

@ -424,9 +424,9 @@ void WebRtcTransportImp::onStartWebRTC() {
}
ref.rtcp_context_recv = std::make_shared<RtcpContext>(ref.plan->sample_rate, true);
ref.rtcp_context_send = std::make_shared<RtcpContext>(ref.plan->sample_rate, false);
ref.receiver = std::make_shared<RtpReceiverImp>([&ref, this](RtpPacket::Ptr rtp) {
ref.receiver = std::make_shared<RtpReceiverImp>([&ref, this](RtpPacket::Ptr rtp) mutable{
onSortedRtp(ref, std::move(rtp));
}, [&ref, this](const RtpPacket::Ptr &rtp) {
}, [&ref, this](const RtpPacket::Ptr &rtp) mutable {
onBeforeSortedRtp(ref, rtp);
});
}
@ -674,15 +674,28 @@ void WebRtcTransportImp::onRtp(const char *buf, size_t len) {
return;
}
auto &info = it->second;
#if 1
//此处模拟接受丢包
auto header = (RtpHeader *) buf;
auto seq = ntohs(header->seq);
if (seq % 10 == 0) {
//丢包
return;
} else {
info.nack_ctx.received(seq);
}
#endif
//解析并排序rtp
info.receiver->inputRtp(info.media->type, info.plan->sample_rate, (uint8_t *) buf, len);
}
///////////////////////////////////////////////////////////////////
void WebRtcTransportImp::onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr rtp) {
if(!info.is_common_rtp){
//todo rtx/red/ulpfec类型的rtp先未处理
void WebRtcTransportImp::onSortedRtp(RtpPayloadInfo &info, RtpPacket::Ptr rtp) {
if (!info.is_common_rtp) {
WarnL;
return;
}
if (info.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) {
@ -724,7 +737,7 @@ static void changeRtpExtId(const RtpHeader *header, const Type &map) {
}
}
void WebRtcTransportImp::onBeforeSortedRtp(const RtpPayloadInfo &info, const RtpPacket::Ptr &rtp) {
void WebRtcTransportImp::onBeforeSortedRtp(RtpPayloadInfo &info, const RtpPacket::Ptr &rtp) {
changeRtpExtId(rtp->getHeader(), _rtp_ext_id_to_type);
//统计rtp收到的情况好做rr汇报
info.rtcp_context_recv->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
@ -740,6 +753,12 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool r
//统计rtp发送情况好做sr汇报
info->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
info->nack_list.push_back(rtp);
#if 0
//此处模拟发送丢包
if(rtp->getSeq() % 10 == 0){
return;
}
#endif
} else {
WarnL << "重传rtp:" << rtp->getSeq();
}

View File

@ -131,8 +131,8 @@ class NackList {
public:
void push_back(RtpPacket::Ptr rtp) {
auto seq = rtp->getSeq();
nack_cache_seq.emplace_back(seq);
nack_cache_pkt.emplace(seq, std::move(rtp));
_nack_cache_seq.emplace_back(seq);
_nack_cache_pkt.emplace(seq, std::move(rtp));
while (get_cache_ms() > kMaxNackMS) {
//需要清除部分nack缓存
pop_front();
@ -143,7 +143,7 @@ public:
void for_each_nack(const FCI_NACK &nack, const FUNC &func) {
auto seq = nack.getPid();
for (auto bit : nack.getBitArray()) {
if (!bit) {
if (bit) {
//丢包
RtpPacket::Ptr *ptr = get_rtp(seq);
if (ptr) {
@ -156,27 +156,27 @@ public:
private:
void pop_front() {
if (nack_cache_seq.empty()) {
if (_nack_cache_seq.empty()) {
return;
}
nack_cache_pkt.erase(nack_cache_seq.front());
nack_cache_seq.pop_front();
_nack_cache_pkt.erase(_nack_cache_seq.front());
_nack_cache_seq.pop_front();
}
RtpPacket::Ptr *get_rtp(uint16_t seq) {
auto it = nack_cache_pkt.find(seq);
if (it == nack_cache_pkt.end()) {
auto it = _nack_cache_pkt.find(seq);
if (it == _nack_cache_pkt.end()) {
return nullptr;
}
return &it->second;
}
uint32_t get_cache_ms() {
if (nack_cache_seq.size() < 2) {
if (_nack_cache_seq.size() < 2) {
return 0;
}
uint32_t back = nack_cache_pkt[nack_cache_seq.back()]->getStampMS();
uint32_t front = nack_cache_pkt[nack_cache_seq.front()]->getStampMS();
uint32_t back = _nack_cache_pkt[_nack_cache_seq.back()]->getStampMS();
uint32_t front = _nack_cache_pkt[_nack_cache_seq.front()]->getStampMS();
if (back > front) {
return back - front;
}
@ -186,8 +186,81 @@ private:
private:
static constexpr uint32_t kMaxNackMS = 10 * 1000;
deque<uint16_t> nack_cache_seq;
unordered_map<uint16_t, RtpPacket::Ptr > nack_cache_pkt;
deque<uint16_t> _nack_cache_seq;
unordered_map<uint16_t, RtpPacket::Ptr > _nack_cache_pkt;
};
class NackContext {
public:
void received(uint16_t seq) {
if (!_last_max_seq && _seq.empty()) {
_last_max_seq = seq - 1;
}
_seq.emplace(seq);
auto max_seq = *_seq.rbegin();
auto min_seq = *_seq.begin();
auto diff = max_seq - min_seq;
if (!diff) {
return;
}
if (diff > UINT32_MAX / 2) {
//回环
_seq.clear();
_last_max_seq = min_seq;
return;
}
if (_seq.size() == diff + 1 && _last_max_seq + 1 == min_seq) {
//都是连续的seq未丢包
_seq.clear();
_last_max_seq = max_seq;
} else {
//seq不连续有丢包
if (min_seq == _last_max_seq + 1) {
//前面部分seq是连续的未丢包移除之
eraseFrontSeq();
}
//有丢包丢包从_last_max_seq开始
if (max_seq - _last_max_seq > FCI_NACK::kBitSize) {
vector<bool> vec;
vec.resize(FCI_NACK::kBitSize);
for (auto i = 0; i < FCI_NACK::kBitSize; ++i) {
vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end();
}
onNack(FCI_NACK(_last_max_seq + 1, vec));
_last_max_seq += FCI_NACK::kBitSize + 1;
if (_last_max_seq >= max_seq) {
_seq.clear();
} else {
auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq);
_seq.erase(_seq.begin(), it);
}
}
}
}
void onNack(const FCI_NACK &nack) {
InfoL << nack.dumpString() << " " << _seq.size();
}
private:
void eraseFrontSeq(){
//前面部分seq是连续的未丢包移除之
for (auto it = _seq.begin(); it != _seq.end();) {
if (*it != _last_max_seq + 1) {
//seq不连续丢包了
break;
}
_last_max_seq = *it;
it = _seq.erase(it);
}
}
private:
set<uint16_t> _seq;
uint16_t _last_max_seq = 0;
};
class WebRtcTransportImp : public WebRtcTransport, public MediaSourceEvent, public SockInfo, public std::enable_shared_from_this<WebRtcTransportImp>{
@ -265,10 +338,11 @@ private:
RtcpContext::Ptr rtcp_context_recv;
RtcpContext::Ptr rtcp_context_send;
NackList nack_list;
NackContext nack_ctx;
};
void onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr rtp);
void onBeforeSortedRtp(const RtpPayloadInfo &info, const RtpPacket::Ptr &rtp);
void onSortedRtp(RtpPayloadInfo &info, RtpPacket::Ptr rtp);
void onBeforeSortedRtp(RtpPayloadInfo &info, const RtpPacket::Ptr &rtp);
private:
//用掉的总流量