From 99a55ddaaa4d77924668490e488cbd6296ad9d77 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 9 Apr 2020 16:19:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=90=88=E5=B9=B6=E5=86=99?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/config.ini | 3 + src/Common/MediaSource.cpp | 46 +++++++++++++++ src/Common/MediaSource.h | 111 +++++++++++++++++++++++++++++++++++++ src/Common/config.cpp | 2 + src/Common/config.h | 3 + src/Http/HttpSession.cpp | 25 ++++++--- src/Http/HttpSession.h | 2 +- src/Rtmp/FlvMuxer.cpp | 35 +++++++----- src/Rtmp/FlvMuxer.h | 10 ++-- src/Rtmp/RtmpMediaSource.h | 41 ++++++++++++-- src/Rtmp/RtmpPusher.cpp | 13 ++++- src/Rtmp/RtmpSession.cpp | 36 ++++++------ src/Rtmp/RtmpSession.h | 3 +- src/Rtsp/RtspMediaSource.h | 98 +++----------------------------- 14 files changed, 279 insertions(+), 149 deletions(-) diff --git a/conf/config.ini b/conf/config.ini index ed4727af..accde647 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -42,6 +42,9 @@ publishToRtxp=1 publishToHls=1 #是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置 publishToMP4=0 +#合并写缓存大小(单位毫秒),合并写指服务器缓存一定的数据后才会一次性写入socket,这样能提高性能,但是会提高延时 +#在开启低延时模式后,该参数不起作用 +mergeWriteMS=300 [hls] #hls写文件的buf大小,调整参数可以提高文件io性能 diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index aa30ffbc..1947900a 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -450,4 +450,50 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string & #endif //ENABLE_MP4 } +static bool isFlushAble_default(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size) { + if (new_stamp < last_stamp) { + //时间戳回退(可能seek中) + return true; + } + + if (!is_audio) { + //这是视频,时间戳发送变化或者缓存超过1024个 + return last_stamp != new_stamp || cache_size >= 1024; + } + + //这是音频,缓存超过100ms或者缓存个数超过10个 + return new_stamp > last_stamp + 100 || cache_size > 10; +} + +static bool isFlushAble_merge(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) { + if (new_stamp < last_stamp) { + //时间戳回退(可能seek中) + return true; + } + + if(new_stamp > last_stamp + merge_ms){ + //时间戳增量超过合并写阈值 + return true; + } + + if (!is_audio) { + //这是视频,缓存数超过1024个,这个逻辑用于避免时间戳异常的流导致的内存暴增问题 + //而且sendmsg接口一般最多只能发送1024个数据包 + return cache_size >= 1024; + } + + //这是音频,音频缓存超过20个 + return cache_size > 20; +} + +bool FlushPolicy::isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size) { + GET_CONFIG(bool,ultraLowDelay, General::kUltraLowDelay); + GET_CONFIG(int,mergeWriteMS, General::kMergeWriteMS); + if(ultraLowDelay || mergeWriteMS <= 0){ + //关闭了合并写或者合并写阈值小于等于0 + return isFlushAble_default(_is_audio, last_stamp, new_stamp, cache_size); + } + return isFlushAble_merge(_is_audio, last_stamp, new_stamp, cache_size,mergeWriteMS); +} + } /* namespace mediakit */ \ No newline at end of file diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index f3d6e140..c743d574 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -21,6 +21,9 @@ #include "Util/logger.h" #include "Util/TimeTicker.h" #include "Util/NoticeCenter.h" +#include "Util/List.h" +#include "Rtsp/Rtsp.h" +#include "Rtmp/Rtmp.h" #include "Extension/Track.h" #include "Record/Recorder.h" @@ -153,6 +156,114 @@ private: static recursive_mutex g_mtxMediaSrc; }; +///缓存刷新策略类 +class FlushPolicy { +public: + FlushPolicy(bool is_audio) { + _is_audio = is_audio; + }; + + ~FlushPolicy() = default; + + uint32_t getStamp(const RtpPacket::Ptr &packet) { + return packet->timeStamp; + } + + uint32_t getStamp(const RtmpPacket::Ptr &packet) { + return packet->timeStamp; + } + + bool isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size); +private: + bool _is_audio; +}; + +/// 视频合并写缓存模板 +/// \tparam packet 包类型 +/// \tparam policy 刷新缓存策略 +/// \tparam packet_list 包缓存类型 +template > > +class VideoPacketCache { +public: + VideoPacketCache() : _policy(true) { + _cache = std::make_shared(); + } + + virtual ~VideoPacketCache() = default; + + void inputVideo(const std::shared_ptr &rtp, bool key_pos) { + auto new_stamp = _policy.getStamp(rtp); + if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) { + flushAll(); + } + + //追加数据到最后 + _cache->emplace_back(rtp); + _last_stamp = new_stamp; + if (key_pos) { + _key_pos = key_pos; + } + } + + virtual void onFlushVideo(std::shared_ptr &, bool key_pos) = 0; + +private: + void flushAll() { + if (_cache->empty()) { + return; + } + onFlushVideo(_cache, _key_pos); + _cache = std::make_shared(); + _key_pos = false; + } + +private: + policy _policy; + std::shared_ptr _cache; + uint32_t _last_stamp = 0; + bool _key_pos = false; +}; + +/// 音频频合并写缓存模板 +/// \tparam packet 包类型 +/// \tparam policy 刷新缓存策略 +/// \tparam packet_list 包缓存类型 +template > > +class AudioPacketCache { +public: + AudioPacketCache() : _policy(false) { + _cache = std::make_shared(); + } + + virtual ~AudioPacketCache() = default; + + void inputAudio(const std::shared_ptr &rtp) { + auto new_stamp = _policy.getStamp(rtp); + if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) { + flushAll(); + } + //追加数据到最后 + _cache->emplace_back(rtp); + _last_stamp = new_stamp; + } + + virtual void onFlushAudio(std::shared_ptr &) = 0; + +private: + void flushAll() { + if (_cache->empty()) { + return; + } + onFlushAudio(_cache); + _cache = std::make_shared(); + } + +private: + policy _policy; + std::shared_ptr _cache; + uint32_t _last_stamp = 0; +}; + } /* namespace mediakit */ diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 332554a0..819901fb 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -67,6 +67,7 @@ const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay"; const string kPublishToRtxp = GENERAL_FIELD"publishToRtxp"; const string kPublishToHls = GENERAL_FIELD"publishToHls"; const string kPublishToMP4 = GENERAL_FIELD"publishToMP4"; +const string kMergeWriteMS = GENERAL_FIELD"mergeWriteMS"; onceToken token([](){ mINI::Instance()[kFlowThreshold] = 1024; @@ -79,6 +80,7 @@ onceToken token([](){ mINI::Instance()[kPublishToRtxp] = 1; mINI::Instance()[kPublishToHls] = 1; mINI::Instance()[kPublishToMP4] = 0; + mINI::Instance()[kMergeWriteMS] = 300; },nullptr); }//namespace General diff --git a/src/Common/config.h b/src/Common/config.h index b58e3a99..2c681e05 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -173,6 +173,9 @@ extern const string kPublishToRtxp ; extern const string kPublishToHls ; //是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置 extern const string kPublishToMP4 ; +//合并写缓存大小(单位毫秒),合并写指服务器缓存一定的数据后才会一次性写入socket,这样能提高性能,但是会提高延时 +//在开启低延时模式后,该参数不起作用 +extern const string kMergeWriteMS ; }//namespace General diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 0ce9591c..92cb886d 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -615,20 +615,29 @@ void HttpSession::setSocketFlags(){ } } -void HttpSession::onWrite(const Buffer::Ptr &buffer) { +void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) { + if(flush){ + //需要flush那么一次刷新缓存 + HttpSession::setSendFlushFlag(true); + } + _ticker.resetTime(); if(!_flv_over_websocket){ _ui64TotalBytes += buffer->size(); send(buffer); - return; + }else{ + WebSocketHeader header; + header._fin = true; + header._reserved = 0; + header._opcode = WebSocketHeader::BINARY; + header._mask_flag = false; + WebSocketSplitter::encode(header,buffer); } - WebSocketHeader header; - header._fin = true; - header._reserved = 0; - header._opcode = WebSocketHeader::BINARY; - header._mask_flag = false; - WebSocketSplitter::encode(header,buffer); + if(flush){ + //本次刷新缓存后,下次不用刷新缓存 + HttpSession::setSendFlushFlag(false); + } } void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 6b9b21fb..540a7e3c 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -49,7 +49,7 @@ public: static string urlDecode(const string &str); protected: //FlvMuxer override - void onWrite(const Buffer::Ptr &data) override ; + void onWrite(const Buffer::Ptr &data, bool flush) override ; void onDetach() override; std::shared_ptr getSharedPtr() override; diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp index cc4866cc..b3df0459 100644 --- a/src/Rtmp/FlvMuxer.cpp +++ b/src/Rtmp/FlvMuxer.cpp @@ -50,12 +50,17 @@ void FlvMuxer::start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr & } strongSelf->onDetach(); }); - _ring_reader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ + _ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){ auto strongSelf = weakSelf.lock(); if(!strongSelf){ return; } - strongSelf->onWriteRtmp(pkt); + + int i = 0; + int size = pkt->size(); + pkt->for_each([&](const RtmpPacket::Ptr &rtmp){ + strongSelf->onWriteRtmp(rtmp, ++i == size); + }); }); } @@ -84,11 +89,11 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) { } //flv header - onWrite(std::make_shared(flv_file_header, sizeof(flv_file_header) - 1)); + onWrite(std::make_shared(flv_file_header, sizeof(flv_file_header) - 1), false); auto size = htonl(0); //PreviousTagSize0 Always 0 - onWrite(std::make_shared((char *)&size,4)); + onWrite(std::make_shared((char *)&size,4), false); auto &metadata = mediaSrc->getMetaData(); @@ -97,12 +102,12 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) { //其实metadata没什么用,有些推流器不产生metadata AMFEncoder invoke; invoke << "onMetaData" << metadata; - onWriteFlvTag(MSG_DATA, std::make_shared(invoke.data()), 0); + onWriteFlvTag(MSG_DATA, std::make_shared(invoke.data()), 0, false); } //config frame mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - onWriteRtmp(pkt); + onWriteRtmp(pkt, true); }); } @@ -125,29 +130,29 @@ public: #pragma pack(pop) #endif // defined(_WIN32) -void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) { - onWriteFlvTag(pkt->typeId,pkt,ui32TimeStamp); +void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp , bool flush) { + onWriteFlvTag(pkt->typeId,pkt,ui32TimeStamp, flush); } -void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp) { +void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, bool flush) { RtmpTagHeader header; header.type = ui8Type; set_be24(header.data_size, buffer->size()); header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); //tag header - onWrite(std::make_shared((char *)&header, sizeof(header))); + onWrite(std::make_shared((char *)&header, sizeof(header)), false); //tag data - onWrite(buffer); + onWrite(buffer, false); auto size = htonl((buffer->size() + sizeof(header))); //PreviousTagSize - onWrite(std::make_shared((char *)&size,4)); + onWrite(std::make_shared((char *)&size,4), flush); } -void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt) { +void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt,bool flush) { int64_t dts_out; _stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out); - onWriteFlvTag(pkt, dts_out); + onWriteFlvTag(pkt, dts_out,flush); } void FlvMuxer::stop() { @@ -187,7 +192,7 @@ void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const RtmpMediaSour start(poller,media); } -void FlvRecorder::onWrite(const Buffer::Ptr &data) { +void FlvRecorder::onWrite(const Buffer::Ptr &data, bool flush) { lock_guard lck(_file_mtx); if(_file){ fwrite(data->data(),data->size(),1,_file.get()); diff --git a/src/Rtmp/FlvMuxer.h b/src/Rtmp/FlvMuxer.h index d51c0b4d..86be5ee3 100644 --- a/src/Rtmp/FlvMuxer.h +++ b/src/Rtmp/FlvMuxer.h @@ -27,14 +27,14 @@ public: void stop(); protected: void start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media); - virtual void onWrite(const Buffer::Ptr &data) = 0; + virtual void onWrite(const Buffer::Ptr &data, bool flush) = 0; virtual void onDetach() = 0; virtual std::shared_ptr getSharedPtr() = 0; private: void onWriteFlvHeader(const RtmpMediaSource::Ptr &media); - void onWriteRtmp(const RtmpPacket::Ptr &pkt); - void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp); - void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp); + void onWriteRtmp(const RtmpPacket::Ptr &pkt,bool flush); + void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp, bool flush); + void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, bool flush); private: RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; //时间戳修整器 @@ -50,7 +50,7 @@ public: void startRecord(const EventPoller::Ptr &poller,const string &vhost,const string &app,const string &stream,const string &file_path); void startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media,const string &file_path); private: - virtual void onWrite(const Buffer::Ptr &data) override ; + virtual void onWrite(const Buffer::Ptr &data, bool flush) override ; virtual void onDetach() override; virtual std::shared_ptr getSharedPtr() override; private: diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index f002f533..23a0c6ad 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -33,6 +33,9 @@ using namespace toolkit; #define RTMP_GOP_SIZE 512 namespace mediakit { +typedef VideoPacketCache RtmpVideoCache; +typedef AudioPacketCache RtmpAudioCache; + /** * rtmp媒体源的数据抽象 * rtmp有关键的三要素,分别是metadata、config帧,普通帧 @@ -40,10 +43,11 @@ namespace mediakit { * 只要生成了这三要素,那么要实现rtmp推流、rtmp服务器就很简单了 * rtmp推拉流协议中,先传递metadata,然后传递config帧,然后一直传递普通帧 */ -class RtmpMediaSource : public MediaSource, public RingDelegate { +class RtmpMediaSource : public MediaSource, public RingDelegate, public RtmpVideoCache, public RtmpAudioCache{ public: typedef std::shared_ptr Ptr; - typedef RingBuffer RingType; + typedef std::shared_ptr > RingDataType; + typedef RingBuffer RingType; /** * 构造函数 @@ -122,6 +126,9 @@ public: return; } + //保存当前时间戳 + _track_stamps_map[pkt->typeId] = pkt->timeStamp; + if (!_ring) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) { @@ -142,9 +149,12 @@ public: regist(); } } - _track_stamps_map[pkt->typeId] = pkt->timeStamp; - //不存在视频,为了减少缓存延时,那么关闭GOP缓存 - _ring->write(pkt, _have_video ? pkt->isVideoKeyFrame() : true); + + if(pkt->typeId == MSG_VIDEO){ + RtmpVideoCache::inputVideo(pkt, key); + }else{ + RtmpAudioCache::inputAudio(pkt); + } } /** @@ -163,6 +173,25 @@ public: } private: + + /** + * 批量flush时间戳相同的视频rtmp包时触发该函数 + * @param rtmp_list 时间戳相同的rtmp包列表 + * @param key_pos 是否包含关键帧 + */ + void onFlushVideo(std::shared_ptr > &rtmp_list, bool key_pos) override { + _ring->write(rtmp_list, key_pos); + } + + /** + * 批量flush一定数量的音频rtmp包时触发该函数 + * @param rtmp_list rtmp包列表 + */ + void onFlushAudio(std::shared_ptr > &rtmp_list) override{ + //只有音频的话,就不存在gop缓存的意义 + _ring->write(rtmp_list, !_have_video); + } + /** * 每次增减消费者都会触发该函数 */ @@ -177,7 +206,7 @@ private: bool _have_video = false; mutable recursive_mutex _mtx; AMFValue _metadata; - RingBuffer::Ptr _ring; + RingType::Ptr _ring; unordered_map _track_stamps_map; unordered_map _config_frame_map; }; diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index c4f25df1..e5de8e60 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -200,12 +200,21 @@ inline void RtmpPusher::send_metaData(){ _pRtmpReader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRtmpReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ + _pRtmpReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){ auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; } - strongSelf->sendRtmp(pkt->typeId, strongSelf->_ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId); + + int i = 0; + int size = pkt->size(); + strongSelf->setSendFlushFlag(false); + pkt->for_each([&](const RtmpPacket::Ptr &rtmp){ + if(++i == size){ + strongSelf->setSendFlushFlag(true); + } + strongSelf->sendRtmp(rtmp->typeId, strongSelf->_ui32StreamId, rtmp, rtmp->timeStamp, rtmp->chunkId); + }); }); _pRtmpReader->setDetachCB([weakSelf](){ auto strongSelf = weakSelf.lock(); diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index b8551421..8d0d479e 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -272,12 +272,23 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr _pRingReader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { + _pRingReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; } - strongSelf->onSendMedia(pkt); + if(strongSelf->_paused){ + return; + } + int i = 0; + int size = pkt->size(); + strongSelf->setSendFlushFlag(false); + pkt->for_each([&](const RtmpPacket::Ptr &rtmp){ + if(++i == size){ + strongSelf->setSendFlushFlag(true); + } + strongSelf->onSendMedia(rtmp); + }); }); _pRingReader->setDetachCB([weakSelf]() { auto strongSelf = weakSelf.lock(); @@ -393,24 +404,9 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) { status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify"); status.set("description", paused ? "Paused stream." : "Unpaused stream."); sendReply("onStatus", nullptr, status); -//streamBegin - sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, - STREAM_MEDIA); - if (!_pRingReader) { - throw std::runtime_error("Rtmp not started yet!"); - } - if (paused) { - _pRingReader->setReadCB(nullptr); - } else { - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->onSendMedia(pkt); - }); - } + //streamBegin + sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA); + _paused = paused; } void RtmpSession::setMetaData(AMFDecoder &dec) { diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index a61ad864..da76c627 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -80,13 +80,14 @@ private: double _dNowReqID = 0; bool _set_meta_data = false; Ticker _ticker;//数据接收时间 - RingBuffer::RingReader::Ptr _pRingReader; + RtmpMediaSource::RingType::RingReader::Ptr _pRingReader; std::shared_ptr _pPublisherSrc; std::weak_ptr _pPlayerSrc; //时间戳修整器 Stamp _stamp[2]; //消耗的总流量 uint64_t _ui64TotalBytes = 0; + bool _paused = false; }; diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index 11ff43e7..caededbf 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -30,94 +30,10 @@ using namespace toolkit; #define RTP_GOP_SIZE 512 namespace mediakit { -class RtpVideoCache { -public: +typedef VideoPacketCache RtpVideoCache; +typedef AudioPacketCache RtpAudioCache; - RtpVideoCache() { - _cache = std::make_shared >(); - } - - virtual ~RtpVideoCache() = default; - - void inputVideoRtp(const RtpPacket::Ptr &rtp, bool key_pos) { - if (_last_rtp_stamp != rtp->timeStamp) { - //时间戳发生变化了 - flushAll(); - } else if (_cache->size() > RTP_GOP_SIZE) { - //这个逻辑用于避免时间戳异常的流导致的内存暴增问题 - flushAll(); - } - - //追加数据到最后 - _cache->emplace_back(rtp); - _last_rtp_stamp = rtp->timeStamp; - if (key_pos) { - _key_pos = key_pos; - } - } - - virtual void onFlushVideoRtp(std::shared_ptr > &, bool key_pos) = 0; - -private: - - void flushAll() { - if (_cache->empty()) { - return; - } - onFlushVideoRtp(_cache, _key_pos); - _cache = std::make_shared >(); - _key_pos = false; - } - -private: - - std::shared_ptr > _cache; - uint32_t _last_rtp_stamp = 0; - bool _key_pos = false; -}; - -class RtpAudioCache { -public: - - RtpAudioCache() { - _cache = std::make_shared >(); - } - - virtual ~RtpAudioCache() = default; - - void inputAudioRtp(const RtpPacket::Ptr &rtp) { - if (rtp->timeStamp > _last_rtp_stamp + 100) { - //累积了100ms的音频数据 - flushAll(); - } else if (_cache->size() > 10) { - //或者audio rtp缓存超过10个 - flushAll(); - } - - //追加数据到最后 - _cache->emplace_back(rtp); - _last_rtp_stamp = rtp->timeStamp; - } - - virtual void onFlushAudioRtp(std::shared_ptr > &) = 0; - -private: - - void flushAll() { - if (_cache->empty()) { - return; - } - onFlushAudioRtp(_cache); - _cache = std::make_shared >(); - } - -private: - - std::shared_ptr > _cache; - uint32_t _last_rtp_stamp = 0; -}; - -/** + /** * rtsp媒体源的数据抽象 * rtsp有关键的两要素,分别是sdp、rtp包 * 只要生成了这两要素,那么要实现rtsp推流、rtsp服务器就很简单了 @@ -261,9 +177,9 @@ public: } if(rtp->type == TrackVideo){ - RtpVideoCache::inputVideoRtp(rtp, keyPos); + RtpVideoCache::inputVideo(rtp, keyPos); }else{ - RtpAudioCache::inputAudioRtp(rtp); + RtpAudioCache::inputAudio(rtp); } } @@ -274,7 +190,7 @@ private: * @param rtp_list 时间戳相同的rtp包列表 * @param key_pos 是否包含关键帧 */ - void onFlushVideoRtp(std::shared_ptr > &rtp_list, bool key_pos) override { + void onFlushVideo(std::shared_ptr > &rtp_list, bool key_pos) override { _ring->write(rtp_list, key_pos); } @@ -282,7 +198,7 @@ private: * 批量flush一定数量的音频rtp包时触发该函数 * @param rtp_list rtp包列表 */ - void onFlushAudioRtp(std::shared_ptr > &rtp_list) override{ + void onFlushAudio(std::shared_ptr > &rtp_list) override{ //只有音频的话,就不存在gop缓存的意义 _ring->write(rtp_list, !_have_video); }