diff --git a/srt/SrtSession.cpp b/srt/SrtSession.cpp index d62cb7cd..b6a8a839 100644 --- a/srt/SrtSession.cpp +++ b/srt/SrtSession.cpp @@ -128,9 +128,9 @@ void SrtSession::onError(const SockException &err) { // 防止互相引用导致不释放 auto transport = std::move(_transport); getPoller()->async( - [transport, err] { + [transport] { //延时减引用,防止使用transport对象时,销毁对象 - transport->onShutdown(err); + //transport->onShutdown(err); }, false); } diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index ba688af1..eee69d75 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -60,7 +60,29 @@ void SrtTransport::switchToOtherTransport(uint8_t *buf, int len, uint32_t socket } } +void SrtTransport::createTimerForCheckAlive(){ + std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); + auto timeoutSec = getTimeOutSec(); + _timer = std::make_shared( + timeoutSec/ 2, + [weak_self,timeoutSec]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) { + strong_self->onShutdown(SockException(Err_timeout, "接收srt数据超时")); + } + return true; + }, + getPoller()); +} + void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) { + _alive_ticker.resetTime(); + if(!_timer){ + createTimerForCheckAlive(); + } using srt_control_handler = void (SrtTransport::*)(uint8_t * buf, int len, struct sockaddr_storage *addr); static std::unordered_map s_control_functions; static onceToken token([]() { @@ -173,7 +195,6 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd registerSelfHandshake(); sendControlPacket(res, true); - _handleshake_timer = std::make_shared(0.02,[this]()->bool{ sendControlPacket(_handleshake_res, true); return true; diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 9d0c186c..5fc7989f 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -57,6 +57,7 @@ protected: virtual void sendPacket(Buffer::Ptr pkt, bool flush = true); virtual int getLatencyMul() { return 4; }; virtual int getPktBufSize() { return 8192; }; + virtual float getTimeOutSec(){return 5.0;}; private: void registerSelf(); @@ -88,6 +89,8 @@ private: size_t getPayloadSize(); + void createTimerForCheckAlive(); + protected: void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false); void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true); @@ -144,6 +147,11 @@ private: Timer::Ptr _handleshake_timer; ResourcePool _packet_pool; + + //检测超时的定时器 + Timer::Ptr _timer; + //刷新计时器 + Ticker _alive_ticker; }; class SrtTransportManager { diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 8464fb5e..93333413 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -340,6 +340,15 @@ int SrtTransportImp::getLatencyMul() { return latencyMul; } +float SrtTransportImp::getTimeOutSec() { + GET_CONFIG(float, timeOutSec, kTimeOutSec); + if (timeOutSec <= 0) { + WarnL << "config srt " << kTimeOutSec << " not vaild"; + return 5.0; + } + return timeOutSec; +} + int SrtTransportImp::getPktBufSize() { // kPktBufSize GET_CONFIG(int, pktBufSize, kPktBufSize); diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index e33156ca..71478605 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -37,6 +37,7 @@ protected: ///////SrtTransport override/////// int getLatencyMul() override; int getPktBufSize() override; + float getTimeOutSec() override; void onSRTData(DataPacket::Ptr pkt) override; void onShutdown(const SockException &ex) override; void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) override;