diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index 93fd341e..5f2598a8 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1,6 +1,6 @@ { "info": { - "_postman_id": "ff20487b-d269-40c3-b811-44bc643a3b74", + "_postman_id": "fe6cdfbd-531d-45e6-87e5-d460ce9e6328", "name": "ZLMediaKit", "description": "媒体服务器", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" @@ -518,6 +518,12 @@ "value": "10", "description": "拉流超时时间,单位秒,float类型", "disabled": true + }, + { + "key": "retry_count", + "value": null, + "description": "拉流重试次数,不传此参数或传值<=0时,则无限重试", + "disabled": true } ] } @@ -555,6 +561,106 @@ }, "response": [] }, + { + "name": "添加rtsp/rtmp推流(addStreamPusherProxy)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/addStreamPusherProxy?secret={{ZLMediaKit_secret}}&schema=rtmp&vhost={{defaultVhost}}&app=live&stream=test&dst_url=rtmp://127.0.0.1/live/push", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "addStreamPusherProxy" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "schema", + "value": "rtmp", + "description": "推流协议,支持rtsp、rtmp,大小写敏感" + }, + { + "key": "vhost", + "value": "{{defaultVhost}}", + "description": "已注册流的虚拟主机,一般为__defaultVhost__" + }, + { + "key": "app", + "value": "live", + "description": "已注册流的应用名,例如live" + }, + { + "key": "stream", + "value": "test", + "description": "已注册流的id名,例如test" + }, + { + "key": "dst_url", + "value": "rtmp://127.0.0.1/live/push", + "description": "推流地址,需要与schema字段协议一致" + }, + { + "key": "rtp_type", + "value": "0", + "description": "rtsp推流时,推流方式,0:tcp,1:udp", + "disabled": true + }, + { + "key": "timeout_sec", + "value": "10", + "description": "推流超时时间,单位秒,float类型", + "disabled": true + }, + { + "key": "retry_count", + "value": null, + "description": "推流重试次数,不传此参数或传值<=0时,则无限重试", + "disabled": true + } + ] + } + }, + "response": [] + }, + { + "name": "关闭推流(delStreamPusherProxy)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/delStreamPusherProxy?secret={{ZLMediaKit_secret}}&key=__defaultVhost__/live/test", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "delStreamPusherProxy" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "key", + "value": "__defaultVhost__/live/test", + "description": "addStreamPusherProxy接口返回的key" + } + ] + } + }, + "response": [] + }, { "name": "添加FFmpeg拉流代理(addFFmpegSource)", "request": { @@ -786,7 +892,7 @@ "method": "GET", "header": [], "url": { - "raw": "{{ZLMediaKit_URL}}/index/api/startRecord?secret={{ZLMediaKit_secret}}&type=1&vhost={{defaultVhost}}&app=live&stream=obs&customized_path", + "raw": "{{ZLMediaKit_URL}}/index/api/startRecord?secret={{ZLMediaKit_secret}}&type=1&vhost={{defaultVhost}}&app=live&stream=obs", "host": [ "{{ZLMediaKit_URL}}" ], @@ -824,14 +930,14 @@ { "key": "customized_path", "value": null, - "disabled": true, - "description": "录像文件保存自定义根目录,为空则采用配置文件设置" + "description": "录像文件保存自定义根目录,为空则采用配置文件设置", + "disabled": true }, { "key": "max_second", "value": "1000", - "disabled": true, - "description": "MP4录制的切片时间大小,单位秒" + "description": "MP4录制的切片时间大小,单位秒", + "disabled": true } ] } @@ -1281,7 +1387,6 @@ { "listen": "prerequest", "script": { - "id": "90757ea3-58c0-4f84-8000-513ed7088bbc", "type": "text/javascript", "exec": [ "" @@ -1291,7 +1396,6 @@ { "listen": "test", "script": { - "id": "0ddf2b8e-9932-409d-a055-1ab3b7765600", "type": "text/javascript", "exec": [ "" @@ -1301,20 +1405,16 @@ ], "variable": [ { - "id": "ce426571-eb1e-4067-8901-01978c982fed", "key": "ZLMediaKit_URL", "value": "zlmediakit.com:8880" }, { - "id": "2d3dfd4a-a39c-47d8-a3e9-37d80352ea5f", "key": "ZLMediaKit_secret", "value": "035c73f7-bb6b-4889-a715-d9eb2d1925cc" }, { - "id": "0aacc473-3a2e-4ef9-b415-e86ce71e0c42", "key": "defaultVhost", "value": "__defaultVhost__" } - ], - "protocolProfileBehavior": {} + ] } \ No newline at end of file diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 3a5dde5d..3666fed1 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -27,6 +27,7 @@ #include "Network/TcpServer.h" #include "Network/UdpServer.h" #include "Player/PlayerProxy.h" +#include "Pusher/PusherProxy.h" #include "Util/MD5.h" #include "WebApi.h" #include "WebHook.h" @@ -262,11 +263,15 @@ static inline void addHttpListener(){ } //拉流代理器列表 -static unordered_map s_proxyMap; +static unordered_map s_proxyMap; static recursive_mutex s_proxyMapMtx; +//推流代理器列表 +static unordered_map s_proxyPusherMap; +static recursive_mutex s_proxyPusherMapMtx; + //FFmpeg拉流代理器列表 -static unordered_map s_ffmpegMap; +static unordered_map s_ffmpegMap; static recursive_mutex s_ffmpegMapMtx; #if defined(ENABLE_RTPPROXY) @@ -275,10 +280,15 @@ static unordered_map s_rtpServerMap; static recursive_mutex s_rtpServerMapMtx; #endif -static inline string getProxyKey(const string &vhost,const string &app,const string &stream){ +static inline string getProxyKey(const string &vhost, const string &app, const string &stream) { return vhost + "/" + app + "/" + stream; } +static inline string getPusherKey(const string &schema, const string &vhost, const string &app, const string &stream, + const string &dst_url) { + return schema + "/" + vhost + "/" + app + "/" + stream + "/" + MD5(dst_url).hexdigest(); +} + Value makeMediaSourceJson(MediaSource &media){ Value item; item["schema"] = media.getSchema(); @@ -634,14 +644,103 @@ void installWebApi() { val["count_hit"] = (Json::UInt64)count_hit; }); + static auto addStreamPusherProxy = [](const string &schema, + const string &vhost, + const string &app, + const string &stream, + const string &url, + int retry_count, + int rtp_type, + float timeout_sec, + const function &cb) { + auto key = getPusherKey(schema, vhost, app, stream, url); + auto src = MediaSource::find(schema, vhost, app, stream); + if (!src) { + cb(SockException(Err_other, "can not find the source stream"), key); + return; + } + lock_guard lck(s_proxyPusherMapMtx); + if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()) { + //已经在推流了 + cb(SockException(Err_success), key); + return; + } + + //添加推流代理 + PusherProxy::Ptr pusher(new PusherProxy(src, retry_count ? retry_count : -1)); + s_proxyPusherMap[key] = pusher; + + //指定RTP over TCP(播放rtsp时有效) + (*pusher)[kRtpType] = rtp_type; + + if (timeout_sec > 0.1) { + //推流握手超时时间 + (*pusher)[kTimeoutMS] = timeout_sec * 1000; + } + + //开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 + pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) { + if (ex) { + WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what(); + lock_guard lck(s_proxyPusherMapMtx); + s_proxyPusherMap.erase(key); + } + cb(ex, key); + }); + + //被主动关闭推流 + pusher->setOnClose([key, url](const SockException &ex) { + WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what(); + lock_guard lck(s_proxyPusherMapMtx); + s_proxyPusherMap.erase(key); + }); + pusher->publish(url); + }; + + //动态添加rtsp/rtmp推流代理 + //测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs + api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { + CHECK_SECRET(); + CHECK_ARGS("schema", "vhost", "app", "stream", "dst_url"); + auto dst_url = allArgs["dst_url"]; + addStreamPusherProxy(allArgs["schema"], + allArgs["vhost"], + allArgs["app"], + allArgs["stream"], + allArgs["dst_url"], + allArgs["retry_count"], + allArgs["rtp_type"], + allArgs["timeout_sec"], + [invoker, val, headerOut, dst_url](const SockException &ex, const string &key) mutable { + if (ex) { + val["code"] = API::OtherFailed; + val["msg"] = ex.what(); + } else { + val["data"]["key"] = key; + InfoL << "Publish success, please play with player:" << dst_url; + } + invoker(200, headerOut, val.toStyledString()); + }); + }); + + //关闭推流代理 + //测试url http://127.0.0.1/index/api/delStreamPusherProxy?key=__defaultVhost__/proxy/0 + api_regist("/index/api/delStreamPusherProxy", [](API_ARGS_MAP) { + CHECK_SECRET(); + CHECK_ARGS("key"); + lock_guard lck(s_proxyPusherMapMtx); + val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1; + }); + static auto addStreamProxy = [](const string &vhost, const string &app, const string &stream, const string &url, + int retry_count, bool enable_hls, bool enable_mp4, int rtp_type, - float timeoutSec, + float timeout_sec, const function &cb){ auto key = getProxyKey(vhost,app,stream); lock_guard lck(s_proxyMapMtx); @@ -651,15 +750,15 @@ void installWebApi() { return; } //添加拉流代理 - PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4)); + PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4, retry_count ? retry_count : -1)); s_proxyMap[key] = player; - + //指定RTP over TCP(播放rtsp时有效) (*player)[kRtpType] = rtp_type; - if (timeoutSec > 0.1) { + if (timeout_sec > 0.1) { //播放握手超时时间 - (*player)[kTimeoutMS] = timeoutSec * 1000; + (*player)[kTimeoutMS] = timeout_sec * 1000; } //开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试 @@ -688,6 +787,7 @@ void installWebApi() { allArgs["app"], allArgs["stream"], allArgs["url"], + allArgs["retry_count"], allArgs["enable_hls"],/* 是否hls转发 */ allArgs["enable_mp4"],/* 是否MP4录制 */ allArgs["rtp_type"], @@ -1265,6 +1365,7 @@ void installWebApi() { allArgs["stream"], /** 支持rtsp和rtmp方式拉流 ,rtsp支持h265/h264/aac,rtmp仅支持h264/aac **/ "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov", + -1,/*无限重试*/ true,/* 开启hls转发 */ false,/* 禁用MP4录制 */ 0,//rtp over tcp方式拉流 diff --git a/src/Pusher/PusherProxy.cpp b/src/Pusher/PusherProxy.cpp new file mode 100644 index 00000000..447a5539 --- /dev/null +++ b/src/Pusher/PusherProxy.cpp @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "PusherProxy.h" + +using namespace toolkit; + +namespace mediakit { + +PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const EventPoller::Ptr &poller) + : MediaPusher(src, poller){ + _retry_count = retry_count; + _on_close = [](const SockException &) {}; + _weak_src = src; +} + +PusherProxy::~PusherProxy() { + _timer.reset(); +} + +void PusherProxy::setPushCallbackOnce(const function &cb) { + _on_publish = cb; +} + +void PusherProxy::setOnClose(const function &cb) { + _on_close = cb; +} + +void PusherProxy::publish(const string &dst_url) { + std::weak_ptr weak_self = shared_from_this(); + std::shared_ptr failed_cnt(new int(0)); + + setOnPublished([weak_self, dst_url, failed_cnt](const SockException &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + + if (strong_self->_on_publish) { + strong_self->_on_publish(err); + strong_self->_on_publish = nullptr; + } + + auto src = strong_self->_weak_src.lock(); + if (!err) { + // 推流成功 + *failed_cnt = 0; + InfoL << "Publish " << dst_url << " success"; + } else if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) { + // 推流失败,延时重试推送 + strong_self->rePublish(dst_url, (*failed_cnt)++); + } else { + //如果媒体源已经注销, 或达到了最大重试次数,回调关闭 + strong_self->_on_close(err); + } + }); + + setOnShutdown([weak_self, dst_url, failed_cnt](const SockException &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + + auto src = strong_self->_weak_src.lock(); + //推流异常中断,延时重试播放 + if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) { + strong_self->rePublish(dst_url, (*failed_cnt)++); + } else { + //如果媒体源已经注销, 或达到了最大重试次数,回调关闭 + strong_self->_on_close(err); + } + }); + + MediaPusher::publish(dst_url); +} + +void PusherProxy::rePublish(const string &dst_url, int failed_cnt) { + auto delay = MAX(2 * 1000, MIN(failed_cnt * 3000, 60 * 1000)); + weak_ptr weak_self = shared_from_this(); + _timer = std::make_shared(delay / 1000.0f, [weak_self, dst_url, failed_cnt]() { + //推流失败次数越多,则延时越长 + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + WarnL << "推流重试[" << failed_cnt << "]:" << dst_url; + strong_self->MediaPusher::publish(dst_url); + return false; + }, getPoller()); +} + +} /* namespace mediakit */ diff --git a/src/Pusher/PusherProxy.h b/src/Pusher/PusherProxy.h new file mode 100644 index 00000000..3e04a8d3 --- /dev/null +++ b/src/Pusher/PusherProxy.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef SRC_DEVICE_PUSHERPROXY_H +#define SRC_DEVICE_PUSHERPROXY_H + +#include "Pusher/MediaPusher.h" +#include "Util/TimeTicker.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit { + +class PusherProxy : public MediaPusher, public std::enable_shared_from_this { +public: + typedef std::shared_ptr Ptr; + + // 如果retry_count<0,则一直重试播放;否则重试retry_count次数 + // 默认一直重试,创建此对象时候,需要外部保证MediaSource存在 + PusherProxy(const MediaSource::Ptr &src, int retry_count = -1, const EventPoller::Ptr &poller = nullptr); + ~PusherProxy() override; + + /** + * 设置push结果回调,只触发一次;在publish执行之前有效 + * @param cb 回调对象 + */ + void setPushCallbackOnce(const function &cb); + + /** + * 设置主动关闭回调 + * @param cb 回调对象 + */ + void setOnClose(const function &cb); + + /** + * 开始拉流播放 + * @param dstUrl 目标推流地址 + */ + void publish(const string& dstUrl) override; + +private: + // 重推逻辑函数 + void rePublish(const string &dstUrl, int iFailedCnt); + +private: + int _retry_count; + Timer::Ptr _timer; + std::weak_ptr _weak_src; + function _on_close; + function _on_publish; +}; + +} /* namespace mediakit */ + +#endif //SRC_DEVICE_PUSHERPROXY_H diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index c8813367..3b0b6c74 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -47,6 +47,7 @@ void RtmpPusher::teardown() { } void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) { + DebugL << ex.what(); if (ex.getErrCode() == Err_shutdown) { //主动shutdown的,不触发回调 return; diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index 60b04baa..5c011ae8 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -103,19 +103,16 @@ static const char *getCodecName(int codec_id) { void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t bytes, int finish){ switch (codecid) { case PSI_STREAM_H264: { - InfoL << "got video track: H264"; onTrack(std::make_shared()); break; } case PSI_STREAM_H265: { - InfoL << "got video track: H265"; onTrack(std::make_shared()); break; } case PSI_STREAM_AAC: { - InfoL<< "got audio track: AAC"; onTrack(std::make_shared()); break; } @@ -123,14 +120,12 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt case PSI_STREAM_AUDIO_G711A: case PSI_STREAM_AUDIO_G711U: { auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; - InfoL << "got audio track: G711"; //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 onTrack(std::make_shared(codec, 8000, 1, 16)); break; } case PSI_STREAM_AUDIO_OPUS: { - InfoL << "got audio track: opus"; onTrack(std::make_shared()); break; } @@ -223,8 +218,11 @@ void DecoderImp::onStream(int stream,int codecid,const void *extra,size_t bytes, #endif void DecoderImp::onTrack(const Track::Ptr &track) { - _tracks[track->getTrackType()] = track; - _sink->addTrack(track); + if (!_tracks[track->getTrackType()]) { + _tracks[track->getTrackType()] = track; + _sink->addTrack(track); + InfoL << "got track: " << track->getCodecName(); + } } void DecoderImp::onFrame(const Frame::Ptr &frame) { diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 9ce2d56b..c19b0ce7 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -95,6 +95,7 @@ void RtspPusher::publish(const string &url_str) { } void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) { + DebugL << ex.what(); if (ex.getErrCode() == Err_shutdown) { //主动shutdown的,不触发回调 return;