抽象FLV复用器

This commit is contained in:
xiongziliang 2018-08-30 19:29:54 +08:00
parent 3ee2a14cf6
commit ef9ebc89e0
4 changed files with 332 additions and 144 deletions

View File

@ -175,9 +175,8 @@ void HttpSession::onError(const SockException& err) {
//WarnL << err.what(); //WarnL << err.what();
GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold); GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold);
if(m_previousTagSize > iFlowThreshold * 1024){ if(m_ui64TotalBytes > iFlowThreshold * 1024){
uint64_t totalBytes = m_previousTagSize; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_ui64TotalBytes,*this);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,totalBytes,*this);
} }
} }
@ -224,73 +223,18 @@ inline bool HttpSession::checkLiveFlvStream(){
} }
//找到rtmp源发送http头负载后续发送 //找到rtmp源发送http头负载后续发送
sendResponse("200 OK", makeHttpHeader(false,0,get_mime_type(".flv")), ""); sendResponse("200 OK", makeHttpHeader(false,0,get_mime_type(".flv")), "");
//发送flv文件头
char flv_file_header[] = "FLV\x1\x5\x0\x0\x0\x9"; // have audio and have video
bool is_have_audio = false,is_have_video = false;
mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
if(pkt->typeId == MSG_VIDEO){
is_have_video = true;
}
if(pkt->typeId == MSG_AUDIO){
is_have_audio = true;
}
});
if (is_have_audio && is_have_video) {
flv_file_header[4] = 0x05;
} else if (is_have_audio && !is_have_video) {
flv_file_header[4] = 0x04;
} else if (!is_have_audio && is_have_video) {
flv_file_header[4] = 0x01;
} else {
flv_file_header[4] = 0x00;
}
//send flv header
send(flv_file_header, sizeof(flv_file_header) - 1);
//send metadata
AMFEncoder invoke;
invoke << "onMetaData" << mediaSrc->getMetaData();
sendRtmp(MSG_DATA, invoke.data(), 0);
//send config frame
mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
onSendMedia(pkt);
});
//开始发送rtmp负载 //开始发送rtmp负载
//关闭tcp_nodelay ,优化性能 //关闭tcp_nodelay ,优化性能
SockUtil::setNoDelay(_sock->rawFD(),false); SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(sock_flags); (*this) << SocketFlags(sock_flags);
m_pRingReader = mediaSrc->getRing()->attach(); try{
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); start(mediaSrc);
m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ }catch (std::exception &ex){
auto strongSelf = weakSelf.lock(); //该rtmp源不存在
if(!strongSelf) { shutdown();
return; }
}
strongSelf->async([pkt,weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onSendMedia(pkt);
});
});
m_pRingReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->async_first([weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->shutdown();
});
});
}; };
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
@ -682,86 +626,43 @@ void HttpSession::responseDelay(const string &Origin,bool bClose,
} }
inline void HttpSession::sendNotFound(bool bClose) { inline void HttpSession::sendNotFound(bool bClose) {
GET_CONFIG_AND_REGISTER(string,notFound,Config::Http::kNotFound); GET_CONFIG_AND_REGISTER(string,notFound,Config::Http::kNotFound);
sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound);
} }
void HttpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
auto modifiedStamp = pkt->timeStamp; void HttpSession::onWrite(const Buffer::Ptr &buffer) {
auto &firstStamp = m_aui32FirstStamp[pkt->typeId % 2]; weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
if(!firstStamp){ async([weakSelf,buffer](){
firstStamp = modifiedStamp; auto strongSelf = weakSelf.lock();
} if(!strongSelf) {
if(modifiedStamp >= firstStamp){ return;
//计算时间戳增量 }
modifiedStamp -= firstStamp; strongSelf->m_ui64TotalBytes += buffer->size();
}else{ strongSelf->send(buffer);
//发生回环,重新计算时间戳增量 });
CLEAR_ARR(m_aui32FirstStamp);
modifiedStamp = 0;
}
sendRtmp(pkt, modifiedStamp);
} }
#if defined(_WIN32) void HttpSession::onWrite(const char *data, int len) {
#pragma pack(push, 1) BufferRaw::Ptr buffer(new BufferRaw);
#endif // defined(_WIN32) buffer->assign(data,len);
class RtmpTagHeader { weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
public: async([weakSelf,buffer](){
uint8_t type = 0; auto strongSelf = weakSelf.lock();
uint8_t data_size[3] = {0}; if(!strongSelf) {
uint8_t timestamp[3] = {0}; return;
uint8_t timestamp_ex = 0; }
uint8_t streamid[3] = {0}; /* Always 0. */ strongSelf->m_ui64TotalBytes += buffer->size();
}PACKED; strongSelf->send(buffer);
});
#if defined(_WIN32)
#pragma pack(pop)
#endif // defined(_WIN32)
class BufferRtmp : public Buffer{
public:
typedef std::shared_ptr<BufferRtmp> Ptr;
BufferRtmp(const RtmpPacket::Ptr & pkt):_rtmp(pkt){}
virtual ~BufferRtmp(){}
char *data() override {
return (char *)_rtmp->strBuf.data();
}
uint32_t size() const override {
return _rtmp->strBuf.size();
}
private:
RtmpPacket::Ptr _rtmp;
};
void HttpSession::sendRtmp(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) {
auto size = htonl(m_previousTagSize);
send((char *)&size,4);//send PreviousTagSize
RtmpTagHeader header;
header.type = pkt->typeId;
set_be24(header.data_size, pkt->strBuf.size());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
send((char *)&header, sizeof(header));//send tag header
send(std::make_shared<BufferRtmp>(pkt));//send tag data
m_previousTagSize += (pkt->strBuf.size() + sizeof(header));
m_ticker.resetTime();
} }
void HttpSession::sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp) { void HttpSession::onDetach() {
auto size = htonl(m_previousTagSize); safeShutdown();
send((char *)&size,4);//send PreviousTagSize }
RtmpTagHeader header;
header.type = ui8Type; std::shared_ptr<FlvMuxer> HttpSession::getSharedPtr(){
set_be24(header.data_size, strBuf.size()); return dynamic_pointer_cast<FlvMuxer>(shared_from_this());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
send((char *)&header, sizeof(header));//send tag header
send(strBuf);//send tag data
m_previousTagSize += (strBuf.size() + sizeof(header));
m_ticker.resetTime();
} }
} /* namespace Http */ } /* namespace Http */

