下一步实现websocket或大文件上传做好准备

This commit is contained in:
xiongziliang 2018-09-20 17:50:58 +08:00
parent 8076466b58
commit ffb5a22845
4 changed files with 208 additions and 79 deletions

View File

@ -0,0 +1,45 @@
//
// Created by xzl on 2018/9/20.
//
#include "HttpRequestSplitter.h"
void HttpRequestSplitter::input(const string &data) {
if(_remain_data.empty()){
_remain_data = data;
}else{
_remain_data.append(data);
}
splitPacket:
//数据按照请求头处理
size_t index;
while (_content_len == 0 && (index = _remain_data.find("\r\n\r\n")) != std::string::npos ) {
//_content_len == 0这是请求头
_content_len = onRecvHeader(_remain_data.substr(0, index + 4));
_remain_data.erase(0, index + 4);
}
if(_content_len > 0){
//数据按照固定长度content处理
if(_remain_data.size() < _content_len){
//数据不够
return;
}
//收到content数据并且接受content完毕
onRecvContent(_remain_data.substr(0,_content_len));
_remain_data.erase(0,_content_len);
//content处理完毕,后面数据当做请求头处理
_content_len = 0;
if(!_remain_data.empty()){
//还有数据没有处理完毕
goto splitPacket;
}
}else{
//数据按照不固定长度content处理
onRecvContent(_remain_data);
_remain_data.clear();
}
}

View File

@ -0,0 +1,44 @@
//
// Created by xzl on 2018/9/20.
//
#ifndef ZLMEDIAKIT_HTTPREQUESTSPLITTER_H
#define ZLMEDIAKIT_HTTPREQUESTSPLITTER_H
#include <string>
using namespace std;
class HttpRequestSplitter {
public:
HttpRequestSplitter(){};
virtual ~HttpRequestSplitter(){};
/**
*
* @param data
*/
void input(const string &data);
protected:
/**
*
* @param header
* @return content长度,
* <0 : content
* 0 : ,
* >0 : content,
*/
virtual int64_t onRecvHeader(const string &header) = 0;
/**
* content分片或全部数据
* onRecvHeader函数返回>0,
* @param content
*/
virtual void onRecvContent(const string &content) = 0;
private:
string _remain_data;
int64_t _content_len = 0;
};
#endif //ZLMEDIAKIT_HTTPREQUESTSPLITTER_H

View File

