From fff53cf0e209bb93bf917ba2e8f0d5e692e08b1a Mon Sep 17 00:00:00 2001 From: xia-chu <771730766@qq.com> Date: Sat, 10 Jun 2023 09:45:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96HttpSession.cpp?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpSession.cpp | 343 +++++++++++++++++++-------------------- 1 file changed, 166 insertions(+), 177 deletions(-) diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index b443dfec..5a5aa113 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -24,16 +24,16 @@ using namespace toolkit; namespace mediakit { HttpSession::HttpSession(const Socket::Ptr &pSock) : Session(pSock) { - GET_CONFIG(uint32_t,keep_alive_sec,Http::kKeepAliveSecond); + GET_CONFIG(uint32_t, keep_alive_sec, Http::kKeepAliveSecond); pSock->setSendTimeOutSecond(keep_alive_sec); } HttpSession::~HttpSession() = default; -void HttpSession::Handle_Req_HEAD(ssize_t &content_len){ - //暂时全部返回200 OK,因为HTTP GET存在按需生成流的操作,所以不能按照HTTP GET的流程返回 - //如果直接返回404,那么又会导致按需生成流的逻辑失效,所以HTTP HEAD在静态文件或者已存在资源时才有效 - //对于按需生成流的直播场景并不适用 +void HttpSession::Handle_Req_HEAD(ssize_t &content_len) { + // 暂时全部返回200 OK,因为HTTP GET存在按需生成流的操作,所以不能按照HTTP GET的流程返回 + // 如果直接返回404,那么又会导致按需生成流的逻辑失效,所以HTTP HEAD在静态文件或者已存在资源时才有效 + // 对于按需生成流的直播场景并不适用 sendResponse(200, false); } @@ -52,16 +52,16 @@ void HttpSession::Handle_Req_OPTIONS(ssize_t &content_len) { sendResponse(200, true, nullptr, header); } -ssize_t HttpSession::onRecvHeader(const char *header,size_t len) { - typedef void (HttpSession::*HttpCMDHandle)(ssize_t &); - static unordered_map s_func_map; +ssize_t HttpSession::onRecvHeader(const char *header, size_t len) { + using func_type = void (HttpSession::*)(ssize_t &); + static unordered_map s_func_map; static onceToken token([]() { - s_func_map.emplace("GET",&HttpSession::Handle_Req_GET); - s_func_map.emplace("DELETE",&HttpSession::Handle_Req_GET); - s_func_map.emplace("POST",&HttpSession::Handle_Req_POST); - s_func_map.emplace("HEAD",&HttpSession::Handle_Req_HEAD); - s_func_map.emplace("OPTIONS",&HttpSession::Handle_Req_OPTIONS); - }, nullptr); + s_func_map.emplace("GET", &HttpSession::Handle_Req_GET); + s_func_map.emplace("DELETE", &HttpSession::Handle_Req_GET); + s_func_map.emplace("POST", &HttpSession::Handle_Req_POST); + s_func_map.emplace("HEAD", &HttpSession::Handle_Req_HEAD); + s_func_map.emplace("OPTIONS", &HttpSession::Handle_Req_OPTIONS); + }); _parser.Parse(header); CHECK(_parser.Url()[0] == '/'); @@ -75,60 +75,54 @@ ssize_t HttpSession::onRecvHeader(const char *header,size_t len) { return 0; } - //跨域 + // 跨域 _origin = _parser["Origin"]; //默认后面数据不是content而是header ssize_t content_len = 0; (this->*(it->second))(content_len); - //清空解析器节省内存 + // 清空解析器节省内存 _parser.Clear(); - //返回content长度 + // 返回content长度 return content_len; } -void HttpSession::onRecvContent(const char *data,size_t len) { - if(_contentCallBack){ - if(!_contentCallBack(data,len)){ - _contentCallBack = nullptr; - } +void HttpSession::onRecvContent(const char *data, size_t len) { + if (_contentCallBack && !_contentCallBack(data, len)) { + _contentCallBack = nullptr; } } void HttpSession::onRecv(const Buffer::Ptr &pBuf) { _ticker.resetTime(); - input(pBuf->data(),pBuf->size()); + input(pBuf->data(), pBuf->size()); } -void HttpSession::onError(const SockException& err) { +void HttpSession::onError(const SockException &err) { if (_is_live_stream) { - //flv/ts播放器 + // flv/ts播放器 uint64_t duration = _ticker.createdTime() / 1000; - WarnP(this) << "FLV/TS/FMP4播放器(" - << _mediaInfo.shortUrl() - << ")断开:" << err - << ",耗时(s):" << duration; + WarnP(this) << "FLV/TS/FMP4播放器(" << _mediaInfo.shortUrl() << ")断开:" << err << ",耗时(s):" << duration; GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_total_bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _total_bytes_usage, - duration, true, static_cast(*this)); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _total_bytes_usage, duration, true, static_cast(*this)); } return; } } void HttpSession::onManager() { - GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond); + GET_CONFIG(uint32_t, keepAliveSec, Http::kKeepAliveSecond); - if(_ticker.elapsedTime() > keepAliveSec * 1000){ - //1分钟超时 - shutdown(SockException(Err_timeout,"session timeout")); + if (_ticker.elapsedTime() > keepAliveSec * 1000) { + // 1分钟超时 + shutdown(SockException(Err_timeout, "session timeout")); } } -bool HttpSession::checkWebSocket(){ +bool HttpSession::checkWebSocket() { auto Sec_WebSocket_Key = _parser["Sec-WebSocket-Key"]; if (Sec_WebSocket_Key.empty()) { return false; @@ -147,32 +141,32 @@ bool HttpSession::checkWebSocket(){ _live_over_websocket = true; sendResponse(101, false, nullptr, headerOut, nullptr, true); }; - + auto res_cb_flv = [this, headerOut]() mutable { _live_over_websocket = true; headerOut.emplace("Cache-Control", "no-store"); sendResponse(101, false, nullptr, headerOut, nullptr, true); }; - //判断是否为websocket-flv + // 判断是否为websocket-flv if (checkLiveStreamFlv(res_cb_flv)) { - //这里是websocket-flv直播请求 + // 这里是websocket-flv直播请求 return true; } - //判断是否为websocket-ts + // 判断是否为websocket-ts if (checkLiveStreamTS(res_cb)) { - //这里是websocket-ts直播请求 + // 这里是websocket-ts直播请求 return true; } - //判断是否为websocket-fmp4 + // 判断是否为websocket-fmp4 if (checkLiveStreamFMP4(res_cb)) { - //这里是websocket-fmp4直播请求 + // 这里是websocket-fmp4直播请求 return true; } - //这是普通的websocket连接 + // 这是普通的websocket连接 if (!onWebSocketConnect(_parser)) { sendResponse(501, true, nullptr, headerOut); return true; @@ -181,7 +175,7 @@ bool HttpSession::checkWebSocket(){ return true; } -bool HttpSession::checkLiveStream(const string &schema, const string &url_suffix, const function &cb){ +bool HttpSession::checkLiveStream(const string &schema, const string &url_suffix, const function &cb) { std::string url = _parser.Url(); auto it = _parser.getUrlArgs().find("schema"); if (it != _parser.getUrlArgs().end()) { @@ -192,57 +186,57 @@ bool HttpSession::checkLiveStream(const string &schema, const string &url_suffi } else { auto prefix_size = url_suffix.size(); if (url.size() < prefix_size || strcasecmp(url.data() + (url.size() - prefix_size), url_suffix.data())) { - //未找到后缀 + // 未找到后缀 return false; } // url去除特殊后缀 url.resize(url.size() - prefix_size); } - //带参数的url + // 带参数的url if (!_parser.Params().empty()) { url += "?"; url += _parser.Params(); } - //解析带上协议+参数完整的url + // 解析带上协议+参数完整的url _mediaInfo.parse(schema + "://" + _parser["Host"] + url); if (_mediaInfo.app.empty() || _mediaInfo.stream.empty()) { - //url不合法 + // url不合法 return false; } bool close_flag = !strcasecmp(_parser["Connection"].data(), "close"); weak_ptr weak_self = static_pointer_cast(shared_from_this()); - //鉴权结果回调 + // 鉴权结果回调 auto onRes = [cb, weak_self, close_flag](const string &err) { auto strong_self = weak_self.lock(); if (!strong_self) { - //本对象已经销毁 + // 本对象已经销毁 return; } if (!err.empty()) { - //播放鉴权失败 + // 播放鉴权失败 strong_self->sendResponse(401, close_flag, nullptr, KeyValue(), std::make_shared(err)); return; } - //异步查找直播流 + // 异步查找直播流 MediaSource::findAsync(strong_self->_mediaInfo, strong_self, [weak_self, close_flag, cb](const MediaSource::Ptr &src) { auto strong_self = weak_self.lock(); if (!strong_self) { - //本对象已经销毁 + // 本对象已经销毁 return; } if (!src) { - //未找到该流 + // 未找到该流 strong_self->sendNotFound(close_flag); } else { strong_self->_is_live_stream = true; - //触发回调 + // 触发回调 cb(src); } }); @@ -256,26 +250,26 @@ bool HttpSession::checkLiveStream(const string &schema, const string &url_suffi auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _mediaInfo, invoker, static_cast(*this)); if (!flag) { - //该事件无人监听,默认不鉴权 + // 该事件无人监听,默认不鉴权 onRes(""); } return true; } -//http-fmp4 链接格式:http://vhost-url:port/app/streamid.live.mp4?key1=value1&key2=value2 -bool HttpSession::checkLiveStreamFMP4(const function &cb){ +// http-fmp4 链接格式:http://vhost-url:port/app/streamid.live.mp4?key1=value1&key2=value2 +bool HttpSession::checkLiveStreamFMP4(const function &cb) { return checkLiveStream(FMP4_SCHEMA, ".live.mp4", [this, cb](const MediaSource::Ptr &src) { auto fmp4_src = dynamic_pointer_cast(src); assert(fmp4_src); if (!cb) { - //找到源,发送http头,负载后续发送 + // 找到源,发送http头,负载后续发送 sendResponse(200, false, HttpFileManager::getContentType(".mp4").data(), KeyValue(), nullptr, true); } else { - //自定义发送http头 + // 自定义发送http头 cb(); } - //直播牺牲延时提升发送性能 + // 直播牺牲延时提升发送性能 setSocketFlags(); onWrite(std::make_shared(fmp4_src->getInitSegment()), true); weak_ptr weak_self = static_pointer_cast(shared_from_this()); @@ -285,7 +279,7 @@ bool HttpSession::checkLiveStreamFMP4(const function &cb){ _fmp4_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { - //本对象已经销毁 + // 本对象已经销毁 return; } strong_self->shutdown(SockException(Err_shutdown, "fmp4 ring buffer detached")); @@ -293,85 +287,80 @@ bool HttpSession::checkLiveStreamFMP4(const function &cb){ _fmp4_reader->setReadCB([weak_self](const FMP4MediaSource::RingDataType &fmp4_list) { auto strong_self = weak_self.lock(); if (!strong_self) { - //本对象已经销毁 + // 本对象已经销毁 return; } size_t i = 0; auto size = fmp4_list->size(); - fmp4_list->for_each([&](const FMP4Packet::Ptr &ts) { - strong_self->onWrite(ts, ++i == size); - }); + fmp4_list->for_each([&](const FMP4Packet::Ptr &ts) { strong_self->onWrite(ts, ++i == size); }); }); }); } -//http-ts 链接格式:http://vhost-url:port/app/streamid.live.ts?key1=value1&key2=value2 -bool HttpSession::checkLiveStreamTS(const function &cb){ +// http-ts 链接格式:http://vhost-url:port/app/streamid.live.ts?key1=value1&key2=value2 +bool HttpSession::checkLiveStreamTS(const function &cb) { return checkLiveStream(TS_SCHEMA, ".live.ts", [this, cb](const MediaSource::Ptr &src) { auto ts_src = dynamic_pointer_cast(src); assert(ts_src); if (!cb) { - //找到源,发送http头,负载后续发送 + // 找到源,发送http头,负载后续发送 sendResponse(200, false, HttpFileManager::getContentType(".ts").data(), KeyValue(), nullptr, true); } else { - //自定义发送http头 + // 自定义发送http头 cb(); } - //直播牺牲延时提升发送性能 + // 直播牺牲延时提升发送性能 setSocketFlags(); weak_ptr weak_self = static_pointer_cast(shared_from_this()); ts_src->pause(false); _ts_reader = ts_src->getRing()->attach(getPoller()); _ts_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); - _ts_reader->setDetachCB([weak_self](){ + _ts_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { - //本对象已经销毁 + // 本对象已经销毁 return; } - strong_self->shutdown(SockException(Err_shutdown,"ts ring buffer detached")); + strong_self->shutdown(SockException(Err_shutdown, "ts ring buffer detached")); }); _ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { auto strong_self = weak_self.lock(); if (!strong_self) { - //本对象已经销毁 + // 本对象已经销毁 return; } size_t i = 0; auto size = ts_list->size(); - ts_list->for_each([&](const TSPacket::Ptr &ts) { - strong_self->onWrite(ts, ++i == size); - }); + ts_list->for_each([&](const TSPacket::Ptr &ts) { strong_self->onWrite(ts, ++i == size); }); }); }); } -//http-flv 链接格式:http://vhost-url:port/app/streamid.live.flv?key1=value1&key2=value2 -bool HttpSession::checkLiveStreamFlv(const function &cb){ +// http-flv 链接格式:http://vhost-url:port/app/streamid.live.flv?key1=value1&key2=value2 +bool HttpSession::checkLiveStreamFlv(const function &cb) { auto start_pts = atoll(_parser.getUrlArgs()["starPts"].data()); return checkLiveStream(RTMP_SCHEMA, ".live.flv", [this, cb, start_pts](const MediaSource::Ptr &src) { auto rtmp_src = dynamic_pointer_cast(src); assert(rtmp_src); if (!cb) { - //找到源,发送http头,负载后续发送 + // 找到源,发送http头,负载后续发送 KeyValue headerOut; headerOut["Cache-Control"] = "no-store"; sendResponse(200, false, HttpFileManager::getContentType(".flv").data(), headerOut, nullptr, true); } else { - //自定义发送http头 + // 自定义发送http头 cb(); } - //直播牺牲延时提升发送性能 + // 直播牺牲延时提升发送性能 setSocketFlags(); - //非H264/AAC时打印警告日志,防止用户提无效问题 + // 非H264/AAC时打印警告日志,防止用户提无效问题 auto tracks = src->getTracks(false); for (auto &track : tracks) { switch (track->getCodecId()) { case CodecH264: - case CodecAAC: - break; + case CodecAAC: break; default: { WarnP(this) << "flv播放器一般只支持H264和AAC编码,该编码格式可能不被播放器支持:" << track->getCodecName(); break; @@ -388,11 +377,11 @@ void HttpSession::Handle_Req_GET(ssize_t &content_len) { } void HttpSession::Handle_Req_GET_l(ssize_t &content_len, bool sendBody) { - //先看看是否为WebSocket请求 + // 先看看是否为WebSocket请求 if (checkWebSocket()) { content_len = -1; _contentCallBack = [this](const char *data, size_t len) { - WebSocketSplitter::decode((uint8_t *) data, len); + WebSocketSplitter::decode((uint8_t *)data, len); //_contentCallBack是可持续的,后面还要处理后续数据 return true; }; @@ -400,29 +389,29 @@ void HttpSession::Handle_Req_GET_l(ssize_t &content_len, bool sendBody) { } if (emitHttpEvent(false)) { - //拦截http api事件 + // 拦截http api事件 return; } if (checkLiveStreamFlv()) { - //拦截http-flv播放器 + // 拦截http-flv播放器 return; } if (checkLiveStreamTS()) { - //拦截http-ts播放器 + // 拦截http-ts播放器 return; } if (checkLiveStreamFMP4()) { - //拦截http-fmp4播放器 + // 拦截http-fmp4播放器 return; } - bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); + bool bClose = !strcasecmp(_parser["Connection"].data(), "close"); weak_ptr weak_self = static_pointer_cast(shared_from_this()); HttpFileManager::onAccessPath(*this, _parser, [weak_self, bClose](int code, const string &content_type, - const StrCaseMap &responseHeader, const HttpBody::Ptr &body) { + const StrCaseMap &responseHeader, const HttpBody::Ptr &body) { auto strong_self = weak_self.lock(); if (!strong_self) { return; @@ -468,7 +457,7 @@ public: static bool onSocketFlushed(const AsyncSenderData::Ptr &data) { if (data->_read_complete) { if (data->_close_when_complete) { - //发送完毕需要关闭socket + // 发送完毕需要关闭socket shutdown(data->_session.lock()); } return false; @@ -478,13 +467,13 @@ public: data->_body->readDataAsync(sendBufSize, [data](const Buffer::Ptr &sendBuf) { auto session = data->_session.lock(); if (!session) { - //本对象已经销毁 + // 本对象已经销毁 return; } session->async([data, sendBuf]() { auto session = data->_session.lock(); if (!session) { - //本对象已经销毁 + // 本对象已经销毁 return; } onRequestData(data, session, sendBuf); @@ -497,14 +486,14 @@ private: static void onRequestData(const AsyncSenderData::Ptr &data, const std::shared_ptr &session, const Buffer::Ptr &sendBuf) { session->_ticker.resetTime(); if (sendBuf && session->send(sendBuf) != -1) { - //文件还未读完,还需要继续发送 + // 文件还未读完,还需要继续发送 if (!session->isSocketBusy()) { - //socket还可写,继续请求数据 + // socket还可写,继续请求数据 onSocketFlushed(data); } return; } - //文件写完了 + // 文件写完了 data->_read_complete = true; if (!session->isSocketBusy() && data->_close_when_complete) { shutdown(session); @@ -512,7 +501,7 @@ private: } static void shutdown(const std::shared_ptr &session) { - if(session){ + if (session) { session->shutdown(SockException(Err_shutdown, StrPrinter << "close connection after send http body completed.")); } } @@ -532,14 +521,14 @@ void HttpSession::sendResponse(int code, const char *pcContentType, const HttpSession::KeyValue &header, const HttpBody::Ptr &body, - bool no_content_length ){ - GET_CONFIG(string,charSet,Http::kCharSet); - GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond); + bool no_content_length) { + GET_CONFIG(string, charSet, Http::kCharSet); + GET_CONFIG(uint32_t, keepAliveSec, Http::kKeepAliveSecond); - //body默认为空 + // body默认为空 int64_t size = 0; if (body && body->remainSize()) { - //有body,获取body大小 + // 有body,获取body大小 size = body->remainSize(); } @@ -547,7 +536,7 @@ void HttpSession::sendResponse(int code, // http-flv直播是Keep-Alive类型 bClose = false; } else if ((size_t)size >= SIZE_MAX || size < 0) { - //不固定长度的body,那么发送完body后应该关闭socket,以便浏览器做下载完毕的判断 + // 不固定长度的body,那么发送完body后应该关闭socket,以便浏览器做下载完毕的判断 bClose = true; } @@ -563,30 +552,30 @@ void HttpSession::sendResponse(int code, } if (!_origin.empty()) { - //设置跨域 + // 设置跨域 headerOut.emplace(kAccessControlAllowOrigin, _origin); headerOut.emplace(kAccessControlAllowCredentials, "true"); } if (!no_content_length && size >= 0 && (size_t)size < SIZE_MAX) { - //文件长度为固定值,且不是http-flv强制设置Content-Length + // 文件长度为固定值,且不是http-flv强制设置Content-Length headerOut[kContentLength] = to_string(size); } if (size && !pcContentType) { - //有body时,设置缺省类型 + // 有body时,设置缺省类型 pcContentType = "text/plain"; } if ((size || no_content_length) && pcContentType) { - //有body时,设置文件类型 + // 有body时,设置文件类型 string strContentType = pcContentType; strContentType += "; charset="; strContentType += charSet; headerOut.emplace(kContentType, std::move(strContentType)); } - //发送http头 + // 发送http头 string str; str.reserve(256); str += "HTTP/1.1 "; @@ -605,9 +594,9 @@ void HttpSession::sendResponse(int code, _ticker.resetTime(); if (!size) { - //没有body + // 没有body if (bClose) { - shutdown(SockException(Err_shutdown,StrPrinter << "close connection after send http header completed with status code:" << code)); + shutdown(SockException(Err_shutdown, StrPrinter << "close connection after send http header completed with status code:" << code)); } return; } @@ -622,20 +611,20 @@ void HttpSession::sendResponse(int code, GET_CONFIG(uint32_t, sendBufSize, Http::kSendBufSize); if (body->remainSize() > sendBufSize) { - //文件下载提升发送性能 + // 文件下载提升发送性能 setSocketFlags(); } - //发送http body + // 发送http body AsyncSenderData::Ptr data = std::make_shared(static_pointer_cast(shared_from_this()), body, bClose); getSock()->setOnFlush([data]() { return AsyncSender::onSocketFlushed(data); }); AsyncSender::onSocketFlushed(data); } -string HttpSession::urlDecode(const string &str){ +string HttpSession::urlDecode(const string &str) { auto ret = strCoding::UrlDecode(str); #ifdef _WIN32 - GET_CONFIG(string,charSet,Http::kCharSet); + GET_CONFIG(string, charSet, Http::kCharSet); bool isGb2312 = !strcasecmp(charSet.data(), "gb2312"); if (isGb2312) { ret = strCoding::UTF8ToGB2312(ret); @@ -644,117 +633,117 @@ string HttpSession::urlDecode(const string &str){ return ret; } -void HttpSession::urlDecode(Parser &parser){ +void HttpSession::urlDecode(Parser &parser) { parser.setUrl(urlDecode(parser.Url())); - for(auto &pr : _parser.getUrlArgs()){ + for (auto &pr : _parser.getUrlArgs()) { const_cast(pr.second) = urlDecode(pr.second); } } -bool HttpSession::emitHttpEvent(bool doInvoke){ - bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); +bool HttpSession::emitHttpEvent(bool doInvoke) { + bool bClose = !strcasecmp(_parser["Connection"].data(), "close"); /////////////////////异步回复Invoker/////////////////////////////// weak_ptr weak_self = static_pointer_cast(shared_from_this()); - HttpResponseInvoker invoker = [weak_self,bClose](int code, const KeyValue &headerOut, const HttpBody::Ptr &body){ + HttpResponseInvoker invoker = [weak_self, bClose](int code, const KeyValue &headerOut, const HttpBody::Ptr &body) { auto strong_self = weak_self.lock(); - if(!strong_self) { + if (!strong_self) { return; } strong_self->async([weak_self, bClose, code, headerOut, body]() { auto strong_self = weak_self.lock(); if (!strong_self) { - //本对象已经销毁 + // 本对象已经销毁 return; } strong_self->sendResponse(code, bClose, nullptr, headerOut, body); }); }; ///////////////////广播HTTP事件/////////////////////////// - bool consumed = false;//该事件是否被消费 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,static_cast(*this)); - if(!consumed && doInvoke){ - //该事件无人消费,所以返回404 - invoker(404,KeyValue(), HttpBody::Ptr()); + bool consumed = false; // 该事件是否被消费 + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest, _parser, invoker, consumed, static_cast(*this)); + if (!consumed && doInvoke) { + // 该事件无人消费,所以返回404 + invoker(404, KeyValue(), HttpBody::Ptr()); } return consumed; } std::string HttpSession::get_peer_ip() { GET_CONFIG(string, forwarded_ip_header, Http::kForwardedIpHeader); - if(!forwarded_ip_header.empty() && !_parser.getHeader()[forwarded_ip_header].empty()){ + if (!forwarded_ip_header.empty() && !_parser.getHeader()[forwarded_ip_header].empty()) { return _parser.getHeader()[forwarded_ip_header]; } return Session::get_peer_ip(); } void HttpSession::Handle_Req_POST(ssize_t &content_len) { - GET_CONFIG(size_t,maxReqSize,Http::kMaxReqSize); + GET_CONFIG(size_t, maxReqSize, Http::kMaxReqSize); ssize_t totalContentLen = _parser["Content-Length"].empty() ? -1 : atoll(_parser["Content-Length"].data()); - if(totalContentLen == 0){ - //content为空 - //emitHttpEvent内部会选择是否关闭连接 + if (totalContentLen == 0) { + // content为空 + // emitHttpEvent内部会选择是否关闭连接 emitHttpEvent(true); return; } - if(totalContentLen > 0 && (size_t)totalContentLen < maxReqSize ){ - //返回固定长度的content + if (totalContentLen > 0 && (size_t)totalContentLen < maxReqSize) { + // 返回固定长度的content content_len = totalContentLen; auto parserCopy = _parser; - _contentCallBack = [this,parserCopy](const char *data,size_t len){ - //恢复http头 + _contentCallBack = [this, parserCopy](const char *data, size_t len) { + // 恢复http头 _parser = parserCopy; - //设置content - _parser.setContent(string(data,len)); - //触发http事件,emitHttpEvent内部会选择是否关闭连接 + // 设置content + _parser.setContent(string(data, len)); + // 触发http事件,emitHttpEvent内部会选择是否关闭连接 emitHttpEvent(true); - //清空数据,节省内存 + // 清空数据,节省内存 _parser.Clear(); - //content已经接收完毕 + // content已经接收完毕 return false; }; - }else{ - //返回不固定长度的content或者超过长度限制的content + } else { + // 返回不固定长度的content或者超过长度限制的content content_len = -1; auto parserCopy = _parser; std::shared_ptr recvedContentLen = std::make_shared(0); - bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); + bool bClose = !strcasecmp(_parser["Connection"].data(), "close"); - _contentCallBack = [this,parserCopy,totalContentLen,recvedContentLen,bClose](const char *data,size_t len){ + _contentCallBack = [this, parserCopy, totalContentLen, recvedContentLen, bClose](const char *data, size_t len) { *(recvedContentLen) += len; if (totalContentLen < 0) { - //不固定长度的content,源源不断接收数据 + // 不固定长度的content,源源不断接收数据 onRecvUnlimitedContent(parserCopy, data, len, SIZE_MAX, *(recvedContentLen)); return true; } - //长度超过限制的content - onRecvUnlimitedContent(parserCopy,data,len,totalContentLen,*(recvedContentLen)); + // 长度超过限制的content + onRecvUnlimitedContent(parserCopy, data, len, totalContentLen, *(recvedContentLen)); - if(*(recvedContentLen) < (size_t)totalContentLen){ - //数据还没接收完毕 + if (*(recvedContentLen) < (size_t)totalContentLen) { + // 数据还没接收完毕 //_contentCallBack是可持续的,后面还要处理后续content数据 return true; } - //数据接收完毕 - if(!bClose){ - //keep-alive类型连接 - //content接收完毕,后续都是http header + // 数据接收完毕 + if (!bClose) { + // keep-alive类型连接 + // content接收完毕,后续都是http header setContentLen(0); - //content已经接收完毕 + // content已经接收完毕 return false; } - //连接类型是close类型,收完content就关闭连接 - shutdown(SockException(Err_shutdown,"recv http content completed")); - //content已经接收完毕 - return false ; + // 连接类型是close类型,收完content就关闭连接 + shutdown(SockException(Err_shutdown, "recv http content completed")); + // content已经接收完毕 + return false; }; } - //有后续content数据要处理,暂时不关闭连接 + // 有后续content数据要处理,暂时不关闭连接 } void HttpSession::sendNotFound(bool bClose) { @@ -762,19 +751,19 @@ void HttpSession::sendNotFound(bool bClose) { sendResponse(404, bClose, "text/html", KeyValue(), std::make_shared(notFound)); } -void HttpSession::setSocketFlags(){ +void HttpSession::setSocketFlags() { GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); - if(mergeWriteMS > 0) { - //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 + if (mergeWriteMS > 0) { + // 推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 SockUtil::setNoDelay(getSock()->rawFD(), false); - //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 + // 播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } } void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) { - if(flush){ - //需要flush那么一次刷新缓存 + if (flush) { + // 需要flush那么一次刷新缓存 HttpSession::setSendFlushFlag(true); } @@ -792,18 +781,18 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) { } if (flush) { - //本次刷新缓存后,下次不用刷新缓存 + // 本次刷新缓存后,下次不用刷新缓存 HttpSession::setSendFlushFlag(false); } } -void HttpSession::onWebSocketEncodeData(Buffer::Ptr buffer){ +void HttpSession::onWebSocketEncodeData(Buffer::Ptr buffer) { _total_bytes_usage += buffer->size(); send(std::move(buffer)); } -void HttpSession::onWebSocketDecodeComplete(const WebSocketHeader &header_in){ - WebSocketHeader& header = const_cast(header_in); +void HttpSession::onWebSocketDecodeComplete(const WebSocketHeader &header_in) { + WebSocketHeader &header = const_cast(header_in); header._mask_flag = false; switch (header._opcode) { @@ -813,15 +802,15 @@ void HttpSession::onWebSocketDecodeComplete(const WebSocketHeader &header_in){ break; } - default : break; + default: break; } } void HttpSession::onDetach() { - shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); + shutdown(SockException(Err_shutdown, "rtmp ring buffer detached")); } -std::shared_ptr HttpSession::getSharedPtr(){ +std::shared_ptr HttpSession::getSharedPtr() { return dynamic_pointer_cast(shared_from_this()); }