diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index b6e5af3e..e4744a0a 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit b6e5af3e7ad95d87f4e45a89e36d9a8ab45209a9 +Subproject commit e4744a0a523817356f2ec995ee5a732264c31629 diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index fb7aed7f..f1cac788 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -30,14 +30,14 @@ HttpSession::HttpSession(const Socket::Ptr &pSock) : Session(pSock) { HttpSession::~HttpSession() = default; -void HttpSession::Handle_Req_HEAD(ssize_t &content_len) { +void HttpSession::onHttpRequest_HEAD() { // 暂时全部返回200 OK,因为HTTP GET存在按需生成流的操作,所以不能按照HTTP GET的流程返回 // 如果直接返回404,那么又会导致按需生成流的逻辑失效,所以HTTP HEAD在静态文件或者已存在资源时才有效 // 对于按需生成流的直播场景并不适用 sendResponse(200, false); } -void HttpSession::Handle_Req_OPTIONS(ssize_t &content_len) { +void HttpSession::onHttpRequest_OPTIONS() { KeyValue header; header.emplace("Allow", "GET, POST, HEAD, OPTIONS"); GET_CONFIG(bool, allow_cross_domains, Http::kAllowCrossDomains); @@ -53,21 +53,22 @@ void HttpSession::Handle_Req_OPTIONS(ssize_t &content_len) { } ssize_t HttpSession::onRecvHeader(const char *header, size_t len) { - using func_type = void (HttpSession::*)(ssize_t &); + using func_type = void (HttpSession::*)(); static unordered_map s_func_map; static onceToken token([]() { - s_func_map.emplace("GET", &HttpSession::Handle_Req_GET); - s_func_map.emplace("DELETE", &HttpSession::Handle_Req_GET); - s_func_map.emplace("POST", &HttpSession::Handle_Req_POST); - s_func_map.emplace("HEAD", &HttpSession::Handle_Req_HEAD); - s_func_map.emplace("OPTIONS", &HttpSession::Handle_Req_OPTIONS); + s_func_map.emplace("GET", &HttpSession::onHttpRequest_GET); + s_func_map.emplace("POST", &HttpSession::onHttpRequest_POST); + // DELETE命令用于whip/whep用,只用于触发http api + s_func_map.emplace("DELETE", &HttpSession::onHttpRequest_POST); + s_func_map.emplace("HEAD", &HttpSession::onHttpRequest_HEAD); + s_func_map.emplace("OPTIONS", &HttpSession::onHttpRequest_OPTIONS); }); _parser.parse(header, len); CHECK(_parser.url()[0] == '/'); urlDecode(_parser); - string cmd = _parser.method(); + auto &cmd = _parser.method(); auto it = s_func_map.find(cmd); if (it == s_func_map.end()) { WarnP(this) << "Http method not supported: " << cmd; @@ -75,19 +76,84 @@ ssize_t HttpSession::onRecvHeader(const char *header, size_t len) { return 0; } - //默认后面数据不是content而是header - ssize_t content_len = 0; - (this->*(it->second))(content_len); + size_t content_len; + auto &content_len_str = _parser["Content-Length"]; + if (content_len_str.empty()) { + if (it->first == "POST") { + // Http post未指定长度,我们认为是不定长的body + WarnL << "Received http post request without content-length, consider it to be unlimited length"; + content_len = SIZE_MAX; + } else { + content_len = 0; + } + } else { + // 已经指定长度 + content_len = atoll(content_len_str.data()); + } - // 清空解析器节省内存 - _parser.clear(); - // 返回content长度 - return content_len; + if (content_len == 0) { + //// 没有body的情况,直接触发回调 //// + (this->*(it->second))(); + _parser.clear(); + // 如果设置了_on_recv_body, 那么说明后续要处理body + return _on_recv_body ? -1 : 0; + } + + GET_CONFIG(size_t, maxReqSize, Http::kMaxReqSize); + if (content_len > maxReqSize) { + //// 不定长body或超大body //// + if (content_len != SIZE_MAX) { + WarnL << "Http body size is too huge: " << content_len << " > " << maxReqSize + << ", please set " << Http::kMaxReqSize << " in config.ini file."; + } + + size_t received = 0; + auto parser = std::move(_parser); + _on_recv_body = [this, parser, received, content_len](const char *data, size_t len) mutable { + received += len; + onRecvUnlimitedContent(parser, data, len, content_len, received); + if (received != content_len) { + // 还没收满 + return true; + } + + // 收满了 + setContentLen(0); + return false; + }; + // 声明后续都是body;Http body在本对象缓冲,不通过HttpRequestSplitter保存 + return -1; + } + + //// body size明确指定且小于最大值的情况 //// + auto body = std::make_shared(); + // 预留一定的内存buffer,防止频繁的内存拷贝 + body->reserve(content_len); + + _on_recv_body = [this, body, content_len, it](const char *data, size_t len) mutable { + body->append(data, len); + if (body->size() < content_len) { + // 未收满数据 + return true; + } + + // 收集body完毕 + _parser.setContent(std::move(*body)); + (this->*(it->second))(); + _parser.clear(); + + // 后续是header + setContentLen(0); + return false; + }; + + // 声明后续都是body;Http body在本对象缓冲,不通过HttpRequestSplitter保存 + return -1; } void HttpSession::onRecvContent(const char *data, size_t len) { - if (_contentCallBack && !_contentCallBack(data, len)) { - _contentCallBack = nullptr; + if (_on_recv_body && !_on_recv_body(data, len)) { + _on_recv_body = nullptr; } } @@ -369,17 +435,13 @@ bool HttpSession::checkLiveStreamFlv(const function &cb) { }); } -void HttpSession::Handle_Req_GET(ssize_t &content_len) { - Handle_Req_GET_l(content_len, true); -} - -void HttpSession::Handle_Req_GET_l(ssize_t &content_len, bool sendBody) { +void HttpSession::onHttpRequest_GET() { // 先看看是否为WebSocket请求 if (checkWebSocket()) { - content_len = -1; - _contentCallBack = [this](const char *data, size_t len) { + // 后续都是websocket body数据 + _on_recv_body = [this](const char *data, size_t len) { WebSocketSplitter::decode((uint8_t *)data, len); - //_contentCallBack是可持续的,后面还要处理后续数据 + // _contentCallBack是可持续的,后面还要处理后续数据 return true; }; return; @@ -658,74 +720,8 @@ std::string HttpSession::get_peer_ip() { return Session::get_peer_ip(); } -void HttpSession::Handle_Req_POST(ssize_t &content_len) { - GET_CONFIG(size_t, maxReqSize, Http::kMaxReqSize); - - ssize_t totalContentLen = _parser["Content-Length"].empty() ? -1 : atoll(_parser["Content-Length"].data()); - - if (totalContentLen == 0) { - // content为空 - // emitHttpEvent内部会选择是否关闭连接 - emitHttpEvent(true); - return; - } - - if (totalContentLen > 0 && (size_t)totalContentLen < maxReqSize) { - // 返回固定长度的content - content_len = totalContentLen; - auto parserCopy = _parser; - _contentCallBack = [this, parserCopy](const char *data, size_t len) { - // 恢复http头 - _parser = parserCopy; - // 设置content - _parser.setContent(string(data, len)); - // 触发http事件,emitHttpEvent内部会选择是否关闭连接 - emitHttpEvent(true); - // 清空数据,节省内存 - _parser.clear(); - // content已经接收完毕 - return false; - }; - } else { - // 返回不固定长度的content或者超过长度限制的content - content_len = -1; - auto parserCopy = _parser; - std::shared_ptr recvedContentLen = std::make_shared(0); - bool bClose = !strcasecmp(_parser["Connection"].data(), "close"); - - _contentCallBack = [this, parserCopy, totalContentLen, recvedContentLen, bClose](const char *data, size_t len) { - *(recvedContentLen) += len; - if (totalContentLen < 0) { - // 不固定长度的content,源源不断接收数据 - onRecvUnlimitedContent(parserCopy, data, len, SIZE_MAX, *(recvedContentLen)); - return true; - } - - // 长度超过限制的content - onRecvUnlimitedContent(parserCopy, data, len, totalContentLen, *(recvedContentLen)); - - if (*(recvedContentLen) < (size_t)totalContentLen) { - // 数据还没接收完毕 - //_contentCallBack是可持续的,后面还要处理后续content数据 - return true; - } - - // 数据接收完毕 - if (!bClose) { - // keep-alive类型连接 - // content接收完毕,后续都是http header - setContentLen(0); - // content已经接收完毕 - return false; - } - - // 连接类型是close类型,收完content就关闭连接 - shutdown(SockException(Err_shutdown, "recv http content completed")); - // content已经接收完毕 - return false; - }; - } - // 有后续content数据要处理,暂时不关闭连接 +void HttpSession::onHttpRequest_POST() { + emitHttpEvent(true); } void HttpSession::sendNotFound(bool bClose) { diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 59255fb8..513f8b63 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -101,11 +101,10 @@ protected: std::string get_peer_ip() override; private: - void Handle_Req_GET(ssize_t &content_len); - void Handle_Req_GET_l(ssize_t &content_len, bool sendBody); - void Handle_Req_POST(ssize_t &content_len); - void Handle_Req_HEAD(ssize_t &content_len); - void Handle_Req_OPTIONS(ssize_t &content_len); + void onHttpRequest_GET(); + void onHttpRequest_POST(); + void onHttpRequest_HEAD(); + void onHttpRequest_OPTIONS(); bool checkLiveStream(const std::string &schema, const std::string &url_suffix, const std::function &cb); @@ -137,7 +136,7 @@ private: TSMediaSource::RingType::RingReader::Ptr _ts_reader; FMP4MediaSource::RingType::RingReader::Ptr _fmp4_reader; //处理content数据的callback - std::function _contentCallBack; + std::function _on_recv_body; }; using HttpsSession = toolkit::SessionWithSSL;