for some optimize and anti pkt lost and force pop packet when too late

This commit is contained in:
xiongguangjie 2022-06-04 20:51:49 +08:00
parent 89b135400c
commit 71ce31d5c0
8 changed files with 95 additions and 28 deletions

View File

@ -62,6 +62,8 @@ public:
uint32_t timestamp; uint32_t timestamp;
uint32_t dst_socket_id; uint32_t dst_socket_id;
TimePoint get_ts; // recv or send time
private: private:
BufferRaw::Ptr _data; BufferRaw::Ptr _data;
}; };

View File

@ -20,16 +20,20 @@ bool PacketQueue::inputPacket(DataPacket::Ptr pkt) {
std::list<DataPacket::Ptr> PacketQueue::tryGetPacket() { std::list<DataPacket::Ptr> PacketQueue::tryGetPacket() {
std::list<DataPacket::Ptr> re; std::list<DataPacket::Ptr> re;
while (_pkt_map.find(_pkt_expected_seq) != _pkt_map.end()) { auto it = _pkt_map.find(_pkt_expected_seq);
re.push_back(_pkt_map[_pkt_expected_seq]); while ( it != _pkt_map.end()) {
_pkt_map.erase(_pkt_expected_seq); re.push_back(it->second);
_last_pop_ts = it->second->get_ts;
_pkt_map.erase(it);
_pkt_expected_seq++; _pkt_expected_seq++;
it = _pkt_map.find(_pkt_expected_seq);
} }
while (_pkt_map.size() > _pkt_cap) { while (_pkt_map.size() > _pkt_cap) {
// force pop some packet // force pop some packet
auto it = _pkt_map.begin(); it = _pkt_map.begin();
re.push_back(it->second); re.push_back(it->second);
_last_pop_ts = it->second->get_ts;
_pkt_expected_seq = it->second->packet_seq_number + 1; _pkt_expected_seq = it->second->packet_seq_number + 1;
_pkt_map.erase(it); _pkt_map.erase(it);
} }
@ -37,11 +41,12 @@ std::list<DataPacket::Ptr> PacketQueue::tryGetPacket() {
while (timeLantency() > _pkt_lantency) { while (timeLantency() > _pkt_lantency) {
auto it = _pkt_map.begin(); auto it = _pkt_map.begin();
re.push_back(it->second); re.push_back(it->second);
_last_pop_ts = it->second->get_ts;
_pkt_expected_seq = it->second->packet_seq_number + 1; _pkt_expected_seq = it->second->packet_seq_number + 1;
_pkt_map.erase(it); _pkt_map.erase(it);
} }
return std::move(re); return re;
} }
@ -67,10 +72,12 @@ bool PacketQueue::dropForSend(uint32_t num){
if(num <= _pkt_expected_seq){ if(num <= _pkt_expected_seq){
return false; return false;
} }
decltype(_pkt_map.end()) it;
for(uint32_t i =_pkt_expected_seq;i< num;++i){ for(uint32_t i =_pkt_expected_seq;i< num;++i){
if(_pkt_map.find(i) != _pkt_map.end()){ it = _pkt_map.find(i);
_pkt_map.erase(i); if(it != _pkt_map.end()){
_last_pop_ts = it->second->get_ts;
_pkt_map.erase(it);
} }
} }
_pkt_expected_seq = num; _pkt_expected_seq = num;
@ -84,6 +91,24 @@ DataPacket::Ptr PacketQueue::findPacketBySeq(uint32_t seq){
} }
return nullptr; return nullptr;
} }
uint32_t PacketQueue::timeLantencyFrom(TimePoint now){
return DurationCountMicroseconds(now - _last_pop_ts);
}
std::list<DataPacket::Ptr> PacketQueue::tryGetPacketByNow(TimePoint now){
std::list<DataPacket::Ptr> re;
auto it = _pkt_map.begin();
while(it !=_pkt_map.end()){
if(DurationCountMicroseconds(now-it->second->get_ts)>=_pkt_lantency){
re.push_back(it->second);
_pkt_expected_seq = it->second->packet_seq_number+1;
_last_pop_ts = it->second->get_ts;
_pkt_map.erase(it);
}
it++;
}
return re;
}
uint32_t PacketQueue::timeLantency() { uint32_t PacketQueue::timeLantency() {
if (_pkt_map.empty()) { if (_pkt_map.empty()) {
return 0; return 0;

View File

@ -21,6 +21,8 @@ public:
bool inputPacket(DataPacket::Ptr pkt); bool inputPacket(DataPacket::Ptr pkt);
std::list<DataPacket::Ptr> tryGetPacket(); std::list<DataPacket::Ptr> tryGetPacket();
uint32_t timeLantency(); uint32_t timeLantency();
uint32_t timeLantencyFrom(TimePoint now);
std::list<DataPacket::Ptr> tryGetPacketByNow(TimePoint now);
std::list<LostPair> getLostSeq(); std::list<LostPair> getLostSeq();
size_t getSize(); size_t getSize();
@ -41,6 +43,8 @@ private:
uint32_t _pkt_expected_seq = 0; uint32_t _pkt_expected_seq = 0;
uint32_t _pkt_cap; uint32_t _pkt_cap;
uint32_t _pkt_lantency; uint32_t _pkt_lantency;
TimePoint _last_pop_ts;
}; };
} }

