From ea35002be8c85d7ce40d50b70b4226fba1220754 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Wed, 21 Sep 2022 19:21:49 +0800 Subject: [PATCH] copy srt official packet recve rate algorithm --- srt/Packet.hpp | 14 ++++++++ srt/SrtTransport.cpp | 31 +++++++++++------- srt/SrtTransport.hpp | 10 ++++-- srt/SrtTransportImp.cpp | 2 ++ srt/Statistic.cpp | 71 ++++++++++++++++++++++++++--------------- srt/Statistic.hpp | 20 ++++++------ 6 files changed, 101 insertions(+), 47 deletions(-) diff --git a/srt/Packet.hpp b/srt/Packet.hpp index 546ece0c..c2d86a1f 100644 --- a/srt/Packet.hpp +++ b/srt/Packet.hpp @@ -14,6 +14,20 @@ namespace SRT { using namespace toolkit; + +static const size_t HDR_SIZE = 4; // packet header size = SRT_PH_E_SIZE * sizeof(uint32_t) + +// Can also be calculated as: sizeof(struct ether_header) + sizeof(struct ip) + sizeof(struct udphdr). +static const size_t UDP_HDR_SIZE = 28; // 20 bytes IPv4 + 8 bytes of UDP { u16 sport, dport, len, csum }. + +static const size_t SRT_DATA_HDR_SIZE = UDP_HDR_SIZE + HDR_SIZE; + +// Maximum transmission unit size. 1500 in case of Ethernet II (RFC 1191). +static const size_t ETH_MAX_MTU_SIZE = 1500; + +// Maximum payload size of an SRT packet. +static const size_t SRT_MAX_PAYLOAD_SIZE = ETH_MAX_MTU_SIZE - SRT_DATA_HDR_SIZE; + /* 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index eee69d75..56d7f72f 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -22,7 +22,7 @@ SrtTransport::SrtTransport(const EventPoller::Ptr &poller) _start_timestamp = SteadyClock::now(); _socket_id = s_srt_socket_id_generate.fetch_add(1); _pkt_recv_rate_context = std::make_shared(_start_timestamp); - _recv_rate_context = std::make_shared(_start_timestamp); + //_recv_rate_context = std::make_shared(_start_timestamp); _estimated_link_capacity_context = std::make_shared(_start_timestamp); } @@ -105,11 +105,12 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage if(_handleshake_timer){ _handleshake_timer.reset(); } - _pkt_recv_rate_context->inputPacket(_now); + _pkt_recv_rate_context->inputPacket(_now,len-HDR_SIZE); _estimated_link_capacity_context->inputPacket(_now); - _recv_rate_context->inputPacket(_now, len); + //_recv_rate_context->inputPacket(_now, len); handleDataPacket(buf, len, addr); + checkAndSendAckNak(); } else { WarnL<<"DataPacket switch to other transport: "<inputPacket(_now); + + _pkt_recv_rate_context->inputPacket(_now,len); _estimated_link_capacity_context->inputPacket(_now); - _recv_rate_context->inputPacket(_now, len); + //_recv_rate_context->inputPacket(_now, len); auto it = s_control_functions.find(type); if (it == s_control_functions.end()) { @@ -135,6 +137,9 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage } else { (this->*(it->second))(buf, len, addr); } + if(_is_handleshake_finished && isPusher()){ + checkAndSendAckNak(); + } } else { // not reach WarnL << "not reach this"; @@ -195,7 +200,7 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd registerSelfHandshake(); sendControlPacket(res, true); - _handleshake_timer = std::make_shared(0.02,[this]()->bool{ + _handleshake_timer = std::make_shared(0.2,[this]()->bool{ sendControlPacket(_handleshake_res, true); return true; },getPoller()); @@ -389,6 +394,7 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage std::list list; // TraceL<<"drop "<drop(pkt.first_pkt_seq_num, pkt.last_pkt_seq_num, list); + //checkAndSendAckNak(); if (list.empty()) { return; } @@ -413,7 +419,8 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage // TraceL << "check lost send nack"; } */ - +} +void SrtTransport::checkAndSendAckNak(){ auto nak_interval = (_rtt + _rtt_variance * 4) / 2; if (nak_interval <= 20 * 1000) { nak_interval = 20 * 1000; @@ -442,7 +449,6 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage } _light_ack_pkt_count++; } - void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr) { TraceL; } @@ -465,6 +471,8 @@ void SrtTransport::handlePeerError(uint8_t *buf, int len, struct sockaddr_storag } void SrtTransport::sendACKPacket() { + uint32_t recv_rate = 0; + ACKPacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); @@ -473,9 +481,9 @@ void SrtTransport::sendACKPacket() { pkt->rtt = _rtt; pkt->rtt_variance = _rtt_variance; pkt->available_buf_size = _recv_buf->getAvailableBufferSize(); - pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(); + pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(recv_rate); pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity(); - pkt->recv_rate = _recv_rate_context->getRecvRate(); + pkt->recv_rate = recv_rate; pkt->storeToData(); _ack_send_timestamp[pkt->ack_number] = _now; _last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number; @@ -584,7 +592,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora sendNAKPacket(lost); } */ - + /* auto nak_interval = (_rtt + _rtt_variance * 4) / 2; if (nak_interval <= 20 * 1000) { nak_interval = 20 * 1000; @@ -617,6 +625,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora _light_ack_pkt_count = 0; } _light_ack_pkt_count++; + */ // bufCheckInterval(); } diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 5fc7989f..f9817904 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -53,7 +53,9 @@ protected: virtual bool isPusher() { return true; }; virtual void onSRTData(DataPacket::Ptr pkt) {}; virtual void onShutdown(const SockException &ex); - virtual void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) {}; + virtual void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) { + _is_handleshake_finished = true; + }; virtual void sendPacket(Buffer::Ptr pkt, bool flush = true); virtual int getLatencyMul() { return 4; }; virtual int getPktBufSize() { return 8192; }; @@ -91,6 +93,8 @@ private: void createTimerForCheckAlive(); + void checkAndSendAckNak(); + protected: void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false); void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true); @@ -137,7 +141,7 @@ private: std::shared_ptr _pkt_recv_rate_context; std::shared_ptr _estimated_link_capacity_context; - std::shared_ptr _recv_rate_context; + //std::shared_ptr _recv_rate_context; UTicker _nak_ticker; @@ -152,6 +156,8 @@ private: Timer::Ptr _timer; //刷新计时器 Ticker _alive_ticker; + + bool _is_handleshake_finished = false; }; class SrtTransportManager { diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 93333413..c2298c65 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -22,6 +22,7 @@ SrtTransportImp::~SrtTransportImp() { } void SrtTransportImp::onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) { + SrtTransport::onHandShakeFinished(streamid,addr); // TODO parse stream id like this zlmediakit.com/live/test?token=1213444&type=push if (!_addr) { _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); @@ -100,6 +101,7 @@ void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) { } if (_decoder) { _decoder->input(reinterpret_cast(pkt->payloadData()), pkt->payloadSize()); + //TraceL<<" size "<payloadSize(); } else { WarnP(this) << " not reach this"; } diff --git a/srt/Statistic.cpp b/srt/Statistic.cpp index 446fc6fe..7ca8c445 100644 --- a/srt/Statistic.cpp +++ b/srt/Statistic.cpp @@ -4,36 +4,56 @@ namespace SRT { -void PacketRecvRateContext::inputPacket(TimePoint &ts) { - if (_pkt_map.size() > 100) { - _pkt_map.erase(_pkt_map.begin()); +PacketRecvRateContext::PacketRecvRateContext(TimePoint start) + : _last_arrive_time(start) { + for (size_t i = 0; i < SIZE; i++) { + _ts_arr[i] = 1000000; + _size_arr[i] = SRT_MAX_PAYLOAD_SIZE; } - auto tmp = DurationCountMicroseconds(ts - _start); - _pkt_map.emplace(tmp, tmp); + _cur_idx = 0; +}; + +void PacketRecvRateContext::inputPacket(TimePoint &ts,size_t len) { + auto tmp = DurationCountMicroseconds(ts - _last_arrive_time); + _ts_arr[_cur_idx] = tmp; + _size_arr[_cur_idx] = len; + _cur_idx = (1+_cur_idx)%SIZE; + _last_arrive_time = ts; } -uint32_t PacketRecvRateContext::getPacketRecvRate() { - if (_pkt_map.size() < 2) { - return 50000; - } - int64_t dur = 1000; - for (auto it = _pkt_map.begin(); it != _pkt_map.end(); ++it) { - auto next = it; - ++next; - if (next == _pkt_map.end()) { - break; - } +uint32_t PacketRecvRateContext::getPacketRecvRate(uint32_t &bytesps) { + int64_t tmp_arry[SIZE]; + std::copy(_ts_arr, _ts_arr + SIZE, tmp_arry); + std::nth_element(tmp_arry, tmp_arry + (SIZE / 2), tmp_arry + SIZE); + int64_t median = tmp_arry[SIZE / 2]; - if ((next->first - it->first) < dur) { - dur = next->first - it->first; + unsigned count = 0; + int sum = 0; + int64_t upper = median << 3; + int64_t lower = median >> 3; + + bytesps = 0; + size_t bytes = 0; + const size_t *bp = _size_arr; + // median filtering + const int64_t *p = _ts_arr; + for (int i = 0, n = SIZE; i < n; ++i) { + if ((*p < upper) && (*p > lower)) { + ++count; // packet counter + sum += *p; // usec counter + bytes += *bp; // byte counter } + ++p; // advance packet pointer + ++bp; // advance bytes pointer } - double rate = 1e6 / (double)dur; - if (rate <= 1000) { - return 50000; - } - return rate; + // claculate speed, or return 0 if not enough valid value + + bytesps = (unsigned long)ceil(1000000.0 / (double(sum) / double(bytes))); + auto ret = (uint32_t)ceil(1000000.0 / (sum / count)); + if(_cur_idx == 0) + TraceL << bytesps << " byte/sec " << ret << " pkt/sec"; + return ret; } void EstimatedLinkCapacityContext::inputPacket(TimePoint &ts) { @@ -70,12 +90,13 @@ uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() { return (uint32_t)(1.0 / dur); } +/* void RecvRateContext::inputPacket(TimePoint &ts, size_t size) { if (_pkt_map.size() > 100) { _pkt_map.erase(_pkt_map.begin()); } auto tmp = DurationCountMicroseconds(ts - _start); - _pkt_map.emplace(tmp, tmp); + _pkt_map.emplace(tmp, size); } uint32_t RecvRateContext::getRecvRate() { @@ -94,5 +115,5 @@ uint32_t RecvRateContext::getRecvRate() { double rate = (double)bytes / dur; return (uint32_t)rate; } - +*/ } // namespace SRT \ No newline at end of file diff --git a/srt/Statistic.hpp b/srt/Statistic.hpp index d2a5036e..f1d341df 100644 --- a/srt/Statistic.hpp +++ b/srt/Statistic.hpp @@ -6,18 +6,19 @@ #include "Packet.hpp" namespace SRT { - class PacketRecvRateContext { public: - PacketRecvRateContext(TimePoint start) - : _start(start) {}; + PacketRecvRateContext(TimePoint start); ~PacketRecvRateContext() = default; - void inputPacket(TimePoint &ts); - uint32_t getPacketRecvRate(); - + void inputPacket(TimePoint &ts,size_t len = 0); + uint32_t getPacketRecvRate(uint32_t& bytesps); + static const int SIZE = 16; private: - TimePoint _start; - std::map _pkt_map; + TimePoint _last_arrive_time; + int64_t _ts_arr[SIZE]; + size_t _size_arr[SIZE]; + size_t _cur_idx; + //std::map _pkt_map; }; class EstimatedLinkCapacityContext { @@ -32,6 +33,7 @@ private: std::map _pkt_map; }; +/* class RecvRateContext { public: RecvRateContext(TimePoint start) @@ -44,6 +46,6 @@ private: TimePoint _start; std::map _pkt_map; }; - +*/ } // namespace SRT #endif // ZLMEDIAKIT_SRT_STATISTIC_H \ No newline at end of file