srt handle packet send repeate by timer

This commit is contained in:
xiongguangjie 2022-08-27 15:06:03 +08:00
parent 602c8e068b
commit f9f6fd136a
2 changed files with 37 additions and 41 deletions

View File

@ -80,6 +80,9 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
if (DataPacket::isDataPacket(buf, len)) { if (DataPacket::isDataPacket(buf, len)) {
uint32_t socketId = DataPacket::getSocketID(buf, len); uint32_t socketId = DataPacket::getSocketID(buf, len);
if (socketId == _socket_id) { if (socketId == _socket_id) {
if(_handleshake_timer){
_handleshake_timer.reset();
}
_pkt_recv_rate_context->inputPacket(_now); _pkt_recv_rate_context->inputPacket(_now);
_estimated_link_capacity_context->inputPacket(_now); _estimated_link_capacity_context->inputPacket(_now);
_recv_rate_context->inputPacket(_now, len); _recv_rate_context->inputPacket(_now, len);
@ -116,39 +119,27 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
} }
} }
} }
bool SrtTransport::isSameCon(HandshakePacket &pkt) {
if (_handleshake_res) {
if (_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_INDUCTION) {
if (pkt.srt_socket_id == _handleshake_res->dst_socket_id
&& pkt.initial_packet_sequence_number == _init_seq_number) {
return true;
}
// TraceL << getIdentifier() << " new client from same udp connection";
return false;
} else if (_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION) {
if (pkt.srt_socket_id == _handleshake_res->dst_socket_id && _sync_cookie == pkt.syn_cookie) {
return true;
}
// TraceL << getIdentifier() << " new client from new same udp connection ";
return false;
} else {
WarnL << "not reach this";
}
return false;
}
return true;
}
void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockaddr_storage *addr) { void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockaddr_storage *addr) {
// Induction Phase // Induction Phase
if (_handleshake_res) { if (_handleshake_res) {
if(isSameCon(pkt)){ if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_INDUCTION){
TraceL << getIdentifier() <<" Induction repeate "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr); if(pkt.srt_socket_id == _handleshake_res->dst_socket_id){
for(int i=0;i<3;++i){ TraceL << getIdentifier() <<" Induction repeate "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
sendControlPacket(_handleshake_res, true); sendControlPacket(_handleshake_res, true);
}else{
TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
onShutdown(SockException(Err_other, "client new connection"));
} }
return;
}else if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION){
if(_handleshake_res->dst_socket_id != pkt.srt_socket_id){
TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
onShutdown(SockException(Err_other, "client new connection"));
}
return;
}else{ }else{
TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr); WarnL<<"not reach this";
onShutdown(SockException(Err_other, "client new connection"));
} }
return; return;
}else{ }else{
@ -181,9 +172,12 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd
res->storeToData(); res->storeToData();
registerSelfHandshake(); registerSelfHandshake();
for(int i=0;i<3;++i){ sendControlPacket(res, true);
sendControlPacket(res, true);
} _handleshake_timer = std::make_shared<Timer>(0.02,[this]()->bool{
sendControlPacket(_handleshake_res, true);
return true;
},getPoller());
} }
void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr) { void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr) {
@ -220,7 +214,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
srt_flag = req->srt_flag; srt_flag = req->srt_flag;
delay = delay <= req->recv_tsbpd_delay ? req->recv_tsbpd_delay : delay; delay = delay <= req->recv_tsbpd_delay ? req->recv_tsbpd_delay : delay;
} }
TraceL << getIdentifier() << " CONCLUSION Phase "; TraceL << getIdentifier() << " CONCLUSION Phase from"<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);;
HandshakePacket::Ptr res = std::make_shared<HandshakePacket>(); HandshakePacket::Ptr res = std::make_shared<HandshakePacket>();
res->dst_socket_id = _peer_socket_id; res->dst_socket_id = _peer_socket_id;
res->timestamp = DurationCountMicroseconds(_now - _start_timestamp); res->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
@ -244,9 +238,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
_handleshake_res = res; _handleshake_res = res;
unregisterSelfHandshake(); unregisterSelfHandshake();
registerSelf(); registerSelf();
for(int i=0;i<3;++i){ sendControlPacket(res, true);
sendControlPacket(res, true);
}
TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number
<< " latency=" << delay; << " latency=" << delay;
_recv_buf = std::make_shared<PacketRecvQueue>(getPktBufSize(), _init_seq_number, delay * 1e3,srt_flag); _recv_buf = std::make_shared<PacketRecvQueue>(getPktBufSize(), _init_seq_number, delay * 1e3,srt_flag);
@ -255,15 +247,19 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
_buf_delay = delay; _buf_delay = delay;
onHandShakeFinished(_stream_id, addr); onHandShakeFinished(_stream_id, addr);
} else { } else {
if(isSameCon(pkt)){ if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION){
TraceL << getIdentifier() <<" CONCLUSION repeate "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr); if(_handleshake_res->dst_socket_id != pkt.srt_socket_id){
for(int i=0;i<3;++i){ TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
onShutdown(SockException(Err_other, "client new connection"));
}else{
TraceL << getIdentifier() <<" CONCLUSION repeate "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
sendControlPacket(_handleshake_res, true); sendControlPacket(_handleshake_res, true);
} }
}else{ }else{
TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr); WarnL<<"not reach this";
onShutdown(SockException(Err_other, "client new connection"));
} }
return;
} }
_last_ack_pkt_seq_num = _init_seq_number; _last_ack_pkt_seq_num = _init_seq_number;

View File

@ -88,8 +88,6 @@ private:
size_t getPayloadSize(); size_t getPayloadSize();
bool isSameCon(HandshakePacket &pkt);
protected: protected:
void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false); void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false);
void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true); void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true);
@ -143,6 +141,8 @@ private:
// 保持发送的握手消息,防止丢失重发 // 保持发送的握手消息,防止丢失重发
HandshakePacket::Ptr _handleshake_res; HandshakePacket::Ptr _handleshake_res;
Timer::Ptr _handleshake_timer;
ResourcePool<BufferRaw> _packet_pool; ResourcePool<BufferRaw> _packet_pool;
}; };