http api服务器支持发送大文件

This commit is contained in:
xiongziliang 2019-10-28 16:50:15 +08:00
parent 7c16f37d64
commit 7ed5b7c2ba
3 changed files with 174 additions and 99 deletions

View File

@ -54,14 +54,14 @@ public:
/**
*
*/
virtual uint64_t remainSize() = 0;
virtual uint64_t remainSize() { return 0;};
/**
* size
* @param size
* @return
*/
virtual Buffer::Ptr readData(uint32_t size) = 0;
virtual Buffer::Ptr readData(uint32_t size) { return nullptr;};
};
/**

View File

@ -44,7 +44,6 @@
#include "Util/base64.h"
#include "Util/SHA1.h"
#include "Rtmp/utils.h"
#include "HttpBody.h"
using namespace toolkit;
namespace mediakit {
@ -103,6 +102,41 @@ get_mime_type(const char* name) {
return it->second.data();
}
////////////////////////////////////////////////////////////////////////////////////////////////////
void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const{
if(_lambad){
_lambad(codeOut,headerOut,body);
}
}
void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const{
this->operator()(codeOut,headerOut,std::make_shared<HttpStringBody>(body));
}
HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda0 &lambda){
_lambad = lambda;
}
HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda1 &lambda){
if(!lambda){
_lambad = nullptr;
return;
}
_lambad = [lambda](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body){
string str;
if(body && body->remainSize()){
str = body->readData(body->remainSize())->toString();
}
lambda(codeOut,headerOut,str);
};
}
HttpResponseInvokerImp::operator bool(){
return _lambad.operator bool();
}
////////////////////////////////////////////////////////////////////////////////////////////////////
HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
TraceP(this);
@ -293,7 +327,7 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
if(!cb) {
//找到rtmp源发送http头负载后续发送
sendResponse("200 OK", makeHttpHeader(false, 0, get_mime_type(".flv")), "");
sendResponse("200 OK", makeHttpHeader(false, -1, get_mime_type(".flv")), "");
}else{
cb();
}
@ -503,7 +537,6 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) {
return;
}
//事件未被拦截则认为是http下载请求
auto fullUrl = string(HTTP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl();
_mediaInfo.parse(fullUrl);
@ -609,66 +642,16 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) {
if(cookie){
httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
}
//先回复HTTP头部分
sendResponse(pcHttpResult,httpHeader,"");
if (iRangeEnd - iRangeStart < 0) {
sendResponse(pcHttpResult,httpHeader,"");
//文件是空的!
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file");
}
//回复Content部分
GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize);
//先回复HTTP头部分
HttpBody::Ptr fileBody = std::make_shared<HttpFileBody>(pFilePtr,iRangeStart,iRangeEnd - iRangeStart + 1);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
auto onFlush = [fileBody,bClose,weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return false;
}
while(true){
//更新超时计时器
strongSelf->_ticker.resetTime();
//读取文件
auto sendBuf = fileBody->readData(sendBufSize);
if (!sendBuf) {
//文件读完
if(strongSelf->isSocketBusy()){
//套接字忙,我们等待触发下一次onFlush事件
//待所有数据flush到socket fd再移除onFlush事件监听
//标记文件读写完毕
return true;
}
//文件全部flush到socket fd可以直接关闭socket了
break;
}
//文件还未读完
if(strongSelf->send(sendBuf) == -1) {
//socket已经销毁不再监听onFlush事件
return false;
}
if(strongSelf->isSocketBusy()){
//socket忙那么停止继续写,等待下一次onFlush事件然后再读文件写socket
return true;
}
//socket还可写继续写socket
}
if(bClose) {
//最后一次flush事件文件也发送完毕了可以关闭socket了
strongSelf->shutdown(SockException(Err_shutdown,"read file eof"));
}
//不再监听onFlush事件
return false;
};
//文件下载提升发送性能
setSocketFlags();
onFlush();
_sock->setOnFlush(onFlush);
sendResponse(pcHttpResult,httpHeader,fileBody,bClose);
});
}
@ -772,16 +755,83 @@ bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet)
}
void HttpSession::sendResponse(const char* pcStatus, const KeyValue& header, const string& strContent) {
sendResponse(pcStatus, header,strContent.empty() ? nullptr : std::make_shared<HttpStringBody>(strContent), false);
}
void HttpSession::sendResponse(const char *pcStatus,const KeyValue &header,const HttpBody::Ptr &body,bool bClose){
//发送http头
_StrPrinter printer;
printer << "HTTP/1.1 " << pcStatus << "\r\n";
for (auto &pr : header) {
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n" << strContent;
auto strSend = printer << endl;
send(strSend);
printer << "\r\n";
send(printer << endl);
_ticker.resetTime();
if(!body || !body->remainSize()){
//没有body
if(bClose){
shutdown(SockException(Err_shutdown,"close connection after send http header completed"));
}
return;
}
//发送http body
GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
auto onFlush = [body,bClose,weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return false;
}
while(true){
//更新超时计时器
strongSelf->_ticker.resetTime();
//读取文件
auto sendBuf = body->readData(sendBufSize);
if (!sendBuf) {
//文件读完
if(strongSelf->isSocketBusy() && bClose){
//套接字忙,我们等待触发下一次onFlush事件
//待所有数据flush到socket fd再移除onFlush事件监听
//标记文件读写完毕
return true;
}
//文件全部flush到socket fd可以直接关闭socket了
break;
}
//文件还未读完
if(strongSelf->send(sendBuf) == -1) {
//socket已经销毁不再监听onFlush事件
return false;
}
if(strongSelf->isSocketBusy()){
//socket忙那么停止继续写,等待下一次onFlush事件然后再读文件写socket
return true;
}
//socket还可写继续写socket
}
if(bClose) {
//最后一次flush事件文件也发送完毕了可以关闭socket了
strongSelf->shutdown(SockException(Err_shutdown,"close connection after send http body completed"));
}
//不再监听onFlush事件
return false;
};
if(body->remainSize() > sendBufSize){
//文件下载提升发送性能
setSocketFlags();
}
onFlush();
_sock->setOnFlush(onFlush);
}
HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentSize,const char* pcContentType) {
KeyValue headerOut;
@ -799,7 +849,7 @@ HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentS
auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl;
headerOut.emplace("Content-Type",strContentType);
}
if(iContentSize > 0){
if(iContentSize >= 0){
headerOut.emplace("Content-Length", StrPrinter<<iContentSize<<endl);
}
@ -836,20 +886,41 @@ bool HttpSession::emitHttpEvent(bool doInvoke){
bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt);
/////////////////////异步回复Invoker///////////////////////////////
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const string &contentOut){
HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->async([weakSelf,bClose,codeOut,headerOut,contentOut]() {
strongSelf->async([weakSelf,bClose,codeOut,headerOut,body]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
//本对象已经销毁
return;
}
strongSelf->responseDelay(bClose,codeOut,headerOut,contentOut);
if(bClose){
strongSelf->shutdown(SockException(Err_shutdown,"Connection: close"));
if(codeOut.empty()){
//回调提供的参数异常
strongSelf->sendNotFound(bClose);
return;
}
//body默认为空
int64_t size = 0;
if (body && body->remainSize()) {
//有body获取body大小
size = body->remainSize();
if (size >= INT64_MAX) {
//不固定长度的body那么不设置content-length字段
size = -1;
}
}
auto headerOther = strongSelf->makeHttpHeader(bClose, size, "text/plain");
for (auto &pr : headerOther) {
//添加默认http头默认http头不能覆盖用户自定义的头
const_cast<KeyValue &>(headerOut).emplace(pr.first, pr.second);
}
strongSelf->sendResponse(codeOut.data(), headerOut, body, bClose);
});
};
///////////////////广播HTTP事件///////////////////////////
@ -857,7 +928,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this);
if(!consumed && doInvoke){
//该事件无人消费所以返回404
invoker("404 Not Found",KeyValue(),"");
invoker("404 Not Found",KeyValue(), HttpBody::Ptr());
if(bClose){
//close类型回复完毕关闭连接
shutdown(SockException(Err_shutdown,"404 Not Found"));
@ -938,21 +1009,6 @@ void HttpSession::Handle_Req_POST(int64_t &content_len) {
}
//有后续content数据要处理,暂时不关闭连接
}
void HttpSession::responseDelay(bool bClose,
const string &codeOut,
const KeyValue &headerOut,
const string &contentOut){
if(codeOut.empty()){
sendNotFound(bClose);
return;
}
auto headerOther = makeHttpHeader(bClose,contentOut.size(),"text/plain");
for (auto &pr : headerOther){
//添加默认http头默认http头不能覆盖用户自定义的头
const_cast<KeyValue &>(headerOut).emplace(pr.first,pr.second);
}
sendResponse(codeOut.data(), headerOut, contentOut);
}
void HttpSession::sendNotFound(bool bClose) {
GET_CONFIG(string,notFound,Http::kNotFound);

View File

@ -36,21 +36,43 @@
#include "HttpRequestSplitter.h"
#include "WebSocketSplitter.h"
#include "HttpCookieManager.h"
#include "HttpBody.h"
#include "Util/function_traits.h"
using namespace std;
using namespace toolkit;
namespace mediakit {
/**
*
*/
class HttpResponseInvokerImp{
public:
typedef std::function<void(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body)> HttpResponseInvokerLambda0;
typedef std::function<void(const string &codeOut, const StrCaseMap &headerOut, const string &body)> HttpResponseInvokerLambda1;
HttpResponseInvokerImp(){}
~HttpResponseInvokerImp(){}
template<typename C>
HttpResponseInvokerImp(const C &c):HttpResponseInvokerImp(typename function_traits<C>::stl_function_type(c)) {}
HttpResponseInvokerImp(const HttpResponseInvokerLambda0 &lambda);
HttpResponseInvokerImp(const HttpResponseInvokerLambda1 &lambda);
void operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const;
void operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const;
operator bool();
private:
HttpResponseInvokerLambda0 _lambad;
};
class HttpSession: public TcpSession,
public FlvMuxer,
public HttpRequestSplitter,
public WebSocketSplitter {
public:
typedef StrCaseMap KeyValue;
typedef std::function<void(const string &codeOut,
const KeyValue &headerOut,
const string &contentOut)> HttpResponseInvoker;
typedef HttpResponseInvokerImp HttpResponseInvoker;
/**
* @param errMsg
@ -119,11 +141,8 @@ private:
void urlDecode(Parser &parser);
void sendNotFound(bool bClose);
void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent);
void sendResponse(const char *pcStatus,const KeyValue &header,const HttpBody::Ptr &body,bool bClose);
KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html");
void responseDelay(bool bClose,
const string &codeOut,
const KeyValue &headerOut,
const string &contentOut);
/**
* http客户端是否有权限访问文件的逻辑步骤