diff --git a/srt/PacketQueue.cpp b/srt/PacketQueue.cpp index 7cf5f73d..e9b51505 100644 --- a/srt/PacketQueue.cpp +++ b/srt/PacketQueue.cpp @@ -1,6 +1,10 @@ #include "PacketQueue.hpp" namespace SRT { + +inline uint32_t genExpectedSeq(uint32_t seq){ + return 0x7fffffff&seq; +} PacketQueue::PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency) : _pkt_expected_seq(init_seq) , _pkt_cap(max_size) @@ -23,27 +27,28 @@ std::list PacketQueue::tryGetPacket() { auto it = _pkt_map.find(_pkt_expected_seq); while ( it != _pkt_map.end()) { re.push_back(it->second); - _last_pop_ts = it->second->get_ts; _pkt_map.erase(it); - _pkt_expected_seq++; + _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq+1); it = _pkt_map.find(_pkt_expected_seq); } while (_pkt_map.size() > _pkt_cap) { - // force pop some packet - it = _pkt_map.begin(); - re.push_back(it->second); - _last_pop_ts = it->second->get_ts; - _pkt_expected_seq = it->second->packet_seq_number + 1; - _pkt_map.erase(it); + // 防止回环 + it = _pkt_map.find(_pkt_expected_seq); + if(it != _pkt_map.end()){ + re.push_back(it->second); + _pkt_map.erase(it); + } + _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); } while (timeLantency() > _pkt_lantency) { - auto it = _pkt_map.begin(); - re.push_back(it->second); - _last_pop_ts = it->second->get_ts; - _pkt_expected_seq = it->second->packet_seq_number + 1; - _pkt_map.erase(it); + it = _pkt_map.find(_pkt_expected_seq); + if(it != _pkt_map.end()){ + re.push_back(it->second); + _pkt_map.erase(it); + } + _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); } return re; @@ -61,7 +66,7 @@ bool PacketQueue::dropForRecv(uint32_t first,uint32_t last){ _pkt_map.erase(i); } } - _pkt_expected_seq = last+1; + _pkt_expected_seq =genExpectedSeq(last+1); return true; } @@ -76,11 +81,10 @@ bool PacketQueue::dropForSend(uint32_t num){ for(uint32_t i =_pkt_expected_seq;i< num;++i){ it = _pkt_map.find(i); if(it != _pkt_map.end()){ - _last_pop_ts = it->second->get_ts; _pkt_map.erase(it); } } - _pkt_expected_seq = num; + _pkt_expected_seq =genExpectedSeq(num); return true; } @@ -91,33 +95,26 @@ DataPacket::Ptr PacketQueue::findPacketBySeq(uint32_t seq){ } return nullptr; } -uint32_t PacketQueue::timeLantencyFrom(TimePoint now){ - return DurationCountMicroseconds(now - _last_pop_ts); -} -std::list PacketQueue::tryGetPacketByNow(TimePoint now){ - std::list re; - auto it = _pkt_map.begin(); - while(it !=_pkt_map.end()){ - if(DurationCountMicroseconds(now-it->second->get_ts)>=_pkt_lantency){ - re.push_back(it->second); - _pkt_expected_seq = it->second->packet_seq_number+1; - _last_pop_ts = it->second->get_ts; - _pkt_map.erase(it); - } - it++; - } - return re; -} uint32_t PacketQueue::timeLantency() { if (_pkt_map.empty()) { return 0; } - auto first = _pkt_map.begin()->second; - auto last = _pkt_map.rbegin()->second; + auto first = _pkt_map.begin()->second->timestamp; + auto last = _pkt_map.rbegin()->second->timestamp; + uint32_t dur; + if(last>first){ + dur = last - first; + }else{ + dur = first - last; + } - return last->timestamp - first->timestamp; + if(dur > 0x80000000){ + dur = 0xffffffff - dur; + } + + return dur; } std::list PacketQueue::getLostSeq() { @@ -136,7 +133,7 @@ std::list PacketQueue::getLostSeq() { uint32_t i = _pkt_expected_seq; bool finish = true; - for(i = _pkt_expected_seq;i<=_pkt_map.rbegin()->first;++i){ + for(i = _pkt_expected_seq;i<=_pkt_map.rbegin()->first;){ if(_pkt_map.find(i) == _pkt_map.end()){ if(finish){ finish = false; @@ -152,6 +149,7 @@ std::list PacketQueue::getLostSeq() { re.push_back(lost); } } + i = genExpectedSeq(i+1); } return re; @@ -175,4 +173,5 @@ size_t PacketQueue::getAvailableBufferSize(){ uint32_t PacketQueue::getExpectedSeq(){ return _pkt_expected_seq; } + } // namespace SRT \ No newline at end of file diff --git a/srt/PacketQueue.hpp b/srt/PacketQueue.hpp index 06036d75..d213f2d5 100644 --- a/srt/PacketQueue.hpp +++ b/srt/PacketQueue.hpp @@ -21,8 +21,6 @@ public: bool inputPacket(DataPacket::Ptr pkt); std::list tryGetPacket(); uint32_t timeLantency(); - uint32_t timeLantencyFrom(TimePoint now); - std::list tryGetPacketByNow(TimePoint now); std::list getLostSeq(); size_t getSize(); @@ -35,8 +33,6 @@ public: bool dropForSend(uint32_t num); DataPacket::Ptr findPacketBySeq(uint32_t seq); - - private: std::map _pkt_map; @@ -44,7 +40,7 @@ private: uint32_t _pkt_cap; uint32_t _pkt_lantency; - TimePoint _last_pop_ts; + }; } diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index 4d5ac60e..4b58b07e 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -210,31 +210,6 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad _last_ack_pkt_seq_num = _init_seq_number; } void SrtTransport::bufCheckInterval(){ - if(isPusher()){ - if(_recv_buf->timeLantencyFrom(_now) > (_buf_delay*1e6)){ - auto list = _recv_buf->tryGetPacketByNow(_now); - for(auto data : list){ - onSRTData(std::move(data)); - } - if(!list.empty()){ - sendACKPacket(); - _light_ack_pkt_count = 0; - _ack_ticker.resetTime(_now); - } - - auto nak_interval = (_rtt+_rtt_variance*4)/2; - if(nak_interval >= 20*1000){ - nak_interval = 20*1000; - } - if(_nak_ticker.elapsedTime(_now)>nak_interval || list.empty()){ - auto lost = _recv_buf->getLostSeq(); - if(!lost.empty()){ - sendNAKPacket(lost); - } - _nak_ticker.resetTime(_now); - } - } - } } void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr){ HandshakePacket pkt; @@ -454,7 +429,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora } _light_ack_pkt_count++; - bufCheckInterval(); + //bufCheckInterval(); } void SrtTransport::sendDataPacket(DataPacket::Ptr pkt,char* buf,int len, bool flush) {