From fadef1cac3c44bf43d9eb4574b1fcb60de08d3b5 Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Wed, 16 Jun 2021 11:39:46 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=B7=BB=E5=8A=A0track?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtp/Decoder.cpp | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index 60b04baa..5c011ae8 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -103,19 +103,16 @@ static const char *getCodecName(int codec_id) { void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t bytes, int finish){ switch (codecid) { case PSI_STREAM_H264: { - InfoL << "got video track: H264"; onTrack(std::make_shared()); break; } case PSI_STREAM_H265: { - InfoL << "got video track: H265"; onTrack(std::make_shared()); break; } case PSI_STREAM_AAC: { - InfoL<< "got audio track: AAC"; onTrack(std::make_shared()); break; } @@ -123,14 +120,12 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt case PSI_STREAM_AUDIO_G711A: case PSI_STREAM_AUDIO_G711U: { auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; - InfoL << "got audio track: G711"; //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 onTrack(std::make_shared(codec, 8000, 1, 16)); break; } case PSI_STREAM_AUDIO_OPUS: { - InfoL << "got audio track: opus"; onTrack(std::make_shared()); break; } @@ -223,8 +218,11 @@ void DecoderImp::onStream(int stream,int codecid,const void *extra,size_t bytes, #endif void DecoderImp::onTrack(const Track::Ptr &track) { - _tracks[track->getTrackType()] = track; - _sink->addTrack(track); + if (!_tracks[track->getTrackType()]) { + _tracks[track->getTrackType()] = track; + _sink->addTrack(track); + InfoL << "got track: " << track->getCodecName(); + } } void DecoderImp::onFrame(const Frame::Ptr &frame) { From cd7ae27276eb0cf9cb9473feace8fe52064acd7d Mon Sep 17 00:00:00 2001 From: monktan Date: Wed, 16 Jun 2021 19:40:08 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 101 +++++++++++++++++++++++++++++++++++++ src/Pusher/PusherProxy.cpp | 98 +++++++++++++++++++++++++++++++++++ src/Pusher/PusherProxy.h | 70 +++++++++++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 src/Pusher/PusherProxy.cpp create mode 100644 src/Pusher/PusherProxy.h diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 969c3636..349272a0 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -27,6 +27,7 @@ #include "Network/TcpServer.h" #include "Network/UdpServer.h" #include "Player/PlayerProxy.h" +#include "Pusher/PusherProxy.h" #include "Util/MD5.h" #include "WebApi.h" #include "WebHook.h" @@ -234,6 +235,10 @@ static inline void addHttpListener(){ static unordered_map s_proxyMap; static recursive_mutex s_proxyMapMtx; +//推流代理器列表 +static unordered_map s_proxyPusherMap; +static recursive_mutex s_proxyPusherMapMtx; + //FFmpeg拉流代理器列表 static unordered_map s_ffmpegMap; static recursive_mutex s_ffmpegMapMtx; @@ -596,6 +601,102 @@ void installWebApi() { val["count_hit"] = (Json::UInt64)count_hit; }); + static auto addStreamPusherProxy = [](const string &schema, + const string &vhost, + const string &app, + const string &stream, + const string &url, + int retryCount, + const function &cb){ + auto key = getProxyKey(vhost, app, stream); + lock_guard lck(s_proxyPusherMapMtx); + if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()){ + //已经在推流了 + cb(SockException(Err_success),key); + return; + } + + auto poller = EventPollerPool::Instance().getPoller(); + int retry_count = 3; + if (retryCount != 0) retry_count = retryCount; + + //添加推流代理 + PusherProxy::Ptr pusher(new PusherProxy(schema,vhost, app, stream, retry_count, poller)); + s_proxyPusherMap[key] = pusher; + + //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 + pusher->setPushCallbackOnce([cb, key, url](const SockException &ex){ + if (ex){ + InfoL << "key: " << key << ", " << "addStreamPusherProxy pusher callback error: " << ex.what(); + lock_guard lck(s_proxyMapMtx); + s_proxyMap.erase(key); + } + cb(ex,key); + }); + + //被主动关闭推流 + pusher->setOnClose([key, url](const SockException &ex){ + InfoL << "key: " << key << ", " << "addStreamPusherProxy close callback error: " << ex.what(); + lock_guard lck(s_proxyMapMtx); + s_proxyMap.erase(key); + }); + pusher->publish(url); + }; + + //动态添加rtsp/rtmp推流代理 + //测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs + api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { + CHECK_SECRET(); + CHECK_ARGS("schema","vhost","app","stream"); + + InfoL << allArgs["schema"] << ", " << allArgs["vhost"] << ", " << allArgs["app"] << ", " << allArgs["stream"]; + + //查找源 + auto src = MediaSource::find(allArgs["schema"], + allArgs["vhost"], + allArgs["app"], + allArgs["stream"]); + if (!src){ + InfoL << "addStreamPusherProxy, canont find source stream!"; + const_cast(val)["code"] = API::OtherFailed; + const_cast(val)["msg"] = "can not find the source stream"; + invoker(200, headerOut, val.toStyledString()); + return; + } + + std::string srcUrl = allArgs["schema"] + "://" + "127.0.0.1" + "/" + allArgs["app"] + "/" + allArgs["stream"]; + std::string pushUrl = decodeBase64(allArgs["dst_url"]); + InfoL << "addStreamPusherProxy, find stream: " << srcUrl << ", push dst url: " << pushUrl; + + addStreamPusherProxy(allArgs["schema"], + allArgs["vhost"], + allArgs["app"], + allArgs["stream"], + pushUrl, + allArgs["retry_count"], + [invoker,val,headerOut, pushUrl](const SockException &ex, const string &key){ + if(ex){ + const_cast(val)["code"] = API::OtherFailed; + const_cast(val)["msg"] = ex.what(); + InfoL << "Publish error url: " << pushUrl; + }else{ + const_cast(val)["data"]["key"] = key; + InfoL << "Publish success, Please play with player:" << pushUrl; + } + invoker(200, headerOut, val.toStyledString()); + }); + + }); + + //关闭推流代理 + //测试url http://127.0.0.1/index/api/delStreamPusherProxy?key=__defaultVhost__/proxy/0 + api_regist("/index/api/delStreamPusherProxy",[](API_ARGS_MAP){ + CHECK_SECRET(); + CHECK_ARGS("key"); + lock_guard lck(s_proxyPusherMapMtx); + val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1; + }); + static auto addStreamProxy = [](const string &vhost, const string &app, const string &stream, diff --git a/src/Pusher/PusherProxy.cpp b/src/Pusher/PusherProxy.cpp new file mode 100644 index 00000000..4e5f4940 --- /dev/null +++ b/src/Pusher/PusherProxy.cpp @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "PusherProxy.h" + +using namespace toolkit; + +namespace mediakit { + +PusherProxy::PusherProxy(const string& schema, const string &vhost, const string &app, const string &stream, + int retry_count, const EventPoller::Ptr &poller) + : MediaPusher(schema,vhost, app, stream, poller){ + _schema = schema; + _vhost = vhost; + _app = app; + _stream_id = stream; + _retry_count = retry_count; + _on_close = [](const SockException &) {}; +} + +PusherProxy::~PusherProxy() { + _timer.reset(); +} + +void PusherProxy::setPushCallbackOnce(const function &cb) { + _on_publish = cb; +} + +void PusherProxy::setOnClose(const function &cb) { + _on_close = cb; +} + +void PusherProxy::publish(const string& dstUrl) { + std::weak_ptr weakSelf = shared_from_this(); + std::shared_ptr piFailedCnt(new int(0)); + + setOnPublished([weakSelf, dstUrl, piFailedCnt](const SockException &err) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) return; + + if (strongSelf->_on_publish) { + strongSelf->_on_publish(err); + strongSelf->_on_publish = nullptr; + } + + if (!err) { + // 推流成功 + *piFailedCnt = 0; + InfoL << "pusher publish " << dstUrl << " success"; + } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + // 推流失败,延时重试推送 + strongSelf->rePublish(dstUrl, (*piFailedCnt)++); + } else { + //达到了最大重试次数,回调关闭 + strongSelf->_on_close(err); + } + }); + + setOnShutdown([weakSelf, dstUrl, piFailedCnt](const SockException &err) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) return; + + //推流异常中断,延时重试播放 + if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + strongSelf->rePublish(dstUrl, (*piFailedCnt)++); + } else { + //达到了最大重试次数,回调关闭 + strongSelf->_on_close(err); + } + }); + + MediaPusher::publish(dstUrl); + _publish_url = dstUrl; +} + +void PusherProxy::rePublish(const string &dstUrl, int iFailedCnt) { + auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60 * 1000)); + weak_ptr weakSelf = shared_from_this(); + _timer = std::make_shared(iDelay / 1000.0f, [weakSelf, dstUrl, iFailedCnt]() { + //推流失败次数越多,则延时越长 + auto strongPusher = weakSelf.lock(); + if (!strongPusher) { + return false; + } + WarnL << "推流重试[" << iFailedCnt << "]:" << dstUrl; + strongPusher->MediaPusher::publish(dstUrl); + return false; + }, getPoller()); +} + +} /* namespace mediakit */ diff --git a/src/Pusher/PusherProxy.h b/src/Pusher/PusherProxy.h new file mode 100644 index 00000000..04b8c624 --- /dev/null +++ b/src/Pusher/PusherProxy.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef SRC_DEVICE_PUSHERPROXY_H +#define SRC_DEVICE_PUSHERPROXY_H + +#include "Pusher/MediaPusher.h" +#include "Util/TimeTicker.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit { + +class PusherProxy : public MediaPusher, public std::enable_shared_from_this { +public: + typedef std::shared_ptr Ptr; + + // 如果retry_count<0,则一直重试播放;否则重试retry_count次数 + // 默认一直重试,创建此对象时候,需要外部保证mediaSource存在 + PusherProxy(const string& schema, const string &vhost, const string &app, const string &stream, + int retry_count = -1, const EventPoller::Ptr &poller = nullptr); + + ~PusherProxy() override; + + /** + * 设置push结果回调,只触发一次;在publish执行之前有效 + * @param cb 回调对象 + */ + void setPushCallbackOnce(const function &cb); + + /** + * 设置主动关闭回调 + * @param cb 回调对象 + */ + void setOnClose(const function &cb); + + /** + * 开始拉流播放 + * @param dstUrl 目标推流地址 + */ + void publish(const string& dstUrl) override; + +private: + // 重推逻辑函数 + void rePublish(const string &dstUrl, int iFailedCnt); + +private: + int _retry_count; + std::string _schema; + string _vhost; + string _app; + string _stream_id; + std::string _publish_url; + Timer::Ptr _timer; + + function _on_close; + function _on_publish; +}; + +} /* namespace mediakit */ + +#endif //SRC_DEVICE_PUSHERPROXY_H From 936c6f7965fe8f52ef7be005195a20831188d550 Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Thu, 17 Jun 2021 10:12:34 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E7=BB=86=E8=8A=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 62 ++++++++++++++++++++++------------------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 349272a0..0eada8cc 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -232,15 +232,15 @@ static inline void addHttpListener(){ } //拉流代理器列表 -static unordered_map s_proxyMap; +static unordered_map s_proxyMap; static recursive_mutex s_proxyMapMtx; //推流代理器列表 -static unordered_map s_proxyPusherMap; +static unordered_map s_proxyPusherMap; static recursive_mutex s_proxyPusherMapMtx; //FFmpeg拉流代理器列表 -static unordered_map s_ffmpegMap; +static unordered_map s_ffmpegMap; static recursive_mutex s_ffmpegMapMtx; #if defined(ENABLE_RTPPROXY) @@ -607,12 +607,12 @@ void installWebApi() { const string &stream, const string &url, int retryCount, - const function &cb){ + const function &cb) { auto key = getProxyKey(vhost, app, stream); lock_guard lck(s_proxyPusherMapMtx); - if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()){ + if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()) { //已经在推流了 - cb(SockException(Err_success),key); + cb(SockException(Err_success), key); return; } @@ -621,21 +621,21 @@ void installWebApi() { if (retryCount != 0) retry_count = retryCount; //添加推流代理 - PusherProxy::Ptr pusher(new PusherProxy(schema,vhost, app, stream, retry_count, poller)); + PusherProxy::Ptr pusher(new PusherProxy(schema, vhost, app, stream, retry_count, poller)); s_proxyPusherMap[key] = pusher; //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 - pusher->setPushCallbackOnce([cb, key, url](const SockException &ex){ - if (ex){ + pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) { + if (ex) { InfoL << "key: " << key << ", " << "addStreamPusherProxy pusher callback error: " << ex.what(); lock_guard lck(s_proxyMapMtx); s_proxyMap.erase(key); } - cb(ex,key); + cb(ex, key); }); //被主动关闭推流 - pusher->setOnClose([key, url](const SockException &ex){ + pusher->setOnClose([key, url](const SockException &ex) { InfoL << "key: " << key << ", " << "addStreamPusherProxy close callback error: " << ex.what(); lock_guard lck(s_proxyMapMtx); s_proxyMap.erase(key); @@ -647,50 +647,46 @@ void installWebApi() { //测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { CHECK_SECRET(); - CHECK_ARGS("schema","vhost","app","stream"); + CHECK_ARGS("schema", "vhost", "app", "stream", "dst_url"); - InfoL << allArgs["schema"] << ", " << allArgs["vhost"] << ", " << allArgs["app"] << ", " << allArgs["stream"]; - - //查找源 + auto dst_url = allArgs["dst_url"]; + auto src_url = allArgs["schema"] + "/" + allArgs["vhost"] + "/" + allArgs["app"] + "/" + allArgs["stream"]; auto src = MediaSource::find(allArgs["schema"], allArgs["vhost"], allArgs["app"], allArgs["stream"]); - if (!src){ - InfoL << "addStreamPusherProxy, canont find source stream!"; - const_cast(val)["code"] = API::OtherFailed; - const_cast(val)["msg"] = "can not find the source stream"; + if (!src) { + WarnL << "addStreamPusherProxy, can not find source stream:" << src_url; + val["code"] = API::NotFound; + val["msg"] = "can not find the source stream"; invoker(200, headerOut, val.toStyledString()); return; } - std::string srcUrl = allArgs["schema"] + "://" + "127.0.0.1" + "/" + allArgs["app"] + "/" + allArgs["stream"]; - std::string pushUrl = decodeBase64(allArgs["dst_url"]); - InfoL << "addStreamPusherProxy, find stream: " << srcUrl << ", push dst url: " << pushUrl; + InfoL << "addStreamPusherProxy, find stream: " << src_url << ", push dst url: " << dst_url; addStreamPusherProxy(allArgs["schema"], allArgs["vhost"], allArgs["app"], allArgs["stream"], - pushUrl, + allArgs["dst_url"], allArgs["retry_count"], - [invoker,val,headerOut, pushUrl](const SockException &ex, const string &key){ - if(ex){ - const_cast(val)["code"] = API::OtherFailed; - const_cast(val)["msg"] = ex.what(); - InfoL << "Publish error url: " << pushUrl; - }else{ - const_cast(val)["data"]["key"] = key; - InfoL << "Publish success, Please play with player:" << pushUrl; + [invoker, val, headerOut, dst_url](const SockException &ex, const string &key) mutable { + if (ex) { + val["code"] = API::OtherFailed; + val["msg"] = ex.what(); + WarnL << "Publish stream failed, dst url is: " << dst_url; + } else { + val["data"]["key"] = key; + InfoL << "Publish success, please play with player:" << dst_url; } invoker(200, headerOut, val.toStyledString()); }); - }); //关闭推流代理 //测试url http://127.0.0.1/index/api/delStreamPusherProxy?key=__defaultVhost__/proxy/0 - api_regist("/index/api/delStreamPusherProxy",[](API_ARGS_MAP){ + api_regist("/index/api/delStreamPusherProxy", [](API_ARGS_MAP) { CHECK_SECRET(); CHECK_ARGS("key"); lock_guard lck(s_proxyPusherMapMtx); From eef0c31d7b934e88799e318d1e4198f4eb962610 Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Thu, 17 Jun 2021 10:39:22 +0800 Subject: [PATCH 4/9] =?UTF-8?q?addStreamProxy=E6=96=B0=E5=A2=9E=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E6=AC=A1=E6=95=B0=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 0eada8cc..328f1e05 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -697,10 +697,11 @@ void installWebApi() { const string &app, const string &stream, const string &url, + int retry_count, bool enable_hls, bool enable_mp4, int rtp_type, - float timeoutSec, + float timeout_sec, const function &cb){ auto key = getProxyKey(vhost,app,stream); lock_guard lck(s_proxyMapMtx); @@ -710,15 +711,15 @@ void installWebApi() { return; } //添加拉流代理 - PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4)); + PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4, retry_count ? retry_count : -1)); s_proxyMap[key] = player; - + //指定RTP over TCP(播放rtsp时有效) (*player)[kRtpType] = rtp_type; - if (timeoutSec > 0.1) { + if (timeout_sec > 0.1) { //播放握手超时时间 - (*player)[kTimeoutMS] = timeoutSec * 1000; + (*player)[kTimeoutMS] = timeout_sec * 1000; } //开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试 @@ -747,6 +748,7 @@ void installWebApi() { allArgs["app"], allArgs["stream"], allArgs["url"], + allArgs["retry_count"], allArgs["enable_hls"],/* 是否hls转发 */ allArgs["enable_mp4"],/* 是否MP4录制 */ allArgs["rtp_type"], @@ -1240,6 +1242,7 @@ void installWebApi() { allArgs["stream"], /** 支持rtsp和rtmp方式拉流 ,rtsp支持h265/h264/aac,rtmp仅支持h264/aac **/ "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov", + -1,/*无限重试*/ true,/* 开启hls转发 */ false,/* 禁用MP4录制 */ 0,//rtp over tcp方式拉流 From aa39680c695827b52e4bfbd524531d43c71912d2 Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Thu, 17 Jun 2021 10:41:26 +0800 Subject: [PATCH 5/9] =?UTF-8?q?addStreamPusherProxy=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E4=BF=AE=E6=94=B9key=E5=94=AF=E4=B8=80=E6=80=A7=E8=A7=84?= =?UTF-8?q?=E5=88=99,=E6=96=B0=E5=A2=9Ertsp=E6=8E=A8=E6=B5=81=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E3=80=81=E8=B6=85=E6=97=B6=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 328f1e05..3f30332e 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -249,10 +249,15 @@ static unordered_map s_rtpServerMap; static recursive_mutex s_rtpServerMapMtx; #endif -static inline string getProxyKey(const string &vhost,const string &app,const string &stream){ +static inline string getProxyKey(const string &vhost, const string &app, const string &stream) { return vhost + "/" + app + "/" + stream; } +static inline string getPusherKey(const string &schema, const string &vhost, const string &app, const string &stream, + const string &dst_url) { + return schema + "/" + vhost + "/" + app + "/" + stream + "/" + MD5(dst_url).hexdigest(); +} + Value makeMediaSourceJson(MediaSource &media){ Value item; item["schema"] = media.getSchema(); @@ -606,9 +611,11 @@ void installWebApi() { const string &app, const string &stream, const string &url, - int retryCount, + int retry_count, + int rtp_type, + float timeout_sec, const function &cb) { - auto key = getProxyKey(vhost, app, stream); + auto key = getPusherKey(schema, vhost, app, stream, url); lock_guard lck(s_proxyPusherMapMtx); if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()) { //已经在推流了 @@ -616,14 +623,18 @@ void installWebApi() { return; } - auto poller = EventPollerPool::Instance().getPoller(); - int retry_count = 3; - if (retryCount != 0) retry_count = retryCount; - //添加推流代理 - PusherProxy::Ptr pusher(new PusherProxy(schema, vhost, app, stream, retry_count, poller)); + PusherProxy::Ptr pusher(new PusherProxy(schema, vhost, app, stream, retry_count ? retry_count : -1)); s_proxyPusherMap[key] = pusher; + //指定RTP over TCP(播放rtsp时有效) + (*pusher)[kRtpType] = rtp_type; + + if (timeout_sec > 0.1) { + //推流握手超时时间 + (*pusher)[kTimeoutMS] = timeout_sec * 1000; + } + //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) { if (ex) { @@ -671,6 +682,8 @@ void installWebApi() { allArgs["stream"], allArgs["dst_url"], allArgs["retry_count"], + allArgs["rtp_type"], + allArgs["timeout_sec"], [invoker, val, headerOut, dst_url](const SockException &ex, const string &key) mutable { if (ex) { val["code"] = API::OtherFailed; From 752590f8046fe583f77e88d85c93396896691c2c Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Thu, 17 Jun 2021 10:59:58 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E5=AE=8C=E5=96=84addStreamPusherProxy?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E5=8A=9F=E8=83=BD=E5=B9=B6=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=88=A0=E9=99=A4=E7=9B=B8=E5=85=B3=E7=9A=84?= =?UTF-8?q?bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 36 ++++++++++++------------------------ src/Pusher/PusherProxy.cpp | 12 +++--------- src/Pusher/PusherProxy.h | 11 ++--------- 3 files changed, 17 insertions(+), 42 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 3f30332e..be694d2a 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -616,6 +616,11 @@ void installWebApi() { float timeout_sec, const function &cb) { auto key = getPusherKey(schema, vhost, app, stream, url); + auto src = MediaSource::find(schema, vhost, app, stream); + if (!src) { + cb(SockException(Err_other, "can not find the source stream"), key); + return; + } lock_guard lck(s_proxyPusherMapMtx); if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()) { //已经在推流了 @@ -624,7 +629,7 @@ void installWebApi() { } //添加推流代理 - PusherProxy::Ptr pusher(new PusherProxy(schema, vhost, app, stream, retry_count ? retry_count : -1)); + PusherProxy::Ptr pusher(new PusherProxy(src, retry_count ? retry_count : -1)); s_proxyPusherMap[key] = pusher; //指定RTP over TCP(播放rtsp时有效) @@ -638,18 +643,18 @@ void installWebApi() { //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) { if (ex) { - InfoL << "key: " << key << ", " << "addStreamPusherProxy pusher callback error: " << ex.what(); - lock_guard lck(s_proxyMapMtx); - s_proxyMap.erase(key); + WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what(); + lock_guard lck(s_proxyPusherMapMtx); + s_proxyPusherMap.erase(key); } cb(ex, key); }); //被主动关闭推流 pusher->setOnClose([key, url](const SockException &ex) { - InfoL << "key: " << key << ", " << "addStreamPusherProxy close callback error: " << ex.what(); - lock_guard lck(s_proxyMapMtx); - s_proxyMap.erase(key); + WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what(); + lock_guard lck(s_proxyPusherMapMtx); + s_proxyPusherMap.erase(key); }); pusher->publish(url); }; @@ -659,23 +664,7 @@ void installWebApi() { api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { CHECK_SECRET(); CHECK_ARGS("schema", "vhost", "app", "stream", "dst_url"); - auto dst_url = allArgs["dst_url"]; - auto src_url = allArgs["schema"] + "/" + allArgs["vhost"] + "/" + allArgs["app"] + "/" + allArgs["stream"]; - auto src = MediaSource::find(allArgs["schema"], - allArgs["vhost"], - allArgs["app"], - allArgs["stream"]); - if (!src) { - WarnL << "addStreamPusherProxy, can not find source stream:" << src_url; - val["code"] = API::NotFound; - val["msg"] = "can not find the source stream"; - invoker(200, headerOut, val.toStyledString()); - return; - } - - InfoL << "addStreamPusherProxy, find stream: " << src_url << ", push dst url: " << dst_url; - addStreamPusherProxy(allArgs["schema"], allArgs["vhost"], allArgs["app"], @@ -688,7 +677,6 @@ void installWebApi() { if (ex) { val["code"] = API::OtherFailed; val["msg"] = ex.what(); - WarnL << "Publish stream failed, dst url is: " << dst_url; } else { val["data"]["key"] = key; InfoL << "Publish success, please play with player:" << dst_url; diff --git a/src/Pusher/PusherProxy.cpp b/src/Pusher/PusherProxy.cpp index 4e5f4940..98c2724c 100644 --- a/src/Pusher/PusherProxy.cpp +++ b/src/Pusher/PusherProxy.cpp @@ -14,13 +14,8 @@ using namespace toolkit; namespace mediakit { -PusherProxy::PusherProxy(const string& schema, const string &vhost, const string &app, const string &stream, - int retry_count, const EventPoller::Ptr &poller) - : MediaPusher(schema,vhost, app, stream, poller){ - _schema = schema; - _vhost = vhost; - _app = app; - _stream_id = stream; +PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const EventPoller::Ptr &poller) + : MediaPusher(src, poller){ _retry_count = retry_count; _on_close = [](const SockException &) {}; } @@ -53,7 +48,7 @@ void PusherProxy::publish(const string& dstUrl) { if (!err) { // 推流成功 *piFailedCnt = 0; - InfoL << "pusher publish " << dstUrl << " success"; + InfoL << "Publish " << dstUrl << " success"; } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { // 推流失败,延时重试推送 strongSelf->rePublish(dstUrl, (*piFailedCnt)++); @@ -77,7 +72,6 @@ void PusherProxy::publish(const string& dstUrl) { }); MediaPusher::publish(dstUrl); - _publish_url = dstUrl; } void PusherProxy::rePublish(const string &dstUrl, int iFailedCnt) { diff --git a/src/Pusher/PusherProxy.h b/src/Pusher/PusherProxy.h index 04b8c624..a21ea916 100644 --- a/src/Pusher/PusherProxy.h +++ b/src/Pusher/PusherProxy.h @@ -24,10 +24,8 @@ public: typedef std::shared_ptr Ptr; // 如果retry_count<0,则一直重试播放;否则重试retry_count次数 - // 默认一直重试,创建此对象时候,需要外部保证mediaSource存在 - PusherProxy(const string& schema, const string &vhost, const string &app, const string &stream, - int retry_count = -1, const EventPoller::Ptr &poller = nullptr); - + // 默认一直重试,创建此对象时候,需要外部保证MediaSource存在 + PusherProxy(const MediaSource::Ptr &src, int retry_count = -1, const EventPoller::Ptr &poller = nullptr); ~PusherProxy() override; /** @@ -54,11 +52,6 @@ private: private: int _retry_count; - std::string _schema; - string _vhost; - string _app; - string _stream_id; - std::string _publish_url; Timer::Ptr _timer; function _on_close; From e4172b4ab1644c6b2a4db787da23e4ef5ce3763d Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Thu, 17 Jun 2021 11:03:11 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E6=9B=B4=E6=96=B0postman=20api=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- postman/ZLMediaKit.postman_collection.json | 126 ++++++++++++++++++--- 1 file changed, 113 insertions(+), 13 deletions(-) diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index 93fd341e..5f2598a8 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1,6 +1,6 @@ { "info": { - "_postman_id": "ff20487b-d269-40c3-b811-44bc643a3b74", + "_postman_id": "fe6cdfbd-531d-45e6-87e5-d460ce9e6328", "name": "ZLMediaKit", "description": "媒体服务器", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" @@ -518,6 +518,12 @@ "value": "10", "description": "拉流超时时间,单位秒,float类型", "disabled": true + }, + { + "key": "retry_count", + "value": null, + "description": "拉流重试次数,不传此参数或传值<=0时,则无限重试", + "disabled": true } ] } @@ -555,6 +561,106 @@ }, "response": [] }, + { + "name": "添加rtsp/rtmp推流(addStreamPusherProxy)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/addStreamPusherProxy?secret={{ZLMediaKit_secret}}&schema=rtmp&vhost={{defaultVhost}}&app=live&stream=test&dst_url=rtmp://127.0.0.1/live/push", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "addStreamPusherProxy" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "schema", + "value": "rtmp", + "description": "推流协议,支持rtsp、rtmp,大小写敏感" + }, + { + "key": "vhost", + "value": "{{defaultVhost}}", + "description": "已注册流的虚拟主机,一般为__defaultVhost__" + }, + { + "key": "app", + "value": "live", + "description": "已注册流的应用名,例如live" + }, + { + "key": "stream", + "value": "test", + "description": "已注册流的id名,例如test" + }, + { + "key": "dst_url", + "value": "rtmp://127.0.0.1/live/push", + "description": "推流地址,需要与schema字段协议一致" + }, + { + "key": "rtp_type", + "value": "0", + "description": "rtsp推流时,推流方式,0:tcp,1:udp", + "disabled": true + }, + { + "key": "timeout_sec", + "value": "10", + "description": "推流超时时间,单位秒,float类型", + "disabled": true + }, + { + "key": "retry_count", + "value": null, + "description": "推流重试次数,不传此参数或传值<=0时,则无限重试", + "disabled": true + } + ] + } + }, + "response": [] + }, + { + "name": "关闭推流(delStreamPusherProxy)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/delStreamPusherProxy?secret={{ZLMediaKit_secret}}&key=__defaultVhost__/live/test", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "delStreamPusherProxy" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "key", + "value": "__defaultVhost__/live/test", + "description": "addStreamPusherProxy接口返回的key" + } + ] + } + }, + "response": [] + }, { "name": "添加FFmpeg拉流代理(addFFmpegSource)", "request": { @@ -786,7 +892,7 @@ "method": "GET", "header": [], "url": { - "raw": "{{ZLMediaKit_URL}}/index/api/startRecord?secret={{ZLMediaKit_secret}}&type=1&vhost={{defaultVhost}}&app=live&stream=obs&customized_path", + "raw": "{{ZLMediaKit_URL}}/index/api/startRecord?secret={{ZLMediaKit_secret}}&type=1&vhost={{defaultVhost}}&app=live&stream=obs", "host": [ "{{ZLMediaKit_URL}}" ], @@ -824,14 +930,14 @@ { "key": "customized_path", "value": null, - "disabled": true, - "description": "录像文件保存自定义根目录,为空则采用配置文件设置" + "description": "录像文件保存自定义根目录,为空则采用配置文件设置", + "disabled": true }, { "key": "max_second", "value": "1000", - "disabled": true, - "description": "MP4录制的切片时间大小,单位秒" + "description": "MP4录制的切片时间大小,单位秒", + "disabled": true } ] } @@ -1281,7 +1387,6 @@ { "listen": "prerequest", "script": { - "id": "90757ea3-58c0-4f84-8000-513ed7088bbc", "type": "text/javascript", "exec": [ "" @@ -1291,7 +1396,6 @@ { "listen": "test", "script": { - "id": "0ddf2b8e-9932-409d-a055-1ab3b7765600", "type": "text/javascript", "exec": [ "" @@ -1301,20 +1405,16 @@ ], "variable": [ { - "id": "ce426571-eb1e-4067-8901-01978c982fed", "key": "ZLMediaKit_URL", "value": "zlmediakit.com:8880" }, { - "id": "2d3dfd4a-a39c-47d8-a3e9-37d80352ea5f", "key": "ZLMediaKit_secret", "value": "035c73f7-bb6b-4889-a715-d9eb2d1925cc" }, { - "id": "0aacc473-3a2e-4ef9-b415-e86ce71e0c42", "key": "defaultVhost", "value": "__defaultVhost__" } - ], - "protocolProfileBehavior": {} + ] } \ No newline at end of file From 60a23468197c0870da19645c3584ef4d7ef18a3b Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Thu, 17 Jun 2021 11:32:57 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E5=AA=92=E4=BD=93=E6=BA=90=E6=B3=A8?= =?UTF-8?q?=E9=94=80=E5=90=8E=E4=B8=8D=E5=81=9A=E6=97=A0=E8=B0=93=E6=8E=A8?= =?UTF-8?q?=E6=B5=81=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Pusher/PusherProxy.cpp | 71 +++++++++++++++++++++----------------- src/Pusher/PusherProxy.h | 2 +- 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/src/Pusher/PusherProxy.cpp b/src/Pusher/PusherProxy.cpp index 98c2724c..447a5539 100644 --- a/src/Pusher/PusherProxy.cpp +++ b/src/Pusher/PusherProxy.cpp @@ -18,6 +18,7 @@ PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const Eve : MediaPusher(src, poller){ _retry_count = retry_count; _on_close = [](const SockException &) {}; + _weak_src = src; } PusherProxy::~PusherProxy() { @@ -32,59 +33,65 @@ void PusherProxy::setOnClose(const function &cb) _on_close = cb; } -void PusherProxy::publish(const string& dstUrl) { - std::weak_ptr weakSelf = shared_from_this(); - std::shared_ptr piFailedCnt(new int(0)); +void PusherProxy::publish(const string &dst_url) { + std::weak_ptr weak_self = shared_from_this(); + std::shared_ptr failed_cnt(new int(0)); - setOnPublished([weakSelf, dstUrl, piFailedCnt](const SockException &err) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) return; + setOnPublished([weak_self, dst_url, failed_cnt](const SockException &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } - if (strongSelf->_on_publish) { - strongSelf->_on_publish(err); - strongSelf->_on_publish = nullptr; - } + if (strong_self->_on_publish) { + strong_self->_on_publish(err); + strong_self->_on_publish = nullptr; + } + auto src = strong_self->_weak_src.lock(); if (!err) { // 推流成功 - *piFailedCnt = 0; - InfoL << "Publish " << dstUrl << " success"; - } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + *failed_cnt = 0; + InfoL << "Publish " << dst_url << " success"; + } else if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) { // 推流失败,延时重试推送 - strongSelf->rePublish(dstUrl, (*piFailedCnt)++); + strong_self->rePublish(dst_url, (*failed_cnt)++); } else { - //达到了最大重试次数,回调关闭 - strongSelf->_on_close(err); + //如果媒体源已经注销, 或达到了最大重试次数,回调关闭 + strong_self->_on_close(err); } }); - setOnShutdown([weakSelf, dstUrl, piFailedCnt](const SockException &err) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) return; + setOnShutdown([weak_self, dst_url, failed_cnt](const SockException &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + auto src = strong_self->_weak_src.lock(); //推流异常中断,延时重试播放 - if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { - strongSelf->rePublish(dstUrl, (*piFailedCnt)++); + if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) { + strong_self->rePublish(dst_url, (*failed_cnt)++); } else { - //达到了最大重试次数,回调关闭 - strongSelf->_on_close(err); + //如果媒体源已经注销, 或达到了最大重试次数,回调关闭 + strong_self->_on_close(err); } }); - MediaPusher::publish(dstUrl); + MediaPusher::publish(dst_url); } -void PusherProxy::rePublish(const string &dstUrl, int iFailedCnt) { - auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60 * 1000)); - weak_ptr weakSelf = shared_from_this(); - _timer = std::make_shared(iDelay / 1000.0f, [weakSelf, dstUrl, iFailedCnt]() { +void PusherProxy::rePublish(const string &dst_url, int failed_cnt) { + auto delay = MAX(2 * 1000, MIN(failed_cnt * 3000, 60 * 1000)); + weak_ptr weak_self = shared_from_this(); + _timer = std::make_shared(delay / 1000.0f, [weak_self, dst_url, failed_cnt]() { //推流失败次数越多,则延时越长 - auto strongPusher = weakSelf.lock(); - if (!strongPusher) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return false; } - WarnL << "推流重试[" << iFailedCnt << "]:" << dstUrl; - strongPusher->MediaPusher::publish(dstUrl); + WarnL << "推流重试[" << failed_cnt << "]:" << dst_url; + strong_self->MediaPusher::publish(dst_url); return false; }, getPoller()); } diff --git a/src/Pusher/PusherProxy.h b/src/Pusher/PusherProxy.h index a21ea916..3e04a8d3 100644 --- a/src/Pusher/PusherProxy.h +++ b/src/Pusher/PusherProxy.h @@ -53,7 +53,7 @@ private: private: int _retry_count; Timer::Ptr _timer; - + std::weak_ptr _weak_src; function _on_close; function _on_publish; }; From f8467f21fbf461861d981b6e37e26cec27ce5331 Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Thu, 17 Jun 2021 11:34:24 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E6=8E=A8=E6=B5=81=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E6=97=B6=E6=89=93=E5=8D=B0=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtmp/RtmpPusher.cpp | 1 + src/Rtsp/RtspPusher.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index c8813367..3b0b6c74 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -47,6 +47,7 @@ void RtmpPusher::teardown() { } void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) { + DebugL << ex.what(); if (ex.getErrCode() == Err_shutdown) { //主动shutdown的,不触发回调 return; diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 9ce2d56b..c19b0ce7 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -95,6 +95,7 @@ void RtspPusher::publish(const string &url_str) { } void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) { + DebugL << ex.what(); if (ex.getErrCode() == Err_shutdown) { //主动shutdown的,不触发回调 return;