支持Http multipart/form-data 文件上传

优化http客户端超时逻辑
去除rtmp/rtsp代理时间戳覆盖逻辑
This commit is contained in:
xiongziliang 2018-06-21 14:03:43 +08:00
parent 4c82903ee8
commit ca885013af
8 changed files with 367 additions and 175 deletions

View File

@ -24,6 +24,7 @@
* SOFTWARE.
*/
#include "Player/Player.h"
#include "Common/config.h"
#include "PlayerProxy.h"
#include "Util/mini.h"
@ -58,7 +59,7 @@ void PlayerProxy::play(const char* strUrl) {
return;
}
if(strongSelf->m_pChn){
strongSelf->m_pChn->inputH264((char *)data.data.data(), data.data.size(), 0);
strongSelf->m_pChn->inputH264((char *)data.data.data(), data.data.size(), data.timeStamp);
}else{
strongSelf->initMedia();
}
@ -69,7 +70,7 @@ void PlayerProxy::play(const char* strUrl) {
return;
}
if(strongSelf->m_pChn){
strongSelf->m_pChn->inputAAC((char *)data.data, data.aac_frame_length, 0);
strongSelf->m_pChn->inputAAC((char *)data.data, data.aac_frame_length, data.timeStamp);
}else{
strongSelf->initMedia();
}

View File

