From 2f50344e7bf56a8ef359a2eefaf221a0c89aa52a Mon Sep 17 00:00:00 2001 From: johzzy Date: Sun, 10 Mar 2024 05:31:20 -0300 Subject: [PATCH] Add ServiceController to manage PlayerProxy/PusherProxy/FFmpegSource/RtpServer services (#3337) --- api/source/mk_common.cpp | 3 +- ext-codec/G711Rtp.cpp | 12 +- server/WebApi.cpp | 323 +++++++++++++++++++-------------------- server/main.cpp | 4 +- 4 files changed, 160 insertions(+), 182 deletions(-) diff --git a/api/source/mk_common.cpp b/api/source/mk_common.cpp index 1f94cf28..c4a36ea1 100644 --- a/api/source/mk_common.cpp +++ b/api/source/mk_common.cpp @@ -33,12 +33,11 @@ static TcpServer::Ptr shell_server; #ifdef ENABLE_RTPPROXY #include "Rtp/RtpServer.h" -static std::shared_ptr rtpServer; +static RtpServer::Ptr rtpServer; #endif #ifdef ENABLE_WEBRTC #include "../webrtc/WebRtcSession.h" -#include "../webrtc/WebRtcTransport.h" static UdpServer::Ptr rtcServer_udp; static TcpServer::Ptr rtcServer_tcp; #endif diff --git a/ext-codec/G711Rtp.cpp b/ext-codec/G711Rtp.cpp index ea07b767..16a9c5c2 100644 --- a/ext-codec/G711Rtp.cpp +++ b/ext-codec/G711Rtp.cpp @@ -38,16 +38,12 @@ bool G711RtpEncoder::inputFrame(const Frame::Ptr &frame) { auto ptr = _cache_frame->data() + _cache_frame->prefixSize(); auto len = _cache_frame->size() - _cache_frame->prefixSize(); auto remain_size = len; - auto max_size = 160 * _channels * _pkt_dur_ms / 20; // 20 ms per 160 byte - uint32_t n = 0; + size_t max_size = 160 * _channels * _pkt_dur_ms / 20; // 20 ms per 160 byte + size_t n = 0; bool mark = true; while (remain_size >= max_size) { - size_t rtp_size; - if (remain_size >= max_size) { - rtp_size = max_size; - } else { - break; - } + assert(remain_size >= max_size); + const size_t rtp_size = max_size; n++; stamp += _pkt_dur_ms; RtpCodec::inputRtp(getRtpInfo().makeRtp(TrackAudio, ptr, rtp_size, mark, stamp), true); diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 6956c5a2..669caa06 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -297,22 +297,71 @@ static inline void addHttpListener(){ }); } +template +class ServiceController { +public: + using Pointer = std::shared_ptr; + std::unordered_map _map; + mutable std::recursive_mutex _mtx; + + void clear() { + decltype(_map) copy; + { + std::lock_guard lck(_mtx); + copy.swap(_map); + } + } + + size_t erase(const std::string &key) { + std::lock_guard lck(_mtx); + return _map.erase(key); + } + + Pointer find(const std::string &key) const { + std::lock_guard lck(_mtx); + auto it = _map.find(key); + if (it == _map.end()) { + return nullptr; + } + return it->second; + } + + template + Pointer make(const std::string &key, _Args&& ...__args) { + // assert(!find(key)); + + auto server = std::make_shared(std::forward<_Args>(__args)...); + std::lock_guard lck(_mtx); + auto it = _map.emplace(key, server); + assert(it.second); + return server; + } + + template + Pointer makeWithAction(const std::string &key, function action, _Args&& ...__args) { + // assert(!find(key)); + + auto server = std::make_shared(std::forward<_Args>(__args)...); + action(server); + std::lock_guard lck(_mtx); + auto it = _map.emplace(key, server); + assert(it.second); + return server; + } +}; + //拉流代理器列表 -static unordered_map s_proxyMap; -static recursive_mutex s_proxyMapMtx; +static ServiceController s_player_proxy; //推流代理器列表 -static unordered_map s_proxyPusherMap; -static recursive_mutex s_proxyPusherMapMtx; +static ServiceController s_pusher_proxy; //FFmpeg拉流代理器列表 -static unordered_map s_ffmpegMap; -static recursive_mutex s_ffmpegMapMtx; +static ServiceController s_ffmpeg_src; #if defined(ENABLE_RTPPROXY) //rtp服务器列表 -static unordered_map s_rtpServerMap; -static recursive_mutex s_rtpServerMapMtx; +static ServiceController s_rtp_server; #endif static inline string getProxyKey(const string &vhost, const string &app, const string &stream) { @@ -416,46 +465,23 @@ Value makeMediaSourceJson(MediaSource &media){ #if defined(ENABLE_RTPPROXY) uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { - lock_guard lck(s_rtpServerMapMtx); - if (s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) { + if (s_rtp_server.find(stream_id)) { //为了防止RtpProcess所有权限混乱的问题,不允许重复添加相同的stream_id return 0; } - RtpServer::Ptr server = std::make_shared(); - server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex); + auto server = s_rtp_server.makeWithAction(stream_id, [&](RtpServer::Ptr server) { + server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex); + }); server->setOnDetach([stream_id]() { //设置rtp超时移除事件 - lock_guard lck(s_rtpServerMapMtx); - s_rtpServerMap.erase(stream_id); + s_rtp_server.erase(stream_id); }); - //保存对象 - s_rtpServerMap.emplace(stream_id, server); //回复json return server->getPort(); } -void connectRtpServer(const string &stream_id, const string &dst_url, uint16_t dst_port, const function &cb) { - lock_guard lck(s_rtpServerMapMtx); - auto it = s_rtpServerMap.find(stream_id); - if (it == s_rtpServerMap.end()) { - cb(SockException(Err_other, "未找到rtp服务")); - return; - } - it->second->connectToServer(dst_url, dst_port, cb); -} - -bool closeRtpServer(const string &stream_id) { - lock_guard lck(s_rtpServerMapMtx); - auto it = s_rtpServerMap.find(stream_id); - if (it == s_rtpServerMap.end()) { - return false; - } - auto server = it->second; - s_rtpServerMap.erase(it); - return true; -} #endif void getStatisticJson(const function &cb) { @@ -546,15 +572,13 @@ void addStreamProxy(const string &vhost, const string &app, const string &stream const ProtocolOption &option, int rtp_type, float timeout_sec, const mINI &args, const function &cb) { auto key = getProxyKey(vhost, app, stream); - lock_guard lck(s_proxyMapMtx); - if (s_proxyMap.find(key) != s_proxyMap.end()) { + if (s_player_proxy.find(key)) { //已经在拉流了 cb(SockException(Err_other, "This stream already exists"), key); return; } //添加拉流代理 - auto player = std::make_shared(vhost, app, stream, option, retry_count); - s_proxyMap[key] = player; + auto player = s_player_proxy.make(key, vhost, app, stream, option, retry_count); // 先透传参数 player->mINI::operator=(args); @@ -562,7 +586,7 @@ void addStreamProxy(const string &vhost, const string &app, const string &stream //指定RTP over TCP(播放rtsp时有效) (*player)[Client::kRtpType] = rtp_type; - if (timeout_sec > 0.1) { + if (timeout_sec > 0.1f) { //播放握手超时时间 (*player)[Client::kTimeoutMS] = timeout_sec * 1000; } @@ -570,20 +594,68 @@ void addStreamProxy(const string &vhost, const string &app, const string &stream //开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试 player->setPlayCallbackOnce([cb, key](const SockException &ex) { if (ex) { - lock_guard lck(s_proxyMapMtx); - s_proxyMap.erase(key); + s_player_proxy.erase(key); } cb(ex, key); }); //被主动关闭拉流 player->setOnClose([key](const SockException &ex) { - lock_guard lck(s_proxyMapMtx); - s_proxyMap.erase(key); + s_player_proxy.erase(key); }); player->play(url); }; + +void addStreamPusherProxy(const string &schema, + const string &vhost, + const string &app, + const string &stream, + const string &url, + int retry_count, + int rtp_type, + float timeout_sec, + const function &cb) { + auto key = getPusherKey(schema, vhost, app, stream, url); + auto src = MediaSource::find(schema, vhost, app, stream); + if (!src) { + cb(SockException(Err_other, "can not find the source stream"), key); + return; + } + if (s_pusher_proxy.find(key)) { + //已经在推流了 + cb(SockException(Err_success), key); + return; + } + + //添加推流代理 + auto pusher = s_pusher_proxy.make(key, src, retry_count); + + //指定RTP over TCP(播放rtsp时有效) + pusher->emplace(Client::kRtpType, rtp_type); + + if (timeout_sec > 0.1f) { + //推流握手超时时间 + pusher->emplace(Client::kTimeoutMS, timeout_sec * 1000); + } + + //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 + pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) { + if (ex) { + WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex; + s_pusher_proxy.erase(key); + } + cb(ex, key); + }); + + //被主动关闭推流 + pusher->setOnClose([key, url](const SockException &ex) { + WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex; + s_pusher_proxy.erase(key); + }); + pusher->publish(url); +} + template static void getArgsValue(const HttpAllArgs &allArgs, const string &key, Type &value) { auto val = allArgs[key]; @@ -973,59 +1045,6 @@ void installWebApi() { val["count_hit"] = (Json::UInt64)count_hit; }); - static auto addStreamPusherProxy = [](const string &schema, - const string &vhost, - const string &app, - const string &stream, - const string &url, - int retry_count, - int rtp_type, - float timeout_sec, - const function &cb) { - auto key = getPusherKey(schema, vhost, app, stream, url); - auto src = MediaSource::find(schema, vhost, app, stream); - if (!src) { - cb(SockException(Err_other, "can not find the source stream"), key); - return; - } - lock_guard lck(s_proxyPusherMapMtx); - if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()) { - //已经在推流了 - cb(SockException(Err_success), key); - return; - } - - //添加推流代理 - auto pusher = std::make_shared(src, retry_count); - s_proxyPusherMap[key] = pusher; - - //指定RTP over TCP(播放rtsp时有效) - (*pusher)[Client::kRtpType] = rtp_type; - - if (timeout_sec > 0.1) { - //推流握手超时时间 - (*pusher)[Client::kTimeoutMS] = timeout_sec * 1000; - } - - //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 - pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) { - if (ex) { - WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex; - lock_guard lck(s_proxyPusherMapMtx); - s_proxyPusherMap.erase(key); - } - cb(ex, key); - }); - - //被主动关闭推流 - pusher->setOnClose([key, url](const SockException &ex) { - WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex; - lock_guard lck(s_proxyPusherMapMtx); - s_proxyPusherMap.erase(key); - }); - pusher->publish(url); - }; - //动态添加rtsp/rtmp推流代理 //测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { @@ -1058,8 +1077,7 @@ void installWebApi() { api_regist("/index/api/delStreamPusherProxy", [](API_ARGS_MAP) { CHECK_SECRET(); CHECK_ARGS("key"); - lock_guard lck(s_proxyPusherMapMtx); - val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1; + val["data"]["flag"] = s_pusher_proxy.erase(allArgs["key"]) == 1; }); //动态添加rtsp/rtmp拉流代理 @@ -1100,8 +1118,7 @@ void installWebApi() { api_regist("/index/api/delStreamProxy",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("key"); - lock_guard lck(s_proxyMapMtx); - val["data"]["flag"] = s_proxyMap.erase(allArgs["key"]) == 1; + val["data"]["flag"] = s_player_proxy.erase(allArgs["key"]) == 1; }); static auto addFFmpegSource = [](const string &ffmpeg_cmd_key, @@ -1112,25 +1129,21 @@ void installWebApi() { bool enable_mp4, const function &cb) { auto key = MD5(dst_url).hexdigest(); - lock_guard lck(s_ffmpegMapMtx); - if (s_ffmpegMap.find(key) != s_ffmpegMap.end()) { + if (s_ffmpeg_src.find(key)) { //已经在拉流了 cb(SockException(Err_success), key); return; } - FFmpegSource::Ptr ffmpeg = std::make_shared(); - s_ffmpegMap[key] = ffmpeg; + auto ffmpeg = s_ffmpeg_src.make(key); ffmpeg->setOnClose([key]() { - lock_guard lck(s_ffmpegMapMtx); - s_ffmpegMap.erase(key); + s_ffmpeg_src.erase(key); }); ffmpeg->setupRecordFlag(enable_hls, enable_mp4); ffmpeg->play(ffmpeg_cmd_key, src_url, dst_url, timeout_ms, [cb, key](const SockException &ex) { if (ex) { - lock_guard lck(s_ffmpegMapMtx); - s_ffmpegMap.erase(key); + s_ffmpeg_src.erase(key); } cb(ex, key); }); @@ -1164,8 +1177,7 @@ void installWebApi() { api_regist("/index/api/delFFmpegSource",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("key"); - lock_guard lck(s_ffmpegMapMtx); - val["data"]["flag"] = s_ffmpegMap.erase(allArgs["key"]) == 1; + val["data"]["flag"] = s_ffmpeg_src.erase(allArgs["key"]) == 1; }); //新增http api下载可执行程序文件接口 @@ -1245,22 +1257,27 @@ void installWebApi() { api_regist("/index/api/connectRtpServer", [](API_ARGS_MAP_ASYNC) { CHECK_SECRET(); CHECK_ARGS("stream_id", "dst_url", "dst_port"); - connectRtpServer( - allArgs["stream_id"], allArgs["dst_url"], allArgs["dst_port"], - [val, headerOut, invoker](const SockException &ex) mutable { - if (ex) { - val["code"] = API::OtherFailed; - val["msg"] = ex.what(); - } - invoker(200, headerOut, val.toStyledString()); - }); + auto cb = [val, headerOut, invoker](const SockException &ex) mutable { + if (ex) { + val["code"] = API::OtherFailed; + val["msg"] = ex.what(); + } + invoker(200, headerOut, val.toStyledString()); + }; + + auto server = s_rtp_server.find(allArgs["stream_id"]); + if (!server) { + cb(SockException(Err_other, "未找到rtp服务")); + return; + } + server->connectToServer(allArgs["dst_url"], allArgs["dst_port"], cb); }); api_regist("/index/api/closeRtpServer",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("stream_id"); - if(!closeRtpServer(allArgs["stream_id"])){ + if(s_rtp_server.erase(allArgs["stream_id"]) == 0){ val["hit"] = 0; return; } @@ -1271,19 +1288,18 @@ void installWebApi() { CHECK_SECRET(); CHECK_ARGS("stream_id", "ssrc"); - lock_guard lck(s_rtpServerMapMtx); - auto it = s_rtpServerMap.find(allArgs["stream_id"]); - if (it == s_rtpServerMap.end()) { + auto server = s_rtp_server.find(allArgs["stream_id"]); + if (!server) { throw ApiRetException("RtpServer not found by stream_id", API::NotFound); } - it->second->updateSSRC(allArgs["ssrc"]); + server->updateSSRC(allArgs["ssrc"]); }); api_regist("/index/api/listRtpServer",[](API_ARGS_MAP){ CHECK_SECRET(); - lock_guard lck(s_rtpServerMapMtx); - for (auto &pr : s_rtpServerMap) { + std::lock_guard lck(s_rtp_server._mtx); + for (auto &pr : s_rtp_server._map) { Value obj; obj["stream_id"] = pr.first; obj["port"] = pr.second->getPort(); @@ -1518,18 +1534,11 @@ void installWebApi() { api_regist("/index/api/getProxyPusherInfo", [](API_ARGS_MAP_ASYNC) { CHECK_SECRET(); CHECK_ARGS("key"); - decltype(s_proxyPusherMap.end()) it; - { - lock_guard lck(s_proxyPusherMapMtx); - it = s_proxyPusherMap.find(allArgs["key"]); - } - - if (it == s_proxyPusherMap.end()) { + auto pusher = s_pusher_proxy.find(allArgs["key"]); + if (!pusher) { throw ApiRetException("can not find pusher", API::NotFound); } - auto pusher = it->second; - val["data"]["status"] = pusher->getStatus(); val["data"]["liveSecs"] = pusher->getLiveSecs(); val["data"]["rePublishCount"] = pusher->getRePublishCount(); @@ -1539,18 +1548,11 @@ void installWebApi() { api_regist("/index/api/getProxyInfo", [](API_ARGS_MAP_ASYNC) { CHECK_SECRET(); CHECK_ARGS("key"); - decltype(s_proxyMap.end()) it; - { - lock_guard lck(s_proxyMapMtx); - it = s_proxyMap.find(allArgs["key"]); - } - - if (it == s_proxyMap.end()) { + auto proxy = s_player_proxy.find(allArgs["key"]); + if (!proxy) { throw ApiRetException("can not find the proxy", API::NotFound); } - auto proxy = it->second; - val["data"]["status"] = proxy->getStatus(); val["data"]["liveSecs"] = proxy->getLiveSecs(); val["data"]["rePullCount"] = proxy->getRePullCount(); @@ -1926,31 +1928,12 @@ void installWebApi() { } void unInstallWebApi(){ - { - lock_guard lck(s_proxyMapMtx); - auto proxyMap(std::move(s_proxyMap)); - proxyMap.clear(); - } - - { - lock_guard lck(s_ffmpegMapMtx); - auto ffmpegMap(std::move(s_ffmpegMap)); - ffmpegMap.clear(); - } - - { - lock_guard lck(s_proxyPusherMapMtx); - auto proxyPusherMap(std::move(s_proxyPusherMap)); - proxyPusherMap.clear(); - } - - { + s_player_proxy.clear(); + s_ffmpeg_src.clear(); + s_pusher_proxy.clear(); #if defined(ENABLE_RTPPROXY) - RtpSelector::Instance().clear(); - lock_guard lck(s_rtpServerMapMtx); - auto rtpServerMap(std::move(s_rtpServerMap)); - rtpServerMap.clear(); + s_rtp_server.clear(); #endif - } + NoticeCenter::Instance().delListener(&web_api_tag); } diff --git a/server/main.cpp b/server/main.cpp index c946beab..c25a89fb 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -392,8 +392,8 @@ int start_main(int argc,char *argv[]) { #endif//defined(ENABLE_WEBRTC) #if defined(ENABLE_SRT) - // srt udp服务器 - if(srtPort) { srtSrv->start(srtPort); } + // srt udp服务器 + if (srtPort) { srtSrv->start(srtPort); } #endif//defined(ENABLE_SRT) } catch (std::exception &ex) {