From 752590f8046fe583f77e88d85c93396896691c2c Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Thu, 17 Jun 2021 10:59:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84addStreamPusherProxy=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E5=8A=9F=E8=83=BD=E5=B9=B6=E4=BF=AE=E5=A4=8D=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=88=A0=E9=99=A4=E7=9B=B8=E5=85=B3=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 36 ++++++++++++------------------------ src/Pusher/PusherProxy.cpp | 12 +++--------- src/Pusher/PusherProxy.h | 11 ++--------- 3 files changed, 17 insertions(+), 42 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 3f30332e..be694d2a 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -616,6 +616,11 @@ void installWebApi() { float timeout_sec, const function &cb) { auto key = getPusherKey(schema, vhost, app, stream, url); + auto src = MediaSource::find(schema, vhost, app, stream); + if (!src) { + cb(SockException(Err_other, "can not find the source stream"), key); + return; + } lock_guard lck(s_proxyPusherMapMtx); if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()) { //已经在推流了 @@ -624,7 +629,7 @@ void installWebApi() { } //添加推流代理 - PusherProxy::Ptr pusher(new PusherProxy(schema, vhost, app, stream, retry_count ? retry_count : -1)); + PusherProxy::Ptr pusher(new PusherProxy(src, retry_count ? retry_count : -1)); s_proxyPusherMap[key] = pusher; //指定RTP over TCP(播放rtsp时有效) @@ -638,18 +643,18 @@ void installWebApi() { //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) { if (ex) { - InfoL << "key: " << key << ", " << "addStreamPusherProxy pusher callback error: " << ex.what(); - lock_guard lck(s_proxyMapMtx); - s_proxyMap.erase(key); + WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what(); + lock_guard lck(s_proxyPusherMapMtx); + s_proxyPusherMap.erase(key); } cb(ex, key); }); //被主动关闭推流 pusher->setOnClose([key, url](const SockException &ex) { - InfoL << "key: " << key << ", " << "addStreamPusherProxy close callback error: " << ex.what(); - lock_guard lck(s_proxyMapMtx); - s_proxyMap.erase(key); + WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what(); + lock_guard lck(s_proxyPusherMapMtx); + s_proxyPusherMap.erase(key); }); pusher->publish(url); }; @@ -659,23 +664,7 @@ void installWebApi() { api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { CHECK_SECRET(); CHECK_ARGS("schema", "vhost", "app", "stream", "dst_url"); - auto dst_url = allArgs["dst_url"]; - auto src_url = allArgs["schema"] + "/" + allArgs["vhost"] + "/" + allArgs["app"] + "/" + allArgs["stream"]; - auto src = MediaSource::find(allArgs["schema"], - allArgs["vhost"], - allArgs["app"], - allArgs["stream"]); - if (!src) { - WarnL << "addStreamPusherProxy, can not find source stream:" << src_url; - val["code"] = API::NotFound; - val["msg"] = "can not find the source stream"; - invoker(200, headerOut, val.toStyledString()); - return; - } - - InfoL << "addStreamPusherProxy, find stream: " << src_url << ", push dst url: " << dst_url; - addStreamPusherProxy(allArgs["schema"], allArgs["vhost"], allArgs["app"], @@ -688,7 +677,6 @@ void installWebApi() { if (ex) { val["code"] = API::OtherFailed; val["msg"] = ex.what(); - WarnL << "Publish stream failed, dst url is: " << dst_url; } else { val["data"]["key"] = key; InfoL << "Publish success, please play with player:" << dst_url; diff --git a/src/Pusher/PusherProxy.cpp b/src/Pusher/PusherProxy.cpp index 4e5f4940..98c2724c 100644 --- a/src/Pusher/PusherProxy.cpp +++ b/src/Pusher/PusherProxy.cpp @@ -14,13 +14,8 @@ using namespace toolkit; namespace mediakit { -PusherProxy::PusherProxy(const string& schema, const string &vhost, const string &app, const string &stream, - int retry_count, const EventPoller::Ptr &poller) - : MediaPusher(schema,vhost, app, stream, poller){ - _schema = schema; - _vhost = vhost; - _app = app; - _stream_id = stream; +PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const EventPoller::Ptr &poller) + : MediaPusher(src, poller){ _retry_count = retry_count; _on_close = [](const SockException &) {}; } @@ -53,7 +48,7 @@ void PusherProxy::publish(const string& dstUrl) { if (!err) { // 推流成功 *piFailedCnt = 0; - InfoL << "pusher publish " << dstUrl << " success"; + InfoL << "Publish " << dstUrl << " success"; } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { // 推流失败,延时重试推送 strongSelf->rePublish(dstUrl, (*piFailedCnt)++); @@ -77,7 +72,6 @@ void PusherProxy::publish(const string& dstUrl) { }); MediaPusher::publish(dstUrl); - _publish_url = dstUrl; } void PusherProxy::rePublish(const string &dstUrl, int iFailedCnt) { diff --git a/src/Pusher/PusherProxy.h b/src/Pusher/PusherProxy.h index 04b8c624..a21ea916 100644 --- a/src/Pusher/PusherProxy.h +++ b/src/Pusher/PusherProxy.h @@ -24,10 +24,8 @@ public: typedef std::shared_ptr Ptr; // 如果retry_count<0,则一直重试播放;否则重试retry_count次数 - // 默认一直重试,创建此对象时候,需要外部保证mediaSource存在 - PusherProxy(const string& schema, const string &vhost, const string &app, const string &stream, - int retry_count = -1, const EventPoller::Ptr &poller = nullptr); - + // 默认一直重试,创建此对象时候,需要外部保证MediaSource存在 + PusherProxy(const MediaSource::Ptr &src, int retry_count = -1, const EventPoller::Ptr &poller = nullptr); ~PusherProxy() override; /** @@ -54,11 +52,6 @@ private: private: int _retry_count; - std::string _schema; - string _vhost; - string _app; - string _stream_id; - std::string _publish_url; Timer::Ptr _timer; function _on_close;