From cd7ae27276eb0cf9cb9473feace8fe52064acd7d Mon Sep 17 00:00:00 2001 From: monktan Date: Wed, 16 Jun 2021 19:40:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8E=A8=E6=B5=81=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 101 +++++++++++++++++++++++++++++++++++++ src/Pusher/PusherProxy.cpp | 98 +++++++++++++++++++++++++++++++++++ src/Pusher/PusherProxy.h | 70 +++++++++++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 src/Pusher/PusherProxy.cpp create mode 100644 src/Pusher/PusherProxy.h diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 969c3636..349272a0 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -27,6 +27,7 @@ #include "Network/TcpServer.h" #include "Network/UdpServer.h" #include "Player/PlayerProxy.h" +#include "Pusher/PusherProxy.h" #include "Util/MD5.h" #include "WebApi.h" #include "WebHook.h" @@ -234,6 +235,10 @@ static inline void addHttpListener(){ static unordered_map s_proxyMap; static recursive_mutex s_proxyMapMtx; +//推流代理器列表 +static unordered_map s_proxyPusherMap; +static recursive_mutex s_proxyPusherMapMtx; + //FFmpeg拉流代理器列表 static unordered_map s_ffmpegMap; static recursive_mutex s_ffmpegMapMtx; @@ -596,6 +601,102 @@ void installWebApi() { val["count_hit"] = (Json::UInt64)count_hit; }); + static auto addStreamPusherProxy = [](const string &schema, + const string &vhost, + const string &app, + const string &stream, + const string &url, + int retryCount, + const function &cb){ + auto key = getProxyKey(vhost, app, stream); + lock_guard lck(s_proxyPusherMapMtx); + if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()){ + //已经在推流了 + cb(SockException(Err_success),key); + return; + } + + auto poller = EventPollerPool::Instance().getPoller(); + int retry_count = 3; + if (retryCount != 0) retry_count = retryCount; + + //添加推流代理 + PusherProxy::Ptr pusher(new PusherProxy(schema,vhost, app, stream, retry_count, poller)); + s_proxyPusherMap[key] = pusher; + + //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 + pusher->setPushCallbackOnce([cb, key, url](const SockException &ex){ + if (ex){ + InfoL << "key: " << key << ", " << "addStreamPusherProxy pusher callback error: " << ex.what(); + lock_guard lck(s_proxyMapMtx); + s_proxyMap.erase(key); + } + cb(ex,key); + }); + + //被主动关闭推流 + pusher->setOnClose([key, url](const SockException &ex){ + InfoL << "key: " << key << ", " << "addStreamPusherProxy close callback error: " << ex.what(); + lock_guard lck(s_proxyMapMtx); + s_proxyMap.erase(key); + }); + pusher->publish(url); + }; + + //动态添加rtsp/rtmp推流代理 + //测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs + api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { + CHECK_SECRET(); + CHECK_ARGS("schema","vhost","app","stream"); + + InfoL << allArgs["schema"] << ", " << allArgs["vhost"] << ", " << allArgs["app"] << ", " << allArgs["stream"]; + + //查找源 + auto src = MediaSource::find(allArgs["schema"], + allArgs["vhost"], + allArgs["app"], + allArgs["stream"]); + if (!src){ + InfoL << "addStreamPusherProxy, canont find source stream!"; + const_cast(val)["code"] = API::OtherFailed; + const_cast(val)["msg"] = "can not find the source stream"; + invoker(200, headerOut, val.toStyledString()); + return; + } + + std::string srcUrl = allArgs["schema"] + "://" + "127.0.0.1" + "/" + allArgs["app"] + "/" + allArgs["stream"]; + std::string pushUrl = decodeBase64(allArgs["dst_url"]); + InfoL << "addStreamPusherProxy, find stream: " << srcUrl << ", push dst url: " << pushUrl; + + addStreamPusherProxy(allArgs["schema"], + allArgs["vhost"], + allArgs["app"], + allArgs["stream"], + pushUrl, + allArgs["retry_count"], + [invoker,val,headerOut, pushUrl](const SockException &ex, const string &key){ + if(ex){ + const_cast(val)["code"] = API::OtherFailed; + const_cast(val)["msg"] = ex.what(); + InfoL << "Publish error url: " << pushUrl; + }else{ + const_cast(val)["data"]["key"] = key; + InfoL << "Publish success, Please play with player:" << pushUrl; + } + invoker(200, headerOut, val.toStyledString()); + }); + + }); + + //关闭推流代理 + //测试url http://127.0.0.1/index/api/delStreamPusherProxy?key=__defaultVhost__/proxy/0 + api_regist("/index/api/delStreamPusherProxy",[](API_ARGS_MAP){ + CHECK_SECRET(); + CHECK_ARGS("key"); + lock_guard lck(s_proxyPusherMapMtx); + val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1; + }); + static auto addStreamProxy = [](const string &vhost, const string &app, const string &stream, diff --git a/src/Pusher/PusherProxy.cpp b/src/Pusher/PusherProxy.cpp new file mode 100644 index 00000000..4e5f4940 --- /dev/null +++ b/src/Pusher/PusherProxy.cpp @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "PusherProxy.h" + +using namespace toolkit; + +namespace mediakit { + +PusherProxy::PusherProxy(const string& schema, const string &vhost, const string &app, const string &stream, + int retry_count, const EventPoller::Ptr &poller) + : MediaPusher(schema,vhost, app, stream, poller){ + _schema = schema; + _vhost = vhost; + _app = app; + _stream_id = stream; + _retry_count = retry_count; + _on_close = [](const SockException &) {}; +} + +PusherProxy::~PusherProxy() { + _timer.reset(); +} + +void PusherProxy::setPushCallbackOnce(const function &cb) { + _on_publish = cb; +} + +void PusherProxy::setOnClose(const function &cb) { + _on_close = cb; +} + +void PusherProxy::publish(const string& dstUrl) { + std::weak_ptr weakSelf = shared_from_this(); + std::shared_ptr piFailedCnt(new int(0)); + + setOnPublished([weakSelf, dstUrl, piFailedCnt](const SockException &err) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) return; + + if (strongSelf->_on_publish) { + strongSelf->_on_publish(err); + strongSelf->_on_publish = nullptr; + } + + if (!err) { + // 推流成功 + *piFailedCnt = 0; + InfoL << "pusher publish " << dstUrl << " success"; + } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + // 推流失败,延时重试推送 + strongSelf->rePublish(dstUrl, (*piFailedCnt)++); + } else { + //达到了最大重试次数,回调关闭 + strongSelf->_on_close(err); + } + }); + + setOnShutdown([weakSelf, dstUrl, piFailedCnt](const SockException &err) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) return; + + //推流异常中断,延时重试播放 + if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + strongSelf->rePublish(dstUrl, (*piFailedCnt)++); + } else { + //达到了最大重试次数,回调关闭 + strongSelf->_on_close(err); + } + }); + + MediaPusher::publish(dstUrl); + _publish_url = dstUrl; +} + +void PusherProxy::rePublish(const string &dstUrl, int iFailedCnt) { + auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60 * 1000)); + weak_ptr weakSelf = shared_from_this(); + _timer = std::make_shared(iDelay / 1000.0f, [weakSelf, dstUrl, iFailedCnt]() { + //推流失败次数越多,则延时越长 + auto strongPusher = weakSelf.lock(); + if (!strongPusher) { + return false; + } + WarnL << "推流重试[" << iFailedCnt << "]:" << dstUrl; + strongPusher->MediaPusher::publish(dstUrl); + return false; + }, getPoller()); +} + +} /* namespace mediakit */ diff --git a/src/Pusher/PusherProxy.h b/src/Pusher/PusherProxy.h new file mode 100644 index 00000000..04b8c624 --- /dev/null +++ b/src/Pusher/PusherProxy.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef SRC_DEVICE_PUSHERPROXY_H +#define SRC_DEVICE_PUSHERPROXY_H + +#include "Pusher/MediaPusher.h" +#include "Util/TimeTicker.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit { + +class PusherProxy : public MediaPusher, public std::enable_shared_from_this { +public: + typedef std::shared_ptr Ptr; + + // 如果retry_count<0,则一直重试播放;否则重试retry_count次数 + // 默认一直重试,创建此对象时候,需要外部保证mediaSource存在 + PusherProxy(const string& schema, const string &vhost, const string &app, const string &stream, + int retry_count = -1, const EventPoller::Ptr &poller = nullptr); + + ~PusherProxy() override; + + /** + * 设置push结果回调,只触发一次;在publish执行之前有效 + * @param cb 回调对象 + */ + void setPushCallbackOnce(const function &cb); + + /** + * 设置主动关闭回调 + * @param cb 回调对象 + */ + void setOnClose(const function &cb); + + /** + * 开始拉流播放 + * @param dstUrl 目标推流地址 + */ + void publish(const string& dstUrl) override; + +private: + // 重推逻辑函数 + void rePublish(const string &dstUrl, int iFailedCnt); + +private: + int _retry_count; + std::string _schema; + string _vhost; + string _app; + string _stream_id; + std::string _publish_url; + Timer::Ptr _timer; + + function _on_close; + function _on_publish; +}; + +} /* namespace mediakit */ + +#endif //SRC_DEVICE_PUSHERPROXY_H