diff --git a/src/Rtcp/RtcpFCI.cpp b/src/Rtcp/RtcpFCI.cpp index 67d82ca8..dd7f2457 100644 --- a/src/Rtcp/RtcpFCI.cpp +++ b/src/Rtcp/RtcpFCI.cpp @@ -183,7 +183,7 @@ vector FCI_NACK::getBitArray() const { vector ret; ret.resize(kBitSize + 1); //nack第一个包丢包 - ret[0] = false; + ret[0] = true; auto blp_h = getBlp(); for (size_t i = 0; i < kBitSize; ++i) { diff --git a/src/Rtcp/RtcpFCI.h b/src/Rtcp/RtcpFCI.h index a3d5e96e..1b68e457 100644 --- a/src/Rtcp/RtcpFCI.h +++ b/src/Rtcp/RtcpFCI.h @@ -243,18 +243,17 @@ private: class FCI_NACK { public: static constexpr size_t kSize = 4; + static constexpr size_t kBitSize = 16; FCI_NACK(uint16_t pid_h, const vector &type); void check(size_t size); uint16_t getPid() const; uint16_t getBlp() const; + //返回丢包列表,总长度17,第一个包必丢 vector getBitArray() const; string dumpString() const; -private: - static constexpr size_t kBitSize = 16; - private: // The PID field is used to specify a lost packet. The PID field // refers to the RTP sequence number of the lost packet. diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index a5f1eaa0..24cf8fb8 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -21,7 +21,7 @@ using namespace toolkit; namespace mediakit { -template +template class PacketSortor { public: PacketSortor() = default; diff --git a/tests/test_rtcp_nack.cpp b/tests/test_rtcp_nack.cpp new file mode 100644 index 00000000..7463ce02 --- /dev/null +++ b/tests/test_rtcp_nack.cpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved. + * + * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include "Util/logger.h" +#include "Rtcp/RtcpFCI.h" +#include "../webrtc/WebRtcTransport.h" +using namespace std; +using namespace toolkit; +using namespace mediakit; + +extern void testFCI(); + +int main() { + Logger::Instance().add(std::make_shared()); + + srand((unsigned) time(NULL)); + + NackContext ctx; + for (int i = 1; i < 1000; ++i) { + if (i % (1 + (rand() % 30)) == 0) { + DebugL << "drop:" << i; + } else { + ctx.received(i); + + } + } + sleep(1); + return 0; +} diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index f06a20be..1cd574c8 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -424,9 +424,9 @@ void WebRtcTransportImp::onStartWebRTC() { } ref.rtcp_context_recv = std::make_shared(ref.plan->sample_rate, true); ref.rtcp_context_send = std::make_shared(ref.plan->sample_rate, false); - ref.receiver = std::make_shared([&ref, this](RtpPacket::Ptr rtp) { + ref.receiver = std::make_shared([&ref, this](RtpPacket::Ptr rtp) mutable{ onSortedRtp(ref, std::move(rtp)); - }, [&ref, this](const RtpPacket::Ptr &rtp) { + }, [&ref, this](const RtpPacket::Ptr &rtp) mutable { onBeforeSortedRtp(ref, rtp); }); } @@ -674,15 +674,28 @@ void WebRtcTransportImp::onRtp(const char *buf, size_t len) { return; } auto &info = it->second; + +#if 1 + //此处模拟接受丢包 + auto header = (RtpHeader *) buf; + auto seq = ntohs(header->seq); + if (seq % 10 == 0) { + //丢包 + return; + } else { + info.nack_ctx.received(seq); + } +#endif + //解析并排序rtp info.receiver->inputRtp(info.media->type, info.plan->sample_rate, (uint8_t *) buf, len); } /////////////////////////////////////////////////////////////////// -void WebRtcTransportImp::onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr rtp) { - if(!info.is_common_rtp){ - //todo rtx/red/ulpfec类型的rtp先未处理 +void WebRtcTransportImp::onSortedRtp(RtpPayloadInfo &info, RtpPacket::Ptr rtp) { + if (!info.is_common_rtp) { + WarnL; return; } if (info.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) { @@ -724,7 +737,7 @@ static void changeRtpExtId(const RtpHeader *header, const Type &map) { } } -void WebRtcTransportImp::onBeforeSortedRtp(const RtpPayloadInfo &info, const RtpPacket::Ptr &rtp) { +void WebRtcTransportImp::onBeforeSortedRtp(RtpPayloadInfo &info, const RtpPacket::Ptr &rtp) { changeRtpExtId(rtp->getHeader(), _rtp_ext_id_to_type); //统计rtp收到的情况,好做rr汇报 info.rtcp_context_recv->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); @@ -740,6 +753,12 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool r //统计rtp发送情况,好做sr汇报 info->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); info->nack_list.push_back(rtp); +#if 0 + //此处模拟发送丢包 + if(rtp->getSeq() % 10 == 0){ + return; + } +#endif } else { WarnL << "重传rtp:" << rtp->getSeq(); } diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 7b911020..caeb80a6 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -131,8 +131,8 @@ class NackList { public: void push_back(RtpPacket::Ptr rtp) { auto seq = rtp->getSeq(); - nack_cache_seq.emplace_back(seq); - nack_cache_pkt.emplace(seq, std::move(rtp)); + _nack_cache_seq.emplace_back(seq); + _nack_cache_pkt.emplace(seq, std::move(rtp)); while (get_cache_ms() > kMaxNackMS) { //需要清除部分nack缓存 pop_front(); @@ -143,7 +143,7 @@ public: void for_each_nack(const FCI_NACK &nack, const FUNC &func) { auto seq = nack.getPid(); for (auto bit : nack.getBitArray()) { - if (!bit) { + if (bit) { //丢包 RtpPacket::Ptr *ptr = get_rtp(seq); if (ptr) { @@ -156,27 +156,27 @@ public: private: void pop_front() { - if (nack_cache_seq.empty()) { + if (_nack_cache_seq.empty()) { return; } - nack_cache_pkt.erase(nack_cache_seq.front()); - nack_cache_seq.pop_front(); + _nack_cache_pkt.erase(_nack_cache_seq.front()); + _nack_cache_seq.pop_front(); } RtpPacket::Ptr *get_rtp(uint16_t seq) { - auto it = nack_cache_pkt.find(seq); - if (it == nack_cache_pkt.end()) { + auto it = _nack_cache_pkt.find(seq); + if (it == _nack_cache_pkt.end()) { return nullptr; } return &it->second; } uint32_t get_cache_ms() { - if (nack_cache_seq.size() < 2) { + if (_nack_cache_seq.size() < 2) { return 0; } - uint32_t back = nack_cache_pkt[nack_cache_seq.back()]->getStampMS(); - uint32_t front = nack_cache_pkt[nack_cache_seq.front()]->getStampMS(); + uint32_t back = _nack_cache_pkt[_nack_cache_seq.back()]->getStampMS(); + uint32_t front = _nack_cache_pkt[_nack_cache_seq.front()]->getStampMS(); if (back > front) { return back - front; } @@ -186,8 +186,81 @@ private: private: static constexpr uint32_t kMaxNackMS = 10 * 1000; - deque nack_cache_seq; - unordered_map nack_cache_pkt; + deque _nack_cache_seq; + unordered_map _nack_cache_pkt; +}; + +class NackContext { +public: + void received(uint16_t seq) { + if (!_last_max_seq && _seq.empty()) { + _last_max_seq = seq - 1; + } + _seq.emplace(seq); + auto max_seq = *_seq.rbegin(); + auto min_seq = *_seq.begin(); + auto diff = max_seq - min_seq; + if (!diff) { + return; + } + + if (diff > UINT32_MAX / 2) { + //回环 + _seq.clear(); + _last_max_seq = min_seq; + return; + } + + if (_seq.size() == diff + 1 && _last_max_seq + 1 == min_seq) { + //都是连续的seq,未丢包 + _seq.clear(); + _last_max_seq = max_seq; + } else { + //seq不连续,有丢包 + if (min_seq == _last_max_seq + 1) { + //前面部分seq是连续的,未丢包,移除之 + eraseFrontSeq(); + } + + //有丢包,丢包从_last_max_seq开始 + if (max_seq - _last_max_seq > FCI_NACK::kBitSize) { + vector vec; + vec.resize(FCI_NACK::kBitSize); + for (auto i = 0; i < FCI_NACK::kBitSize; ++i) { + vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end(); + } + onNack(FCI_NACK(_last_max_seq + 1, vec)); + _last_max_seq += FCI_NACK::kBitSize + 1; + if (_last_max_seq >= max_seq) { + _seq.clear(); + } else { + auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq); + _seq.erase(_seq.begin(), it); + } + } + } + } + + void onNack(const FCI_NACK &nack) { + InfoL << nack.dumpString() << " " << _seq.size(); + } + +private: + void eraseFrontSeq(){ + //前面部分seq是连续的,未丢包,移除之 + for (auto it = _seq.begin(); it != _seq.end();) { + if (*it != _last_max_seq + 1) { + //seq不连续,丢包了 + break; + } + _last_max_seq = *it; + it = _seq.erase(it); + } + } + +private: + set _seq; + uint16_t _last_max_seq = 0; }; class WebRtcTransportImp : public WebRtcTransport, public MediaSourceEvent, public SockInfo, public std::enable_shared_from_this{ @@ -265,10 +338,11 @@ private: RtcpContext::Ptr rtcp_context_recv; RtcpContext::Ptr rtcp_context_send; NackList nack_list; + NackContext nack_ctx; }; - void onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr rtp); - void onBeforeSortedRtp(const RtpPayloadInfo &info, const RtpPacket::Ptr &rtp); + void onSortedRtp(RtpPayloadInfo &info, RtpPacket::Ptr rtp); + void onBeforeSortedRtp(RtpPayloadInfo &info, const RtpPacket::Ptr &rtp); private: //用掉的总流量