diff --git a/conf/config.ini b/conf/config.ini index 77441f9f..708b5ed5 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -161,6 +161,7 @@ alive_interval=10.0 #rtsp方式: rtsp://127.0.0.1:554/%s/%s #hls方式: http://127.0.0.1:80/%s/%s/hls.m3u8 #http-ts方式: http://127.0.0.1:80/%s/%s.live.ts +#支持多个源站,不同源站通过分号(;)分隔 origin_url= [http] diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 451c46c1..058b649c 100755 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -219,6 +219,43 @@ static void reportServerKeepalive() { }, nullptr); } +static const string kEdgeServerParam = "edge=1"; + +static string getPullUrl(const string &origin_fmt, const MediaInfo &info) { + char url[1024] = { 0 }; + if (origin_fmt.size() > snprintf(url, sizeof(url), origin_fmt.data(), info._app.data(), info._streamid.data())) { + WarnL << "get origin url failed, origin_fmt:" << origin_fmt; + return ""; + } + //告知源站这是来自边沿站的,如果未找到流就理解返回 + return string(url) + '?' + kEdgeServerParam + '&' + VHOST_KEY + '=' + info._vhost + '&' + info._param_strs; +} + +static void pullStreamFromOrigin(const vector& urls, int index, int failed_cnt, const MediaInfo &args, + const function &closePlayer) { + + GET_CONFIG(float, hook_timeout_sec, Hook::kTimeoutSec); + + auto url = getPullUrl(urls[index % urls.size()], args); + auto timeout_sec = hook_timeout_sec / urls.size(); + InfoL << "pull stream from origin, failed_cnt: " << failed_cnt << ", timeout_sec: " << timeout_sec << ", url: " << url; + + addStreamProxy(args._vhost, args._app, args._streamid, url, -1, args._schema == HLS_SCHEMA, false, + Rtsp::RTP_TCP, timeout_sec, [=](const SockException &ex, const string &key) mutable { + if (!ex) { + return; + } + //拉流失败 + if (++failed_cnt == urls.size()) { + //已经重试所有源站了 + WarnL << "pull stream from origin final failed: " << url; + closePlayer(); + return; + } + pullStreamFromOrigin(urls, index + 1, failed_cnt, args, closePlayer); + }); +} + void installWebHook(){ GET_CONFIG(bool,hook_enable,Hook::kEnable); GET_CONFIG(string,hook_adminparams,Hook::kAdminParams); @@ -365,41 +402,35 @@ void installWebHook(){ do_http_hook(hook_stream_chaned,body, nullptr); }); - static auto getPullUrl = [](const string &origin_fmt, const MediaInfo &info) -> string { - char url[1024] = { 0 }; - if (origin_fmt.size() > snprintf(url, sizeof(url), origin_fmt.data(), info._app.data(), info._streamid.data())) { - WarnL << "get origin url failed, origin_fmt:" << origin_fmt; - return ""; + GET_CONFIG_FUNC(vector, origin_urls, Cluster::kOriginUrl, [](const string &str) { + vector ret; + for (auto &url : split(str, ";")) { + trim(url); + if (!url.empty()) { + ret.emplace_back(url); + } } - - return string(url) + '?' + VHOST_KEY + '=' + info._vhost + '&' + info._param_strs; - }; + return ret; + }); //监听播放失败(未找到特定的流)事件 NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream, [](BroadcastNotFoundStreamArgs) { - GET_CONFIG(string, origin_url, Cluster::kOriginUrl); - if (!origin_url.empty()) { - //设置了源站 - auto url = getPullUrl(origin_url, args); - if (url.empty()) { - closePlayer(); - return; - } - InfoL << "start pull stream from origin:" << url; - GET_CONFIG(float, hook_timeout_sec, Hook::kTimeoutSec); - addStreamProxy(args._vhost, args._app, args._streamid, url, -1, args._schema == HLS_SCHEMA, false, - Rtsp::RTP_TCP, hook_timeout_sec, [closePlayer](const SockException &ex, const string &key) { - if (ex) { - closePlayer(); - } - }); + if (start_with(args._param_strs, kEdgeServerParam)) { + //来自源站的溯源请求,流不存在时立即返回拉流失败 + closePlayer(); + return; + } + + if (!origin_urls.empty()) { + //设置了源站,那么尝试溯源 + static atomic s_index { 0 }; + pullStreamFromOrigin(origin_urls, s_index.load(), 0, args, closePlayer); + ++s_index; return; } GET_CONFIG(string, hook_stream_not_found, Hook::kOnStreamNotFound); if (!hook_enable || hook_stream_not_found.empty()) { - //如果确定这个流不存在,可以closePlayer()返回播放器流不存在 - // closePlayer(); return; } auto body = make_json(args); @@ -466,8 +497,8 @@ void installWebHook(){ }); NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastStreamNoneReader,[](BroadcastStreamNoneReaderArgs) { - GET_CONFIG(string, origin_url, Cluster::kOriginUrl); - if (!origin_url.empty()) { + if (!origin_urls.empty()) { + //边沿站无人观看时立即停止溯源 sender.close(false); WarnL << "无人观看主动关闭流:" << sender.getOriginUrl(); return;