From c282775205c856f10d8b9b9091db3a7eefec1630 Mon Sep 17 00:00:00 2001 From: Xiaofeng Wang Date: Mon, 19 Sep 2022 21:00:48 +0800 Subject: [PATCH 1/4] cmake: fix linking asan --- CMakeLists.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index b2fd2e04..94c9042c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -308,6 +308,10 @@ endif() if(ENABLE_ASAN) list(APPEND COMPILE_OPTIONS_DEFAULT "-fsanitize=address;-fno-omit-frame-pointer") + # https://github.com/google/sanitizers/wiki/AddressSanitizer#using-addresssanitizer + # > In order to use AddressSanitizer you will need to + # > compile and link your program using clang with the -fsanitize=address switch. + update_cached_list(MK_LINK_LIBRARIES "-fsanitize=address") message(STATUS "已启用 Address Sanitize") endif() From 11dfcf3bdbfd11e0f3691e0839eff880cc3cf7c7 Mon Sep 17 00:00:00 2001 From: Xiaofeng Wang Date: Mon, 19 Sep 2022 21:02:51 +0800 Subject: [PATCH 2/4] cmake: -Wl linking flags --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 94c9042c..416ee8b8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -299,7 +299,7 @@ if(ENABLE_FFMPEG) endif() if(ENABLE_MEM_DEBUG) - list(APPEND COMPILE_OPTIONS_DEFAULT + update_cached_list(MK_LINK_LIBRARIES "-Wl,-wrap,free;-Wl,-wrap,malloc;-Wl,-wrap,realloc;-Wl,-wrap,calloc") update_cached_list(MK_COMPILE_DEFINITIONS ENABLE_MEM_DEBUG) message(STATUS "已启用内存调试功能") From 0c882d4d79c32d4da660db43a0621fd807bdf4ce Mon Sep 17 00:00:00 2001 From: Xiaofeng Wang Date: Mon, 19 Sep 2022 21:15:20 +0800 Subject: [PATCH 3/4] cmake: fix include directory for assert.h --- CMakeLists.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 416ee8b8..cd326292 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -417,6 +417,9 @@ endif() # for version.h include_directories(${CMAKE_CURRENT_BINARY_DIR}) +# for assert.h +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/3rdparty) + add_subdirectory(3rdpart) add_subdirectory(src) From ec1942fa81472ba4417322d5df9d68743eac4b4a Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Tue, 20 Sep 2022 00:39:42 +0800 Subject: [PATCH 4/4] srt connection transfer support --- srt/SrtSession.cpp | 4 ++-- srt/SrtTransport.cpp | 23 ++++++++++++++++++++++- srt/SrtTransport.hpp | 8 ++++++++ srt/SrtTransportImp.cpp | 9 +++++++++ srt/SrtTransportImp.hpp | 1 + 5 files changed, 42 insertions(+), 3 deletions(-) diff --git a/srt/SrtSession.cpp b/srt/SrtSession.cpp index d62cb7cd..b6a8a839 100644 --- a/srt/SrtSession.cpp +++ b/srt/SrtSession.cpp @@ -128,9 +128,9 @@ void SrtSession::onError(const SockException &err) { // 防止互相引用导致不释放 auto transport = std::move(_transport); getPoller()->async( - [transport, err] { + [transport] { //延时减引用,防止使用transport对象时,销毁对象 - transport->onShutdown(err); + //transport->onShutdown(err); }, false); } diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index ba688af1..eee69d75 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -60,7 +60,29 @@ void SrtTransport::switchToOtherTransport(uint8_t *buf, int len, uint32_t socket } } +void SrtTransport::createTimerForCheckAlive(){ + std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); + auto timeoutSec = getTimeOutSec(); + _timer = std::make_shared( + timeoutSec/ 2, + [weak_self,timeoutSec]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) { + strong_self->onShutdown(SockException(Err_timeout, "接收srt数据超时")); + } + return true; + }, + getPoller()); +} + void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) { + _alive_ticker.resetTime(); + if(!_timer){ + createTimerForCheckAlive(); + } using srt_control_handler = void (SrtTransport::*)(uint8_t * buf, int len, struct sockaddr_storage *addr); static std::unordered_map s_control_functions; static onceToken token([]() { @@ -173,7 +195,6 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd registerSelfHandshake(); sendControlPacket(res, true); - _handleshake_timer = std::make_shared(0.02,[this]()->bool{ sendControlPacket(_handleshake_res, true); return true; diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 9d0c186c..5fc7989f 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -57,6 +57,7 @@ protected: virtual void sendPacket(Buffer::Ptr pkt, bool flush = true); virtual int getLatencyMul() { return 4; }; virtual int getPktBufSize() { return 8192; }; + virtual float getTimeOutSec(){return 5.0;}; private: void registerSelf(); @@ -88,6 +89,8 @@ private: size_t getPayloadSize(); + void createTimerForCheckAlive(); + protected: void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false); void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true); @@ -144,6 +147,11 @@ private: Timer::Ptr _handleshake_timer; ResourcePool _packet_pool; + + //检测超时的定时器 + Timer::Ptr _timer; + //刷新计时器 + Ticker _alive_ticker; }; class SrtTransportManager { diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 8464fb5e..93333413 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -340,6 +340,15 @@ int SrtTransportImp::getLatencyMul() { return latencyMul; } +float SrtTransportImp::getTimeOutSec() { + GET_CONFIG(float, timeOutSec, kTimeOutSec); + if (timeOutSec <= 0) { + WarnL << "config srt " << kTimeOutSec << " not vaild"; + return 5.0; + } + return timeOutSec; +} + int SrtTransportImp::getPktBufSize() { // kPktBufSize GET_CONFIG(int, pktBufSize, kPktBufSize); diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index e33156ca..71478605 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -37,6 +37,7 @@ protected: ///////SrtTransport override/////// int getLatencyMul() override; int getPktBufSize() override; + float getTimeOutSec() override; void onSRTData(DataPacket::Ptr pkt) override; void onShutdown(const SockException &ex) override; void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) override;