Merge branch 'master' of https://github.com/xia-chu/ZLMediaKit into dev

This commit is contained in:
ziyue 2021-06-21 17:51:15 +08:00
commit caecfc3fda
7 changed files with 391 additions and 28 deletions

View File

@ -1,6 +1,6 @@
{
"info": {
"_postman_id": "ff20487b-d269-40c3-b811-44bc643a3b74",
"_postman_id": "fe6cdfbd-531d-45e6-87e5-d460ce9e6328",
"name": "ZLMediaKit",
"description": "媒体服务器",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
@ -518,6 +518,12 @@
"value": "10",
"description": "拉流超时时间单位秒float类型",
"disabled": true
},
{
"key": "retry_count",
"value": null,
"description": "拉流重试次数,不传此参数或传值<=0时则无限重试",
"disabled": true
}
]
}
@ -555,6 +561,106 @@
},
"response": []
},
{
"name": "添加rtsp/rtmp推流(addStreamPusherProxy)",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{ZLMediaKit_URL}}/index/api/addStreamPusherProxy?secret={{ZLMediaKit_secret}}&schema=rtmp&vhost={{defaultVhost}}&app=live&stream=test&dst_url=rtmp://127.0.0.1/live/push",
"host": [
"{{ZLMediaKit_URL}}"
],
"path": [
"index",
"api",
"addStreamPusherProxy"
],
"query": [
{
"key": "secret",
"value": "{{ZLMediaKit_secret}}",
"description": "api操作密钥(配置文件配置)如果操作ip是127.0.0.1,则不需要此参数"
},
{
"key": "schema",
"value": "rtmp",
"description": "推流协议支持rtsp、rtmp大小写敏感"
},
{
"key": "vhost",
"value": "{{defaultVhost}}",
"description": "已注册流的虚拟主机一般为__defaultVhost__"
},
{
"key": "app",
"value": "live",
"description": "已注册流的应用名例如live"
},
{
"key": "stream",
"value": "test",
"description": "已注册流的id名例如test"
},
{
"key": "dst_url",
"value": "rtmp://127.0.0.1/live/push",
"description": "推流地址需要与schema字段协议一致"
},
{
"key": "rtp_type",
"value": "0",
"description": "rtsp推流时推流方式0tcp1udp",
"disabled": true
},
{
"key": "timeout_sec",
"value": "10",
"description": "推流超时时间单位秒float类型",
"disabled": true
},
{
"key": "retry_count",
"value": null,
"description": "推流重试次数,不传此参数或传值<=0时则无限重试",
"disabled": true
}
]
}
},
"response": []
},
{
"name": "关闭推流(delStreamPusherProxy)",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{ZLMediaKit_URL}}/index/api/delStreamPusherProxy?secret={{ZLMediaKit_secret}}&key=__defaultVhost__/live/test",
"host": [
"{{ZLMediaKit_URL}}"
],
"path": [
"index",
"api",
"delStreamPusherProxy"
],
"query": [
{
"key": "secret",
"value": "{{ZLMediaKit_secret}}",
"description": "api操作密钥(配置文件配置)如果操作ip是127.0.0.1,则不需要此参数"
},
{
"key": "key",
"value": "__defaultVhost__/live/test",
"description": "addStreamPusherProxy接口返回的key"
}
]
}
},
"response": []
},
{
"name": "添加FFmpeg拉流代理(addFFmpegSource)",
"request": {
@ -786,7 +892,7 @@
"method": "GET",
"header": [],
"url": {
"raw": "{{ZLMediaKit_URL}}/index/api/startRecord?secret={{ZLMediaKit_secret}}&type=1&vhost={{defaultVhost}}&app=live&stream=obs&customized_path",
"raw": "{{ZLMediaKit_URL}}/index/api/startRecord?secret={{ZLMediaKit_secret}}&type=1&vhost={{defaultVhost}}&app=live&stream=obs",
"host": [
"{{ZLMediaKit_URL}}"
],
@ -824,14 +930,14 @@
{
"key": "customized_path",
"value": null,
"disabled": true,
"description": "录像文件保存自定义根目录,为空则采用配置文件设置"
"description": "录像文件保存自定义根目录,为空则采用配置文件设置",
"disabled": true
},
{
"key": "max_second",
"value": "1000",
"disabled": true,
"description": "MP4录制的切片时间大小单位秒"
"description": "MP4录制的切片时间大小单位秒",
"disabled": true
}
]
}
@ -1281,7 +1387,6 @@
{
"listen": "prerequest",
"script": {
"id": "90757ea3-58c0-4f84-8000-513ed7088bbc",
"type": "text/javascript",
"exec": [
""
@ -1291,7 +1396,6 @@
{
"listen": "test",
"script": {
"id": "0ddf2b8e-9932-409d-a055-1ab3b7765600",
"type": "text/javascript",
"exec": [
""
@ -1301,20 +1405,16 @@
],
"variable": [
{
"id": "ce426571-eb1e-4067-8901-01978c982fed",
"key": "ZLMediaKit_URL",
"value": "zlmediakit.com:8880"
},
{
"id": "2d3dfd4a-a39c-47d8-a3e9-37d80352ea5f",
"key": "ZLMediaKit_secret",
"value": "035c73f7-bb6b-4889-a715-d9eb2d1925cc"
},
{
"id": "0aacc473-3a2e-4ef9-b415-e86ce71e0c42",
"key": "defaultVhost",
"value": "__defaultVhost__"
}
],
"protocolProfileBehavior": {}
]
}

View File

@ -27,6 +27,7 @@
#include "Network/TcpServer.h"
#include "Network/UdpServer.h"
#include "Player/PlayerProxy.h"
#include "Pusher/PusherProxy.h"
#include "Util/MD5.h"
#include "WebApi.h"
#include "WebHook.h"
@ -262,11 +263,15 @@ static inline void addHttpListener(){
}
//拉流代理器列表
static unordered_map<string ,PlayerProxy::Ptr> s_proxyMap;
static unordered_map<string, PlayerProxy::Ptr> s_proxyMap;
static recursive_mutex s_proxyMapMtx;
//推流代理器列表
static unordered_map<string, PusherProxy::Ptr> s_proxyPusherMap;
static recursive_mutex s_proxyPusherMapMtx;
//FFmpeg拉流代理器列表
static unordered_map<string ,FFmpegSource::Ptr> s_ffmpegMap;
static unordered_map<string, FFmpegSource::Ptr> s_ffmpegMap;
static recursive_mutex s_ffmpegMapMtx;
#if defined(ENABLE_RTPPROXY)
@ -275,10 +280,15 @@ static unordered_map<string, RtpServer::Ptr> s_rtpServerMap;
static recursive_mutex s_rtpServerMapMtx;
#endif
static inline string getProxyKey(const string &vhost,const string &app,const string &stream){
static inline string getProxyKey(const string &vhost, const string &app, const string &stream) {
return vhost + "/" + app + "/" + stream;
}
static inline string getPusherKey(const string &schema, const string &vhost, const string &app, const string &stream,
const string &dst_url) {
return schema + "/" + vhost + "/" + app + "/" + stream + "/" + MD5(dst_url).hexdigest();
}
Value makeMediaSourceJson(MediaSource &media){
Value item;
item["schema"] = media.getSchema();
@ -634,14 +644,103 @@ void installWebApi() {
val["count_hit"] = (Json::UInt64)count_hit;
});
static auto addStreamPusherProxy = [](const string &schema,
const string &vhost,
const string &app,
const string &stream,
const string &url,
int retry_count,
int rtp_type,
float timeout_sec,
const function<void(const SockException &ex, const string &key)> &cb) {
auto key = getPusherKey(schema, vhost, app, stream, url);
auto src = MediaSource::find(schema, vhost, app, stream);
if (!src) {
cb(SockException(Err_other, "can not find the source stream"), key);
return;
}
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()) {
//已经在推流了
cb(SockException(Err_success), key);
return;
}
//添加推流代理
PusherProxy::Ptr pusher(new PusherProxy(src, retry_count ? retry_count : -1));
s_proxyPusherMap[key] = pusher;
//指定RTP over TCP(播放rtsp时有效)
(*pusher)[kRtpType] = rtp_type;
if (timeout_sec > 0.1) {
//推流握手超时时间
(*pusher)[kTimeoutMS] = timeout_sec * 1000;
}
//开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试
pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) {
if (ex) {
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what();
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
s_proxyPusherMap.erase(key);
}
cb(ex, key);
});
//被主动关闭推流
pusher->setOnClose([key, url](const SockException &ex) {
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what();
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
s_proxyPusherMap.erase(key);
});
pusher->publish(url);
};
//动态添加rtsp/rtmp推流代理
//测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs
api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) {
CHECK_SECRET();
CHECK_ARGS("schema", "vhost", "app", "stream", "dst_url");
auto dst_url = allArgs["dst_url"];
addStreamPusherProxy(allArgs["schema"],
allArgs["vhost"],
allArgs["app"],
allArgs["stream"],
allArgs["dst_url"],
allArgs["retry_count"],
allArgs["rtp_type"],
allArgs["timeout_sec"],
[invoker, val, headerOut, dst_url](const SockException &ex, const string &key) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
} else {
val["data"]["key"] = key;
InfoL << "Publish success, please play with player:" << dst_url;
}
invoker(200, headerOut, val.toStyledString());
});
});
//关闭推流代理
//测试url http://127.0.0.1/index/api/delStreamPusherProxy?key=__defaultVhost__/proxy/0
api_regist("/index/api/delStreamPusherProxy", [](API_ARGS_MAP) {
CHECK_SECRET();
CHECK_ARGS("key");
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1;
});
static auto addStreamProxy = [](const string &vhost,
const string &app,
const string &stream,
const string &url,
int retry_count,
bool enable_hls,
bool enable_mp4,
int rtp_type,
float timeoutSec,
float timeout_sec,
const function<void(const SockException &ex,const string &key)> &cb){
auto key = getProxyKey(vhost,app,stream);
lock_guard<recursive_mutex> lck(s_proxyMapMtx);
@ -651,15 +750,15 @@ void installWebApi() {
return;
}
//添加拉流代理
PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4));
PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4, retry_count ? retry_count : -1));
s_proxyMap[key] = player;
//指定RTP over TCP(播放rtsp时有效)
(*player)[kRtpType] = rtp_type;
if (timeoutSec > 0.1) {
if (timeout_sec > 0.1) {
//播放握手超时时间
(*player)[kTimeoutMS] = timeoutSec * 1000;
(*player)[kTimeoutMS] = timeout_sec * 1000;
}
//开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试
@ -688,6 +787,7 @@ void installWebApi() {
allArgs["app"],
allArgs["stream"],
allArgs["url"],
allArgs["retry_count"],
allArgs["enable_hls"],/* 是否hls转发 */
allArgs["enable_mp4"],/* 是否MP4录制 */
allArgs["rtp_type"],
@ -1265,6 +1365,7 @@ void installWebApi() {
allArgs["stream"],
/** 支持rtsp和rtmp方式拉流 rtsp支持h265/h264/aac,rtmp仅支持h264/aac **/
"rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov",
-1,/*无限重试*/
true,/* 开启hls转发 */
false,/* 禁用MP4录制 */
0,//rtp over tcp方式拉流

View File

@ -0,0 +1,99 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "PusherProxy.h"
using namespace toolkit;
namespace mediakit {
PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const EventPoller::Ptr &poller)
: MediaPusher(src, poller){
_retry_count = retry_count;
_on_close = [](const SockException &) {};
_weak_src = src;
}
PusherProxy::~PusherProxy() {
_timer.reset();
}
void PusherProxy::setPushCallbackOnce(const function<void(const SockException &ex)> &cb) {
_on_publish = cb;
}
void PusherProxy::setOnClose(const function<void(const SockException &ex)> &cb) {
_on_close = cb;
}
void PusherProxy::publish(const string &dst_url) {
std::weak_ptr<PusherProxy> weak_self = shared_from_this();
std::shared_ptr<int> failed_cnt(new int(0));
setOnPublished([weak_self, dst_url, failed_cnt](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
if (strong_self->_on_publish) {
strong_self->_on_publish(err);
strong_self->_on_publish = nullptr;
}
auto src = strong_self->_weak_src.lock();
if (!err) {
// 推流成功
*failed_cnt = 0;
InfoL << "Publish " << dst_url << " success";
} else if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) {
// 推流失败,延时重试推送
strong_self->rePublish(dst_url, (*failed_cnt)++);
} else {
//如果媒体源已经注销, 或达到了最大重试次数,回调关闭
strong_self->_on_close(err);
}
});
setOnShutdown([weak_self, dst_url, failed_cnt](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
auto src = strong_self->_weak_src.lock();
//推流异常中断,延时重试播放
if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) {
strong_self->rePublish(dst_url, (*failed_cnt)++);
} else {
//如果媒体源已经注销, 或达到了最大重试次数,回调关闭
strong_self->_on_close(err);
}
});
MediaPusher::publish(dst_url);
}
void PusherProxy::rePublish(const string &dst_url, int failed_cnt) {
auto delay = MAX(2 * 1000, MIN(failed_cnt * 3000, 60 * 1000));
weak_ptr<PusherProxy> weak_self = shared_from_this();
_timer = std::make_shared<Timer>(delay / 1000.0f, [weak_self, dst_url, failed_cnt]() {
//推流失败次数越多,则延时越长
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
WarnL << "推流重试[" << failed_cnt << "]:" << dst_url;
strong_self->MediaPusher::publish(dst_url);
return false;
}, getPoller());
}
} /* namespace mediakit */

