统一帧合并逻辑

This commit is contained in:
xia-chu 2021-04-26 18:26:07 +08:00
parent d005ccf396
commit af2b1246fd
12 changed files with 173 additions and 186 deletions

View File

@ -140,4 +140,102 @@ const char *CodecInfo::getCodecName() {
TrackType CodecInfo::getTrackType() { TrackType CodecInfo::getTrackType() {
return mediakit::getTrackType(getCodecId()); return mediakit::getTrackType(getCodecId());
} }
static size_t constexpr kMaxFrameCacheSize = 100;
bool FrameMerger::willFlush(const Frame::Ptr &frame) const{
if (_frameCached.empty()) {
return false;
}
switch (_type) {
case none : {
//frame不是完整的帧我们合并为一帧
bool new_frame = false;
switch (frame->getCodecId()) {
case CodecH264:
case CodecH265: {
//如果是新的一帧,前面的缓存需要输出
new_frame = frame->prefixSize();
break;
}
default: break;
}
return new_frame || _frameCached.back()->dts() != frame->dts() || _frameCached.size() > kMaxFrameCacheSize;
}
case mp4_nal_size:
case h264_prefix: {
if (_frameCached.back()->dts() != frame->dts()) {
//时间戳变化了
return true;
}
if (frame->getCodecId() == CodecH264 &&
H264_TYPE(frame->data()[frame->prefixSize()]) == H264Frame::NAL_B_P) {
//如果是264的b/p帧那么也刷新输出
return true;
}
return _frameCached.size() > kMaxFrameCacheSize;
}
default: /*不可达*/ assert(0); return true;
}
}
void FrameMerger::doMerge(BufferLikeString &merged, const Frame::Ptr &frame) const{
switch (_type) {
case none : {
merged.append(frame->data(), frame->size());
break;
}
case h264_prefix: {
if (frame->prefixSize()) {
merged.append(frame->data(), frame->size());
} else {
merged.append("\x00\x00\x00\x01", 4);
merged.append(frame->data(), frame->size());
}
break;
}
case mp4_nal_size: {
uint32_t nalu_size = (uint32_t) (frame->size() - frame->prefixSize());
nalu_size = htonl(nalu_size);
merged.append((char *) &nalu_size, 4);
merged.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize());
break;
}
default: /*不可达*/ assert(0); break;
}
}
void FrameMerger::inputFrame(const Frame::Ptr &frame, const onOutput &cb) {
if (willFlush(frame)) {
Frame::Ptr back = _frameCached.back();
Buffer::Ptr merged_frame = back;
bool have_idr = back->keyFrame();
if (_frameCached.size() != 1 || _type == mp4_nal_size) {
//在MP4模式下一帧数据也需要在前添加nalu_size
BufferLikeString merged;
merged.reserve(back->size() + 1024);
_frameCached.for_each([&](const Frame::Ptr &frame) {
doMerge(merged, frame);
if (frame->keyFrame()) {
have_idr = true;
}
});
merged_frame = std::make_shared<BufferOffset<BufferLikeString> >(std::move(merged));
}
cb(back->dts(), back->pts(), merged_frame, have_idr);
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
FrameMerger::FrameMerger(int type) {
_type = type;
}
void FrameMerger::clear() {
_frameCached.clear();
}
}//namespace mediakit }//namespace mediakit

View File

@ -428,5 +428,32 @@ private:
Buffer::Ptr _buf; Buffer::Ptr _buf;
}; };
/**
* frame
*/
class FrameMerger {
public:
using onOutput = function<void(uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr)>;
enum {
none = 0,
h264_prefix,
mp4_nal_size,
};
FrameMerger(int type);
~FrameMerger() = default;
void clear();
void inputFrame(const Frame::Ptr &frame, const onOutput &cb);
private:
bool willFlush(const Frame::Ptr &frame) const;
void doMerge(BufferLikeString &buffer, const Frame::Ptr &frame) const;
private:
int _type;
List<Frame::Ptr> _frameCached;
};
}//namespace mediakit }//namespace mediakit
#endif //ZLMEDIAKIT_FRAME_H #endif //ZLMEDIAKIT_FRAME_H

View File

