From 77d2df16955f55592bef6e3bcece21babe2827b6 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sun, 9 Jun 2024 10:52:10 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4RtpSelector=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/ZLToolKit | 2 +- api/source/mk_rtp_server.cpp | 2 +- server/WebApi.cpp | 22 ++-- src/Common/MediaSource.cpp | 16 ++- src/Common/MediaSource.h | 12 +- src/Common/MultiMediaSourceMuxer.cpp | 4 +- src/Common/MultiMediaSourceMuxer.h | 2 +- src/Rtp/RtpProcess.cpp | 51 ++++++-- src/Rtp/RtpProcess.h | 25 ++-- src/Rtp/RtpSelector.cpp | 168 --------------------------- src/Rtp/RtpSelector.h | 89 -------------- src/Rtp/RtpServer.cpp | 94 ++++++--------- src/Rtp/RtpServer.h | 2 +- src/Rtp/RtpSession.cpp | 82 +++++-------- src/Rtp/RtpSession.h | 7 +- src/Rtsp/Rtsp.cpp | 9 ++ src/Rtsp/Rtsp.h | 1 + tests/test_rtp.cpp | 5 +- 18 files changed, 174 insertions(+), 419 deletions(-) delete mode 100644 src/Rtp/RtpSelector.cpp delete mode 100644 src/Rtp/RtpSelector.h diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 1e1a9907..5144e2aa 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 1e1a990783c6c09452419c0aaa6d72ce02d0202b +Subproject commit 5144e2aa521df6d473308bfb31172054772a634f diff --git a/api/source/mk_rtp_server.cpp b/api/source/mk_rtp_server.cpp index 6d2228d2..487e09a8 100644 --- a/api/source/mk_rtp_server.cpp +++ b/api/source/mk_rtp_server.cpp @@ -56,7 +56,7 @@ API_EXPORT void API_CALL mk_rtp_server_set_on_detach2(mk_rtp_server ctx, on_mk_r RtpServer::Ptr *server = (RtpServer::Ptr *) ctx; if (cb) { std::shared_ptr ptr(user_data, user_data_free ? user_data_free : [](void *) {}); - (*server)->setOnDetach([cb, ptr]() { + (*server)->setOnDetach([cb, ptr](const SockException &ex) { cb(ptr.get()); }); } else { diff --git a/server/WebApi.cpp b/server/WebApi.cpp index bfd626cc..ba027ac4 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -45,7 +45,7 @@ #include "Http/HttpRequester.h" #include "Player/PlayerProxy.h" #include "Pusher/PusherProxy.h" -#include "Rtp/RtpSelector.h" +#include "Rtp/RtpProcess.h" #include "Record/MP4Reader.h" #if defined(ENABLE_RTPPROXY) @@ -485,7 +485,7 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mod auto server = s_rtp_server.makeWithAction(stream_id, [&](RtpServer::Ptr server) { server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex); }); - server->setOnDetach([stream_id]() { + server->setOnDetach([stream_id](const SockException &ex) { //设置rtp超时移除事件 s_rtp_server.erase(stream_id); }); @@ -1198,8 +1198,8 @@ void installWebApi() { api_regist("/index/api/getRtpInfo",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("stream_id"); - - auto process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); + auto src = MediaSource::find(DEFAULT_VHOST, kRtpAppName, allArgs["stream_id"]); + auto process = src ? src->getRtpProcess() : nullptr; if (!process) { val["exist"] = false; return; @@ -1438,9 +1438,10 @@ void installWebApi() { CHECK_SECRET(); CHECK_ARGS("stream_id"); //只是暂停流的检查,流媒体服务器做为流负载服务,收流就转发,RTSP/RTMP有自己暂停协议 - auto rtp_process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); - if (rtp_process) { - rtp_process->setStopCheckRtp(true); + auto src = MediaSource::find(DEFAULT_VHOST, kRtpAppName, allArgs["stream_id"]); + auto process = src ? src->getRtpProcess() : nullptr; + if (process) { + process->setStopCheckRtp(true); } else { val["code"] = API::NotFound; } @@ -1449,9 +1450,10 @@ void installWebApi() { api_regist("/index/api/resumeRtpCheck", [](API_ARGS_MAP) { CHECK_SECRET(); CHECK_ARGS("stream_id"); - auto rtp_process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); - if (rtp_process) { - rtp_process->setStopCheckRtp(false); + auto src = MediaSource::find(DEFAULT_VHOST, kRtpAppName, allArgs["stream_id"]); + auto process = src ? src->getRtpProcess() : nullptr; + if (process) { + process->setStopCheckRtp(false); } else { val["code"] = API::NotFound; } diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 039b156a..1ed99e46 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -271,9 +271,14 @@ toolkit::EventPoller::Ptr MediaSource::getOwnerPoller() { throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed: " + getUrl()); } -std::shared_ptr MediaSource::getMuxer() { +std::shared_ptr MediaSource::getMuxer() const { auto listener = _listener.lock(); - return listener ? listener->getMuxer(*this) : nullptr; + return listener ? listener->getMuxer(const_cast(*this)) : nullptr; +} + +std::shared_ptr MediaSource::getRtpProcess() const { + auto listener = _listener.lock(); + return listener ? listener->getRtpProcess(const_cast(*this)) : nullptr; } void MediaSource::onReaderChanged(int size) { @@ -803,11 +808,16 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed"); } -std::shared_ptr MediaSourceEventInterceptor::getMuxer(MediaSource &sender) { +std::shared_ptr MediaSourceEventInterceptor::getMuxer(MediaSource &sender) const { auto listener = _listener.lock(); return listener ? listener->getMuxer(sender) : nullptr; } +std::shared_ptr MediaSourceEventInterceptor::getRtpProcess(MediaSource &sender) const { + auto listener = _listener.lock(); + return listener ? listener->getRtpProcess(sender) : nullptr; +} + bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) { auto listener = _listener.lock(); if (!listener) { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 49f16dd4..0cc3f88c 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -41,6 +41,7 @@ enum class MediaOriginType : uint8_t { std::string getOriginTypeString(MediaOriginType type); class MediaSource; +class RtpProcess; class MultiMediaSourceMuxer; class MediaSourceEvent { public: @@ -88,7 +89,9 @@ public: // 获取所有track相关信息 virtual std::vector getMediaTracks(MediaSource &sender, bool trackReady = true) const { return std::vector(); }; // 获取MultiMediaSourceMuxer对象 - virtual std::shared_ptr getMuxer(MediaSource &sender) { return nullptr; } + virtual std::shared_ptr getMuxer(MediaSource &sender) const { return nullptr; } + // 获取RtpProcess对象 + virtual std::shared_ptr getRtpProcess(MediaSource &sender) const { return nullptr; } class SendRtpArgs { public: @@ -278,7 +281,8 @@ public: bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override; float getLossRate(MediaSource &sender, TrackType type) override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; - std::shared_ptr getMuxer(MediaSource &sender) override; + std::shared_ptr getMuxer(MediaSource &sender) const override; + std::shared_ptr getRtpProcess(MediaSource &sender) const override; private: std::weak_ptr _listener; @@ -395,7 +399,9 @@ public: // 获取所在线程 toolkit::EventPoller::Ptr getOwnerPoller(); // 获取MultiMediaSourceMuxer对象 - std::shared_ptr getMuxer(); + std::shared_ptr getMuxer() const; + // 获取RtpProcess对象 + std::shared_ptr getRtpProcess() const; ////////////////static方法,查找或生成MediaSource//////////////// diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index fb00a668..cf4c34b3 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -466,8 +466,8 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) { } } -std::shared_ptr MultiMediaSourceMuxer::getMuxer(MediaSource &sender) { - return shared_from_this(); +std::shared_ptr MultiMediaSourceMuxer::getMuxer(MediaSource &sender) const { + return const_cast(this)->shared_from_this(); } bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index a9775c8e..5e482727 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -127,7 +127,7 @@ public: /** * 获取本对象 */ - std::shared_ptr getMuxer(MediaSource &sender) override; + std::shared_ptr getMuxer(MediaSource &sender) const override; const ProtocolOption &getOption() const; const MediaTuple &getMediaTuple() const; diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index b0161d91..d5632c51 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -11,26 +11,29 @@ #if defined(ENABLE_RTPPROXY) #include "GB28181Process.h" #include "RtpProcess.h" -#include "RtpSelector.h" -#include "Http/HttpTSPlayer.h" #include "Util/File.h" #include "Common/config.h" using namespace std; using namespace toolkit; -static constexpr char kRtpAppName[] = "rtp"; //在创建_muxer对象前(也就是推流鉴权成功前),需要先缓存frame,这样可以防止丢包,提高体验 //但是同时需要控制缓冲长度,防止内存溢出。200帧数据,大概有10秒数据,应该足矣等待鉴权hook返回 static constexpr size_t kMaxCachedFrame = 200; namespace mediakit { -RtpProcess::RtpProcess(const string &stream_id) { +RtpProcess::Ptr RtpProcess::createProcess(std::string stream_id) { + RtpProcess::Ptr ret(new RtpProcess(std::move(stream_id))); + ret->createTimer(); + return ret; +} + +RtpProcess::RtpProcess(string stream_id) { _media_info.schema = kRtpAppName; _media_info.vhost = DEFAULT_VHOST; _media_info.app = kRtpAppName; - _media_info.stream = stream_id; + _media_info.stream = std::move(stream_id); GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); { @@ -75,6 +78,25 @@ RtpProcess::~RtpProcess() { } } +void RtpProcess::onManager() { + if (!alive()) { + onDetach(SockException(Err_timeout, "RtpProcess timeout")); + } +} + +void RtpProcess::createTimer() { + //创建超时管理定时器 + weak_ptr weakSelf = shared_from_this(); + _timer = std::make_shared(3.0f, [weakSelf] { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return false; + } + strongSelf->onManager(); + return true; + }, EventPollerPool::Instance().getPoller()); +} + bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) { if (!isRtp(data, len)) { WarnP(this) << "Not rtp packet"; @@ -203,13 +225,14 @@ void RtpProcess::setOnlyTrack(OnlyTrack only_track) { _only_track = only_track; } -void RtpProcess::onDetach() { +void RtpProcess::onDetach(const SockException &ex) { if (_on_detach) { - _on_detach(); + WarnL << ex << ", stream_id: " << getIdentifier(); + _on_detach(ex); } } -void RtpProcess::setOnDetach(function cb) { +void RtpProcess::setOnDetach(onDetachCB cb) { _on_detach = std::move(cb); } @@ -256,9 +279,6 @@ void RtpProcess::emitOnPublish() { } if (err.empty()) { strong_self->_muxer = std::make_shared(strong_self->_media_info, 0.0f, option); - if (!option.stream_replace.empty()) { - RtpSelector::Instance().addStreamReplace(strong_self->_media_info.stream, option.stream_replace); - } switch (strong_self->_only_track) { case kOnlyAudio: strong_self->_muxer->setOnlyAudio(); break; case kOnlyVideo: strong_self->_muxer->enableAudio(false); break; @@ -294,6 +314,15 @@ std::shared_ptr RtpProcess::getOriginSock(MediaSource &sender) const { return const_cast(this)->shared_from_this(); } +RtpProcess::Ptr RtpProcess::getRtpProcess(mediakit::MediaSource &sender) const { + return const_cast(this)->shared_from_this(); +} + +bool RtpProcess::close(mediakit::MediaSource &sender) { + onDetach(SockException(Err_shutdown, "close media")); + return true; +} + toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) { if (_sock) { return _sock->getPoller(); diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index b680936c..e2e44826 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -18,11 +18,14 @@ namespace mediakit { +static constexpr char kRtpAppName[] = "rtp"; + class RtpProcess final : public RtcpContextForRecv, public toolkit::SockInfo, public MediaSinkInterface, public MediaSourceEventInterceptor, public std::enable_shared_from_this{ public: using Ptr = std::shared_ptr; - friend class RtpProcessHelper; - RtpProcess(const std::string &stream_id); + using onDetachCB = std::function; + + static Ptr createProcess(std::string stream_id); ~RtpProcess(); enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 }; @@ -38,20 +41,16 @@ public: */ bool inputRtp(bool is_udp, const toolkit::Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr , uint64_t *dts_out = nullptr); - /** - * 是否超时,用于超时移除对象 - */ - bool alive(); /** * 超时时被RtpSelector移除时触发 */ - void onDetach(); + void onDetach(const toolkit::SockException &ex); /** * 设置onDetach事件回调 */ - void setOnDetach(std::function cb); + void setOnDetach(onDetachCB cb); /** * 设置onDetach事件回调,false检查RTP超时,true停止 @@ -88,10 +87,17 @@ protected: std::shared_ptr getOriginSock(MediaSource &sender) const override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; float getLossRate(MediaSource &sender, TrackType type) override; + Ptr getRtpProcess(mediakit::MediaSource &sender) const override; + bool close(mediakit::MediaSource &sender) override; private: + RtpProcess(std::string stream_id); + void emitOnPublish(); void doCachedFunc(); + bool alive(); + void onManager(); + void createTimer(); private: OnlyTrack _only_track = kAll; @@ -102,12 +108,13 @@ private: toolkit::Socket::Ptr _sock; MediaInfo _media_info; toolkit::Ticker _last_frame_time; - std::function _on_detach; + onDetachCB _on_detach; std::shared_ptr _save_file_rtp; std::shared_ptr _save_file_video; ProcessInterface::Ptr _process; MultiMediaSourceMuxer::Ptr _muxer; std::atomic_bool _stop_rtp_check{false}; + toolkit::Timer::Ptr _timer; toolkit::Ticker _last_check_alive; std::recursive_mutex _func_mtx; std::deque > _cached_func; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp deleted file mode 100644 index 1eb3058a..00000000 --- a/src/Rtp/RtpSelector.cpp +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). - * - * Use of this source code is governed by MIT-like license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#if defined(ENABLE_RTPPROXY) -#include -#include "RtpSelector.h" -#include "RtpSplitter.h" - -using namespace std; -using namespace toolkit; - -namespace mediakit{ - -INSTANCE_IMP(RtpSelector); - -void RtpSelector::clear(){ - lock_guard lck(_mtx_map); - _map_rtp_process.clear(); - _map_stream_replace.clear(); -} - -bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){ - if (data_len < 12) { - return false; - } - uint32_t *ssrc_ptr = (uint32_t *) (data + 8); - ssrc = ntohl(*ssrc_ptr); - return true; -} - -RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { - lock_guard lck(_mtx_map); - string stream_id_origin = stream_id; - auto it_replace = _map_stream_replace.find(stream_id); - if (it_replace != _map_stream_replace.end()) { - stream_id_origin = it_replace->second; - } - - auto it = _map_rtp_process.find(stream_id_origin); - if (it == _map_rtp_process.end() && !makeNew) { - return nullptr; - } - if (it != _map_rtp_process.end() && makeNew) { - //已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题 - throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id_origin << ") already existed"); - } - RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin]; - if (!ref) { - ref = std::make_shared(stream_id_origin, shared_from_this()); - ref->attachEvent(); - createTimer(); - } - return ref->getProcess(); -} - -void RtpSelector::createTimer() { - if (!_timer) { - //创建超时管理定时器 - weak_ptr weakSelf = shared_from_this(); - _timer = std::make_shared(3.0f, [weakSelf] { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return false; - } - strongSelf->onManager(); - return true; - }, EventPollerPool::Instance().getPoller()); - } -} - -void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) { - RtpProcess::Ptr process; - { - lock_guard lck(_mtx_map); - auto it = _map_rtp_process.find(stream_id); - if (it == _map_rtp_process.end()) { - return; - } - if (it->second->getProcess().get() != ptr) { - return; - } - process = it->second->getProcess(); - _map_rtp_process.erase(it); - delStreamReplace(stream_id); - } - process->onDetach(); -} - -void RtpSelector::addStreamReplace(const string &stream_id, const std::string &stream_replace) { - lock_guard lck(_mtx_map); - _map_stream_replace[stream_replace] = stream_id; -} - -void RtpSelector::delStreamReplace(const string &stream_id) { - for (auto it = _map_stream_replace.begin(); it != _map_stream_replace.end(); ++it) { - if (it->second == stream_id) { - _map_stream_replace.erase(it); - break; - } - } -} - -void RtpSelector::onManager() { - List clear_list; - { - lock_guard lck(_mtx_map); - for (auto it = _map_rtp_process.begin(); it != _map_rtp_process.end();) { - if (it->second->getProcess()->alive()) { - ++it; - continue; - } - WarnL << "RtpProcess timeout:" << it->first; - clear_list.emplace_back(it->second->getProcess()); - delStreamReplace(it->first); - it = _map_rtp_process.erase(it); - } - } - - clear_list.for_each([](const RtpProcess::Ptr &process) { - process->onDetach(); - }); -} - -RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr &parent) { - _stream_id = stream_id; - _parent = parent; - _process = std::make_shared(stream_id); -} - -RtpProcessHelper::~RtpProcessHelper() { - auto process = std::move(_process); - try { - // flush时,确保线程安全 - process->getOwnerPoller(MediaSource::NullMediaSource())->async([process]() { process->flush(); }); - } catch (...) { - // 忽略getOwnerPoller可能抛出的异常 - } -} - -void RtpProcessHelper::attachEvent() { - //主要目的是close回调触发时能把对象从RtpSelector中删除 - _process->setDelegate(shared_from_this()); -} - -bool RtpProcessHelper::close(MediaSource &sender) { - //此回调在其他线程触发 - auto parent = _parent.lock(); - if (!parent) { - return false; - } - parent->delProcess(_stream_id, _process.get()); - WarnL << "close media: " << sender.getUrl(); - return true; -} - -RtpProcess::Ptr &RtpProcessHelper::getProcess() { - return _process; -} - -}//namespace mediakit -#endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h deleted file mode 100644 index 4f46e8dc..00000000 --- a/src/Rtp/RtpSelector.h +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). - * - * Use of this source code is governed by MIT-like license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef ZLMEDIAKIT_RTPSELECTOR_H -#define ZLMEDIAKIT_RTPSELECTOR_H - -#if defined(ENABLE_RTPPROXY) -#include -#include -#include -#include "RtpProcess.h" -#include "Common/MediaSource.h" - -namespace mediakit{ - -class RtpSelector; -class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this { -public: - using Ptr = std::shared_ptr; - RtpProcessHelper(const std::string &stream_id, const std::weak_ptr &parent); - ~RtpProcessHelper(); - void attachEvent(); - RtpProcess::Ptr & getProcess(); - -protected: - // 通知其停止推流 - bool close(MediaSource &sender) override; - -private: - std::string _stream_id; - RtpProcess::Ptr _process; - std::weak_ptr _parent; -}; - -class RtpSelector : public std::enable_shared_from_this{ -public: - class ProcessExisted : public std::runtime_error { - public: - template - ProcessExisted(T && ...args) : std::runtime_error(std::forward(args)...) {} - }; - - static bool getSSRC(const char *data,size_t data_len, uint32_t &ssrc); - static RtpSelector &Instance(); - - /** - * 清空所有对象 - */ - void clear(); - - /** - * 获取一个rtp处理器 - * @param stream_id 流id - * @param makeNew 不存在时是否新建, 该参数为true时,必须确保之前未创建同名对象 - * @return rtp处理器 - */ - RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew); - - /** - * 删除rtp处理器 - * @param stream_id 流id - * @param ptr rtp处理器指针 - */ - void delProcess(const std::string &stream_id, const RtpProcess *ptr); - - void addStreamReplace(const std::string &stream_id, const std::string &stream_replace); - -private: - void onManager(); - void createTimer(); - void delStreamReplace(const std::string &stream_id); - -private: - toolkit::Timer::Ptr _timer; - std::recursive_mutex _mtx_map; - std::unordered_map _map_rtp_process; - std::unordered_map _map_stream_replace; -}; - -}//namespace mediakit -#endif//defined(ENABLE_RTPPROXY) -#endif //ZLMEDIAKIT_RTPSELECTOR_H diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 84809488..3c3a0686 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -11,7 +11,7 @@ #if defined(ENABLE_RTPPROXY) #include "Util/uv_errno.h" #include "RtpServer.h" -#include "RtpSelector.h" +#include "RtpProcess.h" #include "Rtcp/RtcpContext.h" #include "Common/config.h" @@ -35,38 +35,34 @@ public: _stream_id = std::move(stream_id); } - ~RtcpHelper() { - if (_process) { - // 删除rtp处理器 - RtpSelector::Instance().delProcess(_stream_id, _process.get()); - } - } - void setRtpServerInfo(uint16_t local_port, RtpServer::TcpMode mode, bool re_use_port, uint32_t ssrc, int only_track) { - _local_port = local_port; - _tcp_mode = mode; - _re_use_port = re_use_port; _ssrc = ssrc; - _only_track = only_track; + _process = RtpProcess::createProcess(_stream_id); + _process->setOnlyTrack((RtpProcess::OnlyTrack)only_track); + + _timeout_cb = [=]() mutable { + NOTICE_EMIT(BroadcastRtpServerTimeoutArgs, Broadcast::kBroadcastRtpServerTimeout, local_port, _stream_id, (int)mode, re_use_port, ssrc); + }; + + weak_ptr weak_self = shared_from_this(); + _process->setOnDetach([weak_self](const SockException &ex) { + if (auto strong_self = weak_self.lock()) { + if (strong_self->_on_detach) { + strong_self->_on_detach(ex); + } + if (ex.getErrCode() == Err_timeout) { + strong_self->_timeout_cb(); + } + } + }); } - void setOnDetach(function cb) { - if (_process) { - _process->setOnDetach(std::move(cb)); - } else { - _on_detach = std::move(cb); - } - } + void setOnDetach(RtpProcess::onDetachCB cb) { _on_detach = std::move(cb); } + + RtpProcess::Ptr getProcess() const { return _process; } void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) { - if (!_process) { - _process = RtpSelector::Instance().getProcess(_stream_id, true); - _process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track); - _process->setOnDetach(std::move(_on_detach)); - cancelDelayTask(); - } _process->inputRtp(true, sock, buf->data(), buf->size(), addr); - // 统计rtp接受情况,用于发送rr包 auto header = (RtpHeader *)buf->data(); sendRtcp(ntohl(header->ssrc), addr); @@ -92,37 +88,12 @@ public: // 收到sr rtcp后驱动返回rr rtcp strong_self->sendRtcp(strong_self->_ssrc, (struct sockaddr *)(strong_self->_rtcp_addr.get())); }); - - GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec); - _delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() { - if (auto strong_self = weak_self.lock()) { - auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false); - if (!process && strong_self->_on_detach) { - strong_self->_on_detach(); - } - if(process && strong_self->_on_detach){// tcp 链接防止断开不删除rtpServer - process->setOnDetach(std::move(strong_self->_on_detach)); - } - if (!process) { // process 未创建,触发rtp server 超时事件 - NOTICE_EMIT(BroadcastRtpServerTimeoutArgs, Broadcast::kBroadcastRtpServerTimeout, strong_self->_local_port, strong_self->_stream_id, - (int)strong_self->_tcp_mode, strong_self->_re_use_port, strong_self->_ssrc); - } - } - return 0; - }); - } - - void cancelDelayTask() { - if (_delay_task) { - _delay_task->cancel(); - _delay_task = nullptr; - } } private: void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) { // 每5秒发送一次rtcp - if (_ticker.elapsedTime() < 5000 || !_process) { + if (_ticker.elapsedTime() < 5000) { return; } _ticker.resetTime(); @@ -141,19 +112,14 @@ private: } private: - bool _re_use_port = false; - int _only_track = 0; - uint16_t _local_port = 0; uint32_t _ssrc = 0; - RtpServer::TcpMode _tcp_mode = RtpServer::NONE; - + std::function _timeout_cb; Ticker _ticker; Socket::Ptr _rtcp_sock; RtpProcess::Ptr _process; std::string _stream_id; - function _on_detach; + RtpProcess::onDetachCB _on_detach; std::shared_ptr _rtcp_addr; - EventPoller::DelayTask::Ptr _delay_task; }; void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { @@ -186,7 +152,12 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_ (*tcp_server)[RtpSession::kSSRC] = ssrc; (*tcp_server)[RtpSession::kOnlyTrack] = only_track; if (tcp_mode == PASSIVE) { - tcp_server->start(local_port, local_ip); + weak_ptr weak_self = shared_from_this(); + tcp_server->start(local_port, local_ip, 1024, [weak_self](std::shared_ptr &session) { + if (auto strong_self = weak_self.lock()) { + session->setRtpProcess(strong_self->_rtcp_helper->getProcess()); + } + }); } else if (stream_id.empty()) { // tcp主动模式时只能一个端口一个流,必须指定流id; 创建TcpServer对象也仅用于传参 throw std::runtime_error(StrPrinter << "tcp主动模式时必需指定流id"); @@ -242,7 +213,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_ _rtcp_helper = helper; } -void RtpServer::setOnDetach(function cb) { +void RtpServer::setOnDetach(RtpProcess::onDetachCB cb) { if (_rtcp_helper) { _rtcp_helper->setOnDetach(std::move(cb)); } @@ -277,6 +248,7 @@ void RtpServer::connectToServer(const std::string &url, uint16_t port, const fun void RtpServer::onConnect() { auto rtp_session = std::make_shared(_rtp_socket); + rtp_session->setRtpProcess(_rtcp_helper->getProcess()); rtp_session->attachServer(*_tcp_server); _rtp_socket->setOnRead([rtp_session](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { rtp_session->onRecv(buf); diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index 3654828e..06a1d550 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -62,7 +62,7 @@ public: /** * 设置RtpProcess onDetach事件回调 */ - void setOnDetach(std::function cb); + void setOnDetach(RtpProcess::onDetachCB cb); /** * 更新ssrc diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 95807637..3c22c875 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -10,7 +10,7 @@ #if defined(ENABLE_RTPPROXY) #include "RtpSession.h" -#include "RtpSelector.h" +#include "RtpProcess.h" #include "Network/TcpServer.h" #include "Rtsp/Rtsp.h" #include "Rtsp/RtpReceiver.h" @@ -60,28 +60,24 @@ void RtpSession::onRecv(const Buffer::Ptr &data) { } void RtpSession::onError(const SockException &err) { - WarnP(this) << _stream_id << " " << err; - if (_process) { - RtpSelector::Instance().delProcess(_stream_id, _process.get()); - _process = nullptr; + if (_emit_detach) { + _process->onDetach(err); } + WarnP(this) << _stream_id << " " << err; } void RtpSession::onManager() { - if (_process && !_process->alive()) { - shutdown(SockException(Err_timeout, "receive rtp timeout")); - } - if (!_process && _ticker.createdTime() > 10 * 1000) { shutdown(SockException(Err_timeout, "illegal connection")); } } +void RtpSession::setRtpProcess(RtpProcess::Ptr process) { + _emit_detach = true; + _process = std::move(process); +} + void RtpSession::onRtpPacket(const char *data, size_t len) { - if (_delay_close) { - // 正在延时关闭中,忽略所有数据 - return; - } if (!isRtp(data, len)) { // 忽略非rtp数据 WarnP(this) << "Not rtp packet"; @@ -104,33 +100,31 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { return; } } + + // 未设置ssrc时,尝试获取ssrc + if (!_ssrc && !getSSRC(data, len, _ssrc)) { + return; + } + + // 未指定流id就使用ssrc为流id + if (_stream_id.empty()) { + _stream_id = printSSRC(_ssrc); + } + if (!_process) { - //未设置ssrc时,尝试获取ssrc - if (!_ssrc && !RtpSelector::getSSRC(data, len, _ssrc)) { - return; - } - if (_stream_id.empty()) { - //未指定流id就使用ssrc为流id - _stream_id = printSSRC(_ssrc); - } - try { - _process = RtpSelector::Instance().getProcess(_stream_id, true); - } catch (RtpSelector::ProcessExisted &ex) { - if (!_is_udp) { - // tcp情况下立即断开连接 - throw; - } - // udp情况下延时断开连接(等待超时自动关闭),防止频繁创建销毁RtpSession对象 - WarnP(this) << ex.what(); - _delay_close = true; - return; - } + _process = RtpProcess::createProcess(_stream_id); _process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track); - _process->setDelegate(static_pointer_cast(shared_from_this())); + weak_ptr weak_self = static_pointer_cast(shared_from_this()); + _process->setOnDetach([weak_self](const SockException &ex) { + if (auto strong_self = weak_self.lock()) { + strong_self->_process = nullptr; + strong_self->shutdown(ex); + } + }); } try { uint32_t rtp_ssrc = 0; - RtpSelector::getSSRC(data, len, rtp_ssrc); + getSSRC(data, len, rtp_ssrc); if (rtp_ssrc != _ssrc) { WarnP(this) << "ssrc mismatched, rtp dropped: " << rtp_ssrc << " != " << _ssrc; return; @@ -143,26 +137,10 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { } else { throw; } - } catch (std::exception &ex) { - if (!_is_udp) { - // tcp情况下立即断开连接 - throw; - } - // udp情况下延时断开连接(等待超时自动关闭),防止频繁创建销毁RtpSession对象 - WarnP(this) << ex.what(); - _delay_close = true; - return; } _ticker.resetTime(); } -bool RtpSession::close(MediaSource &sender) { - //此回调在其他线程触发 - string err = StrPrinter << "close media: " << sender.getUrl(); - safeShutdown(SockException(Err_shutdown, err)); - return true; -} - static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) { // rtp前面必须预留两个字节的长度字段 for (ssize_t i = 2; i <= len - 4; ++i) { @@ -268,7 +246,7 @@ const char *RtpSession::searchByPsHeaderFlag(const char *data, size_t len) { // TODO or Not ? 更新设置ssrc uint32_t rtp_ssrc = 0; - RtpSelector::getSSRC(rtp_ptr + 2, len, rtp_ssrc); + getSSRC(rtp_ptr + 2, len, rtp_ssrc); _ssrc = rtp_ssrc; InfoL << "设置_ssrc为:" << _ssrc; // RtpServer::updateSSRC(uint32_t ssrc) diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 2bff4f5f..1d7fbcb3 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -20,7 +20,7 @@ namespace mediakit{ -class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSourceEvent { +class RtpSession : public toolkit::Session, public RtpSplitter { public: static const std::string kStreamID; static const std::string kSSRC; @@ -34,10 +34,9 @@ public: void onManager() override; void setParams(toolkit::mINI &ini); void attachServer(const toolkit::Server &server) override; + void setRtpProcess(RtpProcess::Ptr process); protected: - // 通知其停止推流 - bool close(MediaSource &sender) override; // 收到rtp回调 void onRtpPacket(const char *data, size_t len) override; // RtpSplitter override @@ -48,10 +47,10 @@ protected: const char *searchByPsHeaderFlag(const char *data, size_t len); private: - bool _delay_close = false; bool _is_udp = false; bool _search_rtp = false; bool _search_rtp_finished = false; + bool _emit_detach = false; int _only_track = 0; uint32_t _ssrc = 0; toolkit::Ticker _ticker; diff --git a/src/Rtsp/Rtsp.cpp b/src/Rtsp/Rtsp.cpp index dc2d0944..7a7b0b2d 100644 --- a/src/Rtsp/Rtsp.cpp +++ b/src/Rtsp/Rtsp.cpp @@ -470,6 +470,15 @@ string printSSRC(uint32_t ui32Ssrc) { return tmp; } +bool getSSRC(const char *data, size_t data_len, uint32_t &ssrc) { + if (data_len < 12) { + return false; + } + uint32_t *ssrc_ptr = (uint32_t *)(data + 8); + ssrc = ntohl(*ssrc_ptr); + return true; +} + bool isRtp(const char *buf, size_t size) { if (size < 2) { return false; diff --git a/src/Rtsp/Rtsp.h b/src/Rtsp/Rtsp.h index fde5caea..87520d27 100644 --- a/src/Rtsp/Rtsp.h +++ b/src/Rtsp/Rtsp.h @@ -317,6 +317,7 @@ toolkit::Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved); void makeSockPair(std::pair &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true); // 十六进制方式打印ssrc std::string printSSRC(uint32_t ui32Ssrc); +bool getSSRC(const char *data, size_t data_len, uint32_t &ssrc); bool isRtp(const char *buf, size_t size); bool isRtcp(const char *buf, size_t size); diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 9bb7e4bc..ac2b7a56 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -17,7 +17,7 @@ #include "Rtsp/RtspSession.h" #include "Rtmp/RtmpSession.h" #include "Http/HttpSession.h" -#include "Rtp/RtpSelector.h" +#include "Rtp/RtpProcess.h" using namespace std; using namespace toolkit; @@ -42,7 +42,7 @@ static bool loadFile(const char *path, const EventPoller::Ptr &poller) { memset(&addr, 0, sizeof(addr)); addr.ss_family = AF_INET; auto sock = Socket::createSocket(poller); - auto process = RtpSelector::Instance().getProcess("test", true); + auto process = RtpProcess::createProcess("test"); uint64_t stamp_last = 0; auto total_size = std::make_shared(0); @@ -89,7 +89,6 @@ static bool loadFile(const char *path, const EventPoller::Ptr &poller) { auto ret = do_read(); if (!ret) { WarnL << *total_size / 1024 << "KB"; - RtpSelector::Instance().delProcess("test", process.get()); } return ret; });