@ -115,62 +115,53 @@ HttpSession::~HttpSession() {
//DebugL;
}
void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
onRecv(pBuf->data(),pBuf->size());
}
void HttpSession::onRecv(const char *data,int size){
GET_CONFIG_AND_REGISTER(uint32_t,reqSize,Config::Http::kMaxReqSize);
m_ticker.resetTime();
if (m_strRcvBuf.size() + size >= reqSize) {
WarnL << "接收缓冲区溢出:" << m_strRcvBuf.size() + size << "," << reqSize;
shutdown();
return;
}
m_strRcvBuf.append(data, size);
size_t index;
string onePkt;
while ((index = m_strRcvBuf.find("\r\n\r\n")) != std::string::npos) {
onePkt = m_strRcvBuf.substr(0, index + 4);
m_strRcvBuf.erase(0, index + 4);
switch (parserHttpReq(onePkt)) {
case Http_failed:
//失败
shutdown();
return;
case Http_success:
//成功
break;
case Http_moreData:
//需要更多数据,恢复数据并退出
m_strRcvBuf = onePkt + m_strRcvBuf;
m_parser.Clear();
return;
}
}
m_parser.Clear();
}
inline HttpSession::HttpCode HttpSession::parserHttpReq(const string &str) {
m_parser.Parse(str.data());
urlDecode(m_parser);
string cmd = m_parser.Method();
typedef HttpSession::HttpCode (HttpSession::*HttpCMDHandle)();
int64_t HttpSession::onRecvHeader(const string &header) {
typedef bool (HttpSession::*HttpCMDHandle)(int64_t &);
static unordered_map<string, HttpCMDHandle> g_mapCmdIndex;
static onceToken token([]() {
g_mapCmdIndex.emplace("GET",&HttpSession::Handle_Req_GET);
g_mapCmdIndex.emplace("POST",&HttpSession::Handle_Req_POST);
}, nullptr);
m_parser.Parse(header.data());
urlDecode(m_parser);
string cmd = m_parser.Method();
auto it = g_mapCmdIndex.find(cmd);
if (it == g_mapCmdIndex.end()) {
WarnL << cmd;
sendResponse("403 Forbidden", makeHttpHeader(true), "");
return Http_failed;
shutdown();
return 0;
}
auto fun = it->second;
return (this->*fun)();
//默认后面数据不是content而是header
int64_t content_len = 0;
auto &fun = it->second;
if(!(this->*fun)(content_len)){
shutdown();
}
//清空解析器节省内存
m_parser.Clear();
//返回content长度
return content_len;
}
void HttpSession::onRecvContent(const string &content) {
if(m_contentCallBack){
if(!m_contentCallBack(content)){
m_contentCallBack = nullptr;
}
}
}
void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
onRecv(pBuf->data(),pBuf->size());
}
void HttpSession::onRecv(const char *data,int size){
m_ticker.resetTime();
input(string(data,size));
}
void HttpSession::onError(const SockException& err) {
//WarnL << err.what();
GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold);
@ -256,14 +247,14 @@ inline bool HttpSession::checkLiveFlvStream(){
}
return true;
}
inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
inline bool HttpSession::Handle_Req_GET(int64_t &content_len) {
//先看看该http事件是否被拦截
if(emitHttpEvent(false)){
return Http_success;
return true;
}
//再看看是否为http-flv直播请求
if(checkLiveFlvStream()){
return Http_success;
return true;
}
//事件未被拦截则认为是http下载请求
@ -275,7 +266,6 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
GET_CONFIG_AND_REGISTER(uint32_t,reqCnt,Config::Http::kMaxReqCount);
bool bClose = (strcasecmp(m_parser["Connection"].data(),"close") == 0) || ( ++m_iReqCnt > reqCnt);
HttpCode eHttpCode = bClose ? Http_failed : Http_success;
//访问的是文件夹
if (strFile.back() == '/') {
//生成文件夹菜单索引
@ -283,17 +273,17 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
if (!makeMeun(strFile,m_mediaInfo.m_vhost, strMeun)) {
//文件夹不存在
sendNotFound(bClose);
return eHttpCode;
return !bClose;
}
sendResponse("200 OK", makeHttpHeader(bClose,strMeun.size() ), strMeun);
return eHttpCode;
return !bClose;
}
//访问的是文件
struct stat tFileStat;
if (0 != stat(strFile.data(), &tFileStat)) {
//文件不存在
sendNotFound(bClose);
return eHttpCode;
return !bClose;
}
//文件智能指针,防止退出时未关闭
std::shared_ptr<FILE> pFilePtr(fopen(strFile.data(), "rb"), [](FILE *pFile) {
@ -305,7 +295,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
if (!pFilePtr) {
//打开文件失败
sendNotFound(bClose);
return eHttpCode;
return !bClose;
}
//判断是不是分节下载
@ -339,7 +329,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
sendResponse(pcHttpResult, httpHeader, "");
if (iRangeEnd - iRangeStart < 0) {
//文件是空的!
return eHttpCode;
return !bClose;
}
//回复Content部分
std::shared_ptr<int64_t> piLeft(new int64_t(iRangeEnd - iRangeStart + 1));
@ -426,7 +416,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
onFlush();
_sock->setOnFlush(onFlushWrapper);
return Http_success;
return true;
}
inline bool HttpSession::makeMeun(const string &strFullPath,const string &vhost, string &strRet) {
@ -595,17 +585,44 @@ inline bool HttpSession::emitHttpEvent(bool doInvoke){
}
return consumed;
}
inline HttpSession::HttpCode HttpSession::Handle_Req_POST() {
inline bool HttpSession::Handle_Req_POST(int64_t &content_len) {
//////////////获取HTTP POST Content/////////////
int iContentLen = atoi(m_parser["Content-Length"].data());
if ((int) m_strRcvBuf.size() < iContentLen) {
return Http_moreData; //需要更多数据
GET_CONFIG_AND_REGISTER(uint32_t,reqSize,Config::Http::kMaxReqSize);
int realContentLen = atoi(m_parser["Content-Length"].data());
int iContentLen = realContentLen;
if(iContentLen > reqSize){
//Content大小超过限制,那么我们把这个http post请求当做不限制content长度来处理
//这种情况下用于文件post很有必要否则内存可能溢出
iContentLen = 0;
}
m_parser.setContent(m_strRcvBuf.substr(0, iContentLen));
m_strRcvBuf.erase(0, iContentLen);
//广播事件
if(iContentLen > 0){
//返回固定长度的content
content_len = iContentLen;
auto parserCopy = m_parser;
m_contentCallBack = [this,parserCopy](const string &content){
//恢复http头
m_parser = parserCopy;
//设置content
m_parser.setContent(content);
//触发http事件
emitHttpEvent(true);
return Http_success;
//清空数据,节省内存
m_parser.Clear();
//m_contentCallBack是不可持续的收到一次content后就销毁
return false;
};
}else{
//返回不固定长度的content
content_len = -1;
auto parserCopy = m_parser;
m_contentCallBack = [this,parserCopy,realContentLen](const string &content){
onRecvUnlimitedContent(parserCopy,content,realContentLen);
//m_contentCallBack是可持续的后面还要处理后续content数据
return true;
};
}
return true;
}
void HttpSession::responseDelay(const string &Origin,bool bClose,
const string &codeOut,const KeyValue &headerOut,

View File

@ -32,6 +32,7 @@
#include "Network/TcpSession.h"
#include "Rtmp/RtmpMediaSource.h"
#include "Rtmp/FlvMuxer.h"
#include "HttpRequestSplitter.h"
using namespace std;
using namespace ZL::Rtmp;
@ -40,8 +41,7 @@ using namespace ZL::Network;
namespace ZL {
namespace Http {
class HttpSession: public TcpSession,public FlvMuxer {
class HttpSession: public TcpSession,public FlvMuxer, public HttpRequestSplitter {
public:
typedef StrCaseMap KeyValue;
typedef std::function<void(const string &codeOut,
@ -57,35 +57,58 @@ public:
static string urlDecode(const string &str);
protected:
//用于HttpsSession调用
void onRecv(const char *data,int size);
//FlvMuxer override
void onWrite(const Buffer::Ptr &data) override ;
void onWrite(const char *data,int len) override;
void onDetach() override;
std::shared_ptr<FlvMuxer> getSharedPtr() override;
private:
typedef enum
{
Http_success = 0,
Http_failed = 1,
Http_moreData = 2,
} HttpCode;
//HttpRequestSplitter override
/**
*
* @param header
* @return content长度,
* <0 : content
* 0 : ,
* >0 : content,
*/
int64_t onRecvHeader(const string &header) override;
/**
* content分片或全部数据
* onRecvHeader函数返回>0,
* @param content
*/
void onRecvContent(const string &content) override;
/**
* content
* http-flv推流,WebSocket数据
* @param header http请求头
* @param content content分片数据
* @param content_size content大小,0content
*/
virtual void onRecvUnlimitedContent(const Parser &header,const string &content,int64_t content_size){
WarnL << "content数据长度过大无法处理,请重载HttpSession::onRecvUnlimitedContent";
shutdown();
}
private:
Parser m_parser;
string m_strPath;
string m_strRcvBuf;
Ticker m_ticker;
uint32_t m_iReqCnt = 0;
//消耗的总流量
uint64_t m_ui64TotalBytes = 0;
//flv over http
MediaInfo m_mediaInfo;
inline HttpCode parserHttpReq(const string &);
inline HttpCode Handle_Req_GET();
inline HttpCode Handle_Req_POST();
//处理content数据的callback
function<bool (const string &content) > m_contentCallBack;
private:
inline bool Handle_Req_GET(int64_t &content_len);
inline bool Handle_Req_POST(int64_t &content_len);
inline bool checkLiveFlvStream();
inline bool emitHttpEvent(bool doInvoke);
inline void urlDecode(Parser &parser);