支持溯源方式的集群模式

This commit is contained in:
ziyue 2022-01-12 16:45:47 +08:00
parent d52fc4c31f
commit be77f84315
5 changed files with 104 additions and 59 deletions

View File

@ -118,6 +118,7 @@
- 支持FFmpeg拉流代理任意格式的流
- 支持http api生成并返回实时截图
- 支持按需解复用、转协议,当有人观看时才开启转协议
- 支持溯源模式的集群部署溯源方式支持rtsp/rtmp/hls/http-ts
## 编译以及测试

View File

@ -153,6 +153,16 @@ timeoutSec=10
#keepalive hook触发间隔,单位秒float类型
alive_interval=10.0
[cluster]
#设置源站拉流url模板, 格式跟printf类似第一个%s指定app,第二个%s指定stream_id,
#开启集群模式后on_stream_not_found和on_stream_none_reader hook将无效.
#溯源模式支持以下类型:
#rtmp方式: rtmp://127.0.0.1:1935/%s/%s
#rtsp方式: rtsp://127.0.0.1:554/%s/%s
#hls方式: http://127.0.0.1:80/%s/%s/hls.m3u8
#http-ts方式: http://127.0.0.1:80/%s/%s.live.ts
origin_url=
[http]
#http服务器字符编码windows上默认gb2312
charSet=utf-8

View File

