改成异步解码,并修复内存泄露问题

This commit is contained in:
ziyue 2021-09-02 16:35:54 +08:00
parent 895dc04fcc
commit 72c2df057a
3 changed files with 246 additions and 105 deletions

View File

@ -9,7 +9,7 @@
*/ */
#include "FFMpegDecoder.h" #include "FFMpegDecoder.h"
#define MAX_DELAY_SECOND 60 #define MAX_DELAY_SECOND 3
using namespace std; using namespace std;
using namespace mediakit; using namespace mediakit;
@ -20,13 +20,22 @@ static string ffmpeg_err(int errnum){
return errbuf; return errbuf;
} }
/////////////////////////////////////////////////////////////////////////// std::shared_ptr<AVPacket> alloc_av_packet(){
auto pkt = std::shared_ptr<AVPacket>(av_packet_alloc(), [](AVPacket *pkt) {
av_packet_free(&pkt);
});
pkt->data = NULL; // packet data will be allocated by the encoder
pkt->size = 0;
return pkt;
}
//////////////////////////////////////////////////////////////////////////////////////////
template<bool decoder = true, typename ...ARGS> template<bool decoder = true, typename ...ARGS>
AVCodec *getCodecByName(ARGS ...names); AVCodec *getCodec(ARGS ...names);
template<bool decoder = true, typename ...ARGS> template<bool decoder = true>
AVCodec *getCodecByName(const char *name) { AVCodec *getCodec(const char *name) {
auto codec = decoder ? avcodec_find_decoder_by_name(name) : avcodec_find_encoder_by_name(name); auto codec = decoder ? avcodec_find_decoder_by_name(name) : avcodec_find_encoder_by_name(name);
if (codec) { if (codec) {
InfoL << (decoder ? "got decoder:" : "got encoder:") << name; InfoL << (decoder ? "got decoder:" : "got encoder:") << name;
@ -34,15 +43,6 @@ AVCodec *getCodecByName(const char *name) {
return codec; return codec;
} }
template<bool decoder = true, typename ...ARGS>
AVCodec *getCodecByName(const char *name, ARGS ...names) {
auto codec = getCodecByName<decoder>(names...);
if (codec) {
return codec;
}
return getCodecByName<decoder>(name);
}
template<bool decoder = true> template<bool decoder = true>
AVCodec *getCodec(enum AVCodecID id) { AVCodec *getCodec(enum AVCodecID id) {
auto codec = decoder ? avcodec_find_decoder(id) : avcodec_find_encoder(id); auto codec = decoder ? avcodec_find_decoder(id) : avcodec_find_encoder(id);
@ -52,23 +52,22 @@ AVCodec *getCodec(enum AVCodecID id) {
return codec; return codec;
} }
template<bool decoder = true, typename ...ARGS> template<bool decoder = true, typename First, typename ...ARGS>
AVCodec *getCodec(enum AVCodecID id, ARGS ...names) { AVCodec *getCodec(First first, ARGS ...names) {
auto codec = getCodecByName<decoder>(names...); auto codec = getCodec<decoder>(names...);
if (codec) { if (codec) {
return codec; return codec;
} }
return getCodec<decoder>(id); return getCodec<decoder>(first);
} }
/////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////
FFmpegFrame::FFmpegFrame(std::shared_ptr<AVFrame> frame) { FFmpegFrame::FFmpegFrame(std::shared_ptr<AVFrame> frame) {
if (frame) { if (frame) {
_frame = std::move(frame); _frame = std::move(frame);
} else { } else {
_frame.reset(av_frame_alloc(), [](AVFrame *ptr) { _frame.reset(av_frame_alloc(), [](AVFrame *ptr) {
av_frame_unref(ptr);
av_frame_free(&ptr); av_frame_free(&ptr);
}); });
} }
@ -85,14 +84,13 @@ AVFrame *FFmpegFrame::get() const{
return _frame.get(); return _frame.get();
} }
/////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////
FFmpegSwr::FFmpegSwr(AVSampleFormat output, int channel, int channel_layout, int samplerate) { FFmpegSwr::FFmpegSwr(AVSampleFormat output, int channel, int channel_layout, int samplerate) {
_target_format = output; _target_format = output;
_target_channels = channel; _target_channels = channel;
_target_channel_layout = channel_layout; _target_channel_layout = channel_layout;
_target_samplerate = samplerate; _target_samplerate = samplerate;
_frame_pool.setSize(8);
} }
FFmpegSwr::~FFmpegSwr() { FFmpegSwr::~FFmpegSwr() {
@ -117,48 +115,40 @@ FFmpegFrame::Ptr FFmpegSwr::inputFrame(const FFmpegFrame::Ptr &frame){
<< av_get_sample_fmt_name(_target_format); << av_get_sample_fmt_name(_target_format);
} }
if (_ctx) { if (_ctx) {
FFmpegFrame::Ptr out = _frame_pool.obtain(); auto out = std::make_shared<FFmpegFrame>();
out->get()->format = _target_format; out->get()->format = _target_format;
out->get()->channel_layout = _target_channel_layout; out->get()->channel_layout = _target_channel_layout;
out->get()->channels = _target_channels; out->get()->channels = _target_channels;
out->get()->sample_rate = _target_samplerate; out->get()->sample_rate = _target_samplerate;
out->get()->pkt_dts = frame->get()->pkt_dts; out->get()->pkt_dts = frame->get()->pkt_dts;
out->get()->pkt_pts = frame->get()->pkt_pts;
out->get()->pts = frame->get()->pts; out->get()->pts = frame->get()->pts;
int ret; int ret = 0;
if(0 != (ret = swr_convert_frame(_ctx, out->get(), frame->get()))){ if(0 != (ret = swr_convert_frame(_ctx, out->get(), frame->get()))){
WarnL << "swr_convert_frame failed:" << ffmpeg_err(ret); WarnL << "swr_convert_frame failed:" << ffmpeg_err(ret);
return nullptr; return nullptr;
} }
//修正大小
out->get()->linesize[0] = out->get()->nb_samples * out->get()->channels * av_get_bytes_per_sample((enum AVSampleFormat)out->get()->format);
return out; return out;
} }
return nullptr; return nullptr;
} }
void FFmpegFrame::fillPicture(AVPixelFormat target_format, int target_width, int target_height){
assert(_data == nullptr);
_data = new char[avpicture_get_size(target_format, target_width, target_height)];
avpicture_fill((AVPicture *) _frame.get(), (uint8_t *) _data, target_format, target_width, target_height);
}
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
FFmpegDecoder::FFmpegDecoder(const Track::Ptr &track) { FFmpegDecoder::FFmpegDecoder(const Track::Ptr &track) {
_frame_pool.setSize(8);
avcodec_register_all(); avcodec_register_all();
AVCodec *codec = nullptr; AVCodec *codec = nullptr;
AVCodec *codec_default = nullptr; AVCodec *codec_default = nullptr;
switch (track->getCodecId()) { switch (track->getCodecId()) {
case CodecH264: case CodecH264:
codec_default = getCodec(AV_CODEC_ID_H264); codec_default = getCodec(AV_CODEC_ID_H264);
codec = getCodec(AV_CODEC_ID_H264, "h264_cuvid","h264_videotoolbox"); codec = getCodec("libopenh264", AV_CODEC_ID_H264, "h264_videotoolbox", "h264_cuvid");
break; break;
case CodecH265: case CodecH265:
codec_default = getCodec(AV_CODEC_ID_HEVC); codec_default = getCodec(AV_CODEC_ID_HEVC);
codec = getCodec(AV_CODEC_ID_HEVC, "hevc_cuvid","hevc_videotoolbox"); codec = getCodec(AV_CODEC_ID_HEVC, "hevc_videotoolbox", "hevc_cuvid");
break; break;
case CodecAAC: case CodecAAC:
codec = getCodec(AV_CODEC_ID_AAC); codec = getCodec(AV_CODEC_ID_AAC);
@ -200,23 +190,27 @@ FFmpegDecoder::FFmpegDecoder(const Track::Ptr &track) {
AudioTrack::Ptr audio = static_pointer_cast<AudioTrack>(track); AudioTrack::Ptr audio = static_pointer_cast<AudioTrack>(track);
_context->channels = audio->getAudioChannel(); _context->channels = audio->getAudioChannel();
_context->sample_rate = audio->getAudioSampleRate(); _context->sample_rate = audio->getAudioSampleRate();
_context->channel_layout = _context->channels == 1 ? AV_CH_LAYOUT_MONO : AV_CH_LAYOUT_STEREO; _context->channel_layout = av_get_default_channel_layout(_context->channels);
break; break;
} }
default: default:
break; break;
} }
AVDictionary *dict = nullptr; AVDictionary *dict = nullptr;
av_dict_set(&dict, "threads", to_string(thread::hardware_concurrency()).data(), 0); av_dict_set(&dict, "threads", "auto", 0);
av_dict_set(&dict, "zerolatency", "1", 0); av_dict_set(&dict, "zerolatency", "1", 0);
av_dict_set(&dict, "strict", "-2", 0); av_dict_set(&dict, "strict", "-2", 0);
if (codec->capabilities & AV_CODEC_CAP_TRUNCATED) { if (codec->capabilities & AV_CODEC_CAP_TRUNCATED) {
/* we do not send complete frames */ /* we do not send complete frames */
_context->flags |= AV_CODEC_FLAG_TRUNCATED; _context->flags |= AV_CODEC_FLAG_TRUNCATED;
} else {
// 此时业务层应该需要合帧
_do_merger = true;
} }
int ret = avcodec_open2(_context.get(), codec, &dict); int ret = avcodec_open2(_context.get(), codec, &dict);
av_dict_free(&dict);
if (ret >= 0) { if (ret >= 0) {
//成功 //成功
InfoL << "打开解码器成功:" << codec->name; InfoL << "打开解码器成功:" << codec->name;
@ -231,11 +225,19 @@ FFmpegDecoder::FFmpegDecoder(const Track::Ptr &track) {
} }
throw std::runtime_error(StrPrinter << "打开解码器" << codec->name << "失败:" << ffmpeg_err(ret)); throw std::runtime_error(StrPrinter << "打开解码器" << codec->name << "失败:" << ffmpeg_err(ret));
} }
if (track->getTrackType() == TrackVideo) {
startThread("decoder thread");
}
}
FFmpegDecoder::~FFmpegDecoder() {
stopThread();
} }
void FFmpegDecoder::flush() { void FFmpegDecoder::flush() {
while (true) { while (true) {
FFmpegFrame::Ptr out_frame = _frame_pool.obtain(); auto out_frame = std::make_shared<FFmpegFrame>();
auto ret = avcodec_receive_frame(_context.get(), out_frame->get()); auto ret = avcodec_receive_frame(_context.get(), out_frame->get());
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
break; break;
@ -252,20 +254,39 @@ const AVCodecContext *FFmpegDecoder::getContext() const{
return _context.get(); return _context.get();
} }
void FFmpegDecoder::inputFrame(const Frame::Ptr &frame) { void FFmpegDecoder::inputFrame_l(const Frame::Ptr &frame) {
inputFrame(frame->data(), frame->size(), frame->dts(), frame->pts()); if (_do_merger) {
_merger.inputFrame(frame, [&](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr) {
decodeFrame(buffer->data(), buffer->size(), dts, pts);
});
} else {
decodeFrame(frame->data(), frame->size(), frame->dts(), frame->pts());
}
} }
void FFmpegDecoder::inputFrame(const char *data, size_t size, uint32_t dts, uint32_t pts) { void FFmpegDecoder::inputFrame(const Frame::Ptr &frame) {
AVPacket pkt; if (!TaskManager::isEnabled()) {
av_init_packet(&pkt); inputFrame_l(frame);
} else {
auto frame_cache = Frame::getCacheAbleFrame(frame);
addDecodeTask(frame->keyFrame(), [this, frame_cache]() {
inputFrame_l(frame_cache);
//此处模拟解码太慢导致的主动丢帧
//usleep(100 * 1000);
});
}
}
pkt.data = (uint8_t *) data; void FFmpegDecoder::decodeFrame(const char *data, size_t size, uint32_t dts, uint32_t pts) {
pkt.size = size; TimeTicker2(30, TraceL);
pkt.dts = dts;
pkt.pts = pts;
auto ret = avcodec_send_packet(_context.get(), &pkt); auto pkt = alloc_av_packet();
pkt->data = (uint8_t *) data;
pkt->size = size;
pkt->dts = dts;
pkt->pts = pts;
auto ret = avcodec_send_packet(_context.get(), pkt.get());
if (ret < 0) { if (ret < 0) {
if (ret != AVERROR_INVALIDDATA) { if (ret != AVERROR_INVALIDDATA) {
WarnL << "avcodec_send_packet failed:" << ffmpeg_err(ret); WarnL << "avcodec_send_packet failed:" << ffmpeg_err(ret);
@ -274,7 +295,7 @@ void FFmpegDecoder::inputFrame(const char *data, size_t size, uint32_t dts, uint
} }
while (true) { while (true) {
FFmpegFrame::Ptr out_frame = _frame_pool.obtain(); auto out_frame = std::make_shared<FFmpegFrame>();
ret = avcodec_receive_frame(_context.get(), out_frame->get()); ret = avcodec_receive_frame(_context.get(), out_frame->get());
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
break; break;
@ -283,9 +304,9 @@ void FFmpegDecoder::inputFrame(const char *data, size_t size, uint32_t dts, uint
WarnL << "avcodec_receive_frame failed:" << ffmpeg_err(ret); WarnL << "avcodec_receive_frame failed:" << ffmpeg_err(ret);
break; break;
} }
if (pts - out_frame->get()->pkt_pts > MAX_DELAY_SECOND * 1000 && _ticker.createdTime() > 10 * 1000) { if (pts - out_frame->get()->pts > MAX_DELAY_SECOND * 1000 && _ticker.createdTime() > 10 * 1000) {
//后面的帧才忽略,防止Track无法ready //后面的帧才忽略,防止Track无法ready
WarnL << "解码时,忽略" << MAX_DELAY_SECOND << "秒前的数据:" << pts << " " << out_frame->get()->pkt_pts; WarnL << "解码时,忽略" << MAX_DELAY_SECOND << "秒前的数据:" << pts << " " << out_frame->get()->pts;
continue; continue;
} }
onDecode(out_frame); onDecode(out_frame);
@ -297,16 +318,105 @@ void FFmpegDecoder::setOnDecode(FFmpegDecoder::onDec cb) {
} }
void FFmpegDecoder::onDecode(const FFmpegFrame::Ptr &frame) { void FFmpegDecoder::onDecode(const FFmpegFrame::Ptr &frame) {
if (_context->codec_type == AVMEDIA_TYPE_AUDIO) { if (_cb) {
if (!_swr) {
//固定输出16位整型的pcm
_swr = std::make_shared<FFmpegSwr>(AV_SAMPLE_FMT_S16, frame->get()->channels, frame->get()->channel_layout, frame->get()->sample_rate);
}
//音频情况下转换音频format类型比如说浮点型转换为int型
const_cast<FFmpegFrame::Ptr &>(frame) = _swr->inputFrame(frame);
}
if (_cb && frame) {
_cb(frame); _cb(frame);
} }
} }
////////////////////////////////////////////////////////////////////////
void TaskManager::pushExit(){
{
lock_guard<mutex> lck(_task_mtx);
_exit = true;
_task.clear();
_task.emplace_back([](){
throw ThreadExitException();
});
}
_sem.post(10);
}
void TaskManager::addEncodeTask(function<void()> task) {
{
lock_guard<mutex> lck(_task_mtx);
_task.emplace_back(std::move(task));
if (_task.size() > 30) {
WarnL << "encoder thread task is too more, now drop frame!";
_task.pop_front();
}
}
_sem.post();
}
void TaskManager::addDecodeTask(bool key_frame, function<void()> task) {
{
lock_guard<mutex> lck(_task_mtx);
if (_decode_drop_start) {
if (!key_frame) {
TraceL << "decode thread drop frame";
return;
}
_decode_drop_start = false;
InfoL << "decode thread stop drop frame";
}
_task.emplace_back(std::move(task));
if (_task.size() > 30) {
_decode_drop_start = true;
WarnL << "decode thread start drop frame";
}
}
_sem.post();
}
void TaskManager::startThread(const string &name) {
_thread.reset(new thread([this, name]() {
onThreadRun(name);
}), [this](thread *ptr) {
pushExit();
ptr->join();
delete ptr;
});
}
void TaskManager::stopThread() {
_thread = nullptr;
}
TaskManager::~TaskManager() {
stopThread();
}
bool TaskManager::isEnabled() const {
return _thread.operator bool();
}
void TaskManager::onThreadRun(const string &name) {
setThreadName(name.data());
function<void()> task;
_exit = false;
while (!_exit) {
_sem.wait();
{
unique_lock<mutex> lck(_task_mtx);
if (_task.empty()) {
continue;
}
task = _task.front();
_task.pop_front();
}
try {
TimeTicker2(50, TraceL);
task();
task = nullptr;
} catch (ThreadExitException &ex) {
break;
} catch (std::exception &ex) {
WarnL << ex.what();
continue;
}
}
InfoL << name << " exited!";
}

View File

@ -10,11 +10,9 @@
#ifndef FFMpegDecoder_H_ #ifndef FFMpegDecoder_H_
#define FFMpegDecoder_H_ #define FFMpegDecoder_H_
#include <string>
#include <memory> #include "Util/TimeTicker.h"
#include <stdexcept> #include "Common/MediaSink.h"
#include "Extension/Frame.h"
#include "Extension/Track.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -25,9 +23,6 @@ extern "C" {
} }
#endif #endif
using namespace std;
using namespace mediakit;
class FFmpegFrame { class FFmpegFrame {
public: public:
using Ptr = std::shared_ptr<FFmpegFrame>; using Ptr = std::shared_ptr<FFmpegFrame>;
@ -36,7 +31,6 @@ public:
~FFmpegFrame(); ~FFmpegFrame();
AVFrame *get() const; AVFrame *get() const;
void fillPicture(AVPixelFormat target_format, int target_width, int target_height);
private: private:
char *_data = nullptr; char *_data = nullptr;
@ -58,33 +52,65 @@ private:
int _target_samplerate; int _target_samplerate;
AVSampleFormat _target_format; AVSampleFormat _target_format;
SwrContext *_ctx = nullptr; SwrContext *_ctx = nullptr;
ResourcePool<FFmpegFrame> _frame_pool;
}; };
class FFmpegDecoder : public FrameWriterInterface { class TaskManager {
public:
TaskManager() = default;
~TaskManager();
protected:
void startThread(const string &name);
void stopThread();
void addEncodeTask(function<void()> task);
void addDecodeTask(bool key_frame, function<void()> task);
bool isEnabled() const;
private:
void onThreadRun(const string &name);
void pushExit();
private:
class ThreadExitException : public std::runtime_error {
public:
ThreadExitException() : std::runtime_error("exit") {}
~ThreadExitException() = default;
};
private:
bool _decode_drop_start = false;
bool _exit = false;
mutex _task_mtx;
semaphore _sem;
List<function<void()> > _task;
std::shared_ptr<thread> _thread;
};
class FFmpegDecoder : public FrameWriterInterface, private TaskManager {
public: public:
using Ptr = std::shared_ptr<FFmpegDecoder>; using Ptr = std::shared_ptr<FFmpegDecoder>;
using onDec = function<void(const FFmpegFrame::Ptr &)>; using onDec = function<void(const FFmpegFrame::Ptr &)>;
FFmpegDecoder(const Track::Ptr &track); FFmpegDecoder(const Track::Ptr &track);
~FFmpegDecoder() {} ~FFmpegDecoder();
void inputFrame(const Frame::Ptr &frame) override; void inputFrame(const Frame::Ptr &frame) override;
void inputFrame(const char *data, size_t size, uint32_t dts, uint32_t pts);
void setOnDecode(onDec cb); void setOnDecode(onDec cb);
void flush(); void flush();
const AVCodecContext *getContext() const; const AVCodecContext *getContext() const;
private: private:
void onDecode(const FFmpegFrame::Ptr &frame); void onDecode(const FFmpegFrame::Ptr &frame);
void inputFrame_l(const Frame::Ptr &frame);
void decodeFrame(const char *data, size_t size, uint32_t dts, uint32_t pts);
private: private:
bool _do_merger = false;
Ticker _ticker; Ticker _ticker;
onDec _cb; onDec _cb;
FFmpegSwr::Ptr _swr;
ResourcePool<FFmpegFrame> _frame_pool;
std::shared_ptr<AVCodecContext> _context; std::shared_ptr<AVCodecContext> _context;
FrameMerger _merger{FrameMerger::h264_prefix};
}; };
#endif /* FFMpegDecoder_H_ */ #endif /* FFMpegDecoder_H_ */

View File

@ -83,11 +83,8 @@ int main(int argc, char *argv[]) {
return true; return true;
}); });
}); });
auto merger = std::make_shared<FrameMerger>(FrameMerger::h264_prefix); auto delegate = std::make_shared<FrameWriterInterfaceHelper>([decoder](const Frame::Ptr &frame) {
auto delegate = std::make_shared<FrameWriterInterfaceHelper>([decoder, merger](const Frame::Ptr &frame) { decoder->inputFrame(frame);
merger->inputFrame(frame, [&](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool have_idr) {
decoder->inputFrame(buffer->data(), buffer->size(), dts, pts);
});
}); });
videoTrack->addDelegate(delegate); videoTrack->addDelegate(delegate);
} }
@ -97,8 +94,16 @@ int main(int argc, char *argv[]) {
auto audio_player = std::make_shared<AudioPlayer>(); auto audio_player = std::make_shared<AudioPlayer>();
//FFmpeg解码时已经统一转换为16位整型pcm //FFmpeg解码时已经统一转换为16位整型pcm
audio_player->setup(audioTrack->getAudioSampleRate(), audioTrack->getAudioChannel(), AUDIO_S16); audio_player->setup(audioTrack->getAudioSampleRate(), audioTrack->getAudioChannel(), AUDIO_S16);
decoder->setOnDecode([audio_player](const FFmpegFrame::Ptr &pcm) { FFmpegSwr::Ptr swr;
audio_player->playPCM((const char *) (pcm->get()->data[0]), pcm->get()->linesize[0]);
decoder->setOnDecode([audio_player, swr](const FFmpegFrame::Ptr &frame) mutable{
if (!swr) {
swr = std::make_shared<FFmpegSwr>(AV_SAMPLE_FMT_S16, frame->get()->channels,
frame->get()->channel_layout, frame->get()->sample_rate);
}
auto pcm = swr->inputFrame(frame);
auto len = pcm->get()->nb_samples * pcm->get()->channels * av_get_bytes_per_sample((enum AVSampleFormat)pcm->get()->format);
audio_player->playPCM((const char *) (pcm->get()->data[0]), len);
}); });
auto audio_delegate = std::make_shared<FrameWriterInterfaceHelper>( [decoder](const Frame::Ptr &frame) { auto audio_delegate = std::make_shared<FrameWriterInterfaceHelper>( [decoder](const Frame::Ptr &frame) {
decoder->inputFrame(frame); decoder->inputFrame(frame);