diff --git a/srt/Packet.hpp b/srt/Packet.hpp index 2118113d..bab6df0c 100644 --- a/srt/Packet.hpp +++ b/srt/Packet.hpp @@ -62,6 +62,8 @@ public: uint32_t timestamp; uint32_t dst_socket_id; + TimePoint get_ts; // recv or send time + private: BufferRaw::Ptr _data; }; diff --git a/srt/PacketQueue.cpp b/srt/PacketQueue.cpp index 20181262..7cf5f73d 100644 --- a/srt/PacketQueue.cpp +++ b/srt/PacketQueue.cpp @@ -20,16 +20,20 @@ bool PacketQueue::inputPacket(DataPacket::Ptr pkt) { std::list PacketQueue::tryGetPacket() { std::list re; - while (_pkt_map.find(_pkt_expected_seq) != _pkt_map.end()) { - re.push_back(_pkt_map[_pkt_expected_seq]); - _pkt_map.erase(_pkt_expected_seq); + auto it = _pkt_map.find(_pkt_expected_seq); + while ( it != _pkt_map.end()) { + re.push_back(it->second); + _last_pop_ts = it->second->get_ts; + _pkt_map.erase(it); _pkt_expected_seq++; + it = _pkt_map.find(_pkt_expected_seq); } while (_pkt_map.size() > _pkt_cap) { // force pop some packet - auto it = _pkt_map.begin(); + it = _pkt_map.begin(); re.push_back(it->second); + _last_pop_ts = it->second->get_ts; _pkt_expected_seq = it->second->packet_seq_number + 1; _pkt_map.erase(it); } @@ -37,11 +41,12 @@ std::list PacketQueue::tryGetPacket() { while (timeLantency() > _pkt_lantency) { auto it = _pkt_map.begin(); re.push_back(it->second); + _last_pop_ts = it->second->get_ts; _pkt_expected_seq = it->second->packet_seq_number + 1; _pkt_map.erase(it); } - return std::move(re); + return re; } @@ -67,10 +72,12 @@ bool PacketQueue::dropForSend(uint32_t num){ if(num <= _pkt_expected_seq){ return false; } - + decltype(_pkt_map.end()) it; for(uint32_t i =_pkt_expected_seq;i< num;++i){ - if(_pkt_map.find(i) != _pkt_map.end()){ - _pkt_map.erase(i); + it = _pkt_map.find(i); + if(it != _pkt_map.end()){ + _last_pop_ts = it->second->get_ts; + _pkt_map.erase(it); } } _pkt_expected_seq = num; @@ -84,6 +91,24 @@ DataPacket::Ptr PacketQueue::findPacketBySeq(uint32_t seq){ } return nullptr; } +uint32_t PacketQueue::timeLantencyFrom(TimePoint now){ + return DurationCountMicroseconds(now - _last_pop_ts); +} + +std::list PacketQueue::tryGetPacketByNow(TimePoint now){ + std::list re; + auto it = _pkt_map.begin(); + while(it !=_pkt_map.end()){ + if(DurationCountMicroseconds(now-it->second->get_ts)>=_pkt_lantency){ + re.push_back(it->second); + _pkt_expected_seq = it->second->packet_seq_number+1; + _last_pop_ts = it->second->get_ts; + _pkt_map.erase(it); + } + it++; + } + return re; +} uint32_t PacketQueue::timeLantency() { if (_pkt_map.empty()) { return 0; diff --git a/srt/PacketQueue.hpp b/srt/PacketQueue.hpp index 2ba8467b..06036d75 100644 --- a/srt/PacketQueue.hpp +++ b/srt/PacketQueue.hpp @@ -21,6 +21,8 @@ public: bool inputPacket(DataPacket::Ptr pkt); std::list tryGetPacket(); uint32_t timeLantency(); + uint32_t timeLantencyFrom(TimePoint now); + std::list tryGetPacketByNow(TimePoint now); std::list getLostSeq(); size_t getSize(); @@ -41,6 +43,8 @@ private: uint32_t _pkt_expected_seq = 0; uint32_t _pkt_cap; uint32_t _pkt_lantency; + + TimePoint _last_pop_ts; }; } diff --git a/srt/SrtSession.cpp b/srt/SrtSession.cpp index 978e28da..fe8cfc13 100644 --- a/srt/SrtSession.cpp +++ b/srt/SrtSession.cpp @@ -109,7 +109,7 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) { if(_transport){ _transport->inputSockData(data,size,&_peer_addr); }else{ - WarnL<< "ingore data"; + //WarnL<< "ingore data"; } } diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index 619c6377..4d5ac60e 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -201,6 +201,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad _recv_buf = std::make_shared(res->max_flow_window_size,_init_seq_number, delay*1e6); _send_buf = std::make_shared(res->max_flow_window_size,_init_seq_number, delay*1e6); _send_packet_seq_number = _init_seq_number; + _buf_delay = delay; onHandShakeFinished(_stream_id,addr); } else { TraceL << getIdentifier() << " CONCLUSION handle repeate "; @@ -208,6 +209,33 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad } _last_ack_pkt_seq_num = _init_seq_number; } +void SrtTransport::bufCheckInterval(){ + if(isPusher()){ + if(_recv_buf->timeLantencyFrom(_now) > (_buf_delay*1e6)){ + auto list = _recv_buf->tryGetPacketByNow(_now); + for(auto data : list){ + onSRTData(std::move(data)); + } + if(!list.empty()){ + sendACKPacket(); + _light_ack_pkt_count = 0; + _ack_ticker.resetTime(_now); + } + + auto nak_interval = (_rtt+_rtt_variance*4)/2; + if(nak_interval >= 20*1000){ + nak_interval = 20*1000; + } + if(_nak_ticker.elapsedTime(_now)>nak_interval || list.empty()){ + auto lost = _recv_buf->getLostSeq(); + if(!lost.empty()){ + sendNAKPacket(lost); + } + _nak_ticker.resetTime(_now); + } + } + } +} void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr){ HandshakePacket pkt; assert(pkt.loadFromData(buf,len)); @@ -223,7 +251,7 @@ void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storag _nak_ticker.resetTime(_now); } void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr){ - TraceL; + //TraceL; sendKeepLivePacket(); } @@ -260,7 +288,7 @@ void SrtTransport::sendMsgDropReq(uint32_t first ,uint32_t last){ sendControlPacket(pkt,true); } void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){ - TraceL; + //TraceL; NAKPacket pkt; pkt.loadFromData(buf,len); bool empty = false; @@ -291,7 +319,7 @@ void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr){ MsgDropReqPacket pkt; pkt.loadFromData(buf,len); - TraceL<<"drop "<dropForRecv(pkt.first_pkt_seq_num,pkt.last_pkt_seq_num); } void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr){ @@ -315,10 +343,6 @@ void SrtTransport::handlePeerError(uint8_t *buf, int len, struct sockaddr_storag } void SrtTransport::sendACKPacket() { - if(_last_ack_pkt_seq_num == _recv_buf->getExpectedSeq()){ - return; - } - ACKPacket::Ptr pkt=std::make_shared(); pkt->dst_socket_id = _peer_socket_id; pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); @@ -337,9 +361,6 @@ void SrtTransport::sendACKPacket() { //TraceL<<"send ack "<dump(); } void SrtTransport::sendLightACKPacket() { - if(_last_ack_pkt_seq_num == _recv_buf->getExpectedSeq()){ - return; - } ACKPacket::Ptr pkt=std::make_shared(); pkt->dst_socket_id = _peer_socket_id; @@ -367,7 +388,7 @@ void SrtTransport::sendNAKPacket(std::list& lost_list){ pkt->storeToData(); - TraceL<<"send NAK "<dump(); + //TraceL<<"send NAK "<dump(); sendControlPacket(pkt,true); } @@ -381,6 +402,8 @@ void SrtTransport::sendShutDown(){ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr){ DataPacket::Ptr pkt = std::make_shared(); pkt->loadFromData(buf,len); + + pkt->get_ts = _now; //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<timestamp<<" size="<payloadSize()<<\ //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; @@ -399,11 +422,14 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora auto list = _recv_buf->tryGetPacket(); for(auto data : list){ - onSRTData(std::move(data),addr); + onSRTData(std::move(data)); } auto nak_interval = (_rtt+_rtt_variance*4)/2; - if(_nak_ticker.elapsedTime(_now)>20*1000 && _nak_ticker.elapsedTime(_now)>nak_interval){ + if(nak_interval >= 20*1000){ + nak_interval = 20*1000; + } + if(_nak_ticker.elapsedTime(_now)>nak_interval || list.empty()){ auto lost = _recv_buf->getLostSeq(); if(!lost.empty()){ sendNAKPacket(lost); @@ -427,6 +453,8 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora _light_ack_pkt_count = 0; } _light_ack_pkt_count++; + + bufCheckInterval(); } void SrtTransport::sendDataPacket(DataPacket::Ptr pkt,char* buf,int len, bool flush) { diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 0cf93b35..af19060e 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -8,6 +8,7 @@ #include "Network/Session.h" #include "Poller/EventPoller.h" +#include "Poller/Timer.h" #include "Common.hpp" #include "Packet.hpp" @@ -45,8 +46,11 @@ public: void unregisterSelf(); protected: virtual void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr){}; - virtual void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr){}; + virtual void onSRTData(DataPacket::Ptr pkt){}; virtual void onShutdown(const SockException &ex); + virtual bool isPusher(){ + return true; + }; private: void registerSelfHandshake(); @@ -76,6 +80,8 @@ private: void sendShutDown(); void sendMsgDropReq(uint32_t first ,uint32_t last); + void bufCheckInterval(); + size_t getPayloadSize(); protected: void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); @@ -105,6 +111,7 @@ private: uint32_t _send_msg_number = 1; PacketQueue::Ptr _send_buf; + uint32_t _buf_delay = 120; PacketQueue::Ptr _recv_buf; uint32_t _rtt = 100*1000; uint32_t _rtt_variance =50*1000; diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 5bae174a..b12e6e7c 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -48,13 +48,10 @@ void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_ emitOnPlay(); } } -void SrtTransportImp::onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) { +void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) { if(!_is_pusher){ WarnP(this)<<"this is a player data ignore"; return; - } - if(!_addr){ - _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); } if (_decoder) { _decoder->input(reinterpret_cast(pkt->payloadData()), pkt->payloadSize()); diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index 8abe2b40..36a07e01 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -36,7 +36,7 @@ public: protected: ///////SrtTransport override/////// void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr) override; - void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) override; + void onSRTData(DataPacket::Ptr pkt) override; void onShutdown(const SockException &ex) override; void sendPacket(Buffer::Ptr pkt,bool flush = true) override{ @@ -44,6 +44,10 @@ protected: SrtTransport::sendPacket(pkt,flush); }; + bool isPusher() override{ + return _is_pusher; + } + ///////MediaSourceEvent override/////// // 关闭 bool close(mediakit::MediaSource &sender, bool force) override;