@ -80,14 +80,6 @@ int mp4_writer_write(mp4_writer_t* mp4, int track, const void* data, size_t byte
} }
} }
int mp4_writer_write_l(mp4_writer_t* mp4, int track, const void* data, size_t bytes, int64_t pts, int64_t dts, int flags, int add_nalu_size){
if (mp4->is_fmp4) {
return fmp4_writer_write_l(mp4->u.fmp4, track, data, bytes, pts, dts, flags, add_nalu_size);
} else {
return mov_writer_write_l(mp4->u.mov, track, data, bytes, pts, dts, flags, add_nalu_size);
}
}
int mp4_writer_save_segment(mp4_writer_t* mp4){ int mp4_writer_save_segment(mp4_writer_t* mp4){
if (mp4->is_fmp4) { if (mp4->is_fmp4) {
return fmp4_writer_save_segment(mp4->u.fmp4); return fmp4_writer_save_segment(mp4->u.fmp4);

View File

@ -60,7 +60,7 @@ void MP4MuxerInterface::resetTracks() {
_started = false; _started = false;
_have_video = false; _have_video = false;
_mov_writter = nullptr; _mov_writter = nullptr;
_frameCached.clear(); _frame_merger.clear();
_codec_to_trackid.clear(); _codec_to_trackid.clear();
} }
@ -92,47 +92,22 @@ void MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) {
break; break;
} }
} }
case CodecH265: { case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理 //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { _frame_merger.inputFrame(frame, [&](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr) {
Frame::Ptr back = _frameCached.back(); track_info.stamp.revise(dts, pts, dts_out, pts_out);
//求相对时间戳 mp4_writer_write(_mov_writter.get(),
track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out); track_info.track_id,
buffer->data(),
if (_frameCached.size() != 1) { buffer->size(),
//缓存中有多帧需要按照mp4格式合并一起 pts_out,
BufferLikeString merged; dts_out,
merged.reserve(back->size() + 1024); have_idr ? MOV_AV_FLAG_KEYFREAME : 0);
_frameCached.for_each([&](const Frame::Ptr &frame) { });
uint32_t nalu_size = (uint32_t)(frame->size() - frame->prefixSize());
nalu_size = htonl(nalu_size);
merged.append((char *) &nalu_size, 4);
merged.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize());
});
mp4_writer_write(_mov_writter.get(),
track_info.track_id,
merged.data(),
merged.size(),
pts_out,
dts_out,
back->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0);
} else {
//缓存中只有一帧视频
mp4_writer_write_l(_mov_writter.get(),
track_info.track_id,
back->data() + back->prefixSize(),
back->size() - back->prefixSize(),
pts_out,
dts_out,
back->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0,
1/*需要生成头4个字节的MP4格式start code*/);
}
_frameCached.clear();
}
//缓存帧时间戳相同的帧合并一起写入mp4
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
break; break;
}
default: { default: {
track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out);
mp4_writer_write(_mov_writter.get(), mp4_writer_write(_mov_writter.get(),
@ -142,8 +117,9 @@ void MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) {
pts_out, pts_out,
dts_out, dts_out,
frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0); frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0);
}
break; break;
}
} }
} }

View File

@ -72,8 +72,8 @@ private:
int track_id = -1; int track_id = -1;
Stamp stamp; Stamp stamp;
}; };
List<Frame::Ptr> _frameCached;
unordered_map<int, track_info> _codec_to_trackid; unordered_map<int, track_info> _codec_to_trackid;
FrameMerger _frame_merger{FrameMerger::mp4_nal_size};
}; };
class MP4Muxer : public MP4MuxerInterface{ class MP4Muxer : public MP4MuxerInterface{

View File

@ -103,32 +103,14 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) {
case CodecH265: { case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理 //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { _frame_merger.inputFrame(frame, [&](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr){
Frame::Ptr back = _frameCached.back(); track_info.stamp.revise(dts, pts, dts_out, pts_out);
Buffer::Ptr merged_frame = back;
if (_frameCached.size() != 1) {
BufferLikeString merged;
merged.reserve(back->size() + 1024);
_frameCached.for_each([&](const Frame::Ptr &frame) {
if (frame->prefixSize()) {
merged.append(frame->data(), frame->size());
} else {
merged.append("\x00\x00\x00\x01", 4);
merged.append(frame->data(), frame->size());
}
if (frame->keyFrame()) {
_is_idr_fast_packet = true;
}
});
merged_frame = std::make_shared<BufferOffset<BufferLikeString> >(std::move(merged));
}
track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out);
//取视频时间戳为TS的时间戳 //取视频时间戳为TS的时间戳
_timestamp = (uint32_t)dts_out; _timestamp = (uint32_t) dts_out;
mpeg_ts_write(_context, track_info.track_id, back->keyFrame() ? 0x0001 : 0, pts_out * 90LL,dts_out * 90LL, merged_frame->data(), merged_frame->size()); _is_idr_fast_packet = have_idr;
_frameCached.clear(); mpeg_ts_write(_context, track_info.track_id, have_idr ? 0x0001 : 0,
} pts_out * 90LL, dts_out * 90LL, buffer->data(), buffer->size());
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); });
break; break;
} }
@ -141,11 +123,12 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) {
default: { default: {
track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out);
if(!_have_video){ if (!_have_video) {
//没有视频时才以音频时间戳为TS的时间戳 //没有视频时才以音频时间戳为TS的时间戳
_timestamp = (uint32_t)dts_out; _timestamp = (uint32_t) dts_out;
} }
mpeg_ts_write(_context, track_info.track_id, frame->keyFrame() ? 0x0001 : 0, pts_out * 90LL, dts_out * 90LL, frame->data(), frame->size()); mpeg_ts_write(_context, track_info.track_id, frame->keyFrame() ? 0x0001 : 0,
pts_out * 90LL, dts_out * 90LL, frame->data(), frame->size());
break; break;
} }
} }
@ -187,6 +170,7 @@ void TsMuxer::uninit() {
_context = nullptr; _context = nullptr;
} }
_codec_to_trackid.clear(); _codec_to_trackid.clear();
_frame_merger.clear();
} }
}//namespace mediakit }//namespace mediakit

