mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2024-11-25 20:27:34 +08:00
copy srt estimated link capacity algorithm
This commit is contained in:
parent
ea35002be8
commit
533f35dac4
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#define MAX_SEQ 0x7fffffff
|
#define MAX_SEQ 0x7fffffff
|
||||||
|
#define SEQ_NONE 0xffffffff
|
||||||
#define MAX_TS 0xffffffff
|
#define MAX_TS 0xffffffff
|
||||||
|
|
||||||
namespace SRT {
|
namespace SRT {
|
||||||
@ -35,6 +36,25 @@ static inline uint16_t loadUint16(uint8_t *ptr) {
|
|||||||
return ptr[0] << 8 | ptr[1];
|
return ptr[0] << 8 | ptr[1];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline static int64_t seqCmp(uint32_t seq1, uint32_t seq2) {
|
||||||
|
if(seq1 > seq2){
|
||||||
|
if((seq1 - seq2) >(MAX_SEQ>>1)){
|
||||||
|
return (int64_t)seq1 - (int64_t)(seq2+MAX_SEQ);
|
||||||
|
}else{
|
||||||
|
return (int64_t)seq1 - (int64_t)seq2;
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
if((seq2-seq1) >(MAX_SEQ>>1)){
|
||||||
|
return (int64_t)(seq1+MAX_SEQ) - (int64_t)seq2;
|
||||||
|
}else{
|
||||||
|
return (int64_t)seq1 - (int64_t)seq2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inline static uint32_t incSeq(int32_t seq) {
|
||||||
|
return (seq == MAX_SEQ) ? 0 : seq + 1;
|
||||||
|
}
|
||||||
static inline void storeUint32(uint8_t *buf, uint32_t val) {
|
static inline void storeUint32(uint8_t *buf, uint32_t val) {
|
||||||
buf[0] = val >> 24;
|
buf[0] = val >> 24;
|
||||||
buf[1] = (val >> 16) & 0xff;
|
buf[1] = (val >> 16) & 0xff;
|
||||||
|
@ -106,7 +106,6 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
|
|||||||
_handleshake_timer.reset();
|
_handleshake_timer.reset();
|
||||||
}
|
}
|
||||||
_pkt_recv_rate_context->inputPacket(_now,len-HDR_SIZE);
|
_pkt_recv_rate_context->inputPacket(_now,len-HDR_SIZE);
|
||||||
_estimated_link_capacity_context->inputPacket(_now);
|
|
||||||
//_recv_rate_context->inputPacket(_now, len);
|
//_recv_rate_context->inputPacket(_now, len);
|
||||||
|
|
||||||
handleDataPacket(buf, len, addr);
|
handleDataPacket(buf, len, addr);
|
||||||
@ -126,8 +125,8 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
_pkt_recv_rate_context->inputPacket(_now,len);
|
//_pkt_recv_rate_context->inputPacket(_now,len);
|
||||||
_estimated_link_capacity_context->inputPacket(_now);
|
//_estimated_link_capacity_context->inputPacket(_now);
|
||||||
//_recv_rate_context->inputPacket(_now, len);
|
//_recv_rate_context->inputPacket(_now, len);
|
||||||
|
|
||||||
auto it = s_control_functions.find(type);
|
auto it = s_control_functions.find(type);
|
||||||
@ -179,6 +178,7 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd
|
|||||||
_mtu = pkt.mtu;
|
_mtu = pkt.mtu;
|
||||||
|
|
||||||
_last_pkt_seq = _init_seq_number - 1;
|
_last_pkt_seq = _init_seq_number - 1;
|
||||||
|
_estimated_link_capacity_context->setLastSeq(_last_pkt_seq);
|
||||||
|
|
||||||
_peer_socket_id = pkt.srt_socket_id;
|
_peer_socket_id = pkt.srt_socket_id;
|
||||||
HandshakePacket::Ptr res = std::make_shared<HandshakePacket>();
|
HandshakePacket::Ptr res = std::make_shared<HandshakePacket>();
|
||||||
@ -484,6 +484,7 @@ void SrtTransport::sendACKPacket() {
|
|||||||
pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(recv_rate);
|
pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(recv_rate);
|
||||||
pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity();
|
pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity();
|
||||||
pkt->recv_rate = recv_rate;
|
pkt->recv_rate = recv_rate;
|
||||||
|
TraceL<<pkt->pkt_recv_rate<<" pkt/s "<<recv_rate<<" byte/s "<<pkt->estimated_link_capacity<<" pkt/s (cap)";
|
||||||
pkt->storeToData();
|
pkt->storeToData();
|
||||||
_ack_send_timestamp[pkt->ack_number] = _now;
|
_ack_send_timestamp[pkt->ack_number] = _now;
|
||||||
_last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number;
|
_last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number;
|
||||||
@ -563,6 +564,8 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
|
|||||||
DataPacket::Ptr pkt = std::make_shared<DataPacket>();
|
DataPacket::Ptr pkt = std::make_shared<DataPacket>();
|
||||||
pkt->loadFromData(buf, len);
|
pkt->loadFromData(buf, len);
|
||||||
|
|
||||||
|
_estimated_link_capacity_context->inputPacket(_now,pkt);
|
||||||
|
|
||||||
std::list<DataPacket::Ptr> list;
|
std::list<DataPacket::Ptr> list;
|
||||||
//TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
|
//TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
|
||||||
//" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
|
//" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <math.h>
|
||||||
#include "Statistic.hpp"
|
#include "Statistic.hpp"
|
||||||
|
|
||||||
namespace SRT {
|
namespace SRT {
|
||||||
@ -47,47 +47,121 @@ uint32_t PacketRecvRateContext::getPacketRecvRate(uint32_t &bytesps) {
|
|||||||
++bp; // advance bytes pointer
|
++bp; // advance bytes pointer
|
||||||
}
|
}
|
||||||
|
|
||||||
// claculate speed, or return 0 if not enough valid value
|
if(count>(SIZE>>1)){
|
||||||
|
|
||||||
bytesps = (unsigned long)ceil(1000000.0 / (double(sum) / double(bytes)));
|
bytesps = (unsigned long)ceil(1000000.0 / (double(sum) / double(bytes)));
|
||||||
auto ret = (uint32_t)ceil(1000000.0 / (sum / count));
|
auto ret = (uint32_t)ceil(1000000.0 / (sum / count));
|
||||||
if(_cur_idx == 0)
|
|
||||||
TraceL << bytesps << " byte/sec " << ret << " pkt/sec";
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
bytesps = 0;
|
||||||
|
return 0;
|
||||||
|
// claculate speed, or return 0 if not enough valid value
|
||||||
|
|
||||||
|
|
||||||
void EstimatedLinkCapacityContext::inputPacket(TimePoint &ts) {
|
|
||||||
if (_pkt_map.size() > 16) {
|
|
||||||
_pkt_map.erase(_pkt_map.begin());
|
|
||||||
}
|
}
|
||||||
auto tmp = DurationCountMicroseconds(ts - _start);
|
EstimatedLinkCapacityContext::EstimatedLinkCapacityContext(TimePoint start) : _start(start) {
|
||||||
_pkt_map.emplace(tmp, tmp);
|
for (size_t i = 0; i < SIZE; i++) {
|
||||||
|
_dur_probe_arr[i] = 1000;
|
||||||
|
}
|
||||||
|
_cur_idx = 0;
|
||||||
|
};
|
||||||
|
void EstimatedLinkCapacityContext::inputPacket(TimePoint &ts,DataPacket::Ptr& pkt) {
|
||||||
|
uint32_t seq = pkt->packet_seq_number;
|
||||||
|
auto diff = seqCmp(seq,_last_seq);
|
||||||
|
const bool retransmitted = pkt->R == 1;
|
||||||
|
const bool unordered = diff<=0;
|
||||||
|
uint32_t one = seq&0xf;
|
||||||
|
if(one == 0){
|
||||||
|
probe1Arrival(ts,pkt,unordered || retransmitted);
|
||||||
|
}
|
||||||
|
if(diff>0){
|
||||||
|
_last_seq = seq;
|
||||||
|
}
|
||||||
|
if(unordered || retransmitted){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if(one == 1){
|
||||||
|
probe2Arrival(ts,pkt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record the arrival time of the first probing packet.
|
||||||
|
void EstimatedLinkCapacityContext::probe1Arrival(TimePoint &ts, const DataPacket::Ptr &pkt, bool unordered) {
|
||||||
|
if (unordered && pkt->packet_seq_number == _probe1_seq) {
|
||||||
|
// Reset the starting probe into "undefined", when
|
||||||
|
// a packet has come as retransmitted before the
|
||||||
|
// measurement at arrival of 17th could be taken.
|
||||||
|
_probe1_seq = SEQ_NONE;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_ts_probe_time = ts;
|
||||||
|
_probe1_seq = pkt->packet_seq_number; // Record the sequence where 16th packet probe was taken
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record the arrival time of the second probing packet and the interval between packet pairs.
|
||||||
|
|
||||||
|
void EstimatedLinkCapacityContext::probe2Arrival(TimePoint &ts, const DataPacket::Ptr &pkt) {
|
||||||
|
// Reject probes that don't refer to the very next packet
|
||||||
|
// towards the one that was lately notified by probe1Arrival.
|
||||||
|
// Otherwise the result can be stupid.
|
||||||
|
|
||||||
|
// Simply, in case when this wasn't called exactly for the
|
||||||
|
// expected packet pair, behave as if the 17th packet was lost.
|
||||||
|
|
||||||
|
// no start point yet (or was reset) OR not very next packet
|
||||||
|
if (_probe1_seq == SEQ_NONE || incSeq(_probe1_seq) != pkt->packet_seq_number)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// Reset the starting probe to prevent checking if the
|
||||||
|
// measurement was already taken.
|
||||||
|
_probe1_seq = SEQ_NONE;
|
||||||
|
|
||||||
|
// record the probing packets interval
|
||||||
|
// Adjust the time for what a complete packet would have take
|
||||||
|
const int64_t timediff = DurationCountMicroseconds(ts - _ts_probe_time);
|
||||||
|
const int64_t timediff_times_pl_size = timediff * SRT_MAX_PAYLOAD_SIZE;
|
||||||
|
|
||||||
|
// Let's take it simpler than it is coded here:
|
||||||
|
// (stating that a packet has never zero size)
|
||||||
|
//
|
||||||
|
// probe_case = (now - previous_packet_time) * SRT_MAX_PAYLOAD_SIZE / pktsz;
|
||||||
|
//
|
||||||
|
// Meaning: if the packet is fully packed, probe_case = timediff.
|
||||||
|
// Otherwise the timediff will be "converted" to a time that a fully packed packet "would take",
|
||||||
|
// provided the arrival time is proportional to the payload size and skipping
|
||||||
|
// the ETH+IP+UDP+SRT header part elliminates the constant packet delivery time influence.
|
||||||
|
//
|
||||||
|
const size_t pktsz = pkt->payloadSize();
|
||||||
|
_dur_probe_arr[_cur_idx] = pktsz ? int64_t(timediff_times_pl_size / pktsz) : int64_t(timediff);
|
||||||
|
|
||||||
|
// the window is logically circular
|
||||||
|
_cur_idx = (_cur_idx + 1) % SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() {
|
uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() {
|
||||||
decltype(_pkt_map.begin()) next;
|
int64_t tmp[SIZE];
|
||||||
std::vector<int64_t> tmp;
|
std::copy(_dur_probe_arr, _dur_probe_arr + SIZE , tmp);
|
||||||
|
std::nth_element(tmp, tmp + (SIZE / 2), tmp + SIZE);
|
||||||
|
int64_t median = tmp[SIZE / 2];
|
||||||
|
|
||||||
for (auto it = _pkt_map.begin(); it != _pkt_map.end(); ++it) {
|
int64_t count = 1;
|
||||||
next = it;
|
int64_t sum = median;
|
||||||
++next;
|
int64_t upper = median << 3; // median*8
|
||||||
if (next != _pkt_map.end()) {
|
int64_t lower = median >> 3; // median/8
|
||||||
tmp.push_back(next->first - it->first);
|
|
||||||
} else {
|
// median filtering
|
||||||
break;
|
const int64_t* p = _dur_probe_arr;
|
||||||
|
for (int i = 0, n = SIZE; i < n; ++ i)
|
||||||
|
{
|
||||||
|
if ((*p < upper) && (*p > lower))
|
||||||
|
{
|
||||||
|
++ count;
|
||||||
|
sum += *p;
|
||||||
}
|
}
|
||||||
}
|
++ p;
|
||||||
std::sort(tmp.begin(), tmp.end());
|
|
||||||
if (tmp.empty()) {
|
|
||||||
return 1000;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tmp.size() < 16) {
|
return (uint32_t)ceil(1000000.0 / (double(sum) / double(count)));
|
||||||
return 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
double dur = tmp[0] / 1e6;
|
|
||||||
return (uint32_t)(1.0 / dur);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -23,14 +23,25 @@ private:
|
|||||||
|
|
||||||
class EstimatedLinkCapacityContext {
|
class EstimatedLinkCapacityContext {
|
||||||
public:
|
public:
|
||||||
EstimatedLinkCapacityContext(TimePoint start) : _start(start) {};
|
EstimatedLinkCapacityContext(TimePoint start);
|
||||||
~EstimatedLinkCapacityContext() = default;
|
~EstimatedLinkCapacityContext() = default;
|
||||||
void inputPacket(TimePoint &ts);
|
void setLastSeq(uint32_t seq){
|
||||||
|
_last_seq = seq;
|
||||||
|
}
|
||||||
|
void inputPacket(TimePoint &ts,DataPacket::Ptr& pkt);
|
||||||
uint32_t getEstimatedLinkCapacity();
|
uint32_t getEstimatedLinkCapacity();
|
||||||
|
static const int SIZE = 16;
|
||||||
|
private:
|
||||||
|
void probe1Arrival(TimePoint &ts,const DataPacket::Ptr& pkt, bool unordered);
|
||||||
|
void probe2Arrival(TimePoint &ts,const DataPacket::Ptr& pkt);
|
||||||
private:
|
private:
|
||||||
TimePoint _start;
|
TimePoint _start;
|
||||||
std::map<int64_t, int64_t> _pkt_map;
|
TimePoint _ts_probe_time;
|
||||||
|
int64_t _dur_probe_arr[SIZE];
|
||||||
|
size_t _cur_idx;
|
||||||
|
uint32_t _last_seq = 0;
|
||||||
|
uint32_t _probe1_seq = SEQ_NONE;
|
||||||
|
//std::map<int64_t, int64_t> _pkt_map;
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
Loading…
Reference in New Issue
Block a user