@ -446,6 +446,45 @@ void getStatisticJson(const function<void(Value &val)> &cb) {
#endif
}
void addStreamProxy(const string &vhost, const string &app, const string &stream, const string &url, int retry_count,
bool enable_hls, bool enable_mp4, int rtp_type, float timeout_sec,
const function<void(const SockException &ex, const string &key)> &cb) {
auto key = getProxyKey(vhost, app, stream);
lock_guard<recursive_mutex> lck(s_proxyMapMtx);
if (s_proxyMap.find(key) != s_proxyMap.end()) {
//已经在拉流了
cb(SockException(Err_success), key);
return;
}
//添加拉流代理
auto player = std::make_shared<PlayerProxy>(vhost, app, stream, enable_hls, enable_mp4, retry_count ? retry_count : -1);
s_proxyMap[key] = player;
//指定RTP over TCP(播放rtsp时有效)
(*player)[kRtpType] = rtp_type;
if (timeout_sec > 0.1) {
//播放握手超时时间
(*player)[kTimeoutMS] = timeout_sec * 1000;
}
//开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试
player->setPlayCallbackOnce([cb, key](const SockException &ex) {
if (ex) {
lock_guard<recursive_mutex> lck(s_proxyMapMtx);
s_proxyMap.erase(key);
}
cb(ex, key);
});
//被主动关闭拉流
player->setOnClose([key](const SockException &ex) {
lock_guard<recursive_mutex> lck(s_proxyMapMtx);
s_proxyMap.erase(key);
});
player->play(url);
};
/**
* api接口
* api都支持GET和POST两种方式
@ -861,52 +900,6 @@ void installWebApi() {
val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1;
});
static auto addStreamProxy = [](const string &vhost,
const string &app,
const string &stream,
const string &url,
int retry_count,
bool enable_hls,
bool enable_mp4,
int rtp_type,
float timeout_sec,
const function<void(const SockException &ex,const string &key)> &cb){
auto key = getProxyKey(vhost,app,stream);
lock_guard<recursive_mutex> lck(s_proxyMapMtx);
if(s_proxyMap.find(key) != s_proxyMap.end()){
//已经在拉流了
cb(SockException(Err_success),key);
return;
}
//添加拉流代理
PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4, retry_count ? retry_count : -1));
s_proxyMap[key] = player;
//指定RTP over TCP(播放rtsp时有效)
(*player)[kRtpType] = rtp_type;
if (timeout_sec > 0.1) {
//播放握手超时时间
(*player)[kTimeoutMS] = timeout_sec * 1000;
}
//开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试
player->setPlayCallbackOnce([cb,key](const SockException &ex){
if(ex){
lock_guard<recursive_mutex> lck(s_proxyMapMtx);
s_proxyMap.erase(key);
}
cb(ex,key);
});
//被主动关闭拉流
player->setOnClose([key](const SockException &ex){
lock_guard<recursive_mutex> lck(s_proxyMapMtx);
s_proxyMap.erase(key);
});
player->play(url);
};
//动态添加rtsp/rtmp拉流代理
//测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs
api_regist("/index/api/addStreamProxy",[](API_ARGS_MAP_ASYNC){

View File

@ -236,4 +236,7 @@ void installWebApi();
void unInstallWebApi();
Value makeMediaSourceJson(MediaSource &media);
void getStatisticJson(const function<void(Value &val)> &cb);
void addStreamProxy(const string &vhost, const string &app, const string &stream, const string &url, int retry_count,
bool enable_hls, bool enable_mp4, int rtp_type, float timeout_sec,
const function<void(const SockException &ex, const string &key)> &cb);
#endif //ZLMEDIAKIT_WEBAPI_H

View File

@ -69,9 +69,16 @@ onceToken token([](){
},nullptr);
}//namespace Hook
namespace Cluster {
#define CLUSTER_FIELD "cluster."
const string kOriginUrl = CLUSTER_FIELD "origin_url";
static onceToken token([]() {
mINI::Instance()[kOriginUrl] = "";
});
static void parse_http_response(const SockException &ex,
const Parser &res,
}//namespace Cluster
static void parse_http_response(const SockException &ex, const Parser &res,
const function<void(const Value &,const string &)> &fun){
if (ex) {
auto errStr = StrPrinter << "[network err]:" << ex.what() << endl;
@ -358,8 +365,37 @@ void installWebHook(){
do_http_hook(hook_stream_chaned,body, nullptr);
});
static auto getPullUrl = [](const string &origin_fmt, const MediaInfo &info) -> string {
char url[1024] = { 0 };
if (origin_fmt.size() > snprintf(url, sizeof(url), origin_fmt.data(), info._app.data(), info._streamid.data())) {
WarnL << "get origin url failed, origin_fmt:" << origin_fmt;
return "";
}
return string(url) + '?' + VHOST_KEY + '=' + info._vhost + '&' + info._param_strs;
};
//监听播放失败(未找到特定的流)事件
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream, [](BroadcastNotFoundStreamArgs) {
GET_CONFIG(string, origin_url, Cluster::kOriginUrl);
if (!origin_url.empty()) {
//设置了源站
auto url = getPullUrl(origin_url, args);
if (url.empty()) {
closePlayer();
return;
}
InfoL << "start pull stream from origin:" << url;
GET_CONFIG(float, hook_timeout_sec, Hook::kTimeoutSec);
addStreamProxy(args._vhost, args._app, args._streamid, url, -1, args._schema == HLS_SCHEMA, false,
Rtsp::RTP_TCP, hook_timeout_sec, [closePlayer](const SockException &ex, const string &key) {
if (ex) {
closePlayer();
}
});
return;
}
GET_CONFIG(string, hook_stream_not_found, Hook::kOnStreamNotFound);
if (!hook_enable || hook_stream_not_found.empty()) {
//如果确定这个流不存在可以closePlayer()返回播放器流不存在
@ -430,6 +466,12 @@ void installWebHook(){
});
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastStreamNoneReader,[](BroadcastStreamNoneReaderArgs) {
GET_CONFIG(string, origin_url, Cluster::kOriginUrl);
if (!origin_url.empty()) {
sender.close(false);
WarnL << "无人观看主动关闭流:" << sender.getOriginUrl();
return;
}
GET_CONFIG(string,hook_stream_none_reader,Hook::kOnStreamNoneReader);
if(!hook_enable || hook_stream_none_reader.empty()){
return;
@ -449,11 +491,7 @@ void installWebHook(){
return;
}
strongSrc->close(false);
WarnL << "无人观看主动关闭流:"
<< strongSrc->getSchema() << "/"
<< strongSrc->getVhost() << "/"
<< strongSrc->getApp() << "/"
<< strongSrc->getId();
WarnL << "无人观看主动关闭流:" << strongSrc->getOriginUrl();
});
});