@ -36,6 +36,7 @@ HttpClient::HttpClient(){
HttpClient::~HttpClient(){
}
void HttpClient::sendRequest(const string &strUrl,float fTimeOutSec){
_aliveTicker.resetTime();
auto protocol = FindField(strUrl.data(), NULL , "://");
uint16_t defaultPort;
bool isHttps;
@ -73,15 +74,15 @@ void HttpClient::sendRequest(const string &strUrl,float fTimeOutSec){
_header.emplace(string("Accept-Language"),"zh-CN,zh;q=0.8");
_header.emplace(string("User-Agent"),"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36");
if(!_body.empty()){
_header.emplace(string("Content-Length"),to_string(_body.size()));
if(_body && _body->remainSize()){
_header.emplace(string("Content-Length"),to_string(_body->remainSize()));
_header.emplace(string("Content-Type"),"application/x-www-form-urlencoded; charset=UTF-8");
}
bool bChanged = (_lastHost != host + ":" + to_string(port)) || (_isHttps != isHttps);
_lastHost = host + ":" + to_string(port);
_isHttps = isHttps;
_fTimeOutSec = fTimeOutSec;
if(!alive() || bChanged){
//InfoL << "reconnet:" << _lastHost;
startConnect(host, port,fTimeOutSec);
@ -93,32 +94,38 @@ void HttpClient::sendRequest(const string &strUrl,float fTimeOutSec){
void HttpClient::onConnect(const SockException &ex) {
_aliveTicker.resetTime();
if(ex){
onDisconnect(ex);
return;
}
_recvedBodySize = -1;
_recvedResponse.clear();
send(_method + " ");
send(_path + " HTTP/1.1\r\n");
_StrPrinter printer;
printer << _method + " " << _path + " HTTP/1.1\r\n";
for (auto &pr : _header) {
send(pr.first + ": ");
send(pr.second + "\r\n");
}
send("\r\n");
if (!_body.empty()) {
send(_body);
printer << pr.first + ": ";
printer << pr.second + "\r\n";
}
send(printer << "\r\n");
onSend();
}
void HttpClient::onRecv(const Buffer::Ptr &pBuf) {
onRecvBytes(pBuf->data(),pBuf->size());
}
void HttpClient::onErr(const SockException &ex) {
if(ex.getErrCode() == Err_eof && _totalBodySize == INT64_MAX){
//如果Content-Length未指定 但服务器断开链接
//则认为本次http请求完成
_totalBodySize = 0;
onResponseCompleted();
}
onDisconnect(ex);
}
void HttpClient::onRecvBytes(const char* data, int size) {
_aliveTicker.resetTime();
if(_recvedBodySize == -1){
//还没有收到http body这只是http头
auto lastLen = _recvedResponse.size();
@ -132,6 +139,9 @@ void HttpClient::onRecvBytes(const char* data, int size) {
onResponseHeader(_parser.Url(),_parser.getValues());
_totalBodySize = atoll(((HttpHeader &)_parser.getValues())["Content-Length"].data());
if(_totalBodySize == 0){
_totalBodySize = INT64_MAX;
}
_recvedBodySize = _recvedResponse.size() - pos - 4;
if(_totalBodySize < _recvedBodySize){
//http body 比声明的大 这个不可能的
@ -149,6 +159,7 @@ void HttpClient::onRecvBytes(const char* data, int size) {
}
if(_recvedBodySize >= _totalBodySize){
_totalBodySize = 0;
onResponseCompleted();
}
_recvedResponse.clear();
@ -159,6 +170,9 @@ void HttpClient::onRecvBytes(const char* data, int size) {
_recvedBodySize += size;
onResponseBody(data,size,_recvedBodySize,_totalBodySize);
if(_recvedBodySize >= _totalBodySize){
//如果接收的数据大于Content-Length
//则认为本次http请求完成
_totalBodySize = 0;
onResponseCompleted();
}
return;
@ -172,6 +186,37 @@ void HttpClient::onRecvBytes(const char* data, int size) {
shutdown();
}
void HttpClient::onSend() {
_aliveTicker.resetTime();
while (_body && _body->remainSize() && !isSocketBusy()){
auto buffer = _body->readData();
if (!buffer){
//数据发送结束或读取数据异常
break;
}
if(send(buffer) <= 0){
//发送数据失败不需要回滚数据因为发送前已经通过isSocketBusy()判断socket可写
//所以发送缓存区肯定未满,该buffer肯定已经写入socket
break;
}
}
}
void HttpClient::onManager() {
if(_aliveTicker.elapsedTime() > 3 * 1000 && _totalBodySize == INT64_MAX){
//如果Content-Length未指定 但接收数据超时
//则认为本次http请求完成
_totalBodySize = 0;
onResponseCompleted();
}
if(_fTimeOutSec > 0 && _aliveTicker.elapsedTime() > _fTimeOutSec * 1000){
//超时
onDisconnect(SockException(Err_timeout,"http request timeout"));
shutdown();
}
}
} /* namespace Http */
} /* namespace ZL */

View File

@ -27,6 +27,7 @@
#ifndef Http_HttpClient_h
#define Http_HttpClient_h
#include <stdio.h>
#include <string.h>
#include <functional>
#include <memory>
@ -41,8 +42,7 @@ using namespace ZL::Network;
namespace ZL {
namespace Http {
class HttpArgs : public StrCaseMap
{
class HttpArgs : public StrCaseMap {
public:
HttpArgs(){}
virtual ~HttpArgs(){}
@ -61,6 +61,143 @@ public:
}
};
class HttpBody{
public:
typedef std::shared_ptr<HttpBody> Ptr;
HttpBody(){}
virtual ~HttpBody(){}
//剩余数据大小
virtual uint64_t remainSize() = 0;
virtual Buffer::Ptr readData() = 0;
};
class HttpBodyString : public HttpBody{
public:
typedef std::shared_ptr<HttpBodyString> Ptr;
HttpBodyString(const string &str){
_str = str;
}
virtual ~HttpBodyString(){}
uint64_t remainSize() override {
return _str.size();
}
Buffer::Ptr readData() override {
auto ret = std::make_shared<BufferString>(_str);
_str.clear();
return ret;
}
private:
mutable string _str;
};
class HttpMultiFormBody : public HttpBody {
public:
typedef std::shared_ptr<HttpMultiFormBody> Ptr;
HttpMultiFormBody(const StrCaseMap &args,const string &filePath,const string &boundary,uint32_t sliceSize = 4 * 1024){
_fp = fopen(filePath.data(),"rb");
if(!_fp){
throw std::invalid_argument(StrPrinter << "打开文件失败:" << filePath << " " << get_uv_errmsg());
}
auto fileName = filePath;
auto pos = filePath.rfind('/');
if(pos != string::npos){
fileName = filePath.substr(pos + 1);
}
_bodyPrefix = multiFormBodyPrefix(args,boundary,fileName);
_bodySuffix = multiFormBodySuffix(boundary);
_totalSize = _bodyPrefix.size() + _bodySuffix.size() + fileSize(_fp);
_sliceSize = sliceSize;
}
virtual ~HttpMultiFormBody(){
fclose(_fp);
}
uint64_t remainSize() override {
return _totalSize - _offset;
}
Buffer::Ptr readData() override{
if(_bodyPrefix.size()){
auto ret = std::make_shared<BufferString>(_bodyPrefix);
_offset += _bodyPrefix.size();
_bodyPrefix.clear();
return ret;
}
if(0 == feof(_fp)){
auto ret = std::make_shared<BufferRaw>(_sliceSize);
//读文件
int size;
do{
size = fread(ret->data(),1,_sliceSize,_fp);
}while(-1 == size && UV_EINTR == get_uv_error(false));
if(size == -1){
_offset = _totalSize;
WarnL << "fread failed:" << get_uv_errmsg();
return nullptr;
}
_offset += size;
ret->setSize(size);
return ret;
}
if(_bodySuffix.size()){
auto ret = std::make_shared<BufferString>(_bodySuffix);
_offset = _totalSize;
_bodySuffix.clear();
return ret;
}
return nullptr;
}
public:
static string multiFormBodyPrefix(const StrCaseMap &args,const string &boundary,const string &fileName){
string MPboundary = string("--") + boundary;
_StrPrinter body;
for(auto &pr : args){
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n";
body << pr.second << "\r\n";
}
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n";
body << "Content-Type: application/octet-stream\r\n\r\n" ;
return body;
}
static string multiFormBodySuffix(const string &boundary){
string MPboundary = string("--") + boundary;
string endMPboundary = MPboundary + "--";
_StrPrinter body;
body << "\r\n" << endMPboundary;
return body;
}
static uint64_t fileSize(FILE *fp) {
auto current = ftell(fp);
fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */
auto end = ftell(fp); /* 得到文件大小 */
fseek(fp,current,SEEK_SET);
return end - current;
}
static string multiFormContentType(const string &boundary){
return StrPrinter << "multipart/form-data; boundary=" << boundary;
}
private:
FILE *_fp;
string _bodyPrefix;
string _bodySuffix;
uint64_t _offset = 0;
uint64_t _totalSize;
uint32_t _sliceSize;
};
class HttpClient : public TcpClient
{
public:
@ -71,7 +208,7 @@ public:
virtual void sendRequest(const string &url,float fTimeOutSec);
void clear(){
_header.clear();
_body.clear();
_body.reset();
_method.clear();
_path.clear();
_recvedResponse.clear();
@ -83,10 +220,18 @@ public:
void setHeader(const HttpHeader &header){
_header = header;
}
void addHeader(const string &key,const string &val){
HttpClient & addHeader(const string &key,const string &val,bool force = false){
if(!force){
_header.emplace(key,val);
}else{
_header[key] = val;
}
return *this;
}
void setBody(const string &body){
_body.reset(new HttpBodyString(body));
}
void setBody(const HttpBody::Ptr &body){
_body = body;
}
const string &responseStatus(){
@ -96,8 +241,6 @@ public:
return _parser.getValues();
}
protected:
bool _isHttps;
virtual void onResponseHeader(const string &status,const HttpHeader &headers){
DebugL << status;
};
@ -109,24 +252,28 @@ protected:
}
virtual void onRecvBytes(const char *data,int size);
virtual void onDisconnect(const SockException &ex){}
private:
protected:
virtual void onConnect(const SockException &ex) override;
virtual void onRecv(const Buffer::Ptr &pBuf) override;
virtual void onErr(const SockException &ex) override;
virtual void onSend() override;
virtual void onManager() override;
protected:
bool _isHttps;
private:
//send
HttpHeader _header;
string _body;
HttpBody::Ptr _body;
string _method;
string _path;
//recv
string _recvedResponse;
size_t _recvedBodySize;
size_t _totalBodySize;
int64_t _recvedBodySize;
int64_t _totalBodySize;
Parser _parser;
string _lastHost;
Ticker _aliveTicker;
float _fTimeOutSec = 0;
};

View File

@ -43,8 +43,6 @@ HttpDownloader::~HttpDownloader() {
void HttpDownloader::startDownload(const string& url, const string& filePath,bool bAppend,float timeOutSecond) {
_filePath = filePath;
_timeOutSecond = timeOutSecond;
_downloadTicker.resetTime();
if(_filePath.empty()){
_filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest();
}
@ -68,7 +66,6 @@ void HttpDownloader::startDownload(const string& url, const string& filePath,boo
}
void HttpDownloader::onResponseHeader(const string& status,const HttpHeader& headers) {
_downloadTicker.resetTime();
if(status != "200" && status != "206"){
//失败
shutdown();
@ -83,22 +80,11 @@ void HttpDownloader::onResponseHeader(const string& status,const HttpHeader& hea
}
void HttpDownloader::onResponseBody(const char* buf, size_t size, size_t recvedSize, size_t totalSize) {
_downloadTicker.resetTime();
if(_saveFile){
fwrite(buf,size,1,_saveFile);
}
}
//string getMd5Sum(const string &filePath){
// auto fp = File::createfile_file(filePath.data(),"rb");
// fseek(fp,0,SEEK_END);
// auto sz = ftell(fp);
// char tmp[sz];
// fseek(fp,0,SEEK_SET);
// auto rd = fread(tmp,1,sz,fp);
// InfoL << sz << " " << rd;
// fclose(fp);
// return MD5(string(tmp,sz)).hexdigest();
//}
void HttpDownloader::onResponseCompleted() {
closeFile();
//InfoL << "md5Sum:" << getMd5Sum(_filePath);
@ -128,14 +114,6 @@ void HttpDownloader::closeFile() {
}
}
void HttpDownloader::onManager(){
if(_downloadTicker.elapsedTime() > _timeOutSecond * 1000){
//超时
onDisconnect(SockException(Err_timeout,"download timeout"));
shutdown();
}
}
} /* namespace Http */
} /* namespace ZL */

View File

@ -52,16 +52,12 @@ private:
void onResponseBody(const char *buf,size_t size,size_t recvedSize,size_t totalSize) override;
void onResponseCompleted() override;
void onDisconnect(const SockException &ex) override;
void onManager() override;
void closeFile();
private:
FILE *_saveFile = nullptr;
string _filePath;
onDownloadResult _onResult;
uint32_t _timeOutSecond;
bool _bDownloadSuccess = false;
Ticker _downloadTicker;
};
} /* namespace Http */

View File

@ -59,18 +59,9 @@ void HttpRequester::onDisconnect(const SockException &ex){
void HttpRequester::startRequester(const string &url,const HttpRequesterResult &onResult , float timeOutSecond){
_onResult = onResult;
_resTicker.resetTime();
_timeOutSecond = timeOutSecond;
sendRequest(url,timeOutSecond);
}
void HttpRequester::onManager(){
if(_onResult && _resTicker.elapsedTime() > _timeOutSecond * 1000){
//超时
onDisconnect(SockException(Err_timeout,"wait http response timeout"));
shutdown();
}
}
}//namespace Http

View File

@ -39,18 +39,15 @@ public:
typedef std::function<void(const SockException &ex,const string &status,const HttpHeader &header,const string &strRecvBody)> HttpRequesterResult;
HttpRequester();
virtual ~HttpRequester();
void startRequester(const string &url,const HttpRequesterResult &onResult,float timeOutSecond = 10);
private:
void onResponseHeader(const string &status,const HttpHeader &headers) override;
void onResponseBody(const char *buf,size_t size,size_t recvedSize,size_t totalSize) override;
void onResponseCompleted() override;
void onDisconnect(const SockException &ex) override;
void onManager() override;
private:
string _strRecvBody;
HttpRequesterResult _onResult;
Ticker _resTicker;
float _timeOutSecond;
};
}//namespace Http

View File

@ -48,6 +48,7 @@ int main(int argc,char *argv[]){
//设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("stdout", LTrace));
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
{
///////////////////////////////http downloader///////////////////////
//下载器map
map<string, HttpDownloader::Ptr> downloaderMap;
@ -142,12 +143,48 @@ int main(int argc,char *argv[]){
}
});
///////////////////////////////http upload///////////////////////
//创建一个Http请求器
HttpRequester::Ptr requesterUploader(new HttpRequester());
//使用POST方式请求
requesterUploader->setMethod("POST");
//设置http请求头
HttpArgs argsUploader;
argsUploader["query"] = "test";
argsUploader["from"] = "en";
argsUploader["to"] = "zh";
argsUploader["transtype"] = "translang";
argsUploader["simple_means_flag"] = "3";
static string boundary = "0xKhTmLbOuNdArY";
HttpMultiFormBody::Ptr body(new HttpMultiFormBody(argsUploader, exePath(), boundary));
requesterUploader->setBody(body);
requesterUploader->addHeader("Content-Type", HttpMultiFormBody::multiFormContentType(boundary));
//开启请求
requesterUploader->startRequester("http://fanyi.baidu.com/langdetect",//url地址
[](const SockException &ex, //网络相关的失败信息,如果为空就代表成功
const string &status, //http回复的状态码比如说200/404
const HttpClient::HttpHeader &header, //http回复头
const string &strRecvBody) { //http回复body
DebugL << "=====================HttpRequester Uploader==========================";
if (ex) {
//网络相关的错误
WarnL << "network err:" << ex.getErrCode() << " " << ex.what();
} else {
//打印http回复信息
_StrPrinter printer;
for (auto &pr: header) {
printer << pr.first << ":" << pr.second << "\r\n";
}
InfoL << "status:" << status << "\r\n"
<< "header:\r\n" << (printer << endl)
<< "\r\nbody:" << strRecvBody;
}
});
//事件轮询
EventPoller::Instance().runLoop();
//清空下载器
downloaderMap.clear();
requesterGet.reset();
requesterPost.reset();
}
//程序开始退出
EventPoller::Destory();
AsyncTaskThread::Destory();