nack/rtx支持多次重传,提高抗丢包率

This commit is contained in:
ziyue 2021-07-27 20:37:43 +08:00
parent 3840ff2a3f
commit c59a7a04c3
7 changed files with 206 additions and 40 deletions

View File

@ -74,7 +74,6 @@ public:
*/
uint32_t getRtt(uint32_t ssrc) const;
private:
/**
*
*/

View File

@ -194,9 +194,13 @@ vector<bool> FCI_NACK::getBitArray() const {
string FCI_NACK::dumpString() const {
_StrPrinter printer;
printer << "pid:" << getPid() << ",blp:" << getBlp() << ",bit array:";
auto pid = getPid();
printer << "pid:" << pid << ",blp:" << getBlp() << ",dropped rtp seq:";
for (auto flag : getBitArray()) {
printer << flag << " ";
if (flag) {
printer << pid << " ";
}
++pid;
}
return std::move(printer);
}

View File

@ -67,10 +67,17 @@ uint32_t NackList::get_cache_ms() {
////////////////////////////////////////////////////////////////////////////////////////////////
void NackContext::received(uint16_t seq) {
void NackContext::received(uint16_t seq, bool is_rtx) {
if (!_last_max_seq && _seq.empty()) {
_last_max_seq = seq - 1;
}
if (is_rtx || (seq < _last_max_seq && !(seq < 1024 && _last_max_seq > UINT16_MAX - 1024))) {
//重传包或
//seq回退且非回环那么这个应该是重传包
onRtx(seq);
return;
}
_seq.emplace(seq);
auto max_seq = *_seq.rbegin();
auto min_seq = *_seq.begin();
@ -83,6 +90,7 @@ void NackContext::received(uint16_t seq) {
//回环
_seq.clear();
_last_max_seq = min_seq;
_nack_send_ntp.clear();
return;
}
@ -98,18 +106,19 @@ void NackContext::received(uint16_t seq) {
}
//有丢包丢包从_last_max_seq开始
if (max_seq - _last_max_seq > FCI_NACK::kBitSize) {
auto nack_rtp_count = FCI_NACK::kBitSize;
if (max_seq - _last_max_seq > nack_rtp_count) {
vector<bool> vec;
vec.resize(FCI_NACK::kBitSize);
for (auto i = 0; i < FCI_NACK::kBitSize; ++i) {
vec.resize(FCI_NACK::kBitSize, false);
for (auto i = 0; i < nack_rtp_count; ++i) {
vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end();
}
doNack(FCI_NACK(_last_max_seq + 1, vec));
_last_max_seq += FCI_NACK::kBitSize + 1;
doNack(FCI_NACK(_last_max_seq + 1, vec), true);
_last_max_seq += nack_rtp_count + 1;
if (_last_max_seq >= max_seq) {
_seq.clear();
} else {
auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq);
auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq + 1);
_seq.erase(_seq.begin(), it);
}
}
@ -120,7 +129,10 @@ void NackContext::setOnNack(onNack cb) {
_cb = std::move(cb);
}
void NackContext::doNack(const FCI_NACK &nack) {
void NackContext::doNack(const FCI_NACK &nack, bool record_nack) {
if (record_nack) {
recordNack(nack);
}
if (_cb) {
_cb(nack);
}
@ -136,4 +148,96 @@ void NackContext::eraseFrontSeq() {
_last_max_seq = *it;
it = _seq.erase(it);
}
}
}
void NackContext::onRtx(uint16_t seq) {
auto it = _nack_send_ntp.find(seq);
if (it == _nack_send_ntp.end()) {
return;
}
auto rtt = getCurrentMillisecond() - it->second.update_stamp;
_nack_send_ntp.erase(it);
if (rtt >= 0) {
//rtt不肯小于0
_rtt = rtt;
//InfoL << "rtt:" << rtt;
}
}
void NackContext::recordNack(const FCI_NACK &nack) {
auto now = getCurrentMillisecond();
auto i = nack.getPid();
for (auto flag : nack.getBitArray()) {
if (flag) {
auto &ref = _nack_send_ntp[i];
ref.first_stamp = now;
ref.update_stamp = now;
ref.nack_count = 1;
}
++i;
}
//记录太多了,移除一部分早期的记录
while (_nack_send_ntp.size() > kNackMaxSize) {
_nack_send_ntp.erase(_nack_send_ntp.begin());
}
}
uint64_t NackContext::reSendNack() {
set<uint16_t> nack_rtp;
auto now = getCurrentMillisecond();
for (auto it = _nack_send_ntp.begin(); it != _nack_send_ntp.end();) {
if (now - it->second.first_stamp > kNackMaxMS) {
//该rtp丢失太久了不再要求重传
it = _nack_send_ntp.erase(it);
continue;
}
if (now - it->second.update_stamp < 2 * _rtt) {
//距离上次nack不足2倍的rtt不用再发送nack
++it;
continue;
}
//此rtp需要请求重传
nack_rtp.emplace(it->first);
//更新nack发送时间戳
it->second.update_stamp = now;
if (++(it->second.nack_count) == kNackMaxCount) {
//nack次数太多移除之
it = _nack_send_ntp.erase(it);
continue;
}
++it;
}
if (_nack_send_ntp.empty()) {
//不需要再发送nack
return 0;
}
int pid = -1;
vector<bool> vec;
for (auto it = nack_rtp.begin(); it != nack_rtp.end();) {
if (pid == -1) {
pid = *it;
vec.resize(16, false);
++it;
continue;
}
auto inc = *it - pid;
if (inc > FCI_NACK::kBitSize) {
//新的nack包
doNack(FCI_NACK(pid, vec), false);
pid = -1;
continue;
}
//这个包丢了
vec[inc - 1] = true;
++it;
}
if (pid != -1) {
doNack(FCI_NACK(pid, vec), false);
}
//重传间隔不得低于5ms
return std::max(_rtt, 5);
}

View File

@ -36,22 +36,40 @@ private:
class NackContext {
public:
using Ptr = std::shared_ptr<NackContext>;
using onNack = function<void(const FCI_NACK &nack)>;
//最大保留的rtp丢包状态个数
static constexpr auto kNackMaxSize = 100;
//rtp丢包状态最长保留时间
static constexpr auto kNackMaxMS = 3 * 1000;
//nack最多请求重传10次
static constexpr auto kNackMaxCount = 10;
NackContext() = default;
~NackContext() = default;
void received(uint16_t seq);
void received(uint16_t seq, bool is_rtx = false);
void setOnNack(onNack cb);
uint64_t reSendNack();
private:
void eraseFrontSeq();
void doNack(const FCI_NACK &nack);
void doNack(const FCI_NACK &nack, bool record_nack);
void recordNack(const FCI_NACK &nack);
void onRtx(uint16_t seq);
private:
int _rtt = 50;
onNack _cb;
set<uint16_t> _seq;
uint16_t _last_max_seq = 0;
struct NackStatus{
uint64_t first_stamp;
uint64_t update_stamp;
int nack_count = 0;
};
map<uint16_t/*seq*/, NackStatus > _nack_send_ntp;
};
#endif //ZLMEDIAKIT_NACK_H

View File

@ -239,9 +239,8 @@ namespace RTC
if (DepLibSRTP::IsError(err))
{
MS_WARN_TAG(srtp, "srtp_protect() failed: %s", DepLibSRTP::GetErrorString(err));
return false;
WarnL << "srtp_protect() failed:" << DepLibSRTP::GetErrorString(err);
return false;
}
return true;
@ -256,8 +255,7 @@ namespace RTC
if (DepLibSRTP::IsError(err))
{
MS_DEBUG_TAG(srtp, "srtp_unprotect() failed: %s", DepLibSRTP::GetErrorString(err));
WarnL << "srtp_unprotect() failed:" << DepLibSRTP::GetErrorString(err);
return false;
}
@ -272,8 +270,7 @@ namespace RTC
if (DepLibSRTP::IsError(err))
{
MS_WARN_TAG(srtp, "srtp_protect_rtcp() failed: %s", DepLibSRTP::GetErrorString(err));
WarnL << "srtp_protect_rtcp() failed:" << DepLibSRTP::GetErrorString(err);
return false;
}
@ -289,8 +286,7 @@ namespace RTC
if (DepLibSRTP::IsError(err))
{
MS_DEBUG_TAG(srtp, "srtp_unprotect_rtcp() failed: %s", DepLibSRTP::GetErrorString(err));
WarnL << "srtp_unprotect_rtcp() failed:" << DepLibSRTP::GetErrorString(err);
return false;
}

View File

@ -97,7 +97,12 @@ void WebRtcTransport::OnDtlsTransportConnected(
std::string &remoteCert) {
InfoL;
_srtp_session_send = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen);
_srtp_session_recv = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen);
string srtpRemoteKey_str((char *) srtpRemoteKey, srtpRemoteKeyLen);
_srtp_session_recv_alloc = [srtpCryptoSuite, srtpRemoteKey_str]() {
return std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite,
(uint8_t *) srtpRemoteKey_str.data(), srtpRemoteKey_str.size());
};
onStartWebRTC();
}
@ -250,20 +255,20 @@ void WebRtcTransport::inputSockData(char *buf, size_t len, RTC::TransportTuple *
_dtls_transport->ProcessDtlsData((uint8_t *) buf, len);
return;
}
RtpHeader *rtp = (RtpHeader *) buf;
auto it = _srtp_session_recv.find(rtp->pt);
if (it == _srtp_session_recv.end()) {
it = _srtp_session_recv.emplace((uint8_t) rtp->pt, _srtp_session_recv_alloc()).first;
}
if (is_rtp(buf)) {
if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) {
if (it->second->DecryptSrtp((uint8_t *) buf, &len)) {
onRtp(buf, len);
} else {
RtpHeader *rtp = (RtpHeader *) buf;
WarnL << "srtp_unprotect rtp failed, pt:" << (int)rtp->pt;
}
return;
}
if (is_rtcp(buf)) {
if (_srtp_session_recv->DecryptSrtcp((uint8_t *) buf, &len)) {
if (it->second->DecryptSrtcp((uint8_t *) buf, &len)) {
onRtcp(buf, len);
} else {
WarnL;
}
return;
}
@ -585,21 +590,29 @@ SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{
///////////////////////////////////////////////////////////////////
class RtpChannel : public RtpTrackImp {
class RtpChannel : public RtpTrackImp, public std::enable_shared_from_this<RtpChannel> {
public:
RtpChannel(RtpTrackImp::OnSorted cb, function<void(const FCI_NACK &nack)> on_nack) {
RtpChannel(EventPoller::Ptr poller, RtpTrackImp::OnSorted cb, function<void(const FCI_NACK &nack)> on_nack) {
_poller = std::move(poller);
_on_nack = std::move(on_nack);
setOnSorted(std::move(cb));
_nack_ctx.setOnNack(std::move(on_nack));
_nack_ctx.setOnNack([this](const FCI_NACK &nack) {
onNack(nack);
});
}
~RtpChannel() override = default;
RtpPacket::Ptr inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len, bool is_rtx) {
auto rtp = RtpTrack::inputRtp(type, sample_rate, ptr, len);
if (!is_rtx && rtp) {
if (!rtp) {
return rtp;
}
auto seq = rtp->getSeq();
_nack_ctx.received(seq, is_rtx);
if (!is_rtx) {
//统计rtp接受情况便于生成nack rtcp包
auto seq = rtp->getSeq();
_nack_ctx.received(seq);
_rtcp_context.onRtp(seq, rtp->getStamp(), rtp->ntp_stamp, sample_rate, len);
}
return rtp;
@ -610,9 +623,40 @@ public:
return _rtcp_context.createRtcpRR(ssrc, getSSRC());
}
int getLossRate() {
return _rtcp_context.geLostInterval() * 100 / _rtcp_context.getExpectedPacketsInterval();
}
private:
void starNackTimer(){
if (_delay_task) {
return;
}
weak_ptr<RtpChannel> weak_self = shared_from_this();
_delay_task = _poller->doDelayTask(10, [weak_self]() -> uint64_t {
auto strong_self = weak_self.lock();
if (!strong_self) {
return 0;
}
auto ret = strong_self->_nack_ctx.reSendNack();
if (!ret) {
strong_self->_delay_task = nullptr;
}
return ret;
});
}
void onNack(const FCI_NACK &nack) {
_on_nack(nack);
starNackTimer();
}
private:
NackContext _nack_ctx;
RtcpContext _rtcp_context{true};
EventPoller::Ptr _poller;
DelayTask::Ptr _delay_task;
function<void(const FCI_NACK &nack)> _on_nack;
};
std::shared_ptr<RtpChannel> MediaTrack::getRtpChannel(uint32_t ssrc) const{
@ -638,6 +682,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
if(!rtp_chn){
WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
} else {
//InfoL << "接收丢包率,ssrc:" << sr->ssrc << ",loss rate(%):" << rtp_chn->getLossRate();
//设置rtp时间戳与ntp时间戳的对应关系
rtp_chn->setNtpStamp(sr->rtpts, track->plan_rtp->sample_rate, sr->getNtpUnixStampMS());
auto rr = rtp_chn->createRtcpRR(sr, track->answer_ssrc_rtp);
@ -715,7 +760,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
void WebRtcTransportImp::createRtpChannel(const string &rid, uint32_t ssrc, const MediaTrack::Ptr &track) {
//rid --> RtpReceiverImp
auto &ref = track->rtp_channel[rid];
ref = std::make_shared<RtpChannel>([track, this, rid](RtpPacket::Ptr rtp) mutable {
ref = std::make_shared<RtpChannel>(getPoller(),[track, this, rid](RtpPacket::Ptr rtp) mutable {
onSortedRtp(*track, rid, std::move(rtp));
}, [track, this, ssrc](const FCI_NACK &nack) mutable {
onSendNack(*track, nack, ssrc);
@ -791,7 +836,6 @@ void WebRtcTransportImp::onSendNack(MediaTrack &track, const FCI_NACK &nack, uin
auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize);
rtcp->ssrc = htons(track.answer_ssrc_rtp);
rtcp->ssrc_media = htonl(ssrc);
DebugL << htonl(ssrc) << " " << nack.getPid();
sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true);
}

View File

@ -122,9 +122,10 @@ private:
std::shared_ptr<RTC::IceServer> _ice_server;
std::shared_ptr<RTC::DtlsTransport> _dtls_transport;
std::shared_ptr<RTC::SrtpSession> _srtp_session_send;
std::shared_ptr<RTC::SrtpSession> _srtp_session_recv;
RtcSession::Ptr _offer_sdp;
RtcSession::Ptr _answer_sdp;
function<std::shared_ptr<RTC::SrtpSession>() > _srtp_session_recv_alloc;
std::unordered_map<uint8_t /*pt*/, std::shared_ptr<RTC::SrtpSession> > _srtp_session_recv;
};
class RtpChannel;