View File

@ -31,6 +31,7 @@
#include "Rtsp/Rtsp.h" #include "Rtsp/Rtsp.h"
#include "Network/TcpSession.h" #include "Network/TcpSession.h"
#include "Rtmp/RtmpMediaSource.h" #include "Rtmp/RtmpMediaSource.h"
#include "Rtmp/FlvMuxer.h"
using namespace std; using namespace std;
using namespace ZL::Rtmp; using namespace ZL::Rtmp;
@ -40,7 +41,7 @@ namespace ZL {
namespace Http { namespace Http {
class HttpSession: public TcpSession { class HttpSession: public TcpSession,FlvMuxer {
public: public:
typedef StrCaseMap KeyValue; typedef StrCaseMap KeyValue;
typedef std::function<void(const string &codeOut, typedef std::function<void(const string &codeOut,
@ -57,6 +58,12 @@ public:
static string urlDecode(const string &str); static string urlDecode(const string &str);
protected: protected:
void onRecv(const char *data,int size); void onRecv(const char *data,int size);
//FlvMuxer override
void onWrite(const Buffer::Ptr &data) override ;
void onWrite(const char *data,int len) override;
void onDetach() override;
std::shared_ptr<FlvMuxer> getSharedPtr() override;
private: private:
typedef enum typedef enum
{ {
@ -70,16 +77,11 @@ private:
string m_strRcvBuf; string m_strRcvBuf;
Ticker m_ticker; Ticker m_ticker;
uint32_t m_iReqCnt = 0; uint32_t m_iReqCnt = 0;
//消耗的总流量
uint64_t m_ui64TotalBytes = 0;
//flv over http //flv over http
uint32_t m_aui32FirstStamp[2] = {0};
uint32_t m_previousTagSize = 0;
MediaInfo m_mediaInfo; MediaInfo m_mediaInfo;
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr m_pRingReader;
void onSendMedia(const RtmpPacket::Ptr &pkt);
void sendRtmp(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp);
void sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp);
inline HttpCode parserHttpReq(const string &); inline HttpCode parserHttpReq(const string &);
inline HttpCode Handle_Req_GET(); inline HttpCode Handle_Req_GET();

226
src/Rtmp/FlvMuxer.cpp Normal file
View File

@ -0,0 +1,226 @@
//
// Created by xzl on 2018/8/30.
//
#include "Util/File.h"
#include "FlvMuxer.h"
#include "utils.h"
#define FILE_BUF_SIZE (64 * 1024)
namespace ZL {
namespace Rtmp {
FlvMuxer::FlvMuxer() {
}
FlvMuxer::~FlvMuxer() {
}
void FlvMuxer::start(const RtmpMediaSource::Ptr &media) {
if(!media){
throw std::runtime_error("RtmpMediaSource 无效");
}
if(!media->ready()){
throw std::runtime_error("RtmpMediaSource 未准备好");
}
onWriteFlvHeader(media);
std::weak_ptr<FlvMuxer> weakSelf = getSharedPtr();
_ring_reader = media->getRing()->attach();
_ring_reader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
strongSelf->onDetach();
});
_ring_reader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
strongSelf->onWriteRtmp(pkt);
});
}
void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) {
m_previousTagSize = 0;
CLEAR_ARR(m_aui32FirstStamp);
//发送flv文件头
char flv_file_header[] = "FLV\x1\x5\x0\x0\x0\x9"; // have audio and have video
bool is_have_audio = false,is_have_video = false;
mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
if(pkt->typeId == MSG_VIDEO){
is_have_video = true;
}
if(pkt->typeId == MSG_AUDIO){
is_have_audio = true;
}
});
if (is_have_audio && is_have_video) {
flv_file_header[4] = 0x05;
} else if (is_have_audio && !is_have_video) {
flv_file_header[4] = 0x04;
} else if (!is_have_audio && is_have_video) {
flv_file_header[4] = 0x01;
} else {
flv_file_header[4] = 0x00;
}
//flv header
onWrite(flv_file_header, sizeof(flv_file_header) - 1);
//metadata
AMFEncoder invoke;
invoke << "onMetaData" << mediaSrc->getMetaData();
onWriteFlvTag(MSG_DATA, invoke.data(), 0);
//config frame
mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
onWriteRtmp(pkt);
});
}
#if defined(_WIN32)
#pragma pack(push, 1)
#endif // defined(_WIN32)
class RtmpTagHeader {
public:
uint8_t type = 0;
uint8_t data_size[3] = {0};
uint8_t timestamp[3] = {0};
uint8_t timestamp_ex = 0;
uint8_t streamid[3] = {0}; /* Always 0. */
}PACKED;
#if defined(_WIN32)
#pragma pack(pop)
#endif // defined(_WIN32)
class BufferRtmp : public Buffer{
public:
typedef std::shared_ptr<BufferRtmp> Ptr;
BufferRtmp(const RtmpPacket::Ptr & pkt):_rtmp(pkt){}
virtual ~BufferRtmp(){}
char *data() override {
return (char *)_rtmp->strBuf.data();
}
uint32_t size() const override {
return _rtmp->strBuf.size();
}
private:
RtmpPacket::Ptr _rtmp;
};
void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) {
auto size = htonl(m_previousTagSize);
onWrite((char *)&size,4);//onWrite PreviousTagSize
RtmpTagHeader header;
header.type = pkt->typeId;
set_be24(header.data_size, pkt->strBuf.size());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
onWrite((char *)&header, sizeof(header));//onWrite tag header
onWrite(std::make_shared<BufferRtmp>(pkt));//onWrite tag data
m_previousTagSize += (pkt->strBuf.size() + sizeof(header));
}
void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const std::string &strBuf, uint32_t ui32TimeStamp) {
auto size = htonl(m_previousTagSize);
onWrite((char *)&size,4);//onWrite PreviousTagSize
RtmpTagHeader header;
header.type = ui8Type;
set_be24(header.data_size, strBuf.size());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
onWrite((char *)&header, sizeof(header));//onWrite tag header
onWrite(std::make_shared<BufferString>(strBuf));//onWrite tag data
m_previousTagSize += (strBuf.size() + sizeof(header));
}
void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt) {
auto modifiedStamp = pkt->timeStamp;
auto &firstStamp = m_aui32FirstStamp[pkt->typeId % 2];
if(!firstStamp){
firstStamp = modifiedStamp;
}
if(modifiedStamp >= firstStamp){
//计算时间戳增量
modifiedStamp -= firstStamp;
}else{
//发生回环,重新计算时间戳增量
CLEAR_ARR(m_aui32FirstStamp);
modifiedStamp = 0;
}
onWriteFlvTag(pkt, modifiedStamp);
}
void FlvMuxer::stop() {
if(_ring_reader){
_ring_reader.reset();
onDetach();
}
}
///////////////////////////////////////////////////////FlvRecorder/////////////////////////////////////////////////////
void FlvRecorder::startRecord(const string &vhost, const string &app, const string &stream,const string &file_path) {
startRecord(dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,vhost,app,stream,false)),file_path);
}
void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &file_path) {
//开辟文件写缓存
std::shared_ptr<char> fileBuf(new char[FILE_BUF_SIZE],[](char *ptr){
if(ptr){
delete [] ptr;
}
});
//新建文件
std::shared_ptr<FILE> _file(File::createfile_file(file_path.data(),"wb"),[fileBuf](FILE *fp){
if(fp){
fflush(fp);
fclose(fp);
}
});
if (!_file){
throw std::runtime_error( StrPrinter << "打开文件失败:" << file_path);
}
//设置文件写缓存
setvbuf( _file.get(), fileBuf.get(),_IOFBF, FILE_BUF_SIZE);
start(media);
}
void FlvRecorder::onWrite(const Buffer::Ptr &data) {
lock_guard<recursive_mutex> lck(_file_mtx);
if(_file){
fwrite(data->data(),data->size(),1,_file.get());
}
}
void FlvRecorder::onWrite(const char *data, int len) {
lock_guard<recursive_mutex> lck(_file_mtx);
if(_file){
fwrite(data,len,1,_file.get());
}
}
void FlvRecorder::onDetach() {
lock_guard<recursive_mutex> lck(_file_mtx);
_file.reset();
}
std::shared_ptr<FlvMuxer> FlvRecorder::getSharedPtr() {
return shared_from_this();
}
}//namespace Rtmp
}//namespace ZL

