From 901c38130072d7f6d9e009dcc3edf4e929a8a90e Mon Sep 17 00:00:00 2001 From: mtdxc Date: Fri, 1 Nov 2024 10:47:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=8E=B7=E5=8F=96=E6=8E=A8?= =?UTF-8?q?=E6=B5=81=E6=8E=A8=E6=B5=81=E4=BB=A3=E7=90=86=E5=88=97=E8=A1=A8?= =?UTF-8?q?=E5=92=8Cffmpeg=E6=BA=90=E5=88=97=E8=A1=A8=E6=8E=A5=E5=8F=A3=20?= =?UTF-8?q?(#3992)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- postman/ZLMediaKit.postman_collection.json | 78 ++++++++++++++++++++++ server/FFmpegSource.cpp | 1 + server/FFmpegSource.h | 7 ++ server/WebApi.cpp | 76 ++++++++++++++++++--- src/Player/PlayerProxy.h | 4 ++ src/Pusher/MediaPusher.cpp | 1 + src/Pusher/MediaPusher.h | 4 +- src/Pusher/PusherProxy.cpp | 5 +- src/Pusher/PusherProxy.h | 1 - src/Rtp/RtpServer.cpp | 10 +++ src/Rtp/RtpServer.h | 3 + 11 files changed, 176 insertions(+), 14 deletions(-) diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index 8b80aede..3cdf255d 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -700,6 +700,58 @@ }, "response": [] }, + { + "name": "获取拉流代理列表(listStreamProxy)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/listStreamProxy?secret={{ZLMediaKit_secret}}", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "listStreamProxy" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置)" + } + ] + } + }, + "response": [] + }, + { + "name": "获取推流代理列表(listStreamPusherProxy)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/listStreamPusherProxy?secret={{ZLMediaKit_secret}}", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "listStreamPusherProxy" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置)" + } + ] + } + }, + "response": [] + }, { "name": "添加rtsp/rtmp推流(addStreamPusherProxy)", "request": { @@ -800,6 +852,32 @@ }, "response": [] }, + { + "name": "获取FFmpeg拉流代理列表(listFFmpegSource)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/listFFmpegSource?secret={{ZLMediaKit_secret}}", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "listFFmpegSource" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置)" + } + ] + } + }, + "response": [] + }, { "name": "添加FFmpeg拉流代理(addFFmpegSource)", "request": { diff --git a/server/FFmpegSource.cpp b/server/FFmpegSource.cpp index aa15830f..09b16e50 100644 --- a/server/FFmpegSource.cpp +++ b/server/FFmpegSource.cpp @@ -103,6 +103,7 @@ void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url, con snprintf(cmd, sizeof(cmd), ffmpeg_cmd.data(), File::absolutePath("", ffmpeg_bin).data(), src_url.data(), dst_url.data()); auto log_file = ffmpeg_log.empty() ? "" : File::absolutePath("", ffmpeg_log); _process.run(cmd, log_file); + _cmd = cmd; InfoL << cmd; if (is_local_ip(_media_info.host)) { diff --git a/server/FFmpegSource.h b/server/FFmpegSource.h index 8bb3130a..4bc44e33 100644 --- a/server/FFmpegSource.h +++ b/server/FFmpegSource.h @@ -77,6 +77,12 @@ public: */ void play(const std::string &ffmpeg_cmd_key, const std::string &src_url, const std::string &dst_url, int timeout_ms, const onPlay &cb); + const std::string& getSrcUrl() const { return _src_url; } + const std::string& getDstUrl() const { return _dst_url; } + const std::string& getCmd() const { return _cmd; } + const std::string& getCmdKey() const { return _ffmpeg_cmd_key; } + const mediakit::MediaInfo& getMediaInfo() const { return _media_info; } + /** * 设置录制 * @param enable_hls 是否开启hls直播或录制 @@ -115,6 +121,7 @@ private: std::string _src_url; std::string _dst_url; std::string _ffmpeg_cmd_key; + std::string _cmd; std::function _onClose; toolkit::Ticker _replay_ticker; }; diff --git a/server/WebApi.cpp b/server/WebApi.cpp index c46f2269..5c1aab1a 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -346,6 +346,15 @@ public: return it->second; } + void for_each(const std::function& cb) { + std::lock_guard lck(_mtx); + auto it = _map.begin(); + while (it != _map.end()) { + cb(it->first, it->second); + it++; + } + } + template Pointer make(const std::string &key, _Args&& ...__args) { // assert(!find(key)); @@ -409,6 +418,29 @@ void dumpMediaTuple(const MediaTuple &tuple, Json::Value& item) { item["params"] = tuple.params; } +Value ToJson(const PusherProxy::Ptr& p) { + Value item; + item["url"] = p->getUrl(); + item["status"] = p->getStatus(); + item["liveSecs"] = p->getLiveSecs(); + item["rePublishCount"] = p->getRePublishCount(); + if (auto src = p->getSrc()) { + dumpMediaTuple(src->getMediaTuple(), item["src"]); + } + return item; +} + +Value ToJson(const PlayerProxy::Ptr& p) { + Value item; + item["url"] = p->getUrl(); + item["status"] = p->getStatus(); + item["liveSecs"] = p->getLiveSecs(); + item["rePullCount"] = p->getRePullCount(); + item["totalReaderCount"] = p->totalReaderCount(); + dumpMediaTuple(p->getMediaTuple(), item["src"]); + return item; +} + Value makeMediaSourceJson(MediaSource &media){ Value item; item["schema"] = media.getSchema(); @@ -1173,7 +1205,22 @@ void installWebApi() { CHECK_ARGS("key"); val["data"]["flag"] = s_pusher_proxy.erase(allArgs["key"]) == 1; }); - + api_regist("/index/api/listStreamPusherProxy", [](API_ARGS_MAP) { + CHECK_SECRET(); + s_pusher_proxy.for_each([&val](const std::string& key, const PusherProxy::Ptr& p) { + Json::Value item = ToJson(p); + item["key"] = key; + val["data"].append(item); + }); + }); + api_regist("/index/api/listStreamProxy", [](API_ARGS_MAP) { + CHECK_SECRET(); + s_player_proxy.for_each([&val](const std::string& key, const PlayerProxy::Ptr& p) { + Json::Value item = ToJson(p); + item["key"] = key; + val["data"].append(item); + }); + }); // 动态添加rtsp/rtmp拉流代理 [AUTO-TRANSLATED:2616537c] // Dynamically add rtsp/rtmp pull stream proxy // 测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs [AUTO-TRANSLATED:71ddce15] @@ -1286,7 +1333,18 @@ void installWebApi() { CHECK_ARGS("key"); val["data"]["flag"] = s_ffmpeg_src.erase(allArgs["key"]) == 1; }); - + api_regist("/index/api/listFFmpegSource", [](API_ARGS_MAP) { + CHECK_SECRET(); + s_ffmpeg_src.for_each([&val](const std::string& key, const FFmpegSource::Ptr& src) { + Json::Value item; + item["src_url"] = src->getSrcUrl(); + item["dst_url"] = src->getDstUrl(); + item["cmd"] = src->getCmd(); + item["ffmpeg_cmd_key"] = src->getCmdKey(); + item["key"] = key; + val["data"].append(item); + }); + }); // 新增http api下载可执行程序文件接口 [AUTO-TRANSLATED:d6e44e84] // Add a new http api to download executable files // 测试url http://127.0.0.1/index/api/downloadBin [AUTO-TRANSLATED:9525e834] @@ -1477,7 +1535,11 @@ void installWebApi() { obj["vhost"] = vec[0]; obj["app"] = vec[1]; obj["stream_id"] = vec[2]; - obj["port"] = pr.second->getPort(); + auto& rtps = pr.second; + obj["port"] = rtps->getPort(); + obj["ssrc"] = rtps->getSSRC(); + obj["tcp_mode"] = rtps->getTcpMode(); + obj["only_track"] = rtps->getOnlyTrack(); val["data"].append(obj); } }); @@ -1741,9 +1803,7 @@ void installWebApi() { throw ApiRetException("can not find pusher", API::NotFound); } - val["data"]["status"] = pusher->getStatus(); - val["data"]["liveSecs"] = pusher->getLiveSecs(); - val["data"]["rePublishCount"] = pusher->getRePublishCount(); + val["data"] = ToJson(pusher); invoker(200, headerOut, val.toStyledString()); }); @@ -1755,9 +1815,7 @@ void installWebApi() { throw ApiRetException("can not find the proxy", API::NotFound); } - val["data"]["status"] = proxy->getStatus(); - val["data"]["liveSecs"] = proxy->getLiveSecs(); - val["data"]["rePullCount"] = proxy->getRePullCount(); + val["data"] = ToJson(proxy); invoker(200, headerOut, val.toStyledString()); }); diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index a90d2b45..304d22a6 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -139,6 +139,10 @@ public: // Using this only makes sense after a successful connection to the server TranslationInfo getTranslationInfo(); + const std::string& getUrl() const { return _pull_url; } + const MediaTuple& getMediaTuple() const { return _tuple; } + const ProtocolOption& getOption() const { return _option; } + private: // MediaSourceEvent override bool close(MediaSource &sender) override; diff --git a/src/Pusher/MediaPusher.cpp b/src/Pusher/MediaPusher.cpp index 494f0eee..a7510e10 100644 --- a/src/Pusher/MediaPusher.cpp +++ b/src/Pusher/MediaPusher.cpp @@ -46,6 +46,7 @@ void MediaPusher::publish(const string &url) { _delegate->setOnPublished(_on_publish); _delegate->mINI::operator=(*this); _delegate->publish(url); + _url = url; } EventPoller::Ptr MediaPusher::getPoller(){ diff --git a/src/Pusher/MediaPusher.h b/src/Pusher/MediaPusher.h index 899f7cf2..cd9c2c9f 100644 --- a/src/Pusher/MediaPusher.h +++ b/src/Pusher/MediaPusher.h @@ -33,11 +33,13 @@ public: void publish(const std::string &url) override; toolkit::EventPoller::Ptr getPoller(); void setOnCreateSocket(toolkit::Socket::onCreateSocket cb); - + std::shared_ptr getSrc() { return _src.lock(); } + const std::string& getUrl() const { return _url; } private: std::weak_ptr _src; toolkit::EventPoller::Ptr _poller; toolkit::Socket::onCreateSocket _on_create_socket; + std::string _url; }; } /* namespace mediakit */ diff --git a/src/Pusher/PusherProxy.cpp b/src/Pusher/PusherProxy.cpp index 397aadaa..742df665 100644 --- a/src/Pusher/PusherProxy.cpp +++ b/src/Pusher/PusherProxy.cpp @@ -19,7 +19,6 @@ PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const Eve : MediaPusher(src, poller) { _retry_count = retry_count; _on_close = [](const SockException &) {}; - _weak_src = src; _live_secs = 0; _live_status = 1; _republish_count = 0; @@ -52,7 +51,7 @@ void PusherProxy::publish(const string &dst_url) { strong_self->_on_publish = nullptr; } - auto src = strong_self->_weak_src.lock(); + auto src = strong_self->getSrc(); if (!err) { // 推流成功 [AUTO-TRANSLATED:28ce6e56] // Stream successfully pushed @@ -87,7 +86,7 @@ void PusherProxy::publish(const string &dst_url) { TraceL << " live secs " << strong_self->_live_secs; } - auto src = strong_self->_weak_src.lock(); + auto src = strong_self->getSrc(); // 推流异常中断,延时重试播放 [AUTO-TRANSLATED:e69e5a05] // Stream abnormally interrupted, retry playing with delay if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) { diff --git a/src/Pusher/PusherProxy.h b/src/Pusher/PusherProxy.h index 965096bb..df5d0e3a 100644 --- a/src/Pusher/PusherProxy.h +++ b/src/Pusher/PusherProxy.h @@ -77,7 +77,6 @@ private: std::atomic _live_status; std::atomic _live_secs; std::atomic _republish_count; - std::weak_ptr _weak_src; std::function _on_close; std::function _on_publish; }; diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index ebd3db9b..580f8159 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -295,5 +295,15 @@ void RtpServer::updateSSRC(uint32_t ssrc) { } } +uint32_t RtpServer::getSSRC() const { + if (_ssrc) { + return *_ssrc; + } + if (_tcp_server) { + return (*_tcp_server)[RtpSession::kSSRC]; + } + return 0; +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index daf5b6e0..739ecf87 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -97,6 +97,9 @@ public: */ void updateSSRC(uint32_t ssrc); + uint32_t getSSRC() const; + int getOnlyTrack() const { return _only_track; } + TcpMode getTcpMode() const { return _tcp_mode; } private: // tcp主动模式连接服务器成功回调 [AUTO-TRANSLATED:0775844e] // tcp active mode connection server success callback