View File

@ -109,7 +109,7 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) {
if(_transport){ if(_transport){
_transport->inputSockData(data,size,&_peer_addr); _transport->inputSockData(data,size,&_peer_addr);
}else{ }else{
WarnL<< "ingore data"; //WarnL<< "ingore data";
} }
} }

View File

@ -201,6 +201,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
_recv_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, delay*1e6); _recv_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, delay*1e6);
_send_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, delay*1e6); _send_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, delay*1e6);
_send_packet_seq_number = _init_seq_number; _send_packet_seq_number = _init_seq_number;
_buf_delay = delay;
onHandShakeFinished(_stream_id,addr); onHandShakeFinished(_stream_id,addr);
} else { } else {
TraceL << getIdentifier() << " CONCLUSION handle repeate "; TraceL << getIdentifier() << " CONCLUSION handle repeate ";
@ -208,6 +209,33 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
} }
_last_ack_pkt_seq_num = _init_seq_number; _last_ack_pkt_seq_num = _init_seq_number;
} }
void SrtTransport::bufCheckInterval(){
if(isPusher()){
if(_recv_buf->timeLantencyFrom(_now) > (_buf_delay*1e6)){
auto list = _recv_buf->tryGetPacketByNow(_now);
for(auto data : list){
onSRTData(std::move(data));
}
if(!list.empty()){
sendACKPacket();
_light_ack_pkt_count = 0;
_ack_ticker.resetTime(_now);
}
auto nak_interval = (_rtt+_rtt_variance*4)/2;
if(nak_interval >= 20*1000){
nak_interval = 20*1000;
}
if(_nak_ticker.elapsedTime(_now)>nak_interval || list.empty()){
auto lost = _recv_buf->getLostSeq();
if(!lost.empty()){
sendNAKPacket(lost);
}
_nak_ticker.resetTime(_now);
}
}
}
}
void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr){ void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr){
HandshakePacket pkt; HandshakePacket pkt;
assert(pkt.loadFromData(buf,len)); assert(pkt.loadFromData(buf,len));
@ -223,7 +251,7 @@ void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storag
_nak_ticker.resetTime(_now); _nak_ticker.resetTime(_now);
} }
void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr){ void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr){
TraceL; //TraceL;
sendKeepLivePacket(); sendKeepLivePacket();
} }
@ -260,7 +288,7 @@ void SrtTransport::sendMsgDropReq(uint32_t first ,uint32_t last){
sendControlPacket(pkt,true); sendControlPacket(pkt,true);
} }
void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){ void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){
TraceL; //TraceL;
NAKPacket pkt; NAKPacket pkt;
pkt.loadFromData(buf,len); pkt.loadFromData(buf,len);
bool empty = false; bool empty = false;
@ -291,7 +319,7 @@ void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage
void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr){ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr){
MsgDropReqPacket pkt; MsgDropReqPacket pkt;
pkt.loadFromData(buf,len); pkt.loadFromData(buf,len);
TraceL<<"drop "<<pkt.first_pkt_seq_num<<" last "<<pkt.last_pkt_seq_num; //TraceL<<"drop "<<pkt.first_pkt_seq_num<<" last "<<pkt.last_pkt_seq_num;
_recv_buf->dropForRecv(pkt.first_pkt_seq_num,pkt.last_pkt_seq_num); _recv_buf->dropForRecv(pkt.first_pkt_seq_num,pkt.last_pkt_seq_num);
} }
void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr){ void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr){
@ -315,10 +343,6 @@ void SrtTransport::handlePeerError(uint8_t *buf, int len, struct sockaddr_storag
} }
void SrtTransport::sendACKPacket() { void SrtTransport::sendACKPacket() {
if(_last_ack_pkt_seq_num == _recv_buf->getExpectedSeq()){
return;
}
ACKPacket::Ptr pkt=std::make_shared<ACKPacket>(); ACKPacket::Ptr pkt=std::make_shared<ACKPacket>();
pkt->dst_socket_id = _peer_socket_id; pkt->dst_socket_id = _peer_socket_id;
pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
@ -337,9 +361,6 @@ void SrtTransport::sendACKPacket() {
//TraceL<<"send ack "<<pkt->dump(); //TraceL<<"send ack "<<pkt->dump();
} }
void SrtTransport::sendLightACKPacket() { void SrtTransport::sendLightACKPacket() {
if(_last_ack_pkt_seq_num == _recv_buf->getExpectedSeq()){
return;
}
ACKPacket::Ptr pkt=std::make_shared<ACKPacket>(); ACKPacket::Ptr pkt=std::make_shared<ACKPacket>();
pkt->dst_socket_id = _peer_socket_id; pkt->dst_socket_id = _peer_socket_id;
@ -367,7 +388,7 @@ void SrtTransport::sendNAKPacket(std::list<PacketQueue::LostPair>& lost_list){
pkt->storeToData(); pkt->storeToData();
TraceL<<"send NAK "<<pkt->dump(); //TraceL<<"send NAK "<<pkt->dump();
sendControlPacket(pkt,true); sendControlPacket(pkt,true);
} }
@ -382,6 +403,8 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
DataPacket::Ptr pkt = std::make_shared<DataPacket>(); DataPacket::Ptr pkt = std::make_shared<DataPacket>();
pkt->loadFromData(buf,len); pkt->loadFromData(buf,len);
pkt->get_ts = _now;
//TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\ //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
//" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
#if 1 #if 1
@ -399,11 +422,14 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
auto list = _recv_buf->tryGetPacket(); auto list = _recv_buf->tryGetPacket();
for(auto data : list){ for(auto data : list){
onSRTData(std::move(data),addr); onSRTData(std::move(data));
} }
auto nak_interval = (_rtt+_rtt_variance*4)/2; auto nak_interval = (_rtt+_rtt_variance*4)/2;
if(_nak_ticker.elapsedTime(_now)>20*1000 && _nak_ticker.elapsedTime(_now)>nak_interval){ if(nak_interval >= 20*1000){
nak_interval = 20*1000;
}
if(_nak_ticker.elapsedTime(_now)>nak_interval || list.empty()){
auto lost = _recv_buf->getLostSeq(); auto lost = _recv_buf->getLostSeq();
if(!lost.empty()){ if(!lost.empty()){
sendNAKPacket(lost); sendNAKPacket(lost);
@ -427,6 +453,8 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
_light_ack_pkt_count = 0; _light_ack_pkt_count = 0;
} }
_light_ack_pkt_count++; _light_ack_pkt_count++;
bufCheckInterval();
} }
void SrtTransport::sendDataPacket(DataPacket::Ptr pkt,char* buf,int len, bool flush) { void SrtTransport::sendDataPacket(DataPacket::Ptr pkt,char* buf,int len, bool flush) {

View File

@ -8,6 +8,7 @@
#include "Network/Session.h" #include "Network/Session.h"
#include "Poller/EventPoller.h" #include "Poller/EventPoller.h"
#include "Poller/Timer.h"
#include "Common.hpp" #include "Common.hpp"
#include "Packet.hpp" #include "Packet.hpp"
@ -45,8 +46,11 @@ public:
void unregisterSelf(); void unregisterSelf();
protected: protected:
virtual void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr){}; virtual void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr){};
virtual void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr){}; virtual void onSRTData(DataPacket::Ptr pkt){};
virtual void onShutdown(const SockException &ex); virtual void onShutdown(const SockException &ex);
virtual bool isPusher(){
return true;
};
private: private:
void registerSelfHandshake(); void registerSelfHandshake();
@ -76,6 +80,8 @@ private:
void sendShutDown(); void sendShutDown();
void sendMsgDropReq(uint32_t first ,uint32_t last); void sendMsgDropReq(uint32_t first ,uint32_t last);
void bufCheckInterval();
size_t getPayloadSize(); size_t getPayloadSize();
protected: protected:
void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false);
@ -105,6 +111,7 @@ private:
uint32_t _send_msg_number = 1; uint32_t _send_msg_number = 1;
PacketQueue::Ptr _send_buf; PacketQueue::Ptr _send_buf;
uint32_t _buf_delay = 120;
PacketQueue::Ptr _recv_buf; PacketQueue::Ptr _recv_buf;
uint32_t _rtt = 100*1000; uint32_t _rtt = 100*1000;
uint32_t _rtt_variance =50*1000; uint32_t _rtt_variance =50*1000;

