diff --git a/src/Http/HttpBody.h b/src/Http/HttpBody.h index a340c1b0..9428ab07 100644 --- a/src/Http/HttpBody.h +++ b/src/Http/HttpBody.h @@ -32,6 +32,7 @@ #include "Network/Buffer.h" #include "Util/ResourcePool.h" #include "Util/logger.h" +#include "Thread/WorkThreadPool.h" using namespace std; using namespace toolkit; @@ -45,10 +46,12 @@ namespace mediakit { /** * http content部分基类定义 */ -class HttpBody{ +class HttpBody : public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; - HttpBody(){} + HttpBody(){ + _async_read_thread = WorkThreadPool::Instance().getPoller(); + } virtual ~HttpBody(){} /** @@ -62,6 +65,29 @@ public: * @return 字节对象,如果读完了,那么请返回nullptr */ virtual Buffer::Ptr readData(uint32_t size) { return nullptr;}; + + /** + * 异步请求读取一定字节数,返回大小可能小于size + * @param size 请求大小 + * @param cb 回调函数 + */ + virtual void readDataAsync(uint32_t size,const function &cb){ + if(size >= remainSize()){ + //假如剩余数据很小,那么同步获取(为了优化性能) + cb(readData(size)); + return; + } + //如果是大文件,那么后台读取 + weak_ptr weakSelf = shared_from_this(); + _async_read_thread->async([cb,size,weakSelf](){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + cb(strongSelf->readData(size)); + } + }); + } +private: + EventPoller::Ptr _async_read_thread; }; /** diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 405a67ac..4b8daf0d 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -308,6 +308,79 @@ static string dateStr() { return buf; } +class AsyncSenderData { +public: + friend class AsyncSender; + typedef std::shared_ptr Ptr; + AsyncSenderData(const TcpSession::Ptr &session, const HttpBody::Ptr &body, bool close_when_complete) { + _session = dynamic_pointer_cast(session); + _body = body; + _close_when_complete = close_when_complete; + } + ~AsyncSenderData() = default; +private: + std::weak_ptr _session; + HttpBody::Ptr _body; + bool _close_when_complete; + bool _read_complete = false; +}; + +class AsyncSender { +public: + typedef std::shared_ptr Ptr; + static bool onSocketFlushed(const AsyncSenderData::Ptr &data) { + if (data->_read_complete) { + if (data->_close_when_complete) { + //发送完毕需要关闭socket + shutdown(data->_session.lock()); + } + return false; + } + + GET_CONFIG(uint32_t, sendBufSize, Http::kSendBufSize); + data->_body->readDataAsync(sendBufSize, [data](const Buffer::Ptr &sendBuf) { + auto session = data->_session.lock(); + if (!session) { + //本对象已经销毁 + return; + } + session->async([data, sendBuf]() { + auto session = data->_session.lock(); + if (!session) { + //本对象已经销毁 + return; + } + onRequestData(data, session, sendBuf); + }, false); + }); + return true; + } + +private: + static void onRequestData(const AsyncSenderData::Ptr &data, const std::shared_ptr &session, const Buffer::Ptr &sendBuf) { + session->_ticker.resetTime(); + if (sendBuf && session->send(sendBuf) != -1) { + //文件还未读完,还需要继续发送 + if (!session->isSocketBusy()) { + //socket还可写,继续请求数据 + onSocketFlushed(data); + } + return; + } + //文件写完了 + data->_read_complete = true; + if (!session->isSocketBusy() && data->_close_when_complete) { + shutdown(session); + } + } + + static void shutdown(const std::shared_ptr &session) { + if(session){ + session->shutdown(SockException(Err_shutdown, StrPrinter << "close connection after send http body completed.")); + } + } +}; + static const string kDate = "Date"; static const string kServer = "Server"; static const string kConnection = "Connection"; @@ -401,60 +474,18 @@ void HttpSession::sendResponse(const char *pcStatus, return; } - //发送http body - GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - - string status_code = pcStatus; - auto onFlush = [body,bClose,weakSelf,status_code]() { - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - //本对象已经销毁 - return false; - } - while(true){ - //更新超时计时器 - strongSelf->_ticker.resetTime(); - //读取文件 - auto sendBuf = body->readData(sendBufSize); - if (!sendBuf) { - //文件读完 - if(strongSelf->isSocketBusy() && bClose){ - //套接字忙,我们等待触发下一次onFlush事件 - //待所有数据flush到socket fd再移除onFlush事件监听 - //标记文件读写完毕 - return true; - } - //文件全部flush到socket fd,可以直接关闭socket了 - break; - } - - //文件还未读完 - if(strongSelf->send(sendBuf) == -1) { - //socket已经销毁,不再监听onFlush事件 - return false; - } - if(strongSelf->isSocketBusy()){ - //socket忙,那么停止继续写,等待下一次onFlush事件,然后再读文件写socket - return true; - } - //socket还可写,继续写socket - } - - if(bClose) { - //最后一次flush事件,文件也发送完毕了,可以关闭socket了 - strongSelf->shutdown(SockException(Err_shutdown, StrPrinter << "close connection after send http body completed with status code:" << status_code)); - } - //不再监听onFlush事件 - return false; - }; - + GET_CONFIG(uint32_t, sendBufSize, Http::kSendBufSize); if(body->remainSize() > sendBufSize){ //文件下载提升发送性能 setSocketFlags(); } - onFlush(); - _sock->setOnFlush(onFlush); + + //发送http body + AsyncSenderData::Ptr data = std::make_shared(shared_from_this(),body,bClose); + _sock->setOnFlush([data](){ + return AsyncSender::onSocketFlushed(data); + }); + AsyncSender::onSocketFlushed(data); } string HttpSession::urlDecode(const string &str){ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index ec128c52..0d9ce18e 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -47,7 +47,7 @@ class HttpSession: public TcpSession, public: typedef StrCaseMap KeyValue; typedef HttpResponseInvokerImp HttpResponseInvoker; - + friend class AsyncSender; /** * @param errMsg 如果为空,则代表鉴权通过,否则为错误提示 * @param accessPath 运行或禁止访问的根目录