diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index 9df23287..c0aadd7d 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -61,8 +61,11 @@ int DecoderImp::input(const uint8_t *data, int bytes){ DecoderImp::DecoderImp(const Decoder::Ptr &decoder, MediaSinkInterface *sink){ _decoder = decoder; _sink = sink; - _decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){ - onDecode(stream,codecid,flags,pts,dts,data,bytes); + _decoder->setOnDecode([this](int stream, int codecid, int flags, int64_t pts, int64_t dts, const void *data, int bytes) { + onDecode(stream, codecid, flags, pts, dts, data, bytes); + }); + _decoder->setOnStream([this](int stream, int codecid, const void *extra, int bytes, int finish) { + onStream(stream, codecid, extra, bytes, finish); }); } @@ -115,25 +118,65 @@ void FrameMerger::inputFrame(const Frame::Ptr &frame,const function(); + onTrack(track); + break; + } + + case PSI_STREAM_H265: { + InfoL << "got video track: H265"; + auto track = std::make_shared(); + onTrack(track); + break; + } + + case PSI_STREAM_AAC: { + InfoL<< "got audio track: AAC"; + auto track = std::make_shared(); + onTrack(track); + break; + } + + case PSI_STREAM_AUDIO_G711A: + case PSI_STREAM_AUDIO_G711U: { + auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; + InfoL << "got audio track: G711"; + //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 + auto track = std::make_shared(codec, 8000, 1, 16); + onTrack(track); + break; + } + + case PSI_STREAM_AUDIO_OPUS: { + InfoL << "got audio track: opus"; + auto track = std::make_shared(); + onTrack(track); + break; + } + + default: + if(codecid != 0){ + WarnL<< "unsupported codec type:" << getCodecName(codecid) << " " << (int)codecid; + } + break; + } + + if (finish) { + _sink->addTrackCompleted(); + InfoL << "add track finished"; + } +} + void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes) { pts /= 90; dts /= 90; switch (codecid) { case PSI_STREAM_H264: { - if (!_codecid_video) { - //获取到视频 - _codecid_video = codecid; - InfoL<< "got video track: H264"; - auto track = std::make_shared(); - onTrack(track); - } - - if (codecid != _codecid_video) { - WarnL<< "video track change to H264 from codecid:" << getCodecName(_codecid_video); - return; - } - auto frame = std::make_shared((char *) data, bytes, dts, pts,0); _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) { onFrame(std::make_shared >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); @@ -142,17 +185,6 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d } case PSI_STREAM_H265: { - if (!_codecid_video) { - //获取到视频 - _codecid_video = codecid; - InfoL<< "got video track: H265"; - auto track = std::make_shared(); - onTrack(track); - } - if (codecid != _codecid_video) { - WarnL<< "video track change to H265 from codecid:" << getCodecName(_codecid_video); - return; - } auto frame = std::make_shared((char *) data, bytes, dts, pts, 0); _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) { onFrame(std::make_shared >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); @@ -166,18 +198,6 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d //这不是aac break; } - if (!_codecid_audio) { - //获取到音频 - _codecid_audio = codecid; - InfoL<< "got audio track: AAC"; - auto track = std::make_shared(); - onTrack(track); - } - - if (codecid != _codecid_audio) { - WarnL<< "audio track change to AAC from codecid:" << getCodecName(_codecid_audio); - return; - } onFrame(std::make_shared(CodecAAC, (char *) data, bytes, dts, 0, ADTS_HEADER_LEN)); break; } @@ -185,36 +205,11 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d case PSI_STREAM_AUDIO_G711A: case PSI_STREAM_AUDIO_G711U: { auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; - if (!_codecid_audio) { - //获取到音频 - _codecid_audio = codecid; - InfoL<< "got audio track: G711"; - //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 - auto track = std::make_shared(codec, 8000, 1, 16); - onTrack(track); - } - - if (codecid != _codecid_audio) { - WarnL<< "audio track change to G711 from codecid:" << getCodecName(_codecid_audio); - return; - } onFrame(std::make_shared(codec, (char *) data, bytes, dts)); break; } case PSI_STREAM_AUDIO_OPUS: { - if (!_codecid_audio) { - //获取到音频 - _codecid_audio = codecid; - InfoL << "got audio track: opus"; - auto track = std::make_shared(); - onTrack(track); - } - - if (codecid != _codecid_audio) { - WarnL << "audio track change to opus from codecid:" << getCodecName(_codecid_audio); - return; - } onFrame(std::make_shared(CodecOpus, (char *) data, bytes, dts)); break; } diff --git a/src/Rtp/Decoder.h b/src/Rtp/Decoder.h index 5552500f..6234a876 100644 --- a/src/Rtp/Decoder.h +++ b/src/Rtp/Decoder.h @@ -22,9 +22,13 @@ namespace mediakit { class Decoder { public: typedef std::shared_ptr Ptr; - typedef std::function onDecode; + typedef std::function onDecode; + typedef std::function onStream; + virtual int input(const uint8_t *data, int bytes) = 0; - virtual void setOnDecode(const onDecode &decode) = 0; + virtual void setOnDecode(onDecode cb) = 0; + virtual void setOnStream(onStream cb) = 0; + protected: Decoder() = default; virtual ~Decoder() = default; @@ -61,14 +65,13 @@ protected: private: DecoderImp(const Decoder::Ptr &decoder, MediaSinkInterface *sink); - void onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes); + void onDecode(int stream, int codecid, int flags, int64_t pts, int64_t dts, const void *data, int bytes); + void onStream(int stream, int codecid, const void *extra, int bytes, int finish); private: Decoder::Ptr _decoder; MediaSinkInterface *_sink; FrameMerger _merger; - int _codecid_video = 0; - int _codecid_audio = 0; }; }//namespace mediakit diff --git a/src/Rtp/PSDecoder.cpp b/src/Rtp/PSDecoder.cpp index 9e76dfc9..1c7a3b4e 100644 --- a/src/Rtp/PSDecoder.cpp +++ b/src/Rtp/PSDecoder.cpp @@ -28,6 +28,16 @@ PSDecoder::PSDecoder() { } return 0; },this); + + ps_demuxer_notify_t notify = { + [](void *param, int stream, int codecid, const void *extra, int bytes, int finish) { + PSDecoder *thiz = (PSDecoder *) param; + if (thiz->_on_stream) { + thiz->_on_stream(stream, codecid, extra, bytes, finish); + } + } + }; + ps_demuxer_set_notify((struct ps_demuxer_t *) _ps_demuxer, ¬ify, this); } PSDecoder::~PSDecoder() { @@ -38,8 +48,12 @@ int PSDecoder::input(const uint8_t *data, int bytes) { return ps_demuxer_input((struct ps_demuxer_t*)_ps_demuxer,data,bytes); } -void PSDecoder::setOnDecode(const Decoder::onDecode &decode) { - _on_decode = decode; +void PSDecoder::setOnDecode(Decoder::onDecode cb) { + _on_decode = std::move(cb); +} + +void PSDecoder::setOnStream(Decoder::onStream cb) { + _on_stream = std::move(cb); } }//namespace mediakit diff --git a/src/Rtp/PSDecoder.h b/src/Rtp/PSDecoder.h index b7f3a3ae..2be0dca3 100644 --- a/src/Rtp/PSDecoder.h +++ b/src/Rtp/PSDecoder.h @@ -22,10 +22,13 @@ public: PSDecoder(); ~PSDecoder(); int input(const uint8_t* data, int bytes) override; - void setOnDecode(const onDecode &decode) override; + void setOnDecode(onDecode cb) override; + void setOnStream(onStream cb) override; + private: void *_ps_demuxer = nullptr; onDecode _on_decode; + onStream _on_stream; }; }//namespace mediakit diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 9769b71d..80e71738 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -129,6 +129,10 @@ void RtpProcess::addTrack(const Track::Ptr &track) { _muxer->addTrack(track); } +void RtpProcess::addTrackCompleted() { + _muxer->addTrackCompleted(); +} + bool RtpProcess::alive() { GET_CONFIG(int, timeoutSec, RtpProxy::kTimeoutSec) if (_last_frame_time.elapsedTime() / 1000 < timeoutSec) { diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 7dfc9c7e..2551ab8b 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -66,6 +66,7 @@ public: protected: void inputFrame(const Frame::Ptr &frame) override; void addTrack(const Track::Ptr & track) override; + void addTrackCompleted() override; void resetTracks() override {}; //// MediaSourceEvent override //// diff --git a/src/Rtp/TSDecoder.cpp b/src/Rtp/TSDecoder.cpp index 92fb26dd..8bd2b995 100644 --- a/src/Rtp/TSDecoder.cpp +++ b/src/Rtp/TSDecoder.cpp @@ -61,6 +61,16 @@ TSDecoder::TSDecoder() : _ts_segment() { } return 0; },this); + + ts_demuxer_notify_t notify = { + [](void *param, int stream, int codecid, const void *extra, int bytes, int finish) { + TSDecoder *thiz = (TSDecoder *) param; + if (thiz->_on_stream) { + thiz->_on_stream(stream, codecid, extra, bytes, finish); + } + } + }; + ts_demuxer_set_notify((struct ts_demuxer_t *) _demuxer_ctx, ¬ify, this); } TSDecoder::~TSDecoder() { @@ -75,9 +85,14 @@ int TSDecoder::input(const uint8_t *data, int bytes) { return bytes; } -void TSDecoder::setOnDecode(const Decoder::onDecode &decode) { - _on_decode = decode; +void TSDecoder::setOnDecode(Decoder::onDecode cb) { + _on_decode = std::move(cb); } + +void TSDecoder::setOnStream(Decoder::onStream cb) { + _on_stream = std::move(cb); +} + #endif//defined(ENABLE_HLS) }//namespace mediakit diff --git a/src/Rtp/TSDecoder.h b/src/Rtp/TSDecoder.h index db6e8b4b..6f6b5d9d 100644 --- a/src/Rtp/TSDecoder.h +++ b/src/Rtp/TSDecoder.h @@ -44,11 +44,14 @@ public: TSDecoder(); ~TSDecoder(); int input(const uint8_t* data, int bytes) override ; - void setOnDecode(const onDecode &decode) override; + void setOnDecode(onDecode cb) override; + void setOnStream(onStream cb) override; + private: TSSegment _ts_segment; struct ts_demuxer_t* _demuxer_ctx = nullptr; onDecode _on_decode; + onStream _on_stream; }; #endif//defined(ENABLE_HLS)