diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index a19d49b2..fe8696c5 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -117,9 +117,6 @@ public: unregist(); } - virtual bool regist() ; - virtual bool unregist() ; - static Ptr find(const string &schema, const string &vhost, const string &app, @@ -179,6 +176,10 @@ public: } } } + +protected: + bool regist() ; + bool unregist() ; private: template static bool searchMedia(const string &schema, diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index cc0021db..61e8e9a0 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -207,11 +207,9 @@ inline bool HttpSession::checkLiveFlvStream(){ m_mediaInfo.m_streamid.erase(m_mediaInfo.m_streamid.size() - 4);//去除.flv后缀 auto mediaSrc = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA,m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid)); - if(!mediaSrc || !mediaSrc->ready()){ + if(!mediaSrc){ //该rtmp源不存在 - sendNotFound(true); - shutdown(); - return true; + return false; } auto onRes = [this,mediaSrc](const string &err){ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 763e6054..fcb6a566 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -41,7 +41,7 @@ namespace ZL { namespace Http { -class HttpSession: public TcpSession,FlvMuxer { +class HttpSession: public TcpSession,public FlvMuxer { public: typedef StrCaseMap KeyValue; typedef std::functionready()){ - throw std::runtime_error("RtmpMediaSource 未准备好"); - } onWriteFlvHeader(media); @@ -176,6 +173,7 @@ void FlvRecorder::startRecord(const string &vhost, const string &app, const stri } void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &file_path) { + lock_guard lck(_file_mtx); //开辟文件写缓存 std::shared_ptr fileBuf(new char[FILE_BUF_SIZE],[](char *ptr){ if(ptr){ @@ -183,7 +181,7 @@ void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &f } }); //新建文件 - std::shared_ptr _file(File::createfile_file(file_path.data(),"wb"),[fileBuf](FILE *fp){ + _file.reset(File::createfile_file(file_path.data(),"wb"),[fileBuf](FILE *fp){ if(fp){ fflush(fp); fclose(fp); @@ -221,6 +219,12 @@ std::shared_ptr FlvRecorder::getSharedPtr() { return shared_from_this(); } +FlvRecorder::FlvRecorder() { +} + +FlvRecorder::~FlvRecorder() { +} + }//namespace Rtmp }//namespace ZL diff --git a/src/Rtmp/FlvMuxer.h b/src/Rtmp/FlvMuxer.h index a118a242..64df3c1b 100644 --- a/src/Rtmp/FlvMuxer.h +++ b/src/Rtmp/FlvMuxer.h @@ -16,6 +16,7 @@ namespace Rtmp { class FlvMuxer{ public: + typedef std::shared_ptr Ptr; FlvMuxer(); virtual ~FlvMuxer(); void stop(); @@ -38,6 +39,7 @@ private: class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this{ public: + typedef std::shared_ptr Ptr; FlvRecorder(); virtual ~FlvRecorder(); void startRecord(const string &vhost,const string &app,const string &stream,const string &file_path); diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index 73e5fc8f..d9d9db82 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -34,6 +34,7 @@ #include #include "amf.h" #include "Rtmp.h" +#include "RtmpParser.h" #include "Common/config.h" #include "Common/MediaSender.h" #include "Common/MediaSource.h" @@ -71,6 +72,7 @@ public: } const AMFValue &getMetaData() const { + lock_guard lock(m_mtxMap); return m_metadata; } template @@ -80,29 +82,52 @@ public: f(pr.second); } } - bool ready() const { - lock_guard lock(m_mtxMap); - return (m_mapCfgFrame.size() != 0); - } + virtual void onGetMetaData(const AMFValue &_metadata) { + lock_guard lock(m_mtxMap); m_metadata = _metadata; + RtmpParser parser(_metadata); + m_iCfgFrameSize = parser.containAudio() + parser.containVideo(); + if(ready()){ + MediaSource::regist(); + m_bRegisted = true; + } else{ + m_bAsyncRegist = true; + } } virtual void onGetMedia(const RtmpPacket::Ptr &pkt) { - if (pkt->isCfgFrame()) { + if(!m_bRegisted){ lock_guard lock(m_mtxMap); - m_mapCfgFrame.emplace(pkt->typeId, pkt); + if (m_mapCfgFrame.size() != m_iCfgFrameSize && pkt->isCfgFrame()) { + m_mapCfgFrame.emplace(pkt->typeId, pkt); + + if( m_mapCfgFrame.size() == m_iCfgFrameSize && m_bAsyncRegist){ + m_bAsyncRegist = false; + MediaSource::regist(); + m_bRegisted = true; + } + } } + auto _ring = m_pRing; m_thPool.async([_ring,pkt]() { _ring->write(pkt,pkt->isVideoKeyFrame()); }); } +private: + bool ready(){ + lock_guard lock(m_mtxMap); + return m_iCfgFrameSize != -1 && m_iCfgFrameSize == m_mapCfgFrame.size(); + } protected: AMFValue m_metadata; unordered_map m_mapCfgFrame; mutable recursive_mutex m_mtxMap; RingBuffer::Ptr m_pRing; //rtp环形缓冲 ThreadPool &m_thPool; + int m_iCfgFrameSize = -1; + bool m_bAsyncRegist = false; + bool m_bRegisted = false; }; } /* namespace Rtmp */ diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 66116907..6d3ca5b0 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -194,10 +194,7 @@ inline void RtmpPusher::send_metaData(){ if (!src) { throw std::runtime_error("the media source was released"); } - if (!src->ready()) { - throw std::runtime_error("the media source is not ready"); - } - + AMFEncoder enc; enc << "@setDataFrame" << "onMetaData" << src->getMetaData(); sendRequest(MSG_DATA, enc.data()); diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 3fd09c9a..9e43ed64 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -167,7 +167,6 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { shutdown(); return; } - m_bPublisherSrcRegisted = false; m_pPublisherSrc.reset(new RtmpToRtspMediaSource(m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid)); m_pPublisherSrc->setListener(dynamic_pointer_cast(shared_from_this())); }; @@ -212,13 +211,6 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar m_mediaInfo.m_app, m_mediaInfo.m_streamid, true)); - if(src ){ - if(!src->ready()){ - //流未准备好那么相当于没有 - src = nullptr; - } - } - //是否鉴权成功 bool authSuccess = err.empty(); if(authSuccess && !src && tryDelay ){ @@ -509,10 +501,6 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { chunkData.timeStamp = m_stampTicker[chunkData.typeId % 2].elapsedTime(); } m_pPublisherSrc->onGetMedia(std::make_shared(chunkData)); - if(!m_bPublisherSrcRegisted && m_pPublisherSrc->ready()){ - m_bPublisherSrcRegisted = true; - m_pPublisherSrc->regist(); - } } break; default: diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index eaeed8f0..d05a6422 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -102,7 +102,6 @@ private: SmoothTicker m_stampTicker[2];//时间戳生产器 RingBuffer::RingReader::Ptr m_pRingReader; std::shared_ptr m_pPublisherSrc; - bool m_bPublisherSrcRegisted = false; std::weak_ptr m_pPlayerSrc; uint32_t m_aui32FirstStamp[2] = {0}; //消耗的总流量 diff --git a/src/Rtmp/RtmpToRtspMediaSource.cpp b/src/Rtmp/RtmpToRtspMediaSource.cpp index 09b9775f..930d4ff4 100644 --- a/src/Rtmp/RtmpToRtspMediaSource.cpp +++ b/src/Rtmp/RtmpToRtspMediaSource.cpp @@ -45,19 +45,6 @@ RtmpToRtspMediaSource::RtmpToRtspMediaSource(const string &vhost, } RtmpToRtspMediaSource::~RtmpToRtspMediaSource() {} -bool RtmpToRtspMediaSource::regist() { - if (m_pRtspSrc) { - m_pRtspSrc->regist(); - } - return MediaSource::regist(); -} - -bool RtmpToRtspMediaSource::unregist() { - if(m_pRtspSrc){ - m_pRtspSrc->unregist(); - } - return MediaSource::unregist(); -} void RtmpToRtspMediaSource::onGetH264(const H264Frame &frame) { if(m_pRecorder){ @@ -166,7 +153,6 @@ void RtmpToRtspMediaSource::makeSDP() { m_pRtspSrc.reset(new RtspMediaSource(getVhost(),getApp(),getId())); m_pRtspSrc->setListener(m_listener); m_pRtspSrc->onGetSDP(strSDP); - m_pRtspSrc->regist(); } diff --git a/src/Rtmp/RtmpToRtspMediaSource.h b/src/Rtmp/RtmpToRtspMediaSource.h index 380a567a..f01cc594 100644 --- a/src/Rtmp/RtmpToRtspMediaSource.h +++ b/src/Rtmp/RtmpToRtspMediaSource.h @@ -63,9 +63,6 @@ public: bool bEnableMp4 = false); virtual ~RtmpToRtspMediaSource(); - bool regist() override; - bool unregist() override; - void onGetMetaData(const AMFValue &_metadata) override { try { m_pParser.reset(new RtmpParser(_metadata)); diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index 0fbedb1f..fc928181 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -87,6 +87,7 @@ public: virtual void onGetSDP(const string& sdp) { //派生类设置该媒体源媒体描述信息 m_strSdp = sdp; + regist(); } virtual void onGetRTP(const RtpPacket::Ptr &rtppt, bool keyPos) { auto &trackRef = m_mapTracks[rtppt->type]; diff --git a/src/Rtsp/RtspToRtmpMediaSource.cpp b/src/Rtsp/RtspToRtmpMediaSource.cpp index bea5ef82..3c255da7 100644 --- a/src/Rtsp/RtspToRtmpMediaSource.cpp +++ b/src/Rtsp/RtspToRtmpMediaSource.cpp @@ -50,20 +50,6 @@ RtspToRtmpMediaSource::~RtspToRtmpMediaSource() { } -bool RtspToRtmpMediaSource::regist() { - if (m_pRtmpSrc) { - m_pRtmpSrc->regist(); - } - return MediaSource::regist(); -} - -bool RtspToRtmpMediaSource::unregist() { - if (m_pRtmpSrc) { - m_pRtmpSrc->unregist(); - } - return MediaSource::unregist(); -} - void RtspToRtmpMediaSource::makeVideoConfigPkt() { int8_t flags = 7; //h.264 flags |= (FLV_KEY_FRAME << 4); diff --git a/src/Rtsp/RtspToRtmpMediaSource.h b/src/Rtsp/RtspToRtmpMediaSource.h index 92ba4d82..b8a05407 100644 --- a/src/Rtsp/RtspToRtmpMediaSource.h +++ b/src/Rtsp/RtspToRtmpMediaSource.h @@ -68,8 +68,6 @@ public: } RtspMediaSource::onGetRTP(pRtppkt, bKeyPos); } - virtual bool regist() override ; - virtual bool unregist() override; int readerCount(){ return getRing()->readerCount() + (m_pRtmpSrc ? m_pRtmpSrc->getRing()->readerCount() : 0); diff --git a/tests/test_server.cpp b/tests/test_server.cpp index cac09e5d..122f9677 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -34,6 +34,7 @@ #include "Http/HttpSession.h" #include "Shell/ShellSession.h" #include "Util/MD5.h" +#include "Rtmp/FlvMuxer.h" #ifdef ENABLE_OPENSSL #include "Util/SSLBox.h" @@ -133,6 +134,29 @@ static onceToken s_token([](){ }); }); + //此处用于测试rtmp保存为flv录像,保存在http根目录下 + NoticeCenter::Instance().addListener(nullptr,Config::Broadcast::kBroadcastMediaChanged,[](BroadcastMediaChangedArgs){ + if(schema == RTMP_SCHEMA){ + static map s_mapFlvRecorder; + static mutex s_mtxFlvRecorder; + lock_guard lck(s_mtxFlvRecorder); + if(bRegist){ + GET_CONFIG_AND_REGISTER(string,http_root,Config::Http::kRootPath); + auto path = http_root + "/" + vhost + "/" + app + "/" + stream + "_" + to_string(time(NULL)) + ".flv"; + FlvRecorder::Ptr recorder(new FlvRecorder); + try{ + recorder->startRecord(dynamic_pointer_cast(sender.shared_from_this()),path); + s_mapFlvRecorder[vhost + "/" + app + "/" + stream] = recorder; + }catch(std::exception &ex){ + WarnL << ex.what(); + } + }else{ + s_mapFlvRecorder.erase(vhost + "/" + app + "/" + stream); + } + } + }); + + }, nullptr); #if !defined(SIGHUP)