View File

@ -48,13 +48,10 @@ void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_
emitOnPlay(); emitOnPlay();
} }
} }
void SrtTransportImp::onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) { void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) {
if(!_is_pusher){ if(!_is_pusher){
WarnP(this)<<"this is a player data ignore"; WarnP(this)<<"this is a player data ignore";
return; return;
}
if(!_addr){
_addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr)));
} }
if (_decoder) { if (_decoder) {
_decoder->input(reinterpret_cast<const uint8_t *>(pkt->payloadData()), pkt->payloadSize()); _decoder->input(reinterpret_cast<const uint8_t *>(pkt->payloadData()), pkt->payloadSize());

View File

@ -36,7 +36,7 @@ public:
protected: protected:
///////SrtTransport override/////// ///////SrtTransport override///////
void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr) override; void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr) override;
void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) override; void onSRTData(DataPacket::Ptr pkt) override;
void onShutdown(const SockException &ex) override; void onShutdown(const SockException &ex) override;
void sendPacket(Buffer::Ptr pkt,bool flush = true) override{ void sendPacket(Buffer::Ptr pkt,bool flush = true) override{
@ -44,6 +44,10 @@ protected:
SrtTransport::sendPacket(pkt,flush); SrtTransport::sendPacket(pkt,flush);
}; };
bool isPusher() override{
return _is_pusher;
}
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
bool close(mediakit::MediaSource &sender, bool force) override; bool close(mediakit::MediaSource &sender, bool force) override;