View File

@ -59,6 +59,8 @@ private:
void stampSync(); void stampSync();
private: private:
bool _have_video = false;
bool _is_idr_fast_packet = false;
void *_context = nullptr; void *_context = nullptr;
char _tsbuf[188]; char _tsbuf[188];
uint32_t _timestamp = 0; uint32_t _timestamp = 0;
@ -67,9 +69,7 @@ private:
Stamp stamp; Stamp stamp;
}; };
unordered_map<int, track_info> _codec_to_trackid; unordered_map<int, track_info> _codec_to_trackid;
List<Frame::Ptr> _frameCached; FrameMerger _frame_merger{FrameMerger::h264_prefix};
bool _is_idr_fast_packet = false;
bool _have_video = false;
}; };
}//namespace mediakit }//namespace mediakit

View File

@ -100,34 +100,6 @@ static const char *getCodecName(int codec_id) {
} }
} }
void FrameMerger::inputFrame(const Frame::Ptr &frame,const function<void(uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer)> &cb){
bool flush = false;
switch (frame->getCodecId()) {
case CodecH264:
case CodecH265:{
//如果是新的一帧,前面的缓存需要输出
flush = frame->prefixSize();
break;
}
default: break;
}
if (!_frameCached.empty() && (flush || _frameCached.back()->dts() != frame->dts())) {
Frame::Ptr back = _frameCached.back();
Buffer::Ptr merged_frame = back;
if(_frameCached.size() != 1){
BufferLikeString merged;
merged.reserve(back->size() + 1024);
_frameCached.for_each([&](const Frame::Ptr &frame){
merged.append(frame->data(),frame->size());
});
merged_frame = std::make_shared<BufferOffset<BufferLikeString> >(std::move(merged));
}
cb(back->dts(),back->pts(),merged_frame);
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t bytes, int finish){ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t bytes, int finish){
switch (codecid) { switch (codecid) {
case PSI_STREAM_H264: { case PSI_STREAM_H264: {
@ -188,7 +160,7 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
switch (codecid) { switch (codecid) {
case PSI_STREAM_H264: { case PSI_STREAM_H264: {
auto frame = std::make_shared<H264FrameNoCacheAble>((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes)); auto frame = std::make_shared<H264FrameNoCacheAble>((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes));
_merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) { _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool) {
onFrame(std::make_shared<FrameWrapper<H264FrameNoCacheAble> >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); onFrame(std::make_shared<FrameWrapper<H264FrameNoCacheAble> >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0));
}); });
break; break;
@ -196,7 +168,7 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
case PSI_STREAM_H265: { case PSI_STREAM_H265: {
auto frame = std::make_shared<H265FrameNoCacheAble>((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes)); auto frame = std::make_shared<H265FrameNoCacheAble>((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes));
_merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) { _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool) {
onFrame(std::make_shared<FrameWrapper<H265FrameNoCacheAble> >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); onFrame(std::make_shared<FrameWrapper<H265FrameNoCacheAble> >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0));
}); });
break; break;

View File

@ -34,18 +34,6 @@ protected:
virtual ~Decoder() = default; virtual ~Decoder() = default;
}; };
/**
* frame
*/
class FrameMerger {
public:
FrameMerger() = default;
~FrameMerger() = default;
void inputFrame(const Frame::Ptr &frame,const function<void(uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer)> &cb);
private:
List<Frame::Ptr> _frameCached;
};
class DecoderImp{ class DecoderImp{
public: public:
typedef enum { typedef enum {
@ -71,7 +59,7 @@ private:
private: private:
Decoder::Ptr _decoder; Decoder::Ptr _decoder;
MediaSinkInterface *_sink; MediaSinkInterface *_sink;
FrameMerger _merger; FrameMerger _merger{FrameMerger::none};
Ticker _last_unsported_print; Ticker _last_unsported_print;
}; };

View File

@ -123,33 +123,18 @@ void PSEncoder::inputFrame(const Frame::Ptr &frame) {
break; break;
} }
} }
case CodecH265: { case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理 //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { _frame_merger.inputFrame(frame, [&](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr) {
Frame::Ptr back = _frameCached.back(); track_info.stamp.revise(dts, pts, dts_out, pts_out);
Buffer::Ptr merged_frame = back; //取视频时间戳为TS的时间戳
if (_frameCached.size() != 1) { _timestamp = (uint32_t) dts_out;
BufferLikeString merged; ps_muxer_input(_muxer.get(), track_info.track_id, have_idr ? 0x0001 : 0,
merged.reserve(back->size() + 1024); pts_out * 90LL, dts_out * 90LL, buffer->data(), buffer->size());
_frameCached.for_each([&](const Frame::Ptr &frame) { });
if (frame->prefixSize()) {
merged.append(frame->data(), frame->size());
} else {
merged.append("\x00\x00\x00\x01", 4);
merged.append(frame->data(), frame->size());
}
});
merged_frame = std::make_shared<BufferOffset<BufferLikeString> >(std::move(merged));
}
track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out);
_timestamp = (uint32_t)dts_out;
ps_muxer_input(_muxer.get(), track_info.track_id, back->keyFrame() ? 0x0001 : 0, pts_out * 90LL,
dts_out * 90LL, merged_frame->data(), merged_frame->size());
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
break; break;
}
case CodecAAC: { case CodecAAC: {
if (frame->prefixSize() == 0) { if (frame->prefixSize() == 0) {
@ -160,11 +145,11 @@ void PSEncoder::inputFrame(const Frame::Ptr &frame) {
default: { default: {
track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out);
_timestamp = (uint32_t)dts_out; _timestamp = (uint32_t) dts_out;
ps_muxer_input(_muxer.get(), track_info.track_id, frame->keyFrame() ? 0x0001 : 0, pts_out * 90LL, ps_muxer_input(_muxer.get(), track_info.track_id, frame->keyFrame() ? 0x0001 : 0, pts_out * 90LL,
dts_out * 90LL, frame->data(), frame->size()); dts_out * 90LL, frame->data(), frame->size());
}
break; break;
}
} }
} }

