http服务器实现后台线程读取数据

This commit is contained in:
xiongziliang 2019-12-28 13:11:41 +08:00
parent 582f769893
commit 71631a33c4
3 changed files with 110 additions and 53 deletions

View File

@ -32,6 +32,7 @@
#include "Network/Buffer.h" #include "Network/Buffer.h"
#include "Util/ResourcePool.h" #include "Util/ResourcePool.h"
#include "Util/logger.h" #include "Util/logger.h"
#include "Thread/WorkThreadPool.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
@ -45,10 +46,12 @@ namespace mediakit {
/** /**
* http content部分基类定义 * http content部分基类定义
*/ */
class HttpBody{ class HttpBody : public std::enable_shared_from_this<HttpBody>{
public: public:
typedef std::shared_ptr<HttpBody> Ptr; typedef std::shared_ptr<HttpBody> Ptr;
HttpBody(){} HttpBody(){
_async_read_thread = WorkThreadPool::Instance().getPoller();
}
virtual ~HttpBody(){} virtual ~HttpBody(){}
/** /**
@ -62,6 +65,29 @@ public:
* @return ,nullptr * @return ,nullptr
*/ */
virtual Buffer::Ptr readData(uint32_t size) { return nullptr;}; virtual Buffer::Ptr readData(uint32_t size) { return nullptr;};
/**
* size
* @param size
* @param cb
*/
virtual void readDataAsync(uint32_t size,const function<void(const Buffer::Ptr &buf)> &cb){
if(size >= remainSize()){
//假如剩余数据很小,那么同步获取(为了优化性能)
cb(readData(size));
return;
}
//如果是大文件,那么后台读取
weak_ptr<HttpBody> weakSelf = shared_from_this();
_async_read_thread->async([cb,size,weakSelf](){
auto strongSelf = weakSelf.lock();
if(strongSelf){
cb(strongSelf->readData(size));
}
});
}
private:
EventPoller::Ptr _async_read_thread;
}; };
/** /**

View File

@ -308,6 +308,79 @@ static string dateStr() {
return buf; return buf;
} }
class AsyncSenderData {
public:
friend class AsyncSender;
typedef std::shared_ptr<AsyncSenderData> Ptr;
AsyncSenderData(const TcpSession::Ptr &session, const HttpBody::Ptr &body, bool close_when_complete) {
_session = dynamic_pointer_cast<HttpSession>(session);
_body = body;
_close_when_complete = close_when_complete;
}
~AsyncSenderData() = default;
private:
std::weak_ptr<HttpSession> _session;
HttpBody::Ptr _body;
bool _close_when_complete;
bool _read_complete = false;
};
class AsyncSender {
public:
typedef std::shared_ptr<AsyncSender> Ptr;
static bool onSocketFlushed(const AsyncSenderData::Ptr &data) {
if (data->_read_complete) {
if (data->_close_when_complete) {
//发送完毕需要关闭socket
shutdown(data->_session.lock());
}
return false;
}
GET_CONFIG(uint32_t, sendBufSize, Http::kSendBufSize);
data->_body->readDataAsync(sendBufSize, [data](const Buffer::Ptr &sendBuf) {
auto session = data->_session.lock();
if (!session) {
//本对象已经销毁
return;
}
session->async([data, sendBuf]() {
auto session = data->_session.lock();
if (!session) {
//本对象已经销毁
return;
}
onRequestData(data, session, sendBuf);
}, false);
});
return true;
}
private:
static void onRequestData(const AsyncSenderData::Ptr &data, const std::shared_ptr<HttpSession> &session, const Buffer::Ptr &sendBuf) {
session->_ticker.resetTime();
if (sendBuf && session->send(sendBuf) != -1) {
//文件还未读完,还需要继续发送
if (!session->isSocketBusy()) {
//socket还可写继续请求数据
onSocketFlushed(data);
}
return;
}
//文件写完了
data->_read_complete = true;
if (!session->isSocketBusy() && data->_close_when_complete) {
shutdown(session);
}
}
static void shutdown(const std::shared_ptr<HttpSession> &session) {
if(session){
session->shutdown(SockException(Err_shutdown, StrPrinter << "close connection after send http body completed."));
}
}
};
static const string kDate = "Date"; static const string kDate = "Date";
static const string kServer = "Server"; static const string kServer = "Server";
static const string kConnection = "Connection"; static const string kConnection = "Connection";
@ -401,60 +474,18 @@ void HttpSession::sendResponse(const char *pcStatus,
return; return;
} }
//发送http body
GET_CONFIG(uint32_t, sendBufSize, Http::kSendBufSize); GET_CONFIG(uint32_t, sendBufSize, Http::kSendBufSize);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
string status_code = pcStatus;
auto onFlush = [body,bClose,weakSelf,status_code]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return false;
}
while(true){
//更新超时计时器
strongSelf->_ticker.resetTime();
//读取文件
auto sendBuf = body->readData(sendBufSize);
if (!sendBuf) {
//文件读完
if(strongSelf->isSocketBusy() && bClose){
//套接字忙,我们等待触发下一次onFlush事件
//待所有数据flush到socket fd再移除onFlush事件监听
//标记文件读写完毕
return true;
}
//文件全部flush到socket fd可以直接关闭socket了
break;
}
//文件还未读完
if(strongSelf->send(sendBuf) == -1) {
//socket已经销毁不再监听onFlush事件
return false;
}
if(strongSelf->isSocketBusy()){
//socket忙那么停止继续写,等待下一次onFlush事件然后再读文件写socket
return true;
}
//socket还可写继续写socket
}
if(bClose) {
//最后一次flush事件文件也发送完毕了可以关闭socket了
strongSelf->shutdown(SockException(Err_shutdown, StrPrinter << "close connection after send http body completed with status code:" << status_code));
}
//不再监听onFlush事件
return false;
};
if(body->remainSize() > sendBufSize){ if(body->remainSize() > sendBufSize){
//文件下载提升发送性能 //文件下载提升发送性能
setSocketFlags(); setSocketFlags();
} }
onFlush();
_sock->setOnFlush(onFlush); //发送http body
AsyncSenderData::Ptr data = std::make_shared<AsyncSenderData>(shared_from_this(),body,bClose);
_sock->setOnFlush([data](){
return AsyncSender::onSocketFlushed(data);
});
AsyncSender::onSocketFlushed(data);
} }
string HttpSession::urlDecode(const string &str){ string HttpSession::urlDecode(const string &str){

View File

@ -47,7 +47,7 @@ class HttpSession: public TcpSession,
public: public:
typedef StrCaseMap KeyValue; typedef StrCaseMap KeyValue;
typedef HttpResponseInvokerImp HttpResponseInvoker; typedef HttpResponseInvokerImp HttpResponseInvoker;
friend class AsyncSender;
/** /**
* @param errMsg * @param errMsg
* @param accessPath 访 * @param accessPath 访