diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 13916d41..d5de1d4f 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 13916d41e81e1b302f02eb1682b815df5e241c3d +Subproject commit d5de1d4f0c86139fe8f0b6d956e45dee3b151783 diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp index 96e5427b..be887305 100644 --- a/src/Common/MediaSink.cpp +++ b/src/Common/MediaSink.cpp @@ -11,10 +11,13 @@ #include "MediaSink.h" //最多等待未初始化的Track 10秒,超时之后会忽略未初始化的Track -#define MAX_WAIT_MS_READY 10000 +static size_t constexpr kMaxWaitReadyMS= 10000; //如果直播流只有单Track,最多等待3秒,超时后未收到其他Track的数据,则认为是单Track -#define MAX_WAIT_MS_ADD_TRACK 3000 +static size_t constexpr kMaxAddTrackMS = 3000; + +//如果track未就绪,我们先缓存帧数据,但是有最大个数限制(100帧时大约4秒),防止内存溢出 +static size_t constexpr kMaxUnreadyFrame = 100; namespace mediakit{ @@ -37,8 +40,14 @@ void MediaSink::addTrack(const Track::Ptr &track_in) { if (_all_track_ready) { onTrackFrame(frame); } else { + auto &frame_unread = _frame_unread[frame->getCodecId()]; + if (frame_unread.size() > kMaxUnreadyFrame) { + //未就绪的的track,不能缓存太多的帧,否则可能内存溢出 + frame_unread.clear(); + WarnL << "cached frame of unready track(" << frame->getCodecName() << ") is too much, now cleared"; + } //还有Track未就绪,先缓存之 - _frame_unread[frame->getCodecId()].emplace_back(Frame::getCacheAbleFrame(frame)); + frame_unread.emplace_back(Frame::getCacheAbleFrame(frame)); } })); } @@ -84,7 +93,7 @@ void MediaSink::checkTrackIfReady(const Track::Ptr &track){ } if(!_all_track_ready){ - if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){ + if(_ticker.elapsedTime() > kMaxWaitReadyMS){ //如果超过规定时间,那么不再等待并忽略未准备好的Track emitAllTrackReady(); return; @@ -101,7 +110,7 @@ void MediaSink::checkTrackIfReady(const Track::Ptr &track){ return; } - if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){ + if(_track_map.size() == 1 && _ticker.elapsedTime() > kMaxAddTrackMS){ //如果只有一个Track,那么在该Track添加后,我们最多还等待若干时间(可能后面还会添加Track) emitAllTrackReady(); return; diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp index aa005491..d4fb9959 100644 --- a/src/Extension/Frame.cpp +++ b/src/Extension/Frame.cpp @@ -180,4 +180,102 @@ const char *CodecInfo::getCodecName() { TrackType CodecInfo::getTrackType() { return mediakit::getTrackType(getCodecId()); } + +static size_t constexpr kMaxFrameCacheSize = 100; + +bool FrameMerger::willFlush(const Frame::Ptr &frame) const{ + if (_frameCached.empty()) { + return false; + } + switch (_type) { + case none : { + //frame不是完整的帧,我们合并为一帧 + bool new_frame = false; + switch (frame->getCodecId()) { + case CodecH264: + case CodecH265: { + //如果是新的一帧,前面的缓存需要输出 + new_frame = frame->prefixSize(); + break; + } + default: break; + } + return new_frame || _frameCached.back()->dts() != frame->dts() || _frameCached.size() > kMaxFrameCacheSize; + } + + case mp4_nal_size: + case h264_prefix: { + if (_frameCached.back()->dts() != frame->dts()) { + //时间戳变化了 + return true; + } + if (frame->getCodecId() == CodecH264 && + H264_TYPE(frame->data()[frame->prefixSize()]) == H264Frame::NAL_B_P) { + //如果是264的b/p帧,那么也刷新输出 + return true; + } + return _frameCached.size() > kMaxFrameCacheSize; + } + default: /*不可达*/ assert(0); return true; + } +} + +void FrameMerger::doMerge(BufferLikeString &merged, const Frame::Ptr &frame) const{ + switch (_type) { + case none : { + merged.append(frame->data(), frame->size()); + break; + } + case h264_prefix: { + if (frame->prefixSize()) { + merged.append(frame->data(), frame->size()); + } else { + merged.append("\x00\x00\x00\x01", 4); + merged.append(frame->data(), frame->size()); + } + break; + } + case mp4_nal_size: { + uint32_t nalu_size = (uint32_t) (frame->size() - frame->prefixSize()); + nalu_size = htonl(nalu_size); + merged.append((char *) &nalu_size, 4); + merged.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize()); + break; + } + default: /*不可达*/ assert(0); break; + } +} + +void FrameMerger::inputFrame(const Frame::Ptr &frame, const onOutput &cb) { + if (willFlush(frame)) { + Frame::Ptr back = _frameCached.back(); + Buffer::Ptr merged_frame = back; + bool have_idr = back->keyFrame(); + + if (_frameCached.size() != 1 || _type == mp4_nal_size) { + //在MP4模式下,一帧数据也需要在前添加nalu_size + BufferLikeString merged; + merged.reserve(back->size() + 1024); + _frameCached.for_each([&](const Frame::Ptr &frame) { + doMerge(merged, frame); + if (frame->keyFrame()) { + have_idr = true; + } + }); + merged_frame = std::make_shared >(std::move(merged)); + } + cb(back->dts(), back->pts(), merged_frame, have_idr); + _frameCached.clear(); + } + _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); +} + +FrameMerger::FrameMerger(int type) { + _type = type; +} + +void FrameMerger::clear() { + _frameCached.clear(); +} + }//namespace mediakit diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index d7c4d8f6..ce242f71 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -448,5 +448,32 @@ private: Buffer::Ptr _buf; }; +/** + * 合并一些时间戳相同的frame + */ +class FrameMerger { +public: + using onOutput = function; + enum { + none = 0, + h264_prefix, + mp4_nal_size, + }; + + FrameMerger(int type); + ~FrameMerger() = default; + + void clear(); + void inputFrame(const Frame::Ptr &frame, const onOutput &cb); + +private: + bool willFlush(const Frame::Ptr &frame) const; + void doMerge(BufferLikeString &buffer, const Frame::Ptr &frame) const; + +private: + int _type; + List _frameCached; +}; + }//namespace mediakit #endif //ZLMEDIAKIT_FRAME_H \ No newline at end of file diff --git a/src/Record/MP4.cpp b/src/Record/MP4.cpp index e344aafb..292bc793 100644 --- a/src/Record/MP4.cpp +++ b/src/Record/MP4.cpp @@ -80,14 +80,6 @@ int mp4_writer_write(mp4_writer_t* mp4, int track, const void* data, size_t byte } } -int mp4_writer_write_l(mp4_writer_t* mp4, int track, const void* data, size_t bytes, int64_t pts, int64_t dts, int flags, int add_nalu_size){ - if (mp4->is_fmp4) { - return fmp4_writer_write_l(mp4->u.fmp4, track, data, bytes, pts, dts, flags, add_nalu_size); - } else { - return mov_writer_write_l(mp4->u.mov, track, data, bytes, pts, dts, flags, add_nalu_size); - } -} - int mp4_writer_save_segment(mp4_writer_t* mp4){ if (mp4->is_fmp4) { return fmp4_writer_save_segment(mp4->u.fmp4); diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index e84fa6e4..3e9045d2 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -60,7 +60,7 @@ void MP4MuxerInterface::resetTracks() { _started = false; _have_video = false; _mov_writter = nullptr; - _frameCached.clear(); + _frame_merger.clear(); _codec_to_trackid.clear(); } @@ -92,47 +92,22 @@ void MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { break; } } + case CodecH265: { //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { - Frame::Ptr back = _frameCached.back(); - //求相对时间戳 - track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out); - - if (_frameCached.size() != 1) { - //缓存中有多帧,需要按照mp4格式合并一起 - BufferLikeString merged; - merged.reserve(back->size() + 1024); - _frameCached.for_each([&](const Frame::Ptr &frame) { - uint32_t nalu_size = (uint32_t)(frame->size() - frame->prefixSize()); - nalu_size = htonl(nalu_size); - merged.append((char *) &nalu_size, 4); - merged.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize()); - }); - mp4_writer_write(_mov_writter.get(), - track_info.track_id, - merged.data(), - merged.size(), - pts_out, - dts_out, - back->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0); - } else { - //缓存中只有一帧视频 - mp4_writer_write_l(_mov_writter.get(), - track_info.track_id, - back->data() + back->prefixSize(), - back->size() - back->prefixSize(), - pts_out, - dts_out, - back->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0, - 1/*需要生成头4个字节的MP4格式start code*/); - } - _frameCached.clear(); - } - //缓存帧,时间戳相同的帧合并一起写入mp4 - _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); - } + _frame_merger.inputFrame(frame, [&](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr) { + track_info.stamp.revise(dts, pts, dts_out, pts_out); + mp4_writer_write(_mov_writter.get(), + track_info.track_id, + buffer->data(), + buffer->size(), + pts_out, + dts_out, + have_idr ? MOV_AV_FLAG_KEYFREAME : 0); + }); break; + } + default: { track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); mp4_writer_write(_mov_writter.get(), @@ -142,8 +117,9 @@ void MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { pts_out, dts_out, frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0); - } break; + } + } } diff --git a/src/Record/MP4Muxer.h b/src/Record/MP4Muxer.h index 98a6ca4d..b7e91668 100644 --- a/src/Record/MP4Muxer.h +++ b/src/Record/MP4Muxer.h @@ -72,8 +72,8 @@ private: int track_id = -1; Stamp stamp; }; - List _frameCached; unordered_map _codec_to_trackid; + FrameMerger _frame_merger{FrameMerger::mp4_nal_size}; }; class MP4Muxer : public MP4MuxerInterface{ diff --git a/src/Record/TsMuxer.cpp b/src/Record/TsMuxer.cpp index 8ff5a195..38ea3459 100644 --- a/src/Record/TsMuxer.cpp +++ b/src/Record/TsMuxer.cpp @@ -103,32 +103,14 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { case CodecH265: { //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { - Frame::Ptr back = _frameCached.back(); - Buffer::Ptr merged_frame = back; - if (_frameCached.size() != 1) { - BufferLikeString merged; - merged.reserve(back->size() + 1024); - _frameCached.for_each([&](const Frame::Ptr &frame) { - if (frame->prefixSize()) { - merged.append(frame->data(), frame->size()); - } else { - merged.append("\x00\x00\x00\x01", 4); - merged.append(frame->data(), frame->size()); - } - if (frame->keyFrame()) { - _is_idr_fast_packet = true; - } - }); - merged_frame = std::make_shared >(std::move(merged)); - } - track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out); + _frame_merger.inputFrame(frame, [&](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr){ + track_info.stamp.revise(dts, pts, dts_out, pts_out); //取视频时间戳为TS的时间戳 - _timestamp = (uint32_t)dts_out; - mpeg_ts_write(_context, track_info.track_id, back->keyFrame() ? 0x0001 : 0, pts_out * 90LL,dts_out * 90LL, merged_frame->data(), merged_frame->size()); - _frameCached.clear(); - } - _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); + _timestamp = (uint32_t) dts_out; + _is_idr_fast_packet = have_idr; + mpeg_ts_write(_context, track_info.track_id, have_idr ? 0x0001 : 0, + pts_out * 90LL, dts_out * 90LL, buffer->data(), buffer->size()); + }); break; } @@ -141,11 +123,12 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { default: { track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); - if(!_have_video){ + if (!_have_video) { //没有视频时,才以音频时间戳为TS的时间戳 - _timestamp = (uint32_t)dts_out; + _timestamp = (uint32_t) dts_out; } - mpeg_ts_write(_context, track_info.track_id, frame->keyFrame() ? 0x0001 : 0, pts_out * 90LL, dts_out * 90LL, frame->data(), frame->size()); + mpeg_ts_write(_context, track_info.track_id, frame->keyFrame() ? 0x0001 : 0, + pts_out * 90LL, dts_out * 90LL, frame->data(), frame->size()); break; } } @@ -187,6 +170,7 @@ void TsMuxer::uninit() { _context = nullptr; } _codec_to_trackid.clear(); + _frame_merger.clear(); } }//namespace mediakit diff --git a/src/Record/TsMuxer.h b/src/Record/TsMuxer.h index 2b512f07..1a7ad101 100644 --- a/src/Record/TsMuxer.h +++ b/src/Record/TsMuxer.h @@ -59,6 +59,8 @@ private: void stampSync(); private: + bool _have_video = false; + bool _is_idr_fast_packet = false; void *_context = nullptr; char _tsbuf[188]; uint32_t _timestamp = 0; @@ -67,9 +69,7 @@ private: Stamp stamp; }; unordered_map _codec_to_trackid; - List _frameCached; - bool _is_idr_fast_packet = false; - bool _have_video = false; + FrameMerger _frame_merger{FrameMerger::h264_prefix}; }; }//namespace mediakit diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index 1d97430e..cb979bc8 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -100,34 +100,6 @@ static const char *getCodecName(int codec_id) { } } -void FrameMerger::inputFrame(const Frame::Ptr &frame,const function &cb){ - bool flush = false; - switch (frame->getCodecId()) { - case CodecH264: - case CodecH265:{ - //如果是新的一帧,前面的缓存需要输出 - flush = frame->prefixSize(); - break; - } - default: break; - } - if (!_frameCached.empty() && (flush || _frameCached.back()->dts() != frame->dts())) { - Frame::Ptr back = _frameCached.back(); - Buffer::Ptr merged_frame = back; - if(_frameCached.size() != 1){ - BufferLikeString merged; - merged.reserve(back->size() + 1024); - _frameCached.for_each([&](const Frame::Ptr &frame){ - merged.append(frame->data(),frame->size()); - }); - merged_frame = std::make_shared >(std::move(merged)); - } - cb(back->dts(),back->pts(),merged_frame); - _frameCached.clear(); - } - _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); -} - void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t bytes, int finish){ switch (codecid) { case PSI_STREAM_H264: { @@ -188,7 +160,7 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d switch (codecid) { case PSI_STREAM_H264: { auto frame = std::make_shared((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes)); - _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) { + _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool) { onFrame(std::make_shared >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); }); break; @@ -196,7 +168,7 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d case PSI_STREAM_H265: { auto frame = std::make_shared((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes)); - _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) { + _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool) { onFrame(std::make_shared >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); }); break; diff --git a/src/Rtp/Decoder.h b/src/Rtp/Decoder.h index 5d57ceda..a75a79f8 100644 --- a/src/Rtp/Decoder.h +++ b/src/Rtp/Decoder.h @@ -34,18 +34,6 @@ protected: virtual ~Decoder() = default; }; -/** - * 合并一些时间戳相同的frame - */ -class FrameMerger { -public: - FrameMerger() = default; - ~FrameMerger() = default; - void inputFrame(const Frame::Ptr &frame,const function &cb); -private: - List _frameCached; -}; - class DecoderImp{ public: typedef enum { @@ -71,7 +59,7 @@ private: private: Decoder::Ptr _decoder; MediaSinkInterface *_sink; - FrameMerger _merger; + FrameMerger _merger{FrameMerger::none}; Ticker _last_unsported_print; }; diff --git a/src/Rtp/PSEncoder.cpp b/src/Rtp/PSEncoder.cpp index a2951a74..6cd39e0a 100644 --- a/src/Rtp/PSEncoder.cpp +++ b/src/Rtp/PSEncoder.cpp @@ -123,33 +123,18 @@ void PSEncoder::inputFrame(const Frame::Ptr &frame) { break; } } + case CodecH265: { //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { - Frame::Ptr back = _frameCached.back(); - Buffer::Ptr merged_frame = back; - if (_frameCached.size() != 1) { - BufferLikeString merged; - merged.reserve(back->size() + 1024); - _frameCached.for_each([&](const Frame::Ptr &frame) { - if (frame->prefixSize()) { - merged.append(frame->data(), frame->size()); - } else { - merged.append("\x00\x00\x00\x01", 4); - merged.append(frame->data(), frame->size()); - } - }); - merged_frame = std::make_shared >(std::move(merged)); - } - track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out); - _timestamp = (uint32_t)dts_out; - ps_muxer_input(_muxer.get(), track_info.track_id, back->keyFrame() ? 0x0001 : 0, pts_out * 90LL, - dts_out * 90LL, merged_frame->data(), merged_frame->size()); - _frameCached.clear(); - } - _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); - } + _frame_merger.inputFrame(frame, [&](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr) { + track_info.stamp.revise(dts, pts, dts_out, pts_out); + //取视频时间戳为TS的时间戳 + _timestamp = (uint32_t) dts_out; + ps_muxer_input(_muxer.get(), track_info.track_id, have_idr ? 0x0001 : 0, + pts_out * 90LL, dts_out * 90LL, buffer->data(), buffer->size()); + }); break; + } case CodecAAC: { if (frame->prefixSize() == 0) { @@ -160,11 +145,11 @@ void PSEncoder::inputFrame(const Frame::Ptr &frame) { default: { track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); - _timestamp = (uint32_t)dts_out; + _timestamp = (uint32_t) dts_out; ps_muxer_input(_muxer.get(), track_info.track_id, frame->keyFrame() ? 0x0001 : 0, pts_out * 90LL, dts_out * 90LL, frame->data(), frame->size()); - } break; + } } } diff --git a/src/Rtp/PSEncoder.h b/src/Rtp/PSEncoder.h index 8ca8ce17..06513bd3 100644 --- a/src/Rtp/PSEncoder.h +++ b/src/Rtp/PSEncoder.h @@ -61,9 +61,9 @@ private: private: uint32_t _timestamp = 0; BufferRaw::Ptr _buffer; - List _frameCached; std::shared_ptr _muxer; unordered_map _codec_to_trackid; + FrameMerger _frame_merger{FrameMerger::h264_prefix}; }; class PSEncoderImp : public PSEncoder{ diff --git a/tests/test_player.cpp b/tests/test_player.cpp index 95421da5..17b9554f 100644 --- a/tests/test_player.cpp +++ b/tests/test_player.cpp @@ -23,41 +23,6 @@ using namespace std; using namespace toolkit; using namespace mediakit; - -/** - * 合并一些时间戳相同的frame - */ -class FrameMerger { -public: - FrameMerger() = default; - virtual ~FrameMerger() = default; - - void inputFrame(const Frame::Ptr &frame,const function &cb){ - if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { - Frame::Ptr back = _frameCached.back(); - Buffer::Ptr merged_frame = back; - if(_frameCached.size() != 1){ - string merged; - _frameCached.for_each([&](const Frame::Ptr &frame){ - if(frame->prefixSize()){ - merged.append(frame->data(),frame->size()); - } else{ - merged.append("\x00\x00\x00\x01",4); - merged.append(frame->data(),frame->size()); - } - }); - merged_frame = std::make_shared(std::move(merged)); - } - cb(back->dts(),back->pts(),merged_frame); - _frameCached.clear(); - } - _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); - } -private: - List _frameCached; -}; - - #ifdef WIN32 #include @@ -129,7 +94,7 @@ int main(int argc, char *argv[]) { displayer.set(nullptr,url); } if(!merger){ - merger.set(); + merger.set(FrameMerger::h264_prefix); } merger.get().inputFrame(frame,[&](uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer){ AVFrame *pFrame = nullptr;