From 81b08af0ce9737e7001f5f2df8e2003b3efb0d6f Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 7 Feb 2024 23:06:42 +0800 Subject: [PATCH] Ensure thread safety of player and pusher --- src/Player/PlayerBase.cpp | 29 +++++++++++++++++------------ src/Pusher/PusherBase.cpp | 26 ++++++++++++++++---------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/Player/PlayerBase.cpp b/src/Player/PlayerBase.cpp index cda87184..abaa1311 100644 --- a/src/Player/PlayerBase.cpp +++ b/src/Player/PlayerBase.cpp @@ -23,11 +23,16 @@ namespace mediakit { PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &in_poller, const string &url_in) { auto poller = in_poller ? in_poller : EventPollerPool::Instance().getPoller(); - static auto releasePlayer = [poller](PlayerBase *ptr) { - poller->async([ptr]() { - onceToken token(nullptr, [&]() { delete ptr; }); - ptr->teardown(); - }); + std::weak_ptr weak_poller = poller; + static auto release_func = [weak_poller](PlayerBase *ptr) { + if (auto poller = weak_poller.lock()) { + poller->async([ptr]() { + onceToken token(nullptr, [&]() { delete ptr; }); + ptr->teardown(); + }); + } else { + delete ptr; + } }; string url = url_in; string prefix = findSubString(url.data(), NULL, "://"); @@ -38,29 +43,29 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &in_poller, cons } if (strcasecmp("rtsps", prefix.data()) == 0) { - return PlayerBase::Ptr(new TcpClientWithSSL(poller), releasePlayer); + return PlayerBase::Ptr(new TcpClientWithSSL(poller), release_func); } if (strcasecmp("rtsp", prefix.data()) == 0) { - return PlayerBase::Ptr(new RtspPlayerImp(poller), releasePlayer); + return PlayerBase::Ptr(new RtspPlayerImp(poller), release_func); } if (strcasecmp("rtmps", prefix.data()) == 0) { - return PlayerBase::Ptr(new TcpClientWithSSL(poller), releasePlayer); + return PlayerBase::Ptr(new TcpClientWithSSL(poller), release_func); } if (strcasecmp("rtmp", prefix.data()) == 0) { - return PlayerBase::Ptr(new RtmpPlayerImp(poller), releasePlayer); + return PlayerBase::Ptr(new RtmpPlayerImp(poller), release_func); } if ((strcasecmp("http", prefix.data()) == 0 || strcasecmp("https", prefix.data()) == 0)) { if (end_with(url, ".m3u8") || end_with(url_in, ".m3u8")) { - return PlayerBase::Ptr(new HlsPlayerImp(poller), releasePlayer); + return PlayerBase::Ptr(new HlsPlayerImp(poller), release_func); } if (end_with(url, ".ts") || end_with(url_in, ".ts")) { - return PlayerBase::Ptr(new TsPlayerImp(poller), releasePlayer); + return PlayerBase::Ptr(new TsPlayerImp(poller), release_func); } if (end_with(url, ".flv") || end_with(url_in, ".flv")) { - return PlayerBase::Ptr(new FlvPlayerImp(poller), releasePlayer); + return PlayerBase::Ptr(new FlvPlayerImp(poller), release_func); } } diff --git a/src/Pusher/PusherBase.cpp b/src/Pusher/PusherBase.cpp index ac90d841..32ae58e3 100644 --- a/src/Pusher/PusherBase.cpp +++ b/src/Pusher/PusherBase.cpp @@ -17,31 +17,37 @@ using namespace toolkit; namespace mediakit { -PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &poller, +PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &in_poller, const MediaSource::Ptr &src, const std::string & url) { - static auto releasePusher = [](PusherBase *ptr){ - onceToken token(nullptr,[&](){ - delete ptr; - }); - ptr->teardown(); + auto poller = in_poller ? in_poller : EventPollerPool::Instance().getPoller(); + std::weak_ptr weak_poller = poller; + static auto release_func = [weak_poller](PusherBase *ptr) { + if (auto poller = weak_poller.lock()) { + poller->async([ptr]() { + onceToken token(nullptr, [&]() { delete ptr; }); + ptr->teardown(); + }); + } else { + delete ptr; + } }; std::string prefix = findSubString(url.data(), NULL, "://"); if (strcasecmp("rtsps",prefix.data()) == 0) { - return PusherBase::Ptr(new TcpClientWithSSL(poller, std::dynamic_pointer_cast(src)), releasePusher); + return PusherBase::Ptr(new TcpClientWithSSL(poller, std::dynamic_pointer_cast(src)), release_func); } if (strcasecmp("rtsp",prefix.data()) == 0) { - return PusherBase::Ptr(new RtspPusherImp(poller, std::dynamic_pointer_cast(src)), releasePusher); + return PusherBase::Ptr(new RtspPusherImp(poller, std::dynamic_pointer_cast(src)), release_func); } if (strcasecmp("rtmps",prefix.data()) == 0) { - return PusherBase::Ptr(new TcpClientWithSSL(poller, std::dynamic_pointer_cast(src)), releasePusher); + return PusherBase::Ptr(new TcpClientWithSSL(poller, std::dynamic_pointer_cast(src)), release_func); } if (strcasecmp("rtmp",prefix.data()) == 0) { - return PusherBase::Ptr(new RtmpPusherImp(poller, std::dynamic_pointer_cast(src)), releasePusher); + return PusherBase::Ptr(new RtmpPusherImp(poller, std::dynamic_pointer_cast(src)), release_func); } throw std::invalid_argument("not supported push schema:" + url);