From 8f8e4abc39f53230a615a463a86ecfbe43cbab16 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Fri, 15 Dec 2017 16:01:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81http-flv=E7=9B=B4=E6=92=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpClientImp.cpp | 7 ++ src/Http/HttpClientImp.h | 1 + src/Http/HttpSession.cpp | 165 ++++++++++++++++++++++++++++++++++--- src/Http/HttpSession.h | 9 ++ src/Http/HttpsSession.h | 5 ++ 5 files changed, 174 insertions(+), 13 deletions(-) diff --git a/src/Http/HttpClientImp.cpp b/src/Http/HttpClientImp.cpp index f695b2f1..28d65510 100644 --- a/src/Http/HttpClientImp.cpp +++ b/src/Http/HttpClientImp.cpp @@ -85,6 +85,13 @@ int HttpClientImp::send(const string& str) { } return HttpClient::send(str); } +int HttpClientImp::send(string &&str){ + if(_sslBox){ + _sslBox->onSend(str.data(),str.size()); + return str.size(); + } + return HttpClient::send(std::move(str)); +} int HttpClientImp::send(const char* str, int len) { if(_sslBox){ diff --git a/src/Http/HttpClientImp.h b/src/Http/HttpClientImp.h index c397daff..fa8cfe46 100644 --- a/src/Http/HttpClientImp.h +++ b/src/Http/HttpClientImp.h @@ -55,6 +55,7 @@ private: #ifdef ENABLE_OPENSSL virtual void onRecvBytes(const char *data,int size) override; virtual int send(const string &str) override; + virtual int send(string &&str) override; virtual int send(const char *str, int len) override; std::shared_ptr _sslBox; #endif //ENABLE_OPENSSL diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 72684837..02f01fb5 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -40,7 +40,7 @@ #include "Util/onceToken.h" #include "Util/mini.h" #include "Util/NoticeCenter.h" - +#include "Rtmp/utils.h" using namespace ZL::Util; @@ -58,7 +58,7 @@ static const char* get_mime_type(const char* name) { const char* dot; dot = strrchr(name, '.'); - static unordered_map mapType; + static HttpSession::KeyValue mapType; static onceToken token([&]() { mapType.emplace(".html","text/html"); mapType.emplace(".htm","text/html"); @@ -85,13 +85,12 @@ get_mime_type(const char* name) { mapType.emplace(".mp3","audio/mpeg"); mapType.emplace(".ogg","application/ogg"); mapType.emplace(".pac","application/x-ns-proxy-autoconfig"); + mapType.emplace(".flv","video/x-flv"); }, nullptr); if(!dot){ return "text/plain"; } - string strDot(dot); - transform(strDot.begin(), strDot.end(), strDot.begin(), (int (*)(int))tolower); - auto it = mapType.find(strDot); + auto it = mapType.find(dot); if (it == mapType.end()) { return "text/plain"; } @@ -173,13 +172,108 @@ void HttpSession::onManager() { } } +inline bool HttpSession::checkLiveFlvStream(){ + auto pos = strrchr(m_parser.Url().data(),'.'); + if(!pos){ + //未找到".flv"后缀 + return false; + } + if(strcasecmp(pos,".flv") != 0){ + //未找到".flv"后缀 + return false; + } + auto app = FindField(m_parser.Url().data(),"/","/"); + auto stream = FindField(m_parser.Url().data(),(string("/") + app + "/").data(),"."); + if(app.empty() || stream.empty()){ + //不能拆分成2级url + return false; + } + //TO-DO + auto mediaSrc = RtmpMediaSource::find(app,stream); + if(!mediaSrc){ + //该rtmp源不存在 + return false; + } + if(!mediaSrc->ready()){ + //未准备好 + return false; + } + //找到rtmp源,发送http头,负载后续发送 + sendResponse("200 OK", makeHttpHeader(false,0,get_mime_type(m_parser.Url().data())), ""); + //发送flv文件头 + char flv_file_header[] = "FLV\x1\x5\x0\x0\x0\x9"; // have audio and have video + bool is_have_audio = false,is_have_video = false; + + mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ + if(pkt->typeId == MSG_VIDEO){ + is_have_video = true; + } + if(pkt->typeId == MSG_AUDIO){ + is_have_audio = true; + } + }); + + if (is_have_audio && is_have_video) { + flv_file_header[4] = 0x05; + } else if (is_have_audio && !is_have_video) { + flv_file_header[4] = 0x04; + } else if (!is_have_audio && is_have_video) { + flv_file_header[4] = 0x01; + } else { + flv_file_header[4] = 0x00; + } + //send flv header + send(flv_file_header, sizeof(flv_file_header) - 1); + //send metadata + AMFEncoder invoke; + invoke << "onMetaData" << mediaSrc->getMetaData(); + sendRtmp(MSG_DATA, invoke.data(), 0); + //send config frame + mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ + onSendMedia(pkt); + }); + + //开始发送rtmp负载 + m_pRingReader = mediaSrc->getRing()->attach(); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf) { + return; + } + strongSelf->async([pkt,weakSelf](){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf) { + return; + } + strongSelf->onSendMedia(pkt); + }); + }); + m_pRingReader->setDetachCB([weakSelf](){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf) { + return; + } + strongSelf->async_first([weakSelf](){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf) { + return; + } + strongSelf->shutdown(); + }); + }); + return true; + +} inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { //先看看该http事件是否被拦截 if(emitHttpEvent(false)){ return Http_success; } + if(checkLiveFlvStream()){ + return Http_success; + } //事件未被拦截,则认为是http下载请求 - string strFile = m_strPath + m_parser.Url(); /////////////HTTP连接是否需要被关闭//////////////// static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as(); @@ -235,7 +329,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { //先回复HTTP头部分 sendResponse(pcHttpResult, httpHeader, ""); if (iRangeEnd - iRangeStart < 0) { - //文件时空的! + //文件是空的! return eHttpCode; } //回复Content部分 @@ -388,15 +482,17 @@ inline HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iC static uint32_t keepAliveSec = mINI::Instance()[Config::Http::kKeepAliveSecond].as(); static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as(); + headerOut.emplace("Date", dateStr()); headerOut.emplace("Server", serverName); headerOut.emplace("Connection", bClose ? "close" : "keep-alive"); - if(!bClose){ - headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl); - } - headerOut.emplace("Date", dateStr()); - if(iContentSize >=0 && pcContentType !=nullptr){ + if(!bClose){ + headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl); + } + if(pcContentType){ auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl; - headerOut.emplace("Content-Type",strContentType.data()); + headerOut.emplace("Content-Type",strContentType); + } + if(iContentSize > 0){ headerOut.emplace("Content-Length", StrPrinter<timeStamp; + auto &firstStamp = m_aui32FirstStamp[pkt->typeId % 2]; + if(!firstStamp){ + firstStamp = modifiedStamp; + } + if(modifiedStamp >= firstStamp){ + //计算时间戳增量 + modifiedStamp -= firstStamp; + }else{ + //发生回环,重新计算时间戳增量 + CLEAR_ARR(m_aui32FirstStamp); + modifiedStamp = 0; + } + sendRtmp(pkt->typeId, pkt->strBuf, modifiedStamp); +} + +class RtmpTagHeader { +public: + uint8_t type = 0; + uint8_t data_size[3] = {0}; + uint8_t timestamp[3] = {0}; + uint8_t timestamp_ex = 0; + uint8_t streamid[3] = {0}; /* Always 0. */ +}PACKED; + +void HttpSession::sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp) { + auto size = htonl(m_previousTagSize); + send((char *)&size,4);//send PreviousTagSize + + RtmpTagHeader header; + header.type = ui8Type; + set_be24(header.data_size, strBuf.size()); + + header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); + set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); + + send((char *)&header, sizeof(header));//send tag header + send(strBuf);//send tag data + m_previousTagSize += (strBuf.size() + sizeof(header) + 4); + m_ticker.resetTime(); +} + } /* namespace Http */ } /* namespace ZL */ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index a46037a1..e169fcff 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -30,8 +30,10 @@ #include "Common/config.h" #include "Rtsp/Rtsp.h" #include "Network/TcpLimitedSession.h" +#include "Rtmp/RtmpMediaSource.h" using namespace std; +using namespace ZL::Rtmp; using namespace ZL::Network; namespace ZL { @@ -71,10 +73,17 @@ private: Ticker m_ticker; uint32_t m_iReqCnt = 0; + //flv over http + uint32_t m_aui32FirstStamp[2] = {0}; + uint32_t m_previousTagSize = 0; + RingBuffer::RingReader::Ptr m_pRingReader; + void onSendMedia(const RtmpPacket::Ptr &pkt); + void sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp); inline HttpCode parserHttpReq(const string &); inline HttpCode Handle_Req_GET(); inline HttpCode Handle_Req_POST(); + inline bool checkLiveFlvStream(); inline bool emitHttpEvent(bool doInvoke); inline void urlDecode(Parser &parser); inline bool makeMeun(const string &strFullPath, string &strRet); diff --git a/src/Http/HttpsSession.h b/src/Http/HttpsSession.h index 02e0bb6f..88ffbb07 100644 --- a/src/Http/HttpsSession.h +++ b/src/Http/HttpsSession.h @@ -76,6 +76,11 @@ private: m_sslBox.onSend(buf.data(), buf.size()); return buf.size(); } + virtual int send(string &&buf) override{ + TimeTicker(); + m_sslBox.onSend(buf.data(), buf.size()); + return buf.size(); + } virtual int send(const char *buf, int size) override{ TimeTicker(); m_sslBox.onSend(buf, size);