From 67644a7badf8ec7dcfd1fe26c280b227a8adebdb Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sun, 23 Sep 2018 00:55:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96http=E5=88=86=E5=8C=85?= =?UTF-8?q?=E5=99=A8=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpRequestSplitter.cpp | 58 +++++++++++++++++++++----------- src/Http/HttpRequestSplitter.h | 14 +++++--- src/Http/HttpSession.cpp | 26 +++++++------- src/Http/HttpSession.h | 42 +++++++++-------------- src/Http/WebSocketSplitter.cpp | 15 +++++---- src/Rtsp/RtspSession.cpp | 19 +++++------ src/Rtsp/RtspSession.h | 21 ++---------- 7 files changed, 97 insertions(+), 98 deletions(-) diff --git a/src/Http/HttpRequestSplitter.cpp b/src/Http/HttpRequestSplitter.cpp index 461234d8..f705f499 100644 --- a/src/Http/HttpRequestSplitter.cpp +++ b/src/Http/HttpRequestSplitter.cpp @@ -7,48 +7,68 @@ #include "Util/util.h" using namespace ZL::Util; -void HttpRequestSplitter::input(const string &data) { - if(_remain_data.empty()){ - _remain_data = data; - }else{ - _remain_data.append(data); +void HttpRequestSplitter::input(const char *data,uint64_t len) { + const char *ptr = data; + if(!_remain_data.empty()){ + _remain_data.append(data,len); + data = ptr = _remain_data.data(); + len = _remain_data.size(); } splitPacket: //数据按照请求头处理 - size_t index; - while (_content_len == 0 && (index = _remain_data.find("\r\n\r\n")) != std::string::npos ) { + char *index = nullptr; + while (_content_len == 0 && (index = strstr(ptr,"\r\n\r\n")) != nullptr) { //_content_len == 0,这是请求头 - _content_len = onRecvHeader(_remain_data.substr(0, index + 4)); - _remain_data.erase(0, index + 4); + _content_len = onRecvHeader(ptr, index - ptr + 4); + ptr = index + 4; } - if(_remain_data.empty()){ + uint64_t remain = len - (ptr - data); + if(remain <= 0){ + //没有剩余数据,清空缓存 + _remain_data.clear(); return; } + if(_content_len == 0){ + //尚未找到http头,缓存定位到剩余数据部分 + _remain_data.assign(ptr,remain); + return; + } + + //已经找到http头了 if(_content_len > 0){ //数据按照固定长度content处理 - if(_remain_data.size() < _content_len){ - //数据不够 + if(remain < _content_len){ + //数据不够,缓存定位到剩余数据部分 + _remain_data.assign(ptr,remain); return; } //收到content数据,并且接受content完毕 - onRecvContent(_remain_data.substr(0,_content_len)); - _remain_data.erase(0,_content_len); + onRecvContent(ptr,_content_len); + + remain -= _content_len; + ptr += _content_len; //content处理完毕,后面数据当做请求头处理 _content_len = 0; - if(!_remain_data.empty()){ + if(remain > 0){ //还有数据没有处理完毕 + _remain_data.assign(ptr,remain); + + data = ptr = (char *)_remain_data.data(); + len = _remain_data.size(); goto splitPacket; } - }else{ - //数据按照不固定长度content处理 - onRecvContent(_remain_data); - _remain_data.clear(); + return; } + + + //_content_len < 0;数据按照不固定长度content处理 + onRecvContent(ptr,remain);//消费掉所有剩余数据 + _remain_data.clear(); } void HttpRequestSplitter::setContentLen(int64_t content_len) { diff --git a/src/Http/HttpRequestSplitter.h b/src/Http/HttpRequestSplitter.h index b4413e94..81a382eb 100644 --- a/src/Http/HttpRequestSplitter.h +++ b/src/Http/HttpRequestSplitter.h @@ -16,25 +16,29 @@ public: /** * 添加数据 * @param data 需要添加的数据 + * @param len 数据长度 */ - void input(const string &data); + void input(const char *data,uint64_t len); protected: /** * 收到请求头 - * @param header 请求头 + * @param data 请求头数据 + * @param len 请求头长度 + * * @return 请求头后的content长度, * <0 : 代表后面所有数据都是content * 0 : 代表为后面数据还是请求头, * >0 : 代表后面数据为固定长度content, */ - virtual int64_t onRecvHeader(const string &header) = 0; + virtual int64_t onRecvHeader(const char *data,uint64_t len) = 0; /** * 收到content分片或全部数据 * onRecvHeader函数返回>0,则为全部数据 - * @param content + * @param data content分片或全部数据 + * @param len 数据长度 */ - virtual void onRecvContent(const string &content) = 0; + virtual void onRecvContent(const char *data,uint64_t len) {}; /** * 设置content len diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index b3b9c66f..0e3d961d 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -117,7 +117,7 @@ HttpSession::~HttpSession() { //DebugL; } -int64_t HttpSession::onRecvHeader(const string &header) { +int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { typedef bool (HttpSession::*HttpCMDHandle)(int64_t &); static unordered_map g_mapCmdIndex; static onceToken token([]() { @@ -125,7 +125,7 @@ int64_t HttpSession::onRecvHeader(const string &header) { g_mapCmdIndex.emplace("POST",&HttpSession::Handle_Req_POST); }, nullptr); - m_parser.Parse(header.data()); + m_parser.Parse(header); urlDecode(m_parser); string cmd = m_parser.Method(); auto it = g_mapCmdIndex.find(cmd); @@ -148,9 +148,9 @@ int64_t HttpSession::onRecvHeader(const string &header) { return content_len; } -void HttpSession::onRecvContent(const string &content) { +void HttpSession::onRecvContent(const char *data,uint64_t len) { if(m_contentCallBack){ - if(!m_contentCallBack(content)){ + if(!m_contentCallBack(data,len)){ m_contentCallBack = nullptr; } } @@ -161,7 +161,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { } void HttpSession::onRecv(const char *data,int size){ m_ticker.resetTime(); - input(string(data,size)); + input(data,size); } void HttpSession::onError(const SockException& err) { @@ -271,8 +271,8 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) { if(checkWebSocket()){ content_len = -1; auto parserCopy = m_parser; - m_contentCallBack = [this,parserCopy](const string &data){ - onRecvWebSocketData(parserCopy,data); + m_contentCallBack = [this,parserCopy](const char *data,uint64_t len){ + onRecvWebSocketData(parserCopy,data,len); //m_contentCallBack是可持续的,后面还要处理后续数据 return true; }; @@ -638,11 +638,11 @@ inline bool HttpSession::Handle_Req_POST(int64_t &content_len) { //返回固定长度的content content_len = totalContentLen; auto parserCopy = m_parser; - m_contentCallBack = [this,parserCopy](const string &content){ + m_contentCallBack = [this,parserCopy](const char *data,uint64_t len){ //恢复http头 m_parser = parserCopy; //设置content - m_parser.setContent(content); + m_parser.setContent(string(data,len)); //触发http事件,emitHttpEvent内部会选择是否关闭连接 emitHttpEvent(true); //清空数据,节省内存 @@ -654,13 +654,13 @@ inline bool HttpSession::Handle_Req_POST(int64_t &content_len) { //返回不固定长度的content content_len = -1; auto parserCopy = m_parser; - std::shared_ptr recvedContentLen = std::make_shared(0); + std::shared_ptr recvedContentLen = std::make_shared(0); bool bClose = (strcasecmp(m_parser["Connection"].data(),"close") == 0) || ( ++m_iReqCnt > maxReqCnt); - m_contentCallBack = [this,parserCopy,totalContentLen,recvedContentLen,bClose](const string &content){ - *(recvedContentLen) += content.size(); + m_contentCallBack = [this,parserCopy,totalContentLen,recvedContentLen,bClose](const char *data,uint64_t len){ + *(recvedContentLen) += len; - onRecvUnlimitedContent(parserCopy,content,totalContentLen,*(recvedContentLen)); + onRecvUnlimitedContent(parserCopy,data,len,totalContentLen,*(recvedContentLen)); if(*(recvedContentLen) < totalContentLen){ //数据还没接收完毕 diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 0ce29da9..806e06f3 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -70,44 +70,34 @@ protected: std::shared_ptr getSharedPtr() override; //HttpRequestSplitter override - /** - * 收到请求头 - * @param header 请求头 - * @return 请求头后的content长度, - * <0 : 代表后面所有数据都是content - * 0 : 代表为后面数据还是请求头, - * >0 : 代表后面数据为固定长度content, - */ - int64_t onRecvHeader(const string &header) override; - - /** - * 收到content分片或全部数据 - * onRecvHeader函数返回>0,则为全部数据 - * @param content - */ - void onRecvContent(const string &content) override; + int64_t onRecvHeader(const char *data,uint64_t len) override; + void onRecvContent(const char *data,uint64_t len) override; /** * 重载之用于处理不定长度的content * 这个函数可用于处理大文件上传、http-flv推流 * @param header http请求头 - * @param content content分片数据 + * @param data content分片数据 + * @param len content分片数据大小 * @param totalSize content总大小,如果为0则是不限长度content * @param recvedSize 已收数据大小 */ - virtual void onRecvUnlimitedContent(const Parser &header,const string &content,int64_t totalSize,int64_t recvedSize){ + virtual void onRecvUnlimitedContent(const Parser &header, + const char *data, + uint64_t len, + uint64_t totalSize, + uint64_t recvedSize){ WarnL << "content数据长度过大,无法处理,请重载HttpSession::onRecvUnlimitedContent"; shutdown(); } + void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{ + DebugL << "默认关闭WebSocket"; + shutdown(); + }; - /** - * 重载之用于处理websocket数据 - * @param header http请求头 - * @param data websocket数据 - */ - virtual void onRecvWebSocketData(const Parser &header,const string &data){ - WebSocketSplitter::decode((uint8_t *)data.data(),data.size()); + void onRecvWebSocketData(const Parser &header,const char *data,uint64_t len){ + WebSocketSplitter::decode((uint8_t *)data,len); } private: Parser m_parser; @@ -119,7 +109,7 @@ private: //flv over http MediaInfo m_mediaInfo; //处理content数据的callback - function m_contentCallBack; + function m_contentCallBack; private: inline bool Handle_Req_GET(int64_t &content_len); inline bool Handle_Req_POST(int64_t &content_len); diff --git a/src/Http/WebSocketSplitter.cpp b/src/Http/WebSocketSplitter.cpp index 851fbaf4..a909456d 100644 --- a/src/Http/WebSocketSplitter.cpp +++ b/src/Http/WebSocketSplitter.cpp @@ -90,6 +90,8 @@ begin_decode: onWebSocketDecodeHeader(*this); } + //进入后面逻辑代表已经获取到了webSocket协议头, + uint64_t remain = len - (ptr - data); if(remain > 0){ uint64_t playload_slice_len = remain; @@ -101,16 +103,17 @@ begin_decode: if(_playload_offset == _playload_len){ //这是下一个包 - if(remain - playload_slice_len > 0){ - string nextPacket((char *)ptr + playload_slice_len,remain - playload_slice_len); - _got_header = false; - _remain_data = nextPacket; + remain -= playload_slice_len; + ptr += playload_slice_len; + _got_header = false; + + if(remain > 0){ + //剩余数据是下一个包,把它的数据放置在缓存中 + _remain_data.assign((char *)ptr,remain); data = ptr = (uint8_t *)_remain_data.data(); len = _remain_data.size(); goto begin_decode; - } else{ - _got_header = false; } } } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 5e1f2676..91baa0c9 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -140,15 +140,11 @@ void RtspSession::onManager() { } -void RtspSession::onRecvContent(const string &content){ - -} - -int64_t RtspSession::onRecvHeader(const string &header) { +int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) { char tmp[2 * 1024]; m_pcBuf = tmp; - m_parser.Parse(header.data()); //rtsp请求解析 + m_parser.Parse(header); //rtsp请求解析 string strCmd = m_parser.Method(); //提取出请求命令字 m_iCseq = atoi(m_parser["CSeq"].data()); @@ -188,18 +184,19 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { m_ui64TotalBytes += pBuf->size(); if (m_bBase64need) { //quicktime 加密后的rtsp请求,需要解密 - inputRtspOrRtcp(decodeBase64(string(pBuf->data(),pBuf->size()))); + auto str = decodeBase64(string(pBuf->data(),pBuf->size())); + inputRtspOrRtcp(str.data(),str.size()); } else { - inputRtspOrRtcp(string(pBuf->data(),pBuf->size())); + inputRtspOrRtcp(pBuf->data(),pBuf->size()); } } -void RtspSession::inputRtspOrRtcp(const string &str) { - if(str[0] == '$' && m_rtpType == PlayerBase::RTP_TCP){ +void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) { + if(data[0] == '$' && m_rtpType == PlayerBase::RTP_TCP){ //这是rtcp return; } - input(str); + input(data,len); } bool RtspSession::handleReq_Options() { diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 45666cf4..b463a56a 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -1,4 +1,4 @@ -/* +/* * MIT License * * Copyright (c) 2016 xiongziliang <771730766@qq.com> @@ -84,24 +84,9 @@ public: protected: //HttpRequestSplitter override - /** - * 收到请求头 - * @param header 请求头 - * @return 请求头后的content长度, - * <0 : 代表后面所有数据都是content - * 0 : 代表为后面数据还是请求头, - * >0 : 代表后面数据为固定长度content, - */ - int64_t onRecvHeader(const string &header) override ; - - /** - * 收到content分片或全部数据 - * onRecvHeader函数返回>0,则为全部数据 - * @param content - */ - void onRecvContent(const string &content) override; + int64_t onRecvHeader(const char *data,uint64_t len) override ; private: - void inputRtspOrRtcp(const string &str); + void inputRtspOrRtcp(const char *data,uint64_t len); int send(const string &strBuf) override { m_ui64TotalBytes += strBuf.size(); return m_pSender->send(strBuf);