HTTP: 重写http相关超时管理机制

This commit is contained in:
ziyue 2022-01-19 22:50:44 +08:00
parent 03a8d3a6ac
commit ee6ad66a6d
12 changed files with 155 additions and 97 deletions

@ -1 +1 @@
Subproject commit a71747c7d0811263b12ef9112ec13fdffe2ab171 Subproject commit c41a8e61e43292080462517027554d0015e26fbe

View File

@ -139,6 +139,7 @@ API_EXPORT void API_CALL mk_http_requester_set_cb(mk_http_requester ctx,on_mk_ht
API_EXPORT void API_CALL mk_http_requester_start(mk_http_requester ctx,const char *url, float timeout_second){ API_EXPORT void API_CALL mk_http_requester_start(mk_http_requester ctx,const char *url, float timeout_second){
assert(ctx && url && url[0] && timeout_second > 0); assert(ctx && url && url[0] && timeout_second > 0);
HttpRequester::Ptr *obj = (HttpRequester::Ptr *)ctx; HttpRequester::Ptr *obj = (HttpRequester::Ptr *)ctx;
(*obj)->sendRequest(url,timeout_second); (*obj)->setCompleteTimeout(timeout_second * 1000);
(*obj)->sendRequest(url);
} }

View File

@ -32,12 +32,12 @@ void HlsPlayer::play_l() {
if (waitResponse()) { if (waitResponse()) {
return; return;
} }
float playTimeOutSec = (*this)[Client::kTimeoutMS].as<int>() / 1000.0f;
setMethod("GET"); setMethod("GET");
if (!(*this)[kNetAdapter].empty()) { if (!(*this)[kNetAdapter].empty()) {
setNetAdapter((*this)[kNetAdapter]); setNetAdapter((*this)[kNetAdapter]);
} }
sendRequest(_m3u8_list.back(), playTimeOutSec); setCompleteTimeout((*this)[Client::kTimeoutMS].as<int>());
sendRequest(_m3u8_list.back());
} }
void HlsPlayer::teardown_l(const SockException &ex) { void HlsPlayer::teardown_l(const SockException &ex) {
@ -117,7 +117,9 @@ void HlsPlayer::playNextTs() {
}); });
_http_ts_player->setMethod("GET"); _http_ts_player->setMethod("GET");
_http_ts_player->sendRequest(url, 2 * duration); //ts切片必须在其时长的3倍内下载完毕
_http_ts_player->setCompleteTimeout(3 * duration * 1000);
_http_ts_player->sendRequest(url);
} }
void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map<int, ts_segment> &ts_map) { void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map<int, ts_segment> &ts_map) {

View File

@ -15,8 +15,7 @@
namespace mediakit { namespace mediakit {
void HttpClient::sendRequest(const string &url, float timeout_sec, float recv_timeout_sec) { void HttpClient::sendRequest(const string &url) {
_recv_timeout_second = recv_timeout_sec;
clearResponse(); clearResponse();
_url = url; _url = url;
auto protocol = FindField(url.data(), NULL, "://"); auto protocol = FindField(url.data(), NULL, "://");
@ -73,7 +72,6 @@ void HttpClient::sendRequest(const string &url, float timeout_sec, float recv_ti
bool host_changed = (_last_host != host + ":" + to_string(port)) || (_is_https != is_https); bool host_changed = (_last_host != host + ":" + to_string(port)) || (_is_https != is_https);
_last_host = host + ":" + to_string(port); _last_host = host + ":" + to_string(port);
_is_https = is_https; _is_https = is_https;
_timeout_second = timeout_sec;
auto cookies = HttpCookieStorage::Instance().get(_last_host, _path); auto cookies = HttpCookieStorage::Instance().get(_last_host, _path);
_StrPrinter printer; _StrPrinter printer;
@ -86,7 +84,7 @@ void HttpClient::sendRequest(const string &url, float timeout_sec, float recv_ti
} }
if (!alive() || host_changed) { if (!alive() || host_changed) {
startConnect(host, port, timeout_sec); startConnect(host, port, _wait_header_ms);
} else { } else {
SockException ex; SockException ex;
onConnect_l(ex); onConnect_l(ex);
@ -95,22 +93,22 @@ void HttpClient::sendRequest(const string &url, float timeout_sec, float recv_ti
void HttpClient::clear() { void HttpClient::clear() {
_url.clear(); _url.clear();
_header.clear();
_user_set_header.clear(); _user_set_header.clear();
_body.reset(); _body.reset();
_method.clear(); _method.clear();
_path.clear();
clearResponse(); clearResponse();
} }
void HttpClient::clearResponse() { void HttpClient::clearResponse() {
_complete = false; _complete = false;
_header_recved = false;
_recved_body_size = 0; _recved_body_size = 0;
_total_body_size = 0; _total_body_size = 0;
_parser.Clear(); _parser.Clear();
_chunked_splitter = nullptr; _chunked_splitter = nullptr;
_recv_timeout_ticker.resetTime(); _wait_header.resetTime();
_total_timeout_ticker.resetTime(); _wait_body.resetTime();
_wait_complete.resetTime();
HttpRequestSplitter::reset(); HttpRequestSplitter::reset();
} }
@ -152,7 +150,6 @@ void HttpClient::onConnect(const SockException &ex) {
} }
void HttpClient::onConnect_l(const SockException &ex) { void HttpClient::onConnect_l(const SockException &ex) {
_recv_timeout_ticker.resetTime();
if (ex) { if (ex) {
onDisconnect(ex); onDisconnect(ex);
return; return;
@ -164,12 +161,14 @@ void HttpClient::onConnect_l(const SockException &ex) {
printer << pr.first + ": "; printer << pr.first + ": ";
printer << pr.second + "\r\n"; printer << pr.second + "\r\n";
} }
_header.clear();
_path.clear();
SockSender::send(printer << "\r\n"); SockSender::send(printer << "\r\n");
onFlush(); onFlush();
} }
void HttpClient::onRecv(const Buffer::Ptr &pBuf) { void HttpClient::onRecv(const Buffer::Ptr &pBuf) {
_recv_timeout_ticker.resetTime(); _wait_body.resetTime();
HttpRequestSplitter::input(pBuf->data(), pBuf->size()); HttpRequestSplitter::input(pBuf->data(), pBuf->size());
} }
@ -192,12 +191,13 @@ ssize_t HttpClient::onRecvHeader(const char *data, size_t len) {
} }
if (onRedirectUrl(new_url, _parser.Url() == "302")) { if (onRedirectUrl(new_url, _parser.Url() == "302")) {
setMethod("GET"); setMethod("GET");
HttpClient::sendRequest(new_url, _timeout_second, _recv_timeout_second); HttpClient::sendRequest(new_url);
return 0; return 0;
} }
} }
checkCookie(_parser.getHeader()); checkCookie(_parser.getHeader());
_header_recved = true;
_total_body_size = onResponseHeader(_parser.Url(), _parser.getHeader()); _total_body_size = onResponseHeader(_parser.Url(), _parser.getHeader());
if (!_parser["Content-Length"].empty()) { if (!_parser["Content-Length"].empty()) {
@ -225,8 +225,8 @@ ssize_t HttpClient::onRecvHeader(const char *data, size_t len) {
return 0; return 0;
} }
//当_totalBodySize != 0时到达这里代表后续有content //当_total_body_size != 0时到达这里代表后续有content
//虽然我们在_totalBodySize >0 时知道content的确切大小 //虽然我们在_total_body_size >0 时知道content的确切大小
//但是由于我们没必要等content接收完毕才回调onRecvContent(因为这样浪费内存并且要多次拷贝数据) //但是由于我们没必要等content接收完毕才回调onRecvContent(因为这样浪费内存并且要多次拷贝数据)
//所以返回-1代表我们接下来分段接收content //所以返回-1代表我们接下来分段接收content
_recved_body_size = 0; _recved_body_size = 0;
@ -265,7 +265,6 @@ void HttpClient::onRecvContent(const char *data, size_t len) {
} }
void HttpClient::onFlush() { void HttpClient::onFlush() {
_recv_timeout_ticker.resetTime();
GET_CONFIG(uint32_t, send_buf_size, Http::kSendBufSize); GET_CONFIG(uint32_t, send_buf_size, Http::kSendBufSize);
while (_body && _body->remainSize() && !isSocketBusy()) { while (_body && _body->remainSize() && !isSocketBusy()) {
auto buffer = _body->readData(send_buf_size); auto buffer = _body->readData(send_buf_size);
@ -282,20 +281,36 @@ void HttpClient::onFlush() {
} }
void HttpClient::onManager() { void HttpClient::onManager() {
if (_recv_timeout_ticker.elapsedTime() > _recv_timeout_second * 1000 && _total_body_size < 0 && !_chunked_splitter) { //onManager回调在连接中或已连接状态才会调用
//如果Content-Length未指定 但接收数据超时
//则认为本次http请求完成 if (_wait_complete_ms > 0) {
onResponseCompleted_l(); //设置了总超时时间
if (!_complete && _wait_complete.elapsedTime() > _wait_complete_ms) {
//等待http回复完毕超时
shutdown(SockException(Err_timeout, "wait http response complete timeout"));
return;
}
return;
} }
if (waitResponse() && _timeout_second > 0 && _total_timeout_ticker.elapsedTime() > _timeout_second * 1000) { //未设置总超时时间
//超时 if (!_header_recved) {
shutdown(SockException(Err_timeout, "http request timeout")); //等待header中
if (_wait_header.elapsedTime() > _wait_header_ms) {
//等待header中超时
shutdown(SockException(Err_timeout, "wait http response header timeout"));
return;
}
} else if (_wait_body_ms > 0 && _wait_body.elapsedTime() > _wait_body_ms) {
//等待body中等待超时
shutdown(SockException(Err_timeout, "wait http response body timeout"));
return;
} }
} }
void HttpClient::onResponseCompleted_l() { void HttpClient::onResponseCompleted_l() {
_complete = true; _complete = true;
_wait_complete.resetTime();
onResponseCompleted(); onResponseCompleted();
} }
@ -303,6 +318,10 @@ bool HttpClient::waitResponse() const {
return !_complete && alive(); return !_complete && alive();
} }
bool HttpClient::isHttps() const {
return _is_https;
}
void HttpClient::checkCookie(HttpClient::HttpHeader &headers) { void HttpClient::checkCookie(HttpClient::HttpHeader &headers) {
//Set-Cookie: IPTV_SERVER=8E03927B-CC8C-4389-BC00-31DBA7EC7B49;expires=Sun, Sep 23 2018 15:07:31 GMT;path=/index/api/ //Set-Cookie: IPTV_SERVER=8E03927B-CC8C-4389-BC00-31DBA7EC7B49;expires=Sun, Sep 23 2018 15:07:31 GMT;path=/index/api/
for (auto it_set_cookie = headers.find("Set-Cookie"); it_set_cookie != headers.end(); ++it_set_cookie) { for (auto it_set_cookie = headers.find("Set-Cookie"); it_set_cookie != headers.end(); ++it_set_cookie) {
@ -340,4 +359,17 @@ void HttpClient::checkCookie(HttpClient::HttpHeader &headers) {
} }
} }
void HttpClient::setHeaderTimeout(size_t timeout_ms) {
CHECK(timeout_ms > 0);
_wait_header_ms = timeout_ms;
}
void HttpClient::setBodyTimeout(size_t timeout_ms) {
_wait_body_ms = timeout_ms;
}
void HttpClient::setCompleteTimeout(size_t timeout_ms) {
_wait_complete_ms = timeout_ms;
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -61,9 +61,8 @@ public:
/** /**
* http[s] * http[s]
* @param url url * @param url url
* @param timeout_sec
*/ */
virtual void sendRequest(const string &url, float timeout_sec, float recv_timeout_sec = 3); virtual void sendRequest(const string &url);
/** /**
* *
@ -111,6 +110,30 @@ public:
*/ */
bool waitResponse() const; bool waitResponse() const;
/**
* https
*/
bool isHttps() const;
/**
* header完毕的延时10
* 0
*/
void setHeaderTimeout(size_t timeout_ms);
/**
* body数据超时时间, 5
* body回复的超时问题
* 0
*/
void setBodyTimeout(size_t timeout_ms);
/**
* , 0
* 0HeaderTimeout和BodyTimeout无效
*/
void setCompleteTimeout(size_t timeout_ms);
protected: protected:
/** /**
* http回复头 * http回复头
@ -174,28 +197,34 @@ private:
void checkCookie(HttpHeader &headers); void checkCookie(HttpHeader &headers);
void clearResponse(); void clearResponse();
protected:
bool _is_https;
private: private:
//for http response
bool _complete = false; bool _complete = false;
string _url; bool _header_recved = false;
HttpHeader _header;
HttpHeader _user_set_header;
HttpBody::Ptr _body;
string _method;
string _path;
string _last_host;
Ticker _recv_timeout_ticker;
Ticker _total_timeout_ticker;
float _timeout_second = 0;
float _recv_timeout_second = 0;
//recv
size_t _recved_body_size; size_t _recved_body_size;
ssize_t _total_body_size; ssize_t _total_body_size;
Parser _parser; Parser _parser;
std::shared_ptr<HttpChunkedSplitter> _chunked_splitter; std::shared_ptr<HttpChunkedSplitter> _chunked_splitter;
//for request args
bool _is_https;
string _url;
HttpHeader _user_set_header;
HttpBody::Ptr _body;
string _method;
string _last_host;
//for this request
string _path;
HttpHeader _header;
//for timeout
size_t _wait_header_ms = 10 * 1000;
size_t _wait_body_ms = 10 * 1000;
size_t _wait_complete_ms = 0;
Ticker _wait_header;
Ticker _wait_body;
Ticker _wait_complete;
}; };
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -13,7 +13,7 @@
namespace mediakit { namespace mediakit {
void HttpClientImp::onConnect(const SockException &ex) { void HttpClientImp::onConnect(const SockException &ex) {
if(!_is_https){ if(!isHttps()){
HttpClient::onConnect(ex); HttpClient::onConnect(ex);
} else { } else {
TcpClientWithSSL<HttpClient>::onConnect(ex); TcpClientWithSSL<HttpClient>::onConnect(ex);

View File

@ -9,87 +9,85 @@
*/ */
#include "HttpDownloader.h" #include "HttpDownloader.h"
#include "Util/MD5.h"
#include "Util/File.h" #include "Util/File.h"
#include "Util/MD5.h"
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
HttpDownloader::HttpDownloader() { HttpDownloader::HttpDownloader() {}
}
HttpDownloader::~HttpDownloader() { HttpDownloader::~HttpDownloader() {
closeFile(); closeFile();
} }
void HttpDownloader::startDownload(const string& url, const string& filePath,bool bAppend,float timeOutSecond) { void HttpDownloader::startDownload(const string &url, const string &filePath, bool bAppend) {
_filePath = filePath; _filePath = filePath;
if(_filePath.empty()){ if (_filePath.empty()) {
_filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest(); _filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest();
} }
_saveFile = File::create_file(_filePath.data(), bAppend ? "ab" : "wb"); _saveFile = File::create_file(_filePath.data(), bAppend ? "ab" : "wb");
if(!_saveFile){ if (!_saveFile) {
auto strErr = StrPrinter << "打开文件失败:" << filePath << endl; auto strErr = StrPrinter << "打开文件失败:" << filePath << endl;
throw std::runtime_error(strErr); throw std::runtime_error(strErr);
} }
_bDownloadSuccess = false; _bDownloadSuccess = false;
if(bAppend){ if (bAppend) {
auto currentLen = ftell(_saveFile); auto currentLen = ftell(_saveFile);
if(currentLen){ if (currentLen) {
//最少续传一个字节怕遇到http 416的错误 //最少续传一个字节怕遇到http 416的错误
currentLen -= 1; currentLen -= 1;
fseek(_saveFile,-1,SEEK_CUR); fseek(_saveFile, -1, SEEK_CUR);
} }
addHeader("Range", StrPrinter << "bytes=" << currentLen << "-" << endl); addHeader("Range", StrPrinter << "bytes=" << currentLen << "-" << endl);
} }
setMethod("GET"); setMethod("GET");
sendRequest(url,timeOutSecond);
sendRequest(url);
} }
ssize_t HttpDownloader::onResponseHeader(const string& status,const HttpHeader& headers) { ssize_t HttpDownloader::onResponseHeader(const string &status, const HttpHeader &headers) {
if(status != "200" && status != "206"){ if (status != "200" && status != "206") {
//失败 //失败
shutdown(SockException(Err_shutdown,StrPrinter << "Http Status:" << status)); shutdown(SockException(Err_shutdown, StrPrinter << "Http Status:" << status));
} }
//后续全部是content //后续全部是content
return -1; return -1;
} }
void HttpDownloader::onResponseBody(const char* buf, size_t size, size_t recvedSize, size_t totalSize) { void HttpDownloader::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) {
if(_saveFile){ if (_saveFile) {
fwrite(buf,size,1,_saveFile); fwrite(buf, size, 1, _saveFile);
} }
} }
void HttpDownloader::onResponseCompleted() { void HttpDownloader::onResponseCompleted() {
closeFile(); closeFile();
//InfoL << "md5Sum:" << getMd5Sum(_filePath); // InfoL << "md5Sum:" << getMd5Sum(_filePath);
_bDownloadSuccess = true; _bDownloadSuccess = true;
if(_onResult){ if (_onResult) {
_onResult(Err_success,"success",_filePath); _onResult(Err_success, "success", _filePath);
_onResult = nullptr; _onResult = nullptr;
} }
} }
void HttpDownloader::onDisconnect(const SockException &ex) { void HttpDownloader::onDisconnect(const SockException &ex) {
closeFile(); closeFile();
if(!_bDownloadSuccess){ if (!_bDownloadSuccess) {
File::delete_file(_filePath.data()); File::delete_file(_filePath.data());
} }
if(_onResult){ if (_onResult) {
_onResult(ex.getErrCode(),ex.what(),_filePath); _onResult(ex.getErrCode(), ex.what(), _filePath);
_onResult = nullptr; _onResult = nullptr;
} }
} }
void HttpDownloader::closeFile() { void HttpDownloader::closeFile() {
if(_saveFile){ if (_saveFile) {
fflush(_saveFile); fflush(_saveFile);
fclose(_saveFile); fclose(_saveFile);
_saveFile = nullptr; _saveFile = nullptr;
} }
} }
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -18,18 +18,16 @@ namespace mediakit {
class HttpDownloader: public HttpClientImp { class HttpDownloader: public HttpClientImp {
public: public:
typedef std::shared_ptr<HttpDownloader> Ptr; typedef std::shared_ptr<HttpDownloader> Ptr;
typedef std::function<void(ErrCode code,const string &errMsg,const string &filePath)> onDownloadResult; typedef std::function<void(ErrCode code, const string &errMsg, const string &filePath)> onDownloadResult;
HttpDownloader(); HttpDownloader();
virtual ~HttpDownloader(); virtual ~HttpDownloader();
//开始下载文件,默认断点续传方式下载 //开始下载文件,默认断点续传方式下载
void startDownload(const string &url,const string &filePath = "",bool bAppend = false, float timeOutSecond = 10 ); void startDownload(const string &url, const string &filePath = "", bool bAppend = false);
void startDownload(const string &url,const onDownloadResult &cb,float timeOutSecond = 10){ void startDownload(const string &url, const onDownloadResult &cb) {
setOnResult(cb); setOnResult(cb);
startDownload(url,"",false,timeOutSecond); startDownload(url, "", false);
}
void setOnResult(const onDownloadResult &cb){
_onResult = cb;
} }
void setOnResult(const onDownloadResult &cb) { _onResult = cb; }
protected: protected:
void onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) override; void onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) override;

View File

@ -39,7 +39,8 @@ void HttpRequester::onDisconnect(const SockException &ex) {
void HttpRequester::startRequester(const string &url, const HttpRequesterResult &onResult, float timeOutSecond) { void HttpRequester::startRequester(const string &url, const HttpRequesterResult &onResult, float timeOutSecond) {
_onResult = onResult; _onResult = onResult;
sendRequest(url, timeOutSecond); setCompleteTimeout(timeOutSecond * 1000);
sendRequest(url);
} }
void HttpRequester::clear() { void HttpRequester::clear() {

View File

@ -19,13 +19,8 @@ void TsPlayer::play(const string &strUrl) {
playTs(); playTs();
} }
void TsPlayer::teardown_l(const SockException &ex) {
HttpClient::clear();
shutdown(ex);
}
void TsPlayer::teardown() { void TsPlayer::teardown() {
teardown_l(SockException(Err_shutdown, "teardown")); shutdown(SockException(Err_shutdown, "teardown"));
} }
void TsPlayer::playTs() { void TsPlayer::playTs() {
@ -33,15 +28,17 @@ void TsPlayer::playTs() {
//播放器目前还存活,正在下载中 //播放器目前还存活,正在下载中
return; return;
} }
WarnL << "fetch:" << _ts_url; TraceL << "play http-ts: " << _ts_url;
weak_ptr <TsPlayer> weak_self = dynamic_pointer_cast<TsPlayer>(shared_from_this()); weak_ptr <TsPlayer> weak_self = dynamic_pointer_cast<TsPlayer>(shared_from_this());
setMethod("GET"); setMethod("GET");
sendRequest(_ts_url, 3600 * 2, 60); setHeaderTimeout((*this)[Client::kTimeoutMS].as<int>());
setBodyTimeout((*this)[Client::kMediaTimeoutMS].as<int>());
sendRequest(_ts_url);
} }
void TsPlayer::onResponseCompleted() { void TsPlayer::onResponseCompleted() {
//接收完毕 //接收完毕
teardown_l(SockException(Err_success, StrPrinter << _ts_url << ": play completed")); shutdown(SockException(Err_success, StrPrinter << "play " << _ts_url << " completed"));
} }
void TsPlayer::onDisconnect(const SockException &ex) { void TsPlayer::onDisconnect(const SockException &ex) {

View File

@ -40,7 +40,6 @@ public:
private: private:
void playTs(); void playTs();
void teardown_l(const SockException &ex);
protected: protected:
virtual void onResponseCompleted() override; virtual void onResponseCompleted() override;

View File

@ -83,17 +83,18 @@ public:
* @param ws_url ws连接url * @param ws_url ws连接url
* @param fTimeOutSec * @param fTimeOutSec
*/ */
void startWsClient(const string &ws_url,float fTimeOutSec){ void startWsClient(const string &ws_url, float fTimeOutSec) {
string http_url = ws_url; string http_url = ws_url;
replace(http_url,"ws://","http://"); replace(http_url, "ws://", "http://");
replace(http_url,"wss://","https://"); replace(http_url, "wss://", "https://");
setMethod("GET"); setMethod("GET");
addHeader("Upgrade","websocket"); addHeader("Upgrade", "websocket");
addHeader("Connection","Upgrade"); addHeader("Connection", "Upgrade");
addHeader("Sec-WebSocket-Version","13"); addHeader("Sec-WebSocket-Version", "13");
addHeader("Sec-WebSocket-Key",_Sec_WebSocket_Key); addHeader("Sec-WebSocket-Key", _Sec_WebSocket_Key);
_onRecv = nullptr; _onRecv = nullptr;
sendRequest(http_url,fTimeOutSec); setHeaderTimeout(fTimeOutSec * 1000);
sendRequest(http_url);
} }
void closeWsClient(){ void closeWsClient(){