59
src/Rtmp/FlvMuxer.h Normal file
View File

@ -0,0 +1,59 @@
//
// Created by xzl on 2018/8/30.
//
#ifndef ZLMEDIAKIT_FLVRECORDER_H
#define ZLMEDIAKIT_FLVRECORDER_H
#include "Rtmp.h"
#include "RtmpMediaSource.h"
#include "Network/Socket.h"
using namespace ZL::Network;
namespace ZL {
namespace Rtmp {
class FlvMuxer{
public:
FlvMuxer();
virtual ~FlvMuxer();
void stop();
protected:
void start(const RtmpMediaSource::Ptr &media);
virtual void onWrite(const Buffer::Ptr &data) = 0;
virtual void onWrite(const char *data,int len) = 0;
virtual void onDetach() = 0;
virtual std::shared_ptr<FlvMuxer> getSharedPtr() = 0;
private:
void onWriteFlvHeader(const RtmpMediaSource::Ptr &media);
void onWriteRtmp(const RtmpPacket::Ptr &pkt);
void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp);
void onWriteFlvTag(uint8_t ui8Type, const std::string &strBuf, uint32_t ui32TimeStamp);
private:
RtmpMediaSource::RingType::RingReader::Ptr _ring_reader;
uint32_t m_aui32FirstStamp[2] = {0};
uint32_t m_previousTagSize = 0;
};
class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this<FlvRecorder>{
public:
FlvRecorder();
virtual ~FlvRecorder();
void startRecord(const string &vhost,const string &app,const string &stream,const string &file_path);
void startRecord(const RtmpMediaSource::Ptr &media,const string &file_path);
private:
virtual void onWrite(const Buffer::Ptr &data) override ;
virtual void onWrite(const char *data,int len) override;
virtual void onDetach() override;
virtual std::shared_ptr<FlvMuxer> getSharedPtr() override;
private:
std::shared_ptr<FILE> _file;
recursive_mutex _file_mtx;
};
}//namespace Rtmp
}//namespace ZL
#endif //ZLMEDIAKIT_FLVRECORDER_H