From f58211fb08e409263c1e1af53e16805864b395f3 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Thu, 9 Jun 2022 19:30:03 +0800 Subject: [PATCH] optimize send nack when check packet lost send nack immediately --- srt/Common.hpp | 11 +++-- srt/NackContext.cpp | 108 +++++++++++++++++++++++++++++++++++++++++++ srt/NackContext.hpp | 30 ++++++++++++ srt/PacketQueue.cpp | 11 +---- srt/SrtTransport.cpp | 46 +++++++++++++----- srt/SrtTransport.hpp | 11 +++-- 6 files changed, 189 insertions(+), 28 deletions(-) create mode 100644 srt/NackContext.cpp create mode 100644 srt/NackContext.hpp diff --git a/srt/Common.hpp b/srt/Common.hpp index 8b5d57c3..81d98814 100644 --- a/srt/Common.hpp +++ b/srt/Common.hpp @@ -1,17 +1,19 @@ #ifndef ZLMEDIAKIT_SRT_COMMON_H #define ZLMEDIAKIT_SRT_COMMON_H #if defined(_WIN32) +#include #include #include -#include -#pragma comment (lib, "Ws2_32.lib") -#pragma comment(lib,"Iphlpapi.lib") +#pragma comment(lib, "Ws2_32.lib") +#pragma comment(lib, "Iphlpapi.lib") #else #include #include #endif // defined(_WIN32) #include +#define MAX_SEQ 0x7fffffff +#define MAX_TS 0xffffffff namespace SRT { using SteadyClock = std::chrono::steady_clock; @@ -59,6 +61,9 @@ static inline void storeUint16LE(uint8_t *buf, uint16_t val) { static inline uint32_t srtVersion(int major, int minor, int patch) { return patch + minor * 0x100 + major * 0x10000; } +static inline uint32_t genExpectedSeq(uint32_t seq) { + return MAX_SEQ & seq; +} class UTicker { public: diff --git a/srt/NackContext.cpp b/srt/NackContext.cpp new file mode 100644 index 00000000..9711531c --- /dev/null +++ b/srt/NackContext.cpp @@ -0,0 +1,108 @@ +#include "NackContext.hpp" + +namespace SRT { +void NackContext::update(TimePoint now, std::list &lostlist) { + for (auto item : lostlist) { + mergeItem(now, item); + } +} +void NackContext::getLostList( + TimePoint now, uint32_t rtt, uint32_t rtt_variance, std::list &lostlist) { + lostlist.clear(); + std::list tmp_list; + + for (auto it = _nack_map.begin(); it != _nack_map.end(); ++it) { + if (!it->second._is_nack) { + tmp_list.push_back(it->first); + it->second._ts = now; + it->second._is_nack = true; + } else { + if (DurationCountMicroseconds(now - it->second._ts) > rtt) { + tmp_list.push_back(it->first); + it->second._ts = now; + } + } + } + tmp_list.sort(); + + if (tmp_list.empty()) { + return; + } + + uint32_t min = *tmp_list.begin(); + uint32_t max = *tmp_list.rbegin(); + + if ((max - min) >= (MAX_SEQ >> 1)) { + while ((max - tmp_list.front()) > (MAX_SEQ >> 1)) { + tmp_list.push_back(tmp_list.front()); + tmp_list.pop_front(); + } + } + + PacketQueue::LostPair lost; + bool finish = true; + for (auto cur = tmp_list.begin(); cur != tmp_list.end(); ++cur) { + if (finish) { + lost.first = *cur; + lost.second = genExpectedSeq(*cur + 1); + finish = false; + } else { + if (lost.second == *cur) { + lost.second = genExpectedSeq(*cur + 1); + } else { + finish = true; + lostlist.push_back(lost); + } + } + } +} +void NackContext::drop(uint32_t seq) { + if (_nack_map.empty()) + return; + uint32_t min = _nack_map.begin()->first; + uint32_t max = _nack_map.rbegin()->first; + bool is_cycle = false; + if ((max - min) >= (MAX_SEQ >> 1)) { + is_cycle = true; + } + + for (auto it = _nack_map.begin(); it != _nack_map.end();) { + if (!is_cycle) { + // 不回环 + if (it->first <= seq) { + it = _nack_map.erase(it); + } else { + it++; + } + } else { + if (it->first <= seq) { + if ((seq - it->first) >= (MAX_SEQ >> 1)) { + WarnL << "cycle seq " << seq << " " << it->first; + it++; + } else { + it = _nack_map.erase(it); + } + } else { + if ((it->first - seq) >= (MAX_SEQ >> 1)) { + it = _nack_map.erase(it); + WarnL << "cycle seq " << seq << " " << it->first; + } else { + it++; + } + } + } + } +} + +void NackContext::mergeItem(TimePoint now, PacketQueue::LostPair &item) { + for (uint32_t i = item.first; i < item.second; ++i) { + auto it = _nack_map.find(i); + if (it != _nack_map.end()) { + } else { + NackItem tmp; + tmp._is_nack = false; + _nack_map.emplace(i, tmp); + } + } +} +} // namespace SRT \ No newline at end of file diff --git a/srt/NackContext.hpp b/srt/NackContext.hpp new file mode 100644 index 00000000..fe6a0ba6 --- /dev/null +++ b/srt/NackContext.hpp @@ -0,0 +1,30 @@ +#ifndef ZLMEDIAKIT_SRT_NACK_CONTEXT_H +#define ZLMEDIAKIT_SRT_NACK_CONTEXT_H +#include "Common.hpp" +#include "PacketQueue.hpp" +#include + +namespace SRT { +class NackContext { +public: + NackContext() = default; + ~NackContext() = default; + void update(TimePoint now, std::list &lostlist); + void getLostList(TimePoint now, uint32_t rtt, uint32_t rtt_variance, std::list &lostlist); + void drop(uint32_t seq); + +private: + void mergeItem(TimePoint now, PacketQueue::LostPair &item); + +private: + class NackItem { + public: + bool _is_nack = false; + TimePoint _ts; // send nak time + }; + + std::map _nack_map; +}; + +} // namespace SRT +#endif // ZLMEDIAKIT_SRT_NACK_CONTEXT_H \ No newline at end of file diff --git a/srt/PacketQueue.cpp b/srt/PacketQueue.cpp index 001585d1..cb43bd13 100644 --- a/srt/PacketQueue.cpp +++ b/srt/PacketQueue.cpp @@ -2,13 +2,6 @@ namespace SRT { -#define MAX_SEQ 0x7fffffff -#define MAX_TS 0xffffffff - -static inline uint32_t genExpectedSeq(uint32_t seq) { - return MAX_SEQ & seq; -} - static inline bool isSeqEdge(uint32_t seq, uint32_t cap) { if (seq > (MAX_SEQ - cap)) { return true; @@ -160,9 +153,9 @@ std::list PacketQueue::getLostSeq() { if (finish) { finish = false; lost.first = i; - lost.second = i + 1; + lost.second = genExpectedSeq(i + 1); } else { - lost.second = i + 1; + lost.second = genExpectedSeq(i + 1); } } else { if (!finish) { diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index 11b637d7..570e09f8 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -1,5 +1,5 @@ -#include -#include "Util/onceToken.h" +#include "Util/onceToken.h" +#include #include "Ack.hpp" #include "Packet.hpp" @@ -203,7 +203,8 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad unregisterSelfHandshake(); registerSelf(); sendControlPacket(res, true); - TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number << " latency=" << delay; + TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number + << " latency=" << delay; _recv_buf = std::make_shared(res->max_flow_window_size, _init_seq_number, delay * 1e3); _send_buf = std::make_shared(res->max_flow_window_size, delay * 1e3); _send_packet_seq_number = _init_seq_number; @@ -314,10 +315,21 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage if (list.empty()) { return; } - + uint32_t max_seq = 0; for (auto data : list) { + max_seq = data->packet_seq_number; onSRTData(std::move(data)); } + _recv_nack.drop(max_seq); + + auto lost = _recv_buf->getLostSeq(); + _recv_nack.update(_now, lost); + lost.clear(); + _recv_nack.getLostList(_now, _rtt, _rtt_variance, lost); + if (!lost.empty()) { + sendNAKPacket(lost); + // TraceL << "check lost send nack"; + } auto nak_interval = (_rtt + _rtt_variance * 4) / 2; if (nak_interval <= 20 * 1000) { @@ -436,8 +448,24 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<timestamp<<" size="<payloadSize()<<\ //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; _recv_buf->inputPacket(pkt, list); - for (auto data : list) { - onSRTData(std::move(data)); + if (list.empty()) { + // when no data ok send nack to sender immediately + } else { + uint32_t last_seq; + for (auto data : list) { + last_seq = data->packet_seq_number; + onSRTData(std::move(data)); + } + _recv_nack.drop(last_seq); + } + + auto lost = _recv_buf->getLostSeq(); + _recv_nack.update(_now, lost); + lost.clear(); + _recv_nack.getLostList(_now, _rtt, _rtt_variance, lost); + if (!lost.empty()) { + // TraceL << "check lost send nack immediately"; + sendNAKPacket(lost); } auto nak_interval = (_rtt + _rtt_variance * 4) / 2; @@ -445,12 +473,8 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora nak_interval = 20 * 1000; } - if (list.empty()) { - // TraceL<<_recv_buf->dump()<<" nake interval:"< nak_interval) { + // Periodic NAK reports auto lost = _recv_buf->getLostSeq(); if (!lost.empty()) { sendNAKPacket(lost); diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 3545dd95..43ae841f 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -11,11 +11,11 @@ #include "Poller/Timer.h" #include "Common.hpp" +#include "NackContext.hpp" #include "Packet.hpp" #include "PacketQueue.hpp" #include "PacketSendQueue.hpp" #include "Statistic.hpp" - namespace SRT { using namespace toolkit; @@ -45,7 +45,7 @@ public: virtual void onSendTSData(const Buffer::Ptr &buffer, bool flush); std::string getIdentifier(); -void unregisterSelf(); + void unregisterSelf(); void unregisterSelfHandshake(); protected: @@ -91,9 +91,9 @@ protected: void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true); private: - //当前选中的udp链接 + // 当前选中的udp链接 Session::Ptr _selected_session; - //链接迁移前后使用过的udp链接 + // 链接迁移前后使用过的udp链接 std::unordered_map> _history_sessions; EventPoller::Ptr _poller; @@ -119,6 +119,7 @@ private: PacketSendQueue::Ptr _send_buf; uint32_t _buf_delay = 120; PacketQueue::Ptr _recv_buf; + NackContext _recv_nack; uint32_t _rtt = 100 * 1000; uint32_t _rtt_variance = 50 * 1000; uint32_t _light_ack_pkt_count = 0; @@ -133,7 +134,7 @@ private: UTicker _nak_ticker; - //保持发送的握手消息,防止丢失重发 + // 保持发送的握手消息,防止丢失重发 HandshakePacket::Ptr _handleshake_res; ResourcePool _packet_pool;