diff --git a/src/Codec/Transcode.cpp b/src/Codec/Transcode.cpp index 18d0cfae..b89fc951 100644 --- a/src/Codec/Transcode.cpp +++ b/src/Codec/Transcode.cpp @@ -420,9 +420,7 @@ FFmpegDecoder::FFmpegDecoder(const Track::Ptr &track, int thread_num) { FFmpegDecoder::~FFmpegDecoder() { stopThread(true); if (_do_merger) { - _merger.inputFrame(nullptr, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { - decodeFrame(buffer->data(), buffer->size(), dts, pts, false); - }); + _merger.flush(); } flush(); } @@ -452,7 +450,7 @@ const AVCodecContext *FFmpegDecoder::getContext() const { bool FFmpegDecoder::inputFrame_l(const Frame::Ptr &frame, bool live, bool enable_merge) { if (_do_merger && enable_merge) { - return _merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { + return _merger.inputFrame(frame, [this, live](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { decodeFrame(buffer->data(), buffer->size(), dts, pts, live); }); } diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 8c9eb220..8fbff060 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -359,9 +359,9 @@ public: virtual ~PacketCache() = default; void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr pkt, bool key_pos) { - bool flush = flushImmediatelyWhenCloseMerge(); - if (!flush && _policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) { - flushAll(); + bool flag = flushImmediatelyWhenCloseMerge(); + if (!flag && _policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) { + flush(); } //追加数据到最后 @@ -370,11 +370,20 @@ public: _key_pos = key_pos; } - if (flush) { - flushAll(); + if (flag) { + flush(); } } + void flush() { + if (_cache->empty()) { + return; + } + onFlush(std::move(_cache), _key_pos); + _cache = std::make_shared(); + _key_pos = false; + } + virtual void clearCache() { _cache->clear(); } @@ -382,15 +391,6 @@ public: virtual void onFlush(std::shared_ptr, bool key_pos) = 0; private: - void flushAll() { - if (_cache->empty()) { - return; - } - onFlush(std::move(_cache), _key_pos); - _cache = std::make_shared(); - _key_pos = false; - } - bool flushImmediatelyWhenCloseMerge() { //一般的协议关闭合并写时,立即刷新缓存,这样可以减少一帧的延时,但是rtp例外 //因为rtp的包很小,一个RtpPacket包中也不是完整的一帧图像,所以在关闭合并写时, diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp index e0f0eb14..b97a840e 100644 --- a/src/Extension/Frame.cpp +++ b/src/Extension/Frame.cpp @@ -155,7 +155,7 @@ void FrameMerger::doMerge(BufferLikeString &merged, const Frame::Ptr &frame) con } } -bool FrameMerger::inputFrame(const Frame::Ptr &frame, const onOutput &cb, BufferLikeString *buffer) { +bool FrameMerger::inputFrame(const Frame::Ptr &frame, onOutput cb, BufferLikeString *buffer) { if (willFlush(frame)) { Frame::Ptr back = _frame_cache.back(); Buffer::Ptr merged_frame = back; @@ -190,6 +190,7 @@ bool FrameMerger::inputFrame(const Frame::Ptr &frame, const onOutput &cb, Buffer if (frame->decodeAble()) { _have_decode_able_frame = true; } + _cb = std::move(cb); _frame_cache.emplace_back(Frame::getCacheAbleFrame(frame)); return true; } @@ -203,4 +204,11 @@ void FrameMerger::clear() { _have_decode_able_frame = false; } +void FrameMerger::flush() { + if (_cb) { + inputFrame(nullptr, std::move(_cb), nullptr); + } + clear(); +} + }//namespace mediakit diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index 72a8a7ed..0c118bbe 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -271,6 +271,11 @@ public: * 写入帧数据 */ virtual bool inputFrame(const Frame::Ptr &frame) = 0; + + /** + * 刷新输出所有frame缓存 + */ + virtual void flush() {}; }; /** @@ -542,8 +547,13 @@ public: FrameMerger(int type); ~FrameMerger() = default; + /** + * 刷新输出缓冲,注意此时会调用FrameMerger::inputFrame传入的onOutput回调 + * 请注意回调捕获参数此时是否有效 + */ + void flush(); void clear(); - bool inputFrame(const Frame::Ptr &frame, const onOutput &cb, toolkit::BufferLikeString *buffer = nullptr); + bool inputFrame(const Frame::Ptr &frame, onOutput cb, toolkit::BufferLikeString *buffer = nullptr); private: bool willFlush(const Frame::Ptr &frame) const; @@ -552,6 +562,7 @@ private: private: int _type; bool _have_decode_able_frame = false; + onOutput _cb; toolkit::List _frame_cache; }; diff --git a/src/Extension/H264Rtmp.cpp b/src/Extension/H264Rtmp.cpp index 2af6c0b2..f85ae425 100644 --- a/src/Extension/H264Rtmp.cpp +++ b/src/Extension/H264Rtmp.cpp @@ -124,26 +124,32 @@ void H264RtmpEncoder::makeConfigPacket(){ } } +void H264RtmpEncoder::flush() { + inputFrame(nullptr); +} + bool H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) { - auto data = frame->data() + frame->prefixSize(); - auto len = frame->size() - frame->prefixSize(); - auto type = H264_TYPE(data[0]); - switch (type) { - case H264Frame::NAL_SPS: { - if (!_got_config_frame) { - _sps = string(data, len); - makeConfigPacket(); + if (frame) { + auto data = frame->data() + frame->prefixSize(); + auto len = frame->size() - frame->prefixSize(); + auto type = H264_TYPE(data[0]); + switch (type) { + case H264Frame::NAL_SPS: { + if (!_got_config_frame) { + _sps = string(data, len); + makeConfigPacket(); + } + break; } - break; - } - case H264Frame::NAL_PPS: { - if (!_got_config_frame) { - _pps = string(data, len); - makeConfigPacket(); + case H264Frame::NAL_PPS: { + if (!_got_config_frame) { + _pps = string(data, len); + makeConfigPacket(); + } + break; } - break; + default: break; } - default : break; } if (!_rtmp_packet) { diff --git a/src/Extension/H264Rtmp.h b/src/Extension/H264Rtmp.h index c38e667b..b27309dc 100644 --- a/src/Extension/H264Rtmp.h +++ b/src/Extension/H264Rtmp.h @@ -62,7 +62,7 @@ public: * @param track */ H264RtmpEncoder(const Track::Ptr &track); - ~H264RtmpEncoder() {} + ~H264RtmpEncoder() = default; /** * 输入264帧,可以不带sps pps @@ -70,6 +70,11 @@ public: */ bool inputFrame(const Frame::Ptr &frame) override; + /** + * 刷新输出所有frame缓存 + */ + void flush() override; + /** * 生成config包 */ diff --git a/src/Extension/H264Rtp.cpp b/src/Extension/H264Rtp.cpp index dc13a03a..a5331e26 100644 --- a/src/Extension/H264Rtp.cpp +++ b/src/Extension/H264Rtp.cpp @@ -291,6 +291,14 @@ bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { return true; } +void H264RtpEncoder::flush() { + if (_last_frame) { + // 如果时间戳发生了变化,那么markbit才置true + inputFrame_l(_last_frame, true); + _last_frame = nullptr; + } +} + bool H264RtpEncoder::inputFrame_l(const Frame::Ptr &frame, bool is_mark){ if (frame->keyFrame()) { //保证每一个关键帧前都有SPS与PPS diff --git a/src/Extension/H264Rtp.h b/src/Extension/H264Rtp.h index be00f7af..4736f512 100644 --- a/src/Extension/H264Rtp.h +++ b/src/Extension/H264Rtp.h @@ -85,6 +85,11 @@ public: */ bool inputFrame(const Frame::Ptr &frame) override; + /** + * 刷新输出所有frame缓存 + */ + void flush() override; + private: void insertConfigFrame(uint64_t pts); bool inputFrame_l(const Frame::Ptr &frame, bool is_mark); diff --git a/src/Extension/H265Rtmp.cpp b/src/Extension/H265Rtmp.cpp index 19e450e8..493edac8 100644 --- a/src/Extension/H265Rtmp.cpp +++ b/src/Extension/H265Rtmp.cpp @@ -138,33 +138,39 @@ void H265RtmpEncoder::makeConfigPacket(){ } } +void H265RtmpEncoder::flush() { + inputFrame(nullptr); +} + bool H265RtmpEncoder::inputFrame(const Frame::Ptr &frame) { - auto data = frame->data() + frame->prefixSize(); - auto len = frame->size() - frame->prefixSize(); - auto type = H265_TYPE(data[0]); - switch (type) { - case H265Frame::NAL_SPS: { - if (!_got_config_frame) { - _sps = string(data, len); - makeConfigPacket(); + if (frame) { + auto data = frame->data() + frame->prefixSize(); + auto len = frame->size() - frame->prefixSize(); + auto type = H265_TYPE(data[0]); + switch (type) { + case H265Frame::NAL_SPS: { + if (!_got_config_frame) { + _sps = string(data, len); + makeConfigPacket(); + } + break; } - break; - } - case H265Frame::NAL_PPS: { - if (!_got_config_frame) { - _pps = string(data, len); - makeConfigPacket(); + case H265Frame::NAL_PPS: { + if (!_got_config_frame) { + _pps = string(data, len); + makeConfigPacket(); + } + break; } - break; - } - case H265Frame::NAL_VPS: { - if (!_got_config_frame) { - _vps = string(data, len); - makeConfigPacket(); + case H265Frame::NAL_VPS: { + if (!_got_config_frame) { + _vps = string(data, len); + makeConfigPacket(); + } + break; } - break; + default: break; } - default: break; } if (!_rtmp_packet) { diff --git a/src/Extension/H265Rtmp.h b/src/Extension/H265Rtmp.h index 8200ac98..3c062cfc 100644 --- a/src/Extension/H265Rtmp.h +++ b/src/Extension/H265Rtmp.h @@ -60,7 +60,7 @@ public: * @param track */ H265RtmpEncoder(const Track::Ptr &track); - ~H265RtmpEncoder() {} + ~H265RtmpEncoder() = default; /** * 输入265帧,可以不带sps pps @@ -68,6 +68,11 @@ public: */ bool inputFrame(const Frame::Ptr &frame) override; + /** + * 刷新输出所有frame缓存 + */ + void flush() override; + /** * 生成config包 */ diff --git a/src/FMP4/FMP4MediaSource.h b/src/FMP4/FMP4MediaSource.h index 2b4a1504..52174027 100644 --- a/src/FMP4/FMP4MediaSource.h +++ b/src/FMP4/FMP4MediaSource.h @@ -31,7 +31,7 @@ public: }; //FMP4直播源 -class FMP4MediaSource : public MediaSource, public toolkit::RingDelegate, private PacketCache{ +class FMP4MediaSource final : public MediaSource, public toolkit::RingDelegate, private PacketCache{ public: using Ptr = std::shared_ptr; using RingDataType = std::shared_ptr >; @@ -42,7 +42,7 @@ public: const std::string &stream_id, int ring_size = FMP4_GOP_SIZE) : MediaSource(FMP4_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {} - ~FMP4MediaSource() override = default; + ~FMP4MediaSource() override { flush(); } /** * 获取媒体源的环形缓冲 diff --git a/src/FMP4/FMP4MediaSourceMuxer.h b/src/FMP4/FMP4MediaSourceMuxer.h index cad7caad..438068b8 100644 --- a/src/FMP4/FMP4MediaSourceMuxer.h +++ b/src/FMP4/FMP4MediaSourceMuxer.h @@ -18,8 +18,8 @@ namespace mediakit { -class FMP4MediaSourceMuxer : public MP4MuxerMemory, public MediaSourceEventInterceptor, - public std::enable_shared_from_this { +class FMP4MediaSourceMuxer final : public MP4MuxerMemory, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; @@ -29,7 +29,7 @@ public: _media_src = std::make_shared(vhost, app, stream_id); } - ~FMP4MediaSourceMuxer() override = default; + ~FMP4MediaSourceMuxer() override { MP4MuxerMemory::flush(); }; void setListener(const std::weak_ptr &listener){ setDelegate(listener); diff --git a/src/Http/HlsPlayer.cpp b/src/Http/HlsPlayer.cpp index 25429824..60bda337 100644 --- a/src/Http/HlsPlayer.cpp +++ b/src/Http/HlsPlayer.cpp @@ -393,6 +393,9 @@ void HlsPlayerImp::onShutdown(const SockException &ex) { break; } } + if (_decoder) { + _decoder->flush(); + } PlayerImp::onShutdown(ex); } diff --git a/src/Http/TsplayerImp.cpp b/src/Http/TsplayerImp.cpp index 042c1b43..d41fcb97 100644 --- a/src/Http/TsplayerImp.cpp +++ b/src/Http/TsplayerImp.cpp @@ -60,6 +60,9 @@ void TsPlayerImp::onShutdown(const SockException &ex) { break; } } + if (_decoder) { + _decoder->flush(); + } PlayerImp::onShutdown(ex); } diff --git a/src/Record/HlsRecorder.h b/src/Record/HlsRecorder.h index c5b50f45..8d40d9dc 100644 --- a/src/Record/HlsRecorder.h +++ b/src/Record/HlsRecorder.h @@ -16,7 +16,7 @@ namespace mediakit { -class HlsRecorder : public MediaSourceEventInterceptor, public MpegMuxer, public std::enable_shared_from_this { +class HlsRecorder final : public MediaSourceEventInterceptor, public MpegMuxer, public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; @@ -30,7 +30,7 @@ public: _hls->clearCache(); } - ~HlsRecorder() = default; + ~HlsRecorder() { MpegMuxer::flush(); }; void setMediaSource(const std::string &vhost, const std::string &app, const std::string &stream_id) { _hls->setMediaSource(vhost, app, stream_id); diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index 11747126..274d43fe 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -79,6 +79,10 @@ void MP4MuxerInterface::resetTracks() { _codec_to_trackid.clear(); } +void MP4MuxerInterface::flush() { + _frame_merger.flush(); +} + bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { auto it = _codec_to_trackid.find(frame->getCodecId()); if (it == _codec_to_trackid.end()) { @@ -98,12 +102,12 @@ bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { //mp4文件时间戳需要从0开始 auto &track_info = it->second; - int64_t dts_out, pts_out; switch (frame->getCodecId()) { case CodecH264: case CodecH265: { //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - _frame_merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { + _frame_merger.inputFrame(frame, [this, &track_info](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { + int64_t dts_out, pts_out; track_info.stamp.revise(dts, pts, dts_out, pts_out); mp4_writer_write(_mov_writter.get(), track_info.track_id, @@ -117,6 +121,7 @@ bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { } default: { + int64_t dts_out, pts_out; track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); mp4_writer_write(_mov_writter.get(), track_info.track_id, diff --git a/src/Record/MP4Muxer.h b/src/Record/MP4Muxer.h index 877b05ff..6485b308 100644 --- a/src/Record/MP4Muxer.h +++ b/src/Record/MP4Muxer.h @@ -43,6 +43,11 @@ public: */ void resetTracks() override; + /** + * 刷新输出所有frame缓存 + */ + void flush() override; + /** * 是否包含视频 */ diff --git a/src/Record/MP4Recorder.cpp b/src/Record/MP4Recorder.cpp index efb011fc..201e17ed 100644 --- a/src/Record/MP4Recorder.cpp +++ b/src/Record/MP4Recorder.cpp @@ -33,6 +33,7 @@ MP4Recorder::MP4Recorder(const string &path, const string &vhost, const string & } MP4Recorder::~MP4Recorder() { + flush(); closeFile(); } @@ -96,6 +97,12 @@ void MP4Recorder::closeFile() { } } +void MP4Recorder::flush() { + if (_muxer) { + _muxer->flush(); + } +} + bool MP4Recorder::inputFrame(const Frame::Ptr &frame) { if (!(_have_video && frame->getTrackType() == TrackAudio)) { //如果有视频且输入的是音频,那么应该忽略切片逻辑 diff --git a/src/Record/MP4Recorder.h b/src/Record/MP4Recorder.h index 9989d4e4..6ef35a8c 100644 --- a/src/Record/MP4Recorder.h +++ b/src/Record/MP4Recorder.h @@ -24,7 +24,7 @@ namespace mediakit { #ifdef ENABLE_MP4 -class MP4Recorder : public MediaSinkInterface { +class MP4Recorder final : public MediaSinkInterface { public: using Ptr = std::shared_ptr; @@ -41,6 +41,11 @@ public: */ bool inputFrame(const Frame::Ptr &frame) override; + /** + * 刷新输出所有frame缓存 + */ + void flush() override; + /** * 添加ready状态的track */ diff --git a/src/Record/MPEG.cpp b/src/Record/MPEG.cpp index 085e9f38..98961548 100644 --- a/src/Record/MPEG.cpp +++ b/src/Record/MPEG.cpp @@ -63,7 +63,7 @@ bool MpegMuxer::inputFrame(const Frame::Ptr &frame) { case CodecH264: case CodecH265: { //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - return _frame_merger.inputFrame(frame,[&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { + return _frame_merger.inputFrame(frame,[this, track_id](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { _key_pos = have_idr; //取视频时间戳为TS的时间戳 _timestamp = dts; @@ -153,6 +153,10 @@ void MpegMuxer::releaseContext() { _frame_merger.clear(); } +void MpegMuxer::flush() { + _frame_merger.flush(); +} + }//mediakit #endif \ No newline at end of file diff --git a/src/Record/MPEG.h b/src/Record/MPEG.h index 5d4b05cd..9aabccdd 100644 --- a/src/Record/MPEG.h +++ b/src/Record/MPEG.h @@ -45,6 +45,11 @@ public: */ bool inputFrame(const Frame::Ptr &frame) override; + /** + * 刷新输出所有frame缓存 + */ + void flush() override; + protected: /** * 输出ts/ps数据回调 diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index 8ee954ca..5bd8ea10 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -60,7 +60,7 @@ public: MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) { } - ~RtmpMediaSource() override{} + ~RtmpMediaSource() override { flush(); } /** * 获取媒体源的环形缓冲 diff --git a/src/Rtmp/RtmpMediaSourceImp.h b/src/Rtmp/RtmpMediaSourceImp.h index 915b8fd1..90258198 100644 --- a/src/Rtmp/RtmpMediaSourceImp.h +++ b/src/Rtmp/RtmpMediaSourceImp.h @@ -26,9 +26,9 @@ namespace mediakit { -class RtmpMediaSourceImp: public RtmpMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener { +class RtmpMediaSourceImp final : public RtmpMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener { public: - typedef std::shared_ptr Ptr; + using Ptr = std::shared_ptr; /** * 构造函数 @@ -42,7 +42,7 @@ public: _demuxer->setTrackListener(this); } - ~RtmpMediaSourceImp() = default; + ~RtmpMediaSourceImp() override = default; /** * 设置metadata diff --git a/src/Rtmp/RtmpMediaSourceMuxer.h b/src/Rtmp/RtmpMediaSourceMuxer.h index 7134c71b..f89543c6 100644 --- a/src/Rtmp/RtmpMediaSourceMuxer.h +++ b/src/Rtmp/RtmpMediaSourceMuxer.h @@ -16,8 +16,8 @@ namespace mediakit { -class RtmpMediaSourceMuxer : public RtmpMuxer, public MediaSourceEventInterceptor, - public std::enable_shared_from_this { +class RtmpMediaSourceMuxer final : public RtmpMuxer, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { public: typedef std::shared_ptr Ptr; @@ -29,7 +29,7 @@ public: getRtmpRing()->setDelegate(_media_src); } - ~RtmpMediaSourceMuxer() override{} + ~RtmpMediaSourceMuxer() override { RtmpMuxer::flush(); } void setListener(const std::weak_ptr &listener){ setDelegate(listener); diff --git a/src/Rtmp/RtmpMuxer.cpp b/src/Rtmp/RtmpMuxer.cpp index 2dcec636..4be4ac5d 100644 --- a/src/Rtmp/RtmpMuxer.cpp +++ b/src/Rtmp/RtmpMuxer.cpp @@ -43,6 +43,14 @@ bool RtmpMuxer::inputFrame(const Frame::Ptr &frame) { return encoder ? encoder->inputFrame(frame) : false; } +void RtmpMuxer::flush() { + for (auto &encoder : _encoder) { + if (encoder) { + encoder->flush(); + } + } +} + void RtmpMuxer::makeConfigPacket(){ for(auto &encoder : _encoder){ if(encoder){ diff --git a/src/Rtmp/RtmpMuxer.h b/src/Rtmp/RtmpMuxer.h index 993d0aed..bd13fcf6 100644 --- a/src/Rtmp/RtmpMuxer.h +++ b/src/Rtmp/RtmpMuxer.h @@ -51,6 +51,11 @@ public: */ bool inputFrame(const Frame::Ptr &frame) override; + /** + * 刷新输出所有frame缓存 + */ + void flush() override; + /** * 重置所有track */ diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index b9827e98..22438fd1 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -65,6 +65,10 @@ DecoderImp::Ptr DecoderImp::createDecoder(Type type, MediaSinkInterface *sink){ return DecoderImp::Ptr(new DecoderImp(decoder, sink)); } +void DecoderImp::flush() { + _merger.flush(); +} + ssize_t DecoderImp::input(const uint8_t *data, size_t bytes){ return _decoder->input(data, bytes); } @@ -219,10 +223,7 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d default: // 海康的 PS 流中会有 codecid 为 0xBD 的包 if (codecid != 0 && codecid != 0xBD) { - if (_last_unsported_print.elapsedTime() / 1000 > 5) { - _last_unsported_print.resetTime(); - WarnL << "unsupported codec type:" << getCodecName(codecid) << " " << (int) codecid; - } + WarnL << "unsupported codec type:" << getCodecName(codecid) << " " << (int) codecid; } break; } diff --git a/src/Rtp/Decoder.h b/src/Rtp/Decoder.h index 44fecfee..a51e4494 100644 --- a/src/Rtp/Decoder.h +++ b/src/Rtp/Decoder.h @@ -39,16 +39,14 @@ protected: class DecoderImp{ public: - typedef enum { - decoder_ts = 0, - decoder_ps - }Type; + typedef enum { decoder_ts = 0, decoder_ps } Type; typedef std::shared_ptr Ptr; ~DecoderImp() = default; static Ptr createDecoder(Type type, MediaSinkInterface *sink); ssize_t input(const uint8_t *data, size_t bytes); + void flush(); protected: void onTrack(const Track::Ptr &track); @@ -63,7 +61,6 @@ private: Decoder::Ptr _decoder; MediaSinkInterface *_sink; FrameMerger _merger{FrameMerger::none}; - toolkit::Ticker _last_unsported_print; Track::Ptr _tracks[TrackMax]; }; diff --git a/src/Rtp/GB28181Process.cpp b/src/Rtp/GB28181Process.cpp index e5afcdbb..6e8e1544 100644 --- a/src/Rtp/GB28181Process.cpp +++ b/src/Rtp/GB28181Process.cpp @@ -59,12 +59,16 @@ GB28181Process::GB28181Process(const MediaInfo &media_info, MediaSinkInterface * _interface = sink; } -GB28181Process::~GB28181Process() {} - void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp) { _rtp_decoder[rtp->getHeader()->pt]->inputRtp(rtp, false); } +void GB28181Process::flush() { + if (_decoder) { + _decoder->flush(); + } +} + bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) { GET_CONFIG(uint32_t, h264_pt, RtpProxy::kH264PT); GET_CONFIG(uint32_t, h265_pt, RtpProxy::kH265PT); diff --git a/src/Rtp/GB28181Process.h b/src/Rtp/GB28181Process.h index 3ad078e6..4afba479 100644 --- a/src/Rtp/GB28181Process.h +++ b/src/Rtp/GB28181Process.h @@ -26,7 +26,7 @@ class GB28181Process : public ProcessInterface { public: typedef std::shared_ptr Ptr; GB28181Process(const MediaInfo &media_info, MediaSinkInterface *sink); - ~GB28181Process() override; + ~GB28181Process() override = default; /** * 输入rtp @@ -36,6 +36,11 @@ public: */ bool inputRtp(bool, const char *data, size_t data_len) override; + /** + * 刷新输出所有缓存 + */ + void flush() override; + protected: void onRtpSorted(RtpPacket::Ptr rtp); diff --git a/src/Rtp/ProcessInterface.h b/src/Rtp/ProcessInterface.h index ee5c0f53..6cd7fa9c 100644 --- a/src/Rtp/ProcessInterface.h +++ b/src/Rtp/ProcessInterface.h @@ -31,6 +31,11 @@ public: * @return 是否解析成功 */ virtual bool inputRtp(bool is_udp, const char *data, size_t data_len) = 0; + + /** + * 刷新输出所有缓存 + */ + virtual void flush() {} }; }//namespace mediakit diff --git a/src/Rtp/RtpCache.cpp b/src/Rtp/RtpCache.cpp index 89b05d4a..7a897847 100644 --- a/src/Rtp/RtpCache.cpp +++ b/src/Rtp/RtpCache.cpp @@ -19,37 +19,49 @@ namespace mediakit{ RtpCache::RtpCache(onFlushed cb) { _cb = std::move(cb); } + bool RtpCache::firstKeyReady(bool in) { - if(_first_key){ + if (_first_key) { return _first_key; } _first_key = in; return _first_key; } -void RtpCache::onFlush(std::shared_ptr > rtp_list, bool) { + +void RtpCache::onFlush(std::shared_ptr> rtp_list, bool) { _cb(std::move(rtp_list)); } -void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer,bool is_key ) { +void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer, bool is_key) { inputPacket(stamp, true, std::move(buffer), is_key); } -void RtpCachePS::onRTP(Buffer::Ptr buffer,bool is_key) { - if(!firstKeyReady(is_key)){ - return; - } - auto rtp = std::static_pointer_cast(buffer); - auto stamp = rtp->getStampMS(); - input(stamp, std::move(buffer),is_key); +void RtpCachePS::flush() { + PSEncoderImp::flush(); + RtpCache::flush(); } -void RtpCacheRaw::onRTP(Buffer::Ptr buffer,bool is_key) { - if(!firstKeyReady(is_key)){ +void RtpCachePS::onRTP(Buffer::Ptr buffer, bool is_key) { + if (!firstKeyReady(is_key)) { return; } auto rtp = std::static_pointer_cast(buffer); auto stamp = rtp->getStampMS(); - input(stamp, std::move(buffer),is_key); + input(stamp, std::move(buffer), is_key); +} + +void RtpCacheRaw::flush() { + RawEncoderImp::flush(); + RtpCache::flush(); +} + +void RtpCacheRaw::onRTP(Buffer::Ptr buffer, bool is_key) { + if (!firstKeyReady(is_key)) { + return; + } + auto rtp = std::static_pointer_cast(buffer); + auto stamp = rtp->getStampMS(); + input(stamp, std::move(buffer), is_key); } }//namespace mediakit diff --git a/src/Rtp/RtpCache.h b/src/Rtp/RtpCache.h index b83c8282..1121f384 100644 --- a/src/Rtp/RtpCache.h +++ b/src/Rtp/RtpCache.h @@ -19,7 +19,7 @@ namespace mediakit{ -class RtpCache : private PacketCache { +class RtpCache : protected PacketCache { public: using onFlushed = std::function >)>; RtpCache(onFlushed cb); @@ -33,30 +33,33 @@ protected: void input(uint64_t stamp, toolkit::Buffer::Ptr buffer,bool is_key = false); bool firstKeyReady(bool in); + protected: void onFlush(std::shared_ptr > rtp_list, bool) override; private: - onFlushed _cb; bool _first_key = false; + onFlushed _cb; }; -class RtpCachePS : public RtpCache, public PSEncoderImp{ +class RtpCachePS : public RtpCache, public PSEncoderImp { public: RtpCachePS(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96) : RtpCache(std::move(cb)), PSEncoderImp(ssrc, payload_type) {}; ~RtpCachePS() override = default; + void flush() override; protected: - void onRTP(toolkit::Buffer::Ptr rtp,bool is_key = false) override; + void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) override; }; -class RtpCacheRaw : public RtpCache, public RawEncoderImp{ +class RtpCacheRaw : public RtpCache, public RawEncoderImp { public: - RtpCacheRaw(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96, bool sendAudio = true) : RtpCache(std::move(cb)), RawEncoderImp(ssrc, payload_type,sendAudio) {}; + RtpCacheRaw(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96, bool send_audio = true) : RtpCache(std::move(cb)), RawEncoderImp(ssrc, payload_type, send_audio) {}; ~RtpCacheRaw() override = default; + void flush() override; protected: - void onRTP(toolkit::Buffer::Ptr rtp,bool is_key = false) override; + void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) override; }; }//namespace mediakit diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index df4e0900..f79004e6 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -50,6 +50,9 @@ RtpProcess::RtpProcess(const string &stream_id) { } RtpProcess::~RtpProcess() { + if (_process) { + _process->flush(); + } uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000; WarnP(this) << "RTP推流器(" << _media_info.shortUrl() diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 57b28cca..ea7910ba 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -25,6 +25,10 @@ RtpSender::RtpSender(EventPoller::Ptr poller) { _socket_rtp = Socket::createSocket(_poller, false); } +RtpSender::~RtpSender() { + flush(); +} + void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const function &cb){ _args = args; if (!_interface) { @@ -231,6 +235,12 @@ void RtpSender::resetTracks(){ _interface->resetTracks(); } +void RtpSender::flush() { + if (_interface) { + _interface->flush(); + } +} + //此函数在其他线程执行 bool RtpSender::inputFrame(const Frame::Ptr &frame) { //连接成功后才做实质操作(节省cpu资源) diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 9bafd537..61bdfc3c 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -18,12 +18,12 @@ namespace mediakit{ //rtp发送客户端,支持发送GB28181协议 -class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this{ +class RtpSender final : public MediaSinkInterface, public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; RtpSender(toolkit::EventPoller::Ptr poller = nullptr); - ~RtpSender() override = default; + ~RtpSender() override; /** * 开始发送ps-rtp包 @@ -37,6 +37,11 @@ public: */ bool inputFrame(const Frame::Ptr &frame) override; + /** + * 刷新输出frame缓存 + */ + void flush() override; + /** * 添加track,内部会调用Track的clone方法 * 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系 diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index adfceca0..c9bdc0df 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -56,7 +56,7 @@ public: int ring_size = RTP_GOP_SIZE) : MediaSource(RTSP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {} - ~RtspMediaSource() override{} + ~RtspMediaSource() override { flush(); } /** * 获取媒体源的环形缓冲 diff --git a/src/Rtsp/RtspMediaSourceImp.h b/src/Rtsp/RtspMediaSourceImp.h index 36878fa8..975491bd 100644 --- a/src/Rtsp/RtspMediaSourceImp.h +++ b/src/Rtsp/RtspMediaSourceImp.h @@ -17,9 +17,9 @@ #include "Common/MultiMediaSourceMuxer.h" namespace mediakit { -class RtspMediaSourceImp : public RtspMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener { +class RtspMediaSourceImp final : public RtspMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener { public: - typedef std::shared_ptr Ptr; + using Ptr = std::shared_ptr; /** * 构造函数 @@ -33,7 +33,7 @@ public: _demuxer->setTrackListener(this); } - ~RtspMediaSourceImp() = default; + ~RtspMediaSourceImp() override = default; /** * 设置sdp diff --git a/src/Rtsp/RtspMediaSourceMuxer.h b/src/Rtsp/RtspMediaSourceMuxer.h index 539a2011..be2188d4 100644 --- a/src/Rtsp/RtspMediaSourceMuxer.h +++ b/src/Rtsp/RtspMediaSourceMuxer.h @@ -16,8 +16,8 @@ namespace mediakit { -class RtspMediaSourceMuxer : public RtspMuxer, public MediaSourceEventInterceptor, - public std::enable_shared_from_this { +class RtspMediaSourceMuxer final : public RtspMuxer, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { public: typedef std::shared_ptr Ptr; @@ -29,7 +29,7 @@ public: getRtpRing()->setDelegate(_media_src); } - ~RtspMediaSourceMuxer() override{} + ~RtspMediaSourceMuxer() override { RtspMuxer::flush(); } void setListener(const std::weak_ptr &listener){ setDelegate(listener); diff --git a/src/Rtsp/RtspMuxer.cpp b/src/Rtsp/RtspMuxer.cpp index 890232f4..755b7f5a 100644 --- a/src/Rtsp/RtspMuxer.cpp +++ b/src/Rtsp/RtspMuxer.cpp @@ -86,6 +86,14 @@ bool RtspMuxer::inputFrame(const Frame::Ptr &frame) { return encoder ? encoder->inputFrame(frame) : false; } +void RtspMuxer::flush() { + for (auto &encoder : _encoder) { + if (encoder) { + encoder->flush(); + } + } +} + string RtspMuxer::getSdp() { return _sdp; } diff --git a/src/Rtsp/RtspMuxer.h b/src/Rtsp/RtspMuxer.h index 677eaabd..a7199b97 100644 --- a/src/Rtsp/RtspMuxer.h +++ b/src/Rtsp/RtspMuxer.h @@ -73,6 +73,11 @@ public: */ bool inputFrame(const Frame::Ptr &frame) override; + /** + * 刷新输出所有frame缓存 + */ + void flush() override; + /** * 重置所有track */ diff --git a/src/TS/TSMediaSource.h b/src/TS/TSMediaSource.h index d1e18e41..253e1652 100644 --- a/src/TS/TSMediaSource.h +++ b/src/TS/TSMediaSource.h @@ -31,7 +31,7 @@ public: }; //TS直播源 -class TSMediaSource : public MediaSource, public toolkit::RingDelegate, private PacketCache{ +class TSMediaSource final : public MediaSource, public toolkit::RingDelegate, private PacketCache{ public: using Ptr = std::shared_ptr; using RingDataType = std::shared_ptr >; @@ -42,7 +42,7 @@ public: const std::string &stream_id, int ring_size = TS_GOP_SIZE) : MediaSource(TS_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {} - ~TSMediaSource() override = default; + ~TSMediaSource() override { flush(); } /** * 获取媒体源的环形缓冲 diff --git a/src/TS/TSMediaSourceMuxer.h b/src/TS/TSMediaSourceMuxer.h index 2b3e5d80..d1cff544 100644 --- a/src/TS/TSMediaSourceMuxer.h +++ b/src/TS/TSMediaSourceMuxer.h @@ -16,8 +16,8 @@ namespace mediakit { -class TSMediaSourceMuxer : public MpegMuxer, public MediaSourceEventInterceptor, - public std::enable_shared_from_this { +class TSMediaSourceMuxer final : public MpegMuxer, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; @@ -27,7 +27,7 @@ public: _media_src = std::make_shared(vhost, app, stream_id); } - ~TSMediaSourceMuxer() override = default; + ~TSMediaSourceMuxer() override { MpegMuxer::flush(); }; void setListener(const std::weak_ptr &listener){ setDelegate(listener); diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index c2298c65..8f0694b2 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -108,6 +108,9 @@ void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) { } void SrtTransportImp::onShutdown(const SockException &ex) { + if (_decoder) { + _decoder->flush(); + } SrtTransport::onShutdown(ex); }