hook重试新增延时功能

This commit is contained in:
xiongziliang 2022-06-18 14:09:38 +08:00
parent 06f5bda8b3
commit 272eca1249
2 changed files with 37 additions and 33 deletions

View File

@ -157,8 +157,10 @@ on_server_keepalive=https://127.0.0.1/index/hook/on_server_keepalive
timeoutSec=10 timeoutSec=10
#keepalive hook触发间隔,单位秒float类型 #keepalive hook触发间隔,单位秒float类型
alive_interval=10.0 alive_interval=10.0
#hook通知重试次数 #hook通知失败重试次数,正整数。为0不重试1时重试一次以此类推
retry=2 retry=1
#hook通知失败重试延时单位秒float型
retry_delay=3.0
[cluster] [cluster]
#设置源站拉流url模板, 格式跟printf类似第一个%s指定app,第二个%s指定stream_id, #设置源站拉流url模板, 格式跟printf类似第一个%s指定app,第二个%s指定stream_id,

View File

@ -1,4 +1,4 @@
/* /*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* *
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
@ -48,6 +48,7 @@ const string kOnServerKeepalive = HOOK_FIELD"on_server_keepalive";
const string kAdminParams = HOOK_FIELD"admin_params"; const string kAdminParams = HOOK_FIELD"admin_params";
const string kAliveInterval = HOOK_FIELD"alive_interval"; const string kAliveInterval = HOOK_FIELD"alive_interval";
const string kRetry = HOOK_FIELD"retry"; const string kRetry = HOOK_FIELD"retry";
const string kRetryDelay = HOOK_FIELD"retry_delay";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kEnable] = false; mINI::Instance()[kEnable] = false;
@ -69,8 +70,8 @@ onceToken token([](){
mINI::Instance()[kOnServerKeepalive] = ""; mINI::Instance()[kOnServerKeepalive] = "";
mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc"; mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc";
mINI::Instance()[kAliveInterval] = 30.0; mINI::Instance()[kAliveInterval] = 30.0;
mINI::Instance()[kRetry] = 0; mINI::Instance()[kRetry] = 1;
mINI::Instance()[kRetryDelay] = 3.0;
},nullptr); },nullptr);
}//namespace Hook }//namespace Hook
@ -88,8 +89,6 @@ static onceToken token([]() {
}//namespace Cluster }//namespace Cluster
void do_http_process(HttpRequester::Ptr &requester,const string &url, const string& bodyStr, const function<void(const Value &,const string &)> &func,int retry,float timeout_sec);
static void parse_http_response(const SockException &ex, const Parser &res, static void parse_http_response(const SockException &ex, const Parser &res,
const function<void(const Value &,const string &)> &fun){ const function<void(const Value &,const string &)> &fun){
if (ex) { if (ex) {
@ -152,10 +151,10 @@ string getVhost(const HttpArgs &value) {
return val != value.end() ? val->second : ""; return val != value.end() ? val->second : "";
} }
void do_http_hook(const string &url,const ArgsType &body,const function<void(const Value &,const string &)> &func){ void do_http_hook(const string &url, const ArgsType &body, const function<void(const Value &, const string &)> &func, uint32_t retry) {
GET_CONFIG(string, mediaServerId, General::kMediaServerId); GET_CONFIG(string, mediaServerId, General::kMediaServerId);
GET_CONFIG(float, hook_timeoutSec, Hook::kTimeoutSec); GET_CONFIG(float, hook_timeoutSec, Hook::kTimeoutSec);
GET_CONFIG(int, hook_retry, Hook::kRetry); GET_CONFIG(float, retry_delay, Hook::kRetryDelay);
const_cast<ArgsType &>(body)["mediaServerId"] = mediaServerId; const_cast<ArgsType &>(body)["mediaServerId"] = mediaServerId;
HttpRequester::Ptr requester(new HttpRequester); HttpRequester::Ptr requester(new HttpRequester);
@ -167,35 +166,38 @@ void do_http_hook(const string &url,const ArgsType &body,const function<void(con
if (!vhost.empty()) { if (!vhost.empty()) {
requester->addHeader("X-VHOST", vhost); requester->addHeader("X-VHOST", vhost);
} }
Ticker ticker;
do_http_process(requester,url,bodyStr,func,hook_retry,hook_timeoutSec); requester->startRequester(url, [url, func, bodyStr, body, requester, ticker, retry](const SockException &ex, const Parser &res) mutable {
} onceToken token(nullptr, [&]() mutable { requester.reset(); });
parse_http_response(ex, res, [&](const Value &obj, const string &err) {
if (!err.empty()) {
// hook失败
WarnL << "hook " << url << " " << ticker.elapsedTime() << "ms,failed" << err << ":" << bodyStr;
if (retry-- > 0) {
requester->getPoller()->doDelayTask(MAX(retry_delay, 0.0) * 1000, [url, body, func, retry] {
do_http_hook(url, body, func, retry);
return 0;
});
//重试不需要触发回调
return;
}
} else if (ticker.elapsedTime() > 500) {
//hook成功但是hook响应超过500ms打印警告日志
DebugL << "hook " << url << " " << ticker.elapsedTime() << "ms,success:" << bodyStr;
}
void do_http_process(HttpRequester::Ptr &requester,const string &url, const string& bodyStr, const function<void(const Value &,const string &)> &func,int retry,float timeout_sec){
std::shared_ptr<Ticker> pTicker(new Ticker);
requester->startRequester(url, [url, func, bodyStr, requester, pTicker,retry,timeout_sec](const SockException &ex,
const Parser &res) mutable{
onceToken token(nullptr, [&]() mutable{
requester.reset();
});
parse_http_response(ex, res, [&](const Value &obj, const string &err) {
if (func) { if (func) {
func(obj, err); func(obj, err);
} }
if (!err.empty()) {
WarnL << "hook " << url << " " << pTicker->elapsedTime() << "ms,failed" << err << ":" << bodyStr;
} else if (pTicker->elapsedTime() > 500) {
DebugL << "hook " << url << " " << pTicker->elapsedTime() << "ms,success:" << bodyStr;
}
//尾部递归重试
if (!err.empty() && retry-- > 0) {
//WarnL << "----------------hook retry------------------ " << retry ;
HttpRequester::Ptr requester(new HttpRequester);
do_http_process(requester,url,bodyStr,func,retry,timeout_sec);
}
}); });
}, timeout_sec); }, hook_timeoutSec);
}
void do_http_hook(const string &url, const ArgsType &body, const function<void(const Value &, const string &)> &func) {
GET_CONFIG(uint32_t, hook_retry, Hook::kRetry);
do_http_hook(url, body, func, hook_retry);
} }
static ArgsType make_json(const MediaInfo &args){ static ArgsType make_json(const MediaInfo &args){