From 533f35dac44eef0ffb41acddf6b8316a76bcf94c Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Thu, 22 Sep 2022 00:34:42 +0800 Subject: [PATCH] copy srt estimated link capacity algorithm --- srt/Common.hpp | 24 +++++++- srt/SrtTransport.cpp | 9 ++- srt/Statistic.cpp | 136 +++++++++++++++++++++++++++++++++---------- srt/Statistic.hpp | 19 ++++-- 4 files changed, 148 insertions(+), 40 deletions(-) diff --git a/srt/Common.hpp b/srt/Common.hpp index de5d3aaa..ba7dc3ff 100644 --- a/srt/Common.hpp +++ b/srt/Common.hpp @@ -13,8 +13,9 @@ #endif // defined(_WIN32) #include -#define MAX_SEQ 0x7fffffff -#define MAX_TS 0xffffffff +#define MAX_SEQ 0x7fffffff +#define SEQ_NONE 0xffffffff +#define MAX_TS 0xffffffff namespace SRT { using SteadyClock = std::chrono::steady_clock; @@ -35,6 +36,25 @@ static inline uint16_t loadUint16(uint8_t *ptr) { return ptr[0] << 8 | ptr[1]; } +inline static int64_t seqCmp(uint32_t seq1, uint32_t seq2) { + if(seq1 > seq2){ + if((seq1 - seq2) >(MAX_SEQ>>1)){ + return (int64_t)seq1 - (int64_t)(seq2+MAX_SEQ); + }else{ + return (int64_t)seq1 - (int64_t)seq2; + } + }else{ + if((seq2-seq1) >(MAX_SEQ>>1)){ + return (int64_t)(seq1+MAX_SEQ) - (int64_t)seq2; + }else{ + return (int64_t)seq1 - (int64_t)seq2; + } + } +} + +inline static uint32_t incSeq(int32_t seq) { + return (seq == MAX_SEQ) ? 0 : seq + 1; +} static inline void storeUint32(uint8_t *buf, uint32_t val) { buf[0] = val >> 24; buf[1] = (val >> 16) & 0xff; diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index 56d7f72f..f607a00d 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -106,7 +106,6 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage _handleshake_timer.reset(); } _pkt_recv_rate_context->inputPacket(_now,len-HDR_SIZE); - _estimated_link_capacity_context->inputPacket(_now); //_recv_rate_context->inputPacket(_now, len); handleDataPacket(buf, len, addr); @@ -126,8 +125,8 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage return; } - _pkt_recv_rate_context->inputPacket(_now,len); - _estimated_link_capacity_context->inputPacket(_now); + //_pkt_recv_rate_context->inputPacket(_now,len); + //_estimated_link_capacity_context->inputPacket(_now); //_recv_rate_context->inputPacket(_now, len); auto it = s_control_functions.find(type); @@ -179,6 +178,7 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd _mtu = pkt.mtu; _last_pkt_seq = _init_seq_number - 1; + _estimated_link_capacity_context->setLastSeq(_last_pkt_seq); _peer_socket_id = pkt.srt_socket_id; HandshakePacket::Ptr res = std::make_shared(); @@ -484,6 +484,7 @@ void SrtTransport::sendACKPacket() { pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(recv_rate); pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity(); pkt->recv_rate = recv_rate; + TraceL<pkt_recv_rate<<" pkt/s "<estimated_link_capacity<<" pkt/s (cap)"; pkt->storeToData(); _ack_send_timestamp[pkt->ack_number] = _now; _last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number; @@ -563,6 +564,8 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora DataPacket::Ptr pkt = std::make_shared(); pkt->loadFromData(buf, len); + _estimated_link_capacity_context->inputPacket(_now,pkt); + std::list list; //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<timestamp<<" size="<payloadSize()<<\ //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; diff --git a/srt/Statistic.cpp b/srt/Statistic.cpp index 7ca8c445..0456bcc8 100644 --- a/srt/Statistic.cpp +++ b/srt/Statistic.cpp @@ -1,5 +1,5 @@ #include - +#include #include "Statistic.hpp" namespace SRT { @@ -47,47 +47,121 @@ uint32_t PacketRecvRateContext::getPacketRecvRate(uint32_t &bytesps) { ++bp; // advance bytes pointer } + if(count>(SIZE>>1)){ + bytesps = (unsigned long)ceil(1000000.0 / (double(sum) / double(bytes))); + auto ret = (uint32_t)ceil(1000000.0 / (sum / count)); + return ret; + } + bytesps = 0; + return 0; // claculate speed, or return 0 if not enough valid value - bytesps = (unsigned long)ceil(1000000.0 / (double(sum) / double(bytes))); - auto ret = (uint32_t)ceil(1000000.0 / (sum / count)); - if(_cur_idx == 0) - TraceL << bytesps << " byte/sec " << ret << " pkt/sec"; - return ret; + +} +EstimatedLinkCapacityContext::EstimatedLinkCapacityContext(TimePoint start) : _start(start) { + for (size_t i = 0; i < SIZE; i++) { + _dur_probe_arr[i] = 1000; + } + _cur_idx = 0; +}; +void EstimatedLinkCapacityContext::inputPacket(TimePoint &ts,DataPacket::Ptr& pkt) { + uint32_t seq = pkt->packet_seq_number; + auto diff = seqCmp(seq,_last_seq); + const bool retransmitted = pkt->R == 1; + const bool unordered = diff<=0; + uint32_t one = seq&0xf; + if(one == 0){ + probe1Arrival(ts,pkt,unordered || retransmitted); + } + if(diff>0){ + _last_seq = seq; + } + if(unordered || retransmitted){ + return; + } + if(one == 1){ + probe2Arrival(ts,pkt); + } } -void EstimatedLinkCapacityContext::inputPacket(TimePoint &ts) { - if (_pkt_map.size() > 16) { - _pkt_map.erase(_pkt_map.begin()); +/// Record the arrival time of the first probing packet. +void EstimatedLinkCapacityContext::probe1Arrival(TimePoint &ts, const DataPacket::Ptr &pkt, bool unordered) { + if (unordered && pkt->packet_seq_number == _probe1_seq) { + // Reset the starting probe into "undefined", when + // a packet has come as retransmitted before the + // measurement at arrival of 17th could be taken. + _probe1_seq = SEQ_NONE; + return; } - auto tmp = DurationCountMicroseconds(ts - _start); - _pkt_map.emplace(tmp, tmp); + + _ts_probe_time = ts; + _probe1_seq = pkt->packet_seq_number; // Record the sequence where 16th packet probe was taken +} + +/// Record the arrival time of the second probing packet and the interval between packet pairs. + +void EstimatedLinkCapacityContext::probe2Arrival(TimePoint &ts, const DataPacket::Ptr &pkt) { + // Reject probes that don't refer to the very next packet + // towards the one that was lately notified by probe1Arrival. + // Otherwise the result can be stupid. + + // Simply, in case when this wasn't called exactly for the + // expected packet pair, behave as if the 17th packet was lost. + + // no start point yet (or was reset) OR not very next packet + if (_probe1_seq == SEQ_NONE || incSeq(_probe1_seq) != pkt->packet_seq_number) + return; + + // Reset the starting probe to prevent checking if the + // measurement was already taken. + _probe1_seq = SEQ_NONE; + + // record the probing packets interval + // Adjust the time for what a complete packet would have take + const int64_t timediff = DurationCountMicroseconds(ts - _ts_probe_time); + const int64_t timediff_times_pl_size = timediff * SRT_MAX_PAYLOAD_SIZE; + + // Let's take it simpler than it is coded here: + // (stating that a packet has never zero size) + // + // probe_case = (now - previous_packet_time) * SRT_MAX_PAYLOAD_SIZE / pktsz; + // + // Meaning: if the packet is fully packed, probe_case = timediff. + // Otherwise the timediff will be "converted" to a time that a fully packed packet "would take", + // provided the arrival time is proportional to the payload size and skipping + // the ETH+IP+UDP+SRT header part elliminates the constant packet delivery time influence. + // + const size_t pktsz = pkt->payloadSize(); + _dur_probe_arr[_cur_idx] = pktsz ? int64_t(timediff_times_pl_size / pktsz) : int64_t(timediff); + + // the window is logically circular + _cur_idx = (_cur_idx + 1) % SIZE; } uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() { - decltype(_pkt_map.begin()) next; - std::vector tmp; + int64_t tmp[SIZE]; + std::copy(_dur_probe_arr, _dur_probe_arr + SIZE , tmp); + std::nth_element(tmp, tmp + (SIZE / 2), tmp + SIZE); + int64_t median = tmp[SIZE / 2]; - for (auto it = _pkt_map.begin(); it != _pkt_map.end(); ++it) { - next = it; - ++next; - if (next != _pkt_map.end()) { - tmp.push_back(next->first - it->first); - } else { - break; - } - } - std::sort(tmp.begin(), tmp.end()); - if (tmp.empty()) { - return 1000; - } + int64_t count = 1; + int64_t sum = median; + int64_t upper = median << 3; // median*8 + int64_t lower = median >> 3; // median/8 - if (tmp.size() < 16) { - return 1000; - } + // median filtering + const int64_t* p = _dur_probe_arr; + for (int i = 0, n = SIZE; i < n; ++ i) + { + if ((*p < upper) && (*p > lower)) + { + ++ count; + sum += *p; + } + ++ p; + } - double dur = tmp[0] / 1e6; - return (uint32_t)(1.0 / dur); + return (uint32_t)ceil(1000000.0 / (double(sum) / double(count))); } /* diff --git a/srt/Statistic.hpp b/srt/Statistic.hpp index f1d341df..18d562c1 100644 --- a/srt/Statistic.hpp +++ b/srt/Statistic.hpp @@ -23,14 +23,25 @@ private: class EstimatedLinkCapacityContext { public: - EstimatedLinkCapacityContext(TimePoint start) : _start(start) {}; + EstimatedLinkCapacityContext(TimePoint start); ~EstimatedLinkCapacityContext() = default; - void inputPacket(TimePoint &ts); + void setLastSeq(uint32_t seq){ + _last_seq = seq; + } + void inputPacket(TimePoint &ts,DataPacket::Ptr& pkt); uint32_t getEstimatedLinkCapacity(); - + static const int SIZE = 16; +private: + void probe1Arrival(TimePoint &ts,const DataPacket::Ptr& pkt, bool unordered); + void probe2Arrival(TimePoint &ts,const DataPacket::Ptr& pkt); private: TimePoint _start; - std::map _pkt_map; + TimePoint _ts_probe_time; + int64_t _dur_probe_arr[SIZE]; + size_t _cur_idx; + uint32_t _last_seq = 0; + uint32_t _probe1_seq = SEQ_NONE; + //std::map _pkt_map; }; /*