View File

@ -61,9 +61,9 @@ private:
private: private:
uint32_t _timestamp = 0; uint32_t _timestamp = 0;
BufferRaw::Ptr _buffer; BufferRaw::Ptr _buffer;
List<Frame::Ptr> _frameCached;
std::shared_ptr<struct ps_muxer_t> _muxer; std::shared_ptr<struct ps_muxer_t> _muxer;
unordered_map<int, track_info> _codec_to_trackid; unordered_map<int, track_info> _codec_to_trackid;
FrameMerger _frame_merger{FrameMerger::h264_prefix};
}; };
class PSEncoderImp : public PSEncoder{ class PSEncoderImp : public PSEncoder{

View File

@ -23,41 +23,6 @@ using namespace std;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
/**
* frame
*/
class FrameMerger {
public:
FrameMerger() = default;
virtual ~FrameMerger() = default;
void inputFrame(const Frame::Ptr &frame,const function<void(uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer)> &cb){
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
Frame::Ptr back = _frameCached.back();
Buffer::Ptr merged_frame = back;
if(_frameCached.size() != 1){
string merged;
_frameCached.for_each([&](const Frame::Ptr &frame){
if(frame->prefixSize()){
merged.append(frame->data(),frame->size());
} else{
merged.append("\x00\x00\x00\x01",4);
merged.append(frame->data(),frame->size());
}
});
merged_frame = std::make_shared<BufferString>(std::move(merged));
}
cb(back->dts(),back->pts(),merged_frame);
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
private:
List<Frame::Ptr> _frameCached;
};
#ifdef WIN32 #ifdef WIN32
#include <TCHAR.h> #include <TCHAR.h>
@ -129,7 +94,7 @@ int main(int argc, char *argv[]) {
displayer.set<YuvDisplayer>(nullptr,url); displayer.set<YuvDisplayer>(nullptr,url);
} }
if(!merger){ if(!merger){
merger.set<FrameMerger>(); merger.set<FrameMerger>(FrameMerger::h264_prefix);
} }
merger.get<FrameMerger>().inputFrame(frame,[&](uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer){ merger.get<FrameMerger>().inputFrame(frame,[&](uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer){
AVFrame *pFrame = nullptr; AVFrame *pFrame = nullptr;