63
src/Pusher/PusherProxy.h Normal file
View File

@ -0,0 +1,63 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef SRC_DEVICE_PUSHERPROXY_H
#define SRC_DEVICE_PUSHERPROXY_H
#include "Pusher/MediaPusher.h"
#include "Util/TimeTicker.h"
using namespace std;
using namespace toolkit;
namespace mediakit {
class PusherProxy : public MediaPusher, public std::enable_shared_from_this<PusherProxy> {
public:
typedef std::shared_ptr<PusherProxy> Ptr;
// 如果retry_count<0,则一直重试播放否则重试retry_count次数
// 默认一直重试创建此对象时候需要外部保证MediaSource存在
PusherProxy(const MediaSource::Ptr &src, int retry_count = -1, const EventPoller::Ptr &poller = nullptr);
~PusherProxy() override;
/**
* push结果回调publish执行之前有效
* @param cb
*/
void setPushCallbackOnce(const function<void(const SockException &ex)> &cb);
/**
*
* @param cb
*/
void setOnClose(const function<void(const SockException &ex)> &cb);
/**
*
* @param dstUrl
*/
void publish(const string& dstUrl) override;
private:
// 重推逻辑函数
void rePublish(const string &dstUrl, int iFailedCnt);
private:
int _retry_count;
Timer::Ptr _timer;
std::weak_ptr<MediaSource> _weak_src;
function<void(const SockException &ex)> _on_close;
function<void(const SockException &ex)> _on_publish;
};
} /* namespace mediakit */
#endif //SRC_DEVICE_PUSHERPROXY_H

