From c59a7a04c3ce28a39cdaceca04aee0c6119cdca7 Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Tue, 27 Jul 2021 20:37:43 +0800 Subject: [PATCH] =?UTF-8?q?nack/rtx=E6=94=AF=E6=8C=81=E5=A4=9A=E6=AC=A1?= =?UTF-8?q?=E9=87=8D=E4=BC=A0=EF=BC=8C=E6=8F=90=E9=AB=98=E6=8A=97=E4=B8=A2?= =?UTF-8?q?=E5=8C=85=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtcp/RtcpContext.h | 1 - src/Rtcp/RtcpFCI.cpp | 8 ++- webrtc/Nack.cpp | 122 ++++++++++++++++++++++++++++++++++--- webrtc/Nack.h | 22 ++++++- webrtc/SrtpSession.cpp | 14 ++--- webrtc/WebRtcTransport.cpp | 76 ++++++++++++++++++----- webrtc/WebRtcTransport.h | 3 +- 7 files changed, 206 insertions(+), 40 deletions(-) diff --git a/src/Rtcp/RtcpContext.h b/src/Rtcp/RtcpContext.h index 48453a09..b51c04fa 100644 --- a/src/Rtcp/RtcpContext.h +++ b/src/Rtcp/RtcpContext.h @@ -74,7 +74,6 @@ public: */ uint32_t getRtt(uint32_t ssrc) const; -private: /** * 上次结果与本次结果间应收包数 */ diff --git a/src/Rtcp/RtcpFCI.cpp b/src/Rtcp/RtcpFCI.cpp index dd7f2457..16630b6a 100644 --- a/src/Rtcp/RtcpFCI.cpp +++ b/src/Rtcp/RtcpFCI.cpp @@ -194,9 +194,13 @@ vector FCI_NACK::getBitArray() const { string FCI_NACK::dumpString() const { _StrPrinter printer; - printer << "pid:" << getPid() << ",blp:" << getBlp() << ",bit array:"; + auto pid = getPid(); + printer << "pid:" << pid << ",blp:" << getBlp() << ",dropped rtp seq:"; for (auto flag : getBitArray()) { - printer << flag << " "; + if (flag) { + printer << pid << " "; + } + ++pid; } return std::move(printer); } diff --git a/webrtc/Nack.cpp b/webrtc/Nack.cpp index 18dde583..07a0a504 100644 --- a/webrtc/Nack.cpp +++ b/webrtc/Nack.cpp @@ -67,10 +67,17 @@ uint32_t NackList::get_cache_ms() { //////////////////////////////////////////////////////////////////////////////////////////////// -void NackContext::received(uint16_t seq) { +void NackContext::received(uint16_t seq, bool is_rtx) { if (!_last_max_seq && _seq.empty()) { _last_max_seq = seq - 1; } + if (is_rtx || (seq < _last_max_seq && !(seq < 1024 && _last_max_seq > UINT16_MAX - 1024))) { + //重传包或 + //seq回退,且非回环,那么这个应该是重传包 + onRtx(seq); + return; + } + _seq.emplace(seq); auto max_seq = *_seq.rbegin(); auto min_seq = *_seq.begin(); @@ -83,6 +90,7 @@ void NackContext::received(uint16_t seq) { //回环 _seq.clear(); _last_max_seq = min_seq; + _nack_send_ntp.clear(); return; } @@ -98,18 +106,19 @@ void NackContext::received(uint16_t seq) { } //有丢包,丢包从_last_max_seq开始 - if (max_seq - _last_max_seq > FCI_NACK::kBitSize) { + auto nack_rtp_count = FCI_NACK::kBitSize; + if (max_seq - _last_max_seq > nack_rtp_count) { vector vec; - vec.resize(FCI_NACK::kBitSize); - for (auto i = 0; i < FCI_NACK::kBitSize; ++i) { + vec.resize(FCI_NACK::kBitSize, false); + for (auto i = 0; i < nack_rtp_count; ++i) { vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end(); } - doNack(FCI_NACK(_last_max_seq + 1, vec)); - _last_max_seq += FCI_NACK::kBitSize + 1; + doNack(FCI_NACK(_last_max_seq + 1, vec), true); + _last_max_seq += nack_rtp_count + 1; if (_last_max_seq >= max_seq) { _seq.clear(); } else { - auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq); + auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq + 1); _seq.erase(_seq.begin(), it); } } @@ -120,7 +129,10 @@ void NackContext::setOnNack(onNack cb) { _cb = std::move(cb); } -void NackContext::doNack(const FCI_NACK &nack) { +void NackContext::doNack(const FCI_NACK &nack, bool record_nack) { + if (record_nack) { + recordNack(nack); + } if (_cb) { _cb(nack); } @@ -136,4 +148,96 @@ void NackContext::eraseFrontSeq() { _last_max_seq = *it; it = _seq.erase(it); } -} \ No newline at end of file +} + +void NackContext::onRtx(uint16_t seq) { + auto it = _nack_send_ntp.find(seq); + if (it == _nack_send_ntp.end()) { + return; + } + auto rtt = getCurrentMillisecond() - it->second.update_stamp; + _nack_send_ntp.erase(it); + + if (rtt >= 0) { + //rtt不肯小于0 + _rtt = rtt; + //InfoL << "rtt:" << rtt; + } +} + +void NackContext::recordNack(const FCI_NACK &nack) { + auto now = getCurrentMillisecond(); + auto i = nack.getPid(); + for (auto flag : nack.getBitArray()) { + if (flag) { + auto &ref = _nack_send_ntp[i]; + ref.first_stamp = now; + ref.update_stamp = now; + ref.nack_count = 1; + } + ++i; + } + //记录太多了,移除一部分早期的记录 + while (_nack_send_ntp.size() > kNackMaxSize) { + _nack_send_ntp.erase(_nack_send_ntp.begin()); + } +} + +uint64_t NackContext::reSendNack() { + set nack_rtp; + auto now = getCurrentMillisecond(); + for (auto it = _nack_send_ntp.begin(); it != _nack_send_ntp.end();) { + if (now - it->second.first_stamp > kNackMaxMS) { + //该rtp丢失太久了,不再要求重传 + it = _nack_send_ntp.erase(it); + continue; + } + if (now - it->second.update_stamp < 2 * _rtt) { + //距离上次nack不足2倍的rtt,不用再发送nack + ++it; + continue; + } + //此rtp需要请求重传 + nack_rtp.emplace(it->first); + //更新nack发送时间戳 + it->second.update_stamp = now; + if (++(it->second.nack_count) == kNackMaxCount) { + //nack次数太多,移除之 + it = _nack_send_ntp.erase(it); + continue; + } + ++it; + } + + if (_nack_send_ntp.empty()) { + //不需要再发送nack + return 0; + } + + int pid = -1; + vector vec; + for (auto it = nack_rtp.begin(); it != nack_rtp.end();) { + if (pid == -1) { + pid = *it; + vec.resize(16, false); + ++it; + continue; + } + auto inc = *it - pid; + if (inc > FCI_NACK::kBitSize) { + //新的nack包 + doNack(FCI_NACK(pid, vec), false); + pid = -1; + continue; + } + //这个包丢了 + vec[inc - 1] = true; + ++it; + } + if (pid != -1) { + doNack(FCI_NACK(pid, vec), false); + } + + //重传间隔不得低于5ms + return std::max(_rtt, 5); +} diff --git a/webrtc/Nack.h b/webrtc/Nack.h index e872f3f9..1b95a975 100644 --- a/webrtc/Nack.h +++ b/webrtc/Nack.h @@ -36,22 +36,40 @@ private: class NackContext { public: + using Ptr = std::shared_ptr; using onNack = function; + //最大保留的rtp丢包状态个数 + static constexpr auto kNackMaxSize = 100; + //rtp丢包状态最长保留时间 + static constexpr auto kNackMaxMS = 3 * 1000; + //nack最多请求重传10次 + static constexpr auto kNackMaxCount = 10; NackContext() = default; ~NackContext() = default; - void received(uint16_t seq); + void received(uint16_t seq, bool is_rtx = false); void setOnNack(onNack cb); + uint64_t reSendNack(); private: void eraseFrontSeq(); - void doNack(const FCI_NACK &nack); + void doNack(const FCI_NACK &nack, bool record_nack); + void recordNack(const FCI_NACK &nack); + void onRtx(uint16_t seq); private: + int _rtt = 50; onNack _cb; set _seq; uint16_t _last_max_seq = 0; + + struct NackStatus{ + uint64_t first_stamp; + uint64_t update_stamp; + int nack_count = 0; + }; + map _nack_send_ntp; }; #endif //ZLMEDIAKIT_NACK_H diff --git a/webrtc/SrtpSession.cpp b/webrtc/SrtpSession.cpp index 7aa109c1..c4e30240 100644 --- a/webrtc/SrtpSession.cpp +++ b/webrtc/SrtpSession.cpp @@ -239,9 +239,8 @@ namespace RTC if (DepLibSRTP::IsError(err)) { - MS_WARN_TAG(srtp, "srtp_protect() failed: %s", DepLibSRTP::GetErrorString(err)); - - return false; + WarnL << "srtp_protect() failed:" << DepLibSRTP::GetErrorString(err); + return false; } return true; @@ -256,8 +255,7 @@ namespace RTC if (DepLibSRTP::IsError(err)) { - MS_DEBUG_TAG(srtp, "srtp_unprotect() failed: %s", DepLibSRTP::GetErrorString(err)); - + WarnL << "srtp_unprotect() failed:" << DepLibSRTP::GetErrorString(err); return false; } @@ -272,8 +270,7 @@ namespace RTC if (DepLibSRTP::IsError(err)) { - MS_WARN_TAG(srtp, "srtp_protect_rtcp() failed: %s", DepLibSRTP::GetErrorString(err)); - + WarnL << "srtp_protect_rtcp() failed:" << DepLibSRTP::GetErrorString(err); return false; } @@ -289,8 +286,7 @@ namespace RTC if (DepLibSRTP::IsError(err)) { - MS_DEBUG_TAG(srtp, "srtp_unprotect_rtcp() failed: %s", DepLibSRTP::GetErrorString(err)); - + WarnL << "srtp_unprotect_rtcp() failed:" << DepLibSRTP::GetErrorString(err); return false; } diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index ac30ab8a..54314e4e 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -97,7 +97,12 @@ void WebRtcTransport::OnDtlsTransportConnected( std::string &remoteCert) { InfoL; _srtp_session_send = std::make_shared(RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen); - _srtp_session_recv = std::make_shared(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen); + + string srtpRemoteKey_str((char *) srtpRemoteKey, srtpRemoteKeyLen); + _srtp_session_recv_alloc = [srtpCryptoSuite, srtpRemoteKey_str]() { + return std::make_shared(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, + (uint8_t *) srtpRemoteKey_str.data(), srtpRemoteKey_str.size()); + }; onStartWebRTC(); } @@ -250,20 +255,20 @@ void WebRtcTransport::inputSockData(char *buf, size_t len, RTC::TransportTuple * _dtls_transport->ProcessDtlsData((uint8_t *) buf, len); return; } + RtpHeader *rtp = (RtpHeader *) buf; + auto it = _srtp_session_recv.find(rtp->pt); + if (it == _srtp_session_recv.end()) { + it = _srtp_session_recv.emplace((uint8_t) rtp->pt, _srtp_session_recv_alloc()).first; + } if (is_rtp(buf)) { - if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) { + if (it->second->DecryptSrtp((uint8_t *) buf, &len)) { onRtp(buf, len); - } else { - RtpHeader *rtp = (RtpHeader *) buf; - WarnL << "srtp_unprotect rtp failed, pt:" << (int)rtp->pt; } return; } if (is_rtcp(buf)) { - if (_srtp_session_recv->DecryptSrtcp((uint8_t *) buf, &len)) { + if (it->second->DecryptSrtcp((uint8_t *) buf, &len)) { onRtcp(buf, len); - } else { - WarnL; } return; } @@ -585,21 +590,29 @@ SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{ /////////////////////////////////////////////////////////////////// -class RtpChannel : public RtpTrackImp { +class RtpChannel : public RtpTrackImp, public std::enable_shared_from_this { public: - RtpChannel(RtpTrackImp::OnSorted cb, function on_nack) { + RtpChannel(EventPoller::Ptr poller, RtpTrackImp::OnSorted cb, function on_nack) { + _poller = std::move(poller); + _on_nack = std::move(on_nack); setOnSorted(std::move(cb)); - _nack_ctx.setOnNack(std::move(on_nack)); + + _nack_ctx.setOnNack([this](const FCI_NACK &nack) { + onNack(nack); + }); } ~RtpChannel() override = default; RtpPacket::Ptr inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len, bool is_rtx) { auto rtp = RtpTrack::inputRtp(type, sample_rate, ptr, len); - if (!is_rtx && rtp) { + if (!rtp) { + return rtp; + } + auto seq = rtp->getSeq(); + _nack_ctx.received(seq, is_rtx); + if (!is_rtx) { //统计rtp接受情况,便于生成nack rtcp包 - auto seq = rtp->getSeq(); - _nack_ctx.received(seq); _rtcp_context.onRtp(seq, rtp->getStamp(), rtp->ntp_stamp, sample_rate, len); } return rtp; @@ -610,9 +623,40 @@ public: return _rtcp_context.createRtcpRR(ssrc, getSSRC()); } + int getLossRate() { + return _rtcp_context.geLostInterval() * 100 / _rtcp_context.getExpectedPacketsInterval(); + } + +private: + void starNackTimer(){ + if (_delay_task) { + return; + } + weak_ptr weak_self = shared_from_this(); + _delay_task = _poller->doDelayTask(10, [weak_self]() -> uint64_t { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return 0; + } + auto ret = strong_self->_nack_ctx.reSendNack(); + if (!ret) { + strong_self->_delay_task = nullptr; + } + return ret; + }); + } + + void onNack(const FCI_NACK &nack) { + _on_nack(nack); + starNackTimer(); + } + private: NackContext _nack_ctx; RtcpContext _rtcp_context{true}; + EventPoller::Ptr _poller; + DelayTask::Ptr _delay_task; + function _on_nack; }; std::shared_ptr MediaTrack::getRtpChannel(uint32_t ssrc) const{ @@ -638,6 +682,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { if(!rtp_chn){ WarnL << "未识别的sr rtcp包:" << rtcp->dumpString(); } else { + //InfoL << "接收丢包率,ssrc:" << sr->ssrc << ",loss rate(%):" << rtp_chn->getLossRate(); //设置rtp时间戳与ntp时间戳的对应关系 rtp_chn->setNtpStamp(sr->rtpts, track->plan_rtp->sample_rate, sr->getNtpUnixStampMS()); auto rr = rtp_chn->createRtcpRR(sr, track->answer_ssrc_rtp); @@ -715,7 +760,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { void WebRtcTransportImp::createRtpChannel(const string &rid, uint32_t ssrc, const MediaTrack::Ptr &track) { //rid --> RtpReceiverImp auto &ref = track->rtp_channel[rid]; - ref = std::make_shared([track, this, rid](RtpPacket::Ptr rtp) mutable { + ref = std::make_shared(getPoller(),[track, this, rid](RtpPacket::Ptr rtp) mutable { onSortedRtp(*track, rid, std::move(rtp)); }, [track, this, ssrc](const FCI_NACK &nack) mutable { onSendNack(*track, nack, ssrc); @@ -791,7 +836,6 @@ void WebRtcTransportImp::onSendNack(MediaTrack &track, const FCI_NACK &nack, uin auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize); rtcp->ssrc = htons(track.answer_ssrc_rtp); rtcp->ssrc_media = htonl(ssrc); - DebugL << htonl(ssrc) << " " << nack.getPid(); sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true); } diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 30ba2f95..ba87729b 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -122,9 +122,10 @@ private: std::shared_ptr _ice_server; std::shared_ptr _dtls_transport; std::shared_ptr _srtp_session_send; - std::shared_ptr _srtp_session_recv; RtcSession::Ptr _offer_sdp; RtcSession::Ptr _answer_sdp; + function() > _srtp_session_recv_alloc; + std::unordered_map > _srtp_session_recv; }; class RtpChannel;