溯源集群模式支持多个源站

This commit is contained in:
ziyue 2022-01-12 17:43:07 +08:00
parent be77f84315
commit 838e4f2788
2 changed files with 60 additions and 28 deletions

View File

@ -161,6 +161,7 @@ alive_interval=10.0
#rtsp方式: rtsp://127.0.0.1:554/%s/%s
#hls方式: http://127.0.0.1:80/%s/%s/hls.m3u8
#http-ts方式: http://127.0.0.1:80/%s/%s.live.ts
#支持多个源站,不同源站通过分号(;)分隔
origin_url=
[http]

View File

@ -219,6 +219,43 @@ static void reportServerKeepalive() {
}, nullptr);
}
static const string kEdgeServerParam = "edge=1";
static string getPullUrl(const string &origin_fmt, const MediaInfo &info) {
char url[1024] = { 0 };
if (origin_fmt.size() > snprintf(url, sizeof(url), origin_fmt.data(), info._app.data(), info._streamid.data())) {
WarnL << "get origin url failed, origin_fmt:" << origin_fmt;
return "";
}
//告知源站这是来自边沿站的,如果未找到流就理解返回
return string(url) + '?' + kEdgeServerParam + '&' + VHOST_KEY + '=' + info._vhost + '&' + info._param_strs;
}
static void pullStreamFromOrigin(const vector<string>& urls, int index, int failed_cnt, const MediaInfo &args,
const function<void()> &closePlayer) {
GET_CONFIG(float, hook_timeout_sec, Hook::kTimeoutSec);
auto url = getPullUrl(urls[index % urls.size()], args);
auto timeout_sec = hook_timeout_sec / urls.size();
InfoL << "pull stream from origin, failed_cnt: " << failed_cnt << ", timeout_sec: " << timeout_sec << ", url: " << url;
addStreamProxy(args._vhost, args._app, args._streamid, url, -1, args._schema == HLS_SCHEMA, false,
Rtsp::RTP_TCP, timeout_sec, [=](const SockException &ex, const string &key) mutable {
if (!ex) {
return;
}
//拉流失败
if (++failed_cnt == urls.size()) {
//已经重试所有源站了
WarnL << "pull stream from origin final failed: " << url;
closePlayer();
return;
}
pullStreamFromOrigin(urls, index + 1, failed_cnt, args, closePlayer);
});
}
void installWebHook(){
GET_CONFIG(bool,hook_enable,Hook::kEnable);
GET_CONFIG(string,hook_adminparams,Hook::kAdminParams);
@ -365,41 +402,35 @@ void installWebHook(){
do_http_hook(hook_stream_chaned,body, nullptr);
});
static auto getPullUrl = [](const string &origin_fmt, const MediaInfo &info) -> string {
char url[1024] = { 0 };
if (origin_fmt.size() > snprintf(url, sizeof(url), origin_fmt.data(), info._app.data(), info._streamid.data())) {
WarnL << "get origin url failed, origin_fmt:" << origin_fmt;
return "";
GET_CONFIG_FUNC(vector<string>, origin_urls, Cluster::kOriginUrl, [](const string &str) {
vector<string> ret;
for (auto &url : split(str, ";")) {
trim(url);
if (!url.empty()) {
ret.emplace_back(url);
}
return string(url) + '?' + VHOST_KEY + '=' + info._vhost + '&' + info._param_strs;
};
}
return ret;
});
//监听播放失败(未找到特定的流)事件
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream, [](BroadcastNotFoundStreamArgs) {
GET_CONFIG(string, origin_url, Cluster::kOriginUrl);
if (!origin_url.empty()) {
//设置了源站
auto url = getPullUrl(origin_url, args);
if (url.empty()) {
if (start_with(args._param_strs, kEdgeServerParam)) {
//来自源站的溯源请求,流不存在时立即返回拉流失败
closePlayer();
return;
}
InfoL << "start pull stream from origin:" << url;
GET_CONFIG(float, hook_timeout_sec, Hook::kTimeoutSec);
addStreamProxy(args._vhost, args._app, args._streamid, url, -1, args._schema == HLS_SCHEMA, false,
Rtsp::RTP_TCP, hook_timeout_sec, [closePlayer](const SockException &ex, const string &key) {
if (ex) {
closePlayer();
}
});
if (!origin_urls.empty()) {
//设置了源站,那么尝试溯源
static atomic<uint8_t> s_index { 0 };
pullStreamFromOrigin(origin_urls, s_index.load(), 0, args, closePlayer);
++s_index;
return;
}
GET_CONFIG(string, hook_stream_not_found, Hook::kOnStreamNotFound);
if (!hook_enable || hook_stream_not_found.empty()) {
//如果确定这个流不存在可以closePlayer()返回播放器流不存在
// closePlayer();
return;
}
auto body = make_json(args);
@ -466,8 +497,8 @@ void installWebHook(){
});
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastStreamNoneReader,[](BroadcastStreamNoneReaderArgs) {
GET_CONFIG(string, origin_url, Cluster::kOriginUrl);
if (!origin_url.empty()) {
if (!origin_urls.empty()) {
//边沿站无人观看时立即停止溯源
sender.close(false);
WarnL << "无人观看主动关闭流:" << sender.getOriginUrl();
return;