View File

@ -47,6 +47,7 @@ void RtmpPusher::teardown() {
}
void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) {
DebugL << ex.what();
if (ex.getErrCode() == Err_shutdown) {
//主动shutdown的不触发回调
return;

View File

@ -103,19 +103,16 @@ static const char *getCodecName(int codec_id) {
void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t bytes, int finish){
switch (codecid) {
case PSI_STREAM_H264: {
InfoL << "got video track: H264";
onTrack(std::make_shared<H264Track>());
break;
}
case PSI_STREAM_H265: {
InfoL << "got video track: H265";
onTrack(std::make_shared<H265Track>());
break;
}
case PSI_STREAM_AAC: {
InfoL<< "got audio track: AAC";
onTrack(std::make_shared<AACTrack>());
break;
}
@ -123,14 +120,12 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt
case PSI_STREAM_AUDIO_G711A:
case PSI_STREAM_AUDIO_G711U: {
auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U;
InfoL << "got audio track: G711";
//G711传统只支持 8000/1/16的规格FFmpeg貌似做了扩展但是这里不管它了
onTrack(std::make_shared<G711Track>(codec, 8000, 1, 16));
break;
}
case PSI_STREAM_AUDIO_OPUS: {
InfoL << "got audio track: opus";
onTrack(std::make_shared<OpusTrack>());
break;
}
@ -223,8 +218,11 @@ void DecoderImp::onStream(int stream,int codecid,const void *extra,size_t bytes,
#endif
void DecoderImp::onTrack(const Track::Ptr &track) {
_tracks[track->getTrackType()] = track;
_sink->addTrack(track);
if (!_tracks[track->getTrackType()]) {
_tracks[track->getTrackType()] = track;
_sink->addTrack(track);
InfoL << "got track: " << track->getCodecName();
}
}
void DecoderImp::onFrame(const Frame::Ptr &frame) {

View File

@ -95,6 +95,7 @@ void RtspPusher::publish(const string &url_str) {
}
void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) {
DebugL << ex.what();
if (ex.getErrCode() == Err_shutdown) {
//主动shutdown的不触发回调
return;