Merge branch 'master' of github.com:ZLMediaKit/ZLMediaKit

This commit is contained in:
ziyue 2022-09-20 16:13:14 +08:00
commit a226794cd2
6 changed files with 50 additions and 4 deletions

View File

@ -299,7 +299,7 @@ if(ENABLE_FFMPEG)
endif() endif()
if(ENABLE_MEM_DEBUG) if(ENABLE_MEM_DEBUG)
list(APPEND COMPILE_OPTIONS_DEFAULT update_cached_list(MK_LINK_LIBRARIES
"-Wl,-wrap,free;-Wl,-wrap,malloc;-Wl,-wrap,realloc;-Wl,-wrap,calloc") "-Wl,-wrap,free;-Wl,-wrap,malloc;-Wl,-wrap,realloc;-Wl,-wrap,calloc")
update_cached_list(MK_COMPILE_DEFINITIONS ENABLE_MEM_DEBUG) update_cached_list(MK_COMPILE_DEFINITIONS ENABLE_MEM_DEBUG)
message(STATUS "已启用内存调试功能") message(STATUS "已启用内存调试功能")
@ -308,6 +308,10 @@ endif()
if(ENABLE_ASAN) if(ENABLE_ASAN)
list(APPEND COMPILE_OPTIONS_DEFAULT list(APPEND COMPILE_OPTIONS_DEFAULT
"-fsanitize=address;-fno-omit-frame-pointer") "-fsanitize=address;-fno-omit-frame-pointer")
# https://github.com/google/sanitizers/wiki/AddressSanitizer#using-addresssanitizer
# > In order to use AddressSanitizer you will need to
# > compile and link your program using clang with the -fsanitize=address switch.
update_cached_list(MK_LINK_LIBRARIES "-fsanitize=address")
message(STATUS "已启用 Address Sanitize") message(STATUS "已启用 Address Sanitize")
endif() endif()
@ -413,6 +417,9 @@ endif()
# for version.h # for version.h
include_directories(${CMAKE_CURRENT_BINARY_DIR}) include_directories(${CMAKE_CURRENT_BINARY_DIR})
# for assert.h
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/3rdparty)
add_subdirectory(3rdpart) add_subdirectory(3rdpart)
add_subdirectory(src) add_subdirectory(src)

View File

@ -128,9 +128,9 @@ void SrtSession::onError(const SockException &err) {
// 防止互相引用导致不释放 // 防止互相引用导致不释放
auto transport = std::move(_transport); auto transport = std::move(_transport);
getPoller()->async( getPoller()->async(
[transport, err] { [transport] {
//延时减引用防止使用transport对象时销毁对象 //延时减引用防止使用transport对象时销毁对象
transport->onShutdown(err); //transport->onShutdown(err);
}, },
false); false);
} }

View File

@ -60,7 +60,29 @@ void SrtTransport::switchToOtherTransport(uint8_t *buf, int len, uint32_t socket
} }
} }
void SrtTransport::createTimerForCheckAlive(){
std::weak_ptr<SrtTransport> weak_self = std::static_pointer_cast<SrtTransport>(shared_from_this());
auto timeoutSec = getTimeOutSec();
_timer = std::make_shared<Timer>(
timeoutSec/ 2,
[weak_self,timeoutSec]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) {
strong_self->onShutdown(SockException(Err_timeout, "接收srt数据超时"));
}
return true;
},
getPoller());
}
void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) {
_alive_ticker.resetTime();
if(!_timer){
createTimerForCheckAlive();
}
using srt_control_handler = void (SrtTransport::*)(uint8_t * buf, int len, struct sockaddr_storage *addr); using srt_control_handler = void (SrtTransport::*)(uint8_t * buf, int len, struct sockaddr_storage *addr);
static std::unordered_map<uint16_t, srt_control_handler> s_control_functions; static std::unordered_map<uint16_t, srt_control_handler> s_control_functions;
static onceToken token([]() { static onceToken token([]() {
@ -173,7 +195,6 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd
registerSelfHandshake(); registerSelfHandshake();
sendControlPacket(res, true); sendControlPacket(res, true);
_handleshake_timer = std::make_shared<Timer>(0.02,[this]()->bool{ _handleshake_timer = std::make_shared<Timer>(0.02,[this]()->bool{
sendControlPacket(_handleshake_res, true); sendControlPacket(_handleshake_res, true);
return true; return true;

View File

@ -57,6 +57,7 @@ protected:
virtual void sendPacket(Buffer::Ptr pkt, bool flush = true); virtual void sendPacket(Buffer::Ptr pkt, bool flush = true);
virtual int getLatencyMul() { return 4; }; virtual int getLatencyMul() { return 4; };
virtual int getPktBufSize() { return 8192; }; virtual int getPktBufSize() { return 8192; };
virtual float getTimeOutSec(){return 5.0;};
private: private:
void registerSelf(); void registerSelf();
@ -88,6 +89,8 @@ private:
size_t getPayloadSize(); size_t getPayloadSize();
void createTimerForCheckAlive();
protected: protected:
void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false); void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false);
void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true); void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true);
@ -144,6 +147,11 @@ private:
Timer::Ptr _handleshake_timer; Timer::Ptr _handleshake_timer;
ResourcePool<BufferRaw> _packet_pool; ResourcePool<BufferRaw> _packet_pool;
//检测超时的定时器
Timer::Ptr _timer;
//刷新计时器
Ticker _alive_ticker;
}; };
class SrtTransportManager { class SrtTransportManager {

View File

@ -340,6 +340,15 @@ int SrtTransportImp::getLatencyMul() {
return latencyMul; return latencyMul;
} }
float SrtTransportImp::getTimeOutSec() {
GET_CONFIG(float, timeOutSec, kTimeOutSec);
if (timeOutSec <= 0) {
WarnL << "config srt " << kTimeOutSec << " not vaild";
return 5.0;
}
return timeOutSec;
}
int SrtTransportImp::getPktBufSize() { int SrtTransportImp::getPktBufSize() {
// kPktBufSize // kPktBufSize
GET_CONFIG(int, pktBufSize, kPktBufSize); GET_CONFIG(int, pktBufSize, kPktBufSize);

View File

@ -37,6 +37,7 @@ protected:
///////SrtTransport override/////// ///////SrtTransport override///////
int getLatencyMul() override; int getLatencyMul() override;
int getPktBufSize() override; int getPktBufSize() override;
float getTimeOutSec() override;
void onSRTData(DataPacket::Ptr pkt) override; void onSRTData(DataPacket::Ptr pkt) override;
void onShutdown(const SockException &ex) override; void onShutdown(const SockException &ex) override;
void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) override; void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) override;