diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 87543e22..d609cefe 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -18,95 +18,6 @@ namespace toolkit { namespace mediakit { -///////////////////////////////MultiMuxerPrivate////////////////////////////////// - -MultiMuxerPrivate::~MultiMuxerPrivate() {} -MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, const string &stream, float dur_sec, - bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) { - _stream_url = vhost + " " + app + " " + stream; - if (enable_rtmp) { - _rtmp = std::make_shared(vhost, app, stream, std::make_shared(dur_sec)); - } - if (enable_rtsp) { - _rtsp = std::make_shared(vhost, app, stream, std::make_shared(dur_sec)); - } - - if (enable_hls) { - _hls = dynamic_pointer_cast(Recorder::createRecorder(Recorder::type_hls, vhost, app, stream)); - } - - if (enable_mp4) { - _mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream); - } - - _ts = std::make_shared(vhost, app, stream); - -#if defined(ENABLE_MP4) - _fmp4 = std::make_shared(vhost, app, stream); -#endif -} - -void MultiMuxerPrivate::resetTracks() { - if (_rtmp) { - _rtmp->resetTracks(); - } - if (_rtsp) { - _rtsp->resetTracks(); - } - if (_ts) { - _ts->resetTracks(); - } -#if defined(ENABLE_MP4) - if (_fmp4) { - _fmp4->resetTracks(); - } -#endif - - //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 - auto hls = _hls; - if (hls) { - hls->resetTracks(); - } - - auto mp4 = _mp4; - if (mp4) { - mp4->resetTracks(); - } -} - -void MultiMuxerPrivate::setMediaListener(const std::weak_ptr &listener) { - _listener = listener; - if (_rtmp) { - _rtmp->setListener(listener); - } - if (_rtsp) { - _rtsp->setListener(listener); - } - if (_ts) { - _ts->setListener(listener); - } -#if defined(ENABLE_MP4) - if (_fmp4) { - _fmp4->setListener(listener); - } -#endif - auto hls = _hls; - if (hls) { - hls->setListener(listener); - } -} - -int MultiMuxerPrivate::totalReaderCount() const { - auto hls = _hls; - return (_rtsp ? _rtsp->readerCount() : 0) + - (_rtmp ? _rtmp->readerCount() : 0) + - (_ts ? _ts->readerCount() : 0) + -#if defined(ENABLE_MP4) - (_fmp4 ? _fmp4->readerCount() : 0) + -#endif - (hls ? hls->readerCount() : 0); -} - static std::shared_ptr makeRecorder(MediaSource &sender, const vector &tracks, Recorder::type type, const string &custom_path, size_t max_second){ auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), custom_path, max_second); for (auto &track : tracks) { @@ -115,129 +26,6 @@ static std::shared_ptr makeRecorder(MediaSource &sender, con return recorder; } -//此函数可能跨线程调用 -bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second){ - switch (type) { - case Recorder::type_hls : { - if (start && !_hls) { - //开始录制 - auto hls = dynamic_pointer_cast(makeRecorder(sender, getTracks(true), type, custom_path, max_second)); - if (hls) { - //设置HlsMediaSource的事件监听器 - hls->setListener(_listener); - } - _hls = hls; - } else if (!start && _hls) { - //停止录制 - _hls = nullptr; - } - return true; - } - case Recorder::type_mp4 : { - if (start && !_mp4) { - //开始录制 - _mp4 = makeRecorder(sender, getTracks(true), type, custom_path, max_second); - } else if (!start && _mp4) { - //停止录制 - _mp4 = nullptr; - } - return true; - } - default : return false; - } -} - -//此函数可能跨线程调用 -bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){ - switch (type){ - case Recorder::type_hls : - return _hls ? true : false; - case Recorder::type_mp4 : - return _mp4 ? true : false; - default: - return false; - } -} - -void MultiMuxerPrivate::setTimeStamp(uint32_t stamp) { - if (_rtmp) { - _rtmp->setTimeStamp(stamp); - } - if (_rtsp) { - _rtsp->setTimeStamp(stamp); - } -} - -void MultiMuxerPrivate::setTrackListener(Listener *listener) { - _track_listener = listener; -} - -void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) { - if (_rtmp) { - _rtmp->addTrack(track); - } - if (_rtsp) { - _rtsp->addTrack(track); - } - if (_ts) { - _ts->addTrack(track); - } -#if defined(ENABLE_MP4) - if (_fmp4) { - _fmp4->addTrack(track); - } -#endif - - //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 - auto hls = _hls; - if (hls) { - hls->addTrack(track); - } - auto mp4 = _mp4; - if (mp4) { - mp4->addTrack(track); - } -} - -bool MultiMuxerPrivate::isEnabled(){ - auto hls = _hls; - return (_rtmp ? _rtmp->isEnabled() : false) || - (_rtsp ? _rtsp->isEnabled() : false) || - (_ts ? _ts->isEnabled() : false) || -#if defined(ENABLE_MP4) - (_fmp4 ? _fmp4->isEnabled() : false) || -#endif - (hls ? hls->isEnabled() : false) || _mp4; -} - -void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) { - if (_rtmp) { - _rtmp->inputFrame(frame); - } - if (_rtsp) { - _rtsp->inputFrame(frame); - } - if (_ts) { - _ts->inputFrame(frame); - } -#if defined(ENABLE_MP4) - if (_fmp4) { - _fmp4->inputFrame(frame); - } -#endif - - //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 - //此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优 - auto hls = _hls; - if (hls) { - hls->inputFrame(frame); - } - auto mp4 = _mp4; - if (mp4) { - mp4->inputFrame(frame); - } -} - static string getTrackInfoStr(const TrackSource *track_src){ _StrPrinter codec_info; auto tracks = track_src->getTracks(true); @@ -268,58 +56,83 @@ static string getTrackInfoStr(const TrackSource *track_src){ return std::move(codec_info); } -void MultiMuxerPrivate::onAllTrackReady() { - if (_rtmp) { - _rtmp->onAllTrackReady(); - } - if (_rtsp) { - _rtsp->onAllTrackReady(); - } -#if defined(ENABLE_MP4) - if (_fmp4) { - _fmp4->onAllTrackReady(); - } -#endif - if (_track_listener) { - _track_listener->onAllTrackReady(); - } - InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this); -} - -///////////////////////////////MultiMediaSourceMuxer////////////////////////////////// - -MultiMediaSourceMuxer::~MultiMediaSourceMuxer() {} - MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) { - _muxer.reset(new MultiMuxerPrivate(vhost, app, stream, dur_sec, enable_rtsp, enable_rtmp, enable_hls, enable_mp4)); - _muxer->setTrackListener(this); + if (enable_rtmp) { + _rtmp = std::make_shared(vhost, app, stream, std::make_shared(dur_sec)); + } + if (enable_rtsp) { + _rtsp = std::make_shared(vhost, app, stream, std::make_shared(dur_sec)); + } + + if (enable_hls) { + _hls = dynamic_pointer_cast(Recorder::createRecorder(Recorder::type_hls, vhost, app, stream)); + } + + if (enable_mp4) { + _mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream); + } + + _ts = std::make_shared(vhost, app, stream); + +#if defined(ENABLE_MP4) + _fmp4 = std::make_shared(vhost, app, stream); +#endif } void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr &listener) { setDelegate(listener); + + auto self = shared_from_this(); //拦截事件 - _muxer->setMediaListener(shared_from_this()); + if (_rtmp) { + _rtmp->setListener(self); + } + if (_rtsp) { + _rtsp->setListener(self); + } + if (_ts) { + _ts->setListener(self); + } +#if defined(ENABLE_MP4) + if (_fmp4) { + _fmp4->setListener(self); + } +#endif + auto hls = _hls; + if (hls) { + hls->setListener(self); + } } -void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr &listener) { +void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr &listener) { _track_listener = listener; } int MultiMediaSourceMuxer::totalReaderCount() const { + auto hls = _hls; + auto ret = (_rtsp ? _rtsp->readerCount() : 0) + + (_rtmp ? _rtmp->readerCount() : 0) + + (_ts ? _ts->readerCount() : 0) + + #if defined(ENABLE_MP4) + (_fmp4 ? _fmp4->readerCount() : 0) + + #endif + (hls ? hls->readerCount() : 0); + #if defined(ENABLE_RTPPROXY) - return _muxer->totalReaderCount() + (int)_rtp_sender.size(); + return ret + (int)_rtp_sender.size(); #else - return _muxer->totalReaderCount(); + return ret; #endif } void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) { - _muxer->setTimeStamp(stamp); -} - -vector MultiMediaSourceMuxer::getTracks(MediaSource &sender, bool trackReady) const { - return _muxer->getTracks(trackReady); + if (_rtmp) { + _rtmp->setTimeStamp(stamp); + } + if (_rtsp) { + _rtsp->setTimeStamp(stamp); + } } int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) { @@ -330,12 +143,48 @@ int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) { return listener->totalReaderCount(sender); } +//此函数可能跨线程调用 bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) { - return _muxer->setupRecord(sender, type, start, custom_path, max_second); + switch (type) { + case Recorder::type_hls : { + if (start && !_hls) { + //开始录制 + auto hls = dynamic_pointer_cast(makeRecorder(sender, MediaSink::getTracks(), type, custom_path, max_second)); + if (hls) { + //设置HlsMediaSource的事件监听器 + hls->setListener(shared_from_this()); + } + _hls = hls; + } else if (!start && _hls) { + //停止录制 + _hls = nullptr; + } + return true; + } + case Recorder::type_mp4 : { + if (start && !_mp4) { + //开始录制 + _mp4 = makeRecorder(sender, MediaSink::getTracks(), type, custom_path, max_second); + } else if (!start && _mp4) { + //停止录制 + _mp4 = nullptr; + } + return true; + } + default : return false; + } } +//此函数可能跨线程调用 bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type) { - return _muxer->isRecording(sender,type); + switch (type){ + case Recorder::type_hls : + return _hls ? true : false; + case Recorder::type_mp4 : + return _mp4 ? true : false; + default: + return false; + } } void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ @@ -348,7 +197,7 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const string &dst_url, u if (!strong_self || ex) { return; } - for (auto &track : strong_self->_muxer->getTracks(false)) { + for (auto &track : strong_self->MediaSink::getTracks(false)) { rtp_sender->addTrack(track); } rtp_sender->addTrackCompleted(); @@ -383,28 +232,94 @@ bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string &ssrc) #endif//ENABLE_RTPPROXY } -void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) { +void MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { if (CodecL16 == track->getCodecId()) { WarnL << "L16音频格式目前只支持RTSP协议推流拉流!!!"; return; } - _muxer->addTrack(track); -} -void MultiMediaSourceMuxer::addTrackCompleted() { - _muxer->addTrackCompleted(); -} + if (_rtmp) { + _rtmp->addTrack(track); + } + if (_rtsp) { + _rtsp->addTrack(track); + } + if (_ts) { + _ts->addTrack(track); + } +#if defined(ENABLE_MP4) + if (_fmp4) { + _fmp4->addTrack(track); + } +#endif -void MultiMediaSourceMuxer::onAllTrackReady(){ - _muxer->setMediaListener(shared_from_this()); - auto listener = _track_listener.lock(); - if(listener){ - listener->onAllTrackReady(); + //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 + auto hls = _hls; + if (hls) { + hls->addTrack(track); + } + auto mp4 = _mp4; + if (mp4) { + mp4->addTrack(track); } } +void MultiMediaSourceMuxer::onAllTrackReady() { + setMediaListener(getDelegate()); + + if (_rtmp) { + _rtmp->onAllTrackReady(); + } + if (_rtsp) { + _rtsp->onAllTrackReady(); + } +#if defined(ENABLE_MP4) + if (_fmp4) { + _fmp4->onAllTrackReady(); + } +#endif + auto listener = _track_listener.lock(); + if (listener) { + listener->onAllTrackReady(); + } + InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this); +} + void MultiMediaSourceMuxer::resetTracks() { - _muxer->resetTracks(); + MediaSink::resetTracks(); + + if (_rtmp) { + _rtmp->resetTracks(); + } + if (_rtsp) { + _rtsp->resetTracks(); + } + if (_ts) { + _ts->resetTracks(); + } +#if defined(ENABLE_MP4) + if (_fmp4) { + _fmp4->resetTracks(); + } +#endif + +#if defined(ENABLE_RTPPROXY) + lock_guard lck(_rtp_sender_mtx); + for (auto &pr : _rtp_sender) { + pr.second->resetTracks(); + } +#endif + + //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 + auto hls = _hls; + if (hls) { + hls->resetTracks(); + } + + auto mp4 = _mp4; + if (mp4) { + mp4->resetTracks(); + } } //该类实现frame级别的时间戳覆盖 @@ -459,14 +374,40 @@ private: Frame::Ptr _frame; }; -void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { +void MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) { GET_CONFIG(bool, modify_stamp, General::kModifyStamp); auto frame = frame_in; if (modify_stamp) { //开启了时间戳覆盖 frame = std::make_shared(frame, _stamp[frame->getTrackType()]); } - _muxer->inputFrame(frame); + + if (_rtmp) { + _rtmp->inputFrame(frame); + } + if (_rtsp) { + _rtsp->inputFrame(frame); + } + if (_ts) { + _ts->inputFrame(frame); + } + + //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 + //此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优 + auto hls = _hls; + if (hls) { + hls->inputFrame(frame); + } + auto mp4 = _mp4; + if (mp4) { + mp4->inputFrame(frame); + } + +#if defined(ENABLE_MP4) + if (_fmp4) { + _fmp4->inputFrame(frame); + } +#endif #if defined(ENABLE_RTPPROXY) lock_guard lck(_rtp_sender_mtx); @@ -474,7 +415,6 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { pr.second->inputFrame(frame); } #endif //ENABLE_RTPPROXY - } bool MultiMediaSourceMuxer::isEnabled(){ @@ -482,10 +422,19 @@ bool MultiMediaSourceMuxer::isEnabled(){ if (!_is_enable || _last_check.elapsedTime() > stream_none_reader_delay_ms) { //无人观看时,每次检查是否真的无人观看 //有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能) + auto hls = _hls; + auto flag = (_rtmp ? _rtmp->isEnabled() : false) || + (_rtsp ? _rtsp->isEnabled() : false) || + (_ts ? _ts->isEnabled() : false) || + #if defined(ENABLE_MP4) + (_fmp4 ? _fmp4->isEnabled() : false) || + #endif + (hls ? hls->isEnabled() : false) || _mp4; + #if defined(ENABLE_RTPPROXY) - _is_enable = (_muxer->isEnabled() || _rtp_sender.size()); + _is_enable = flag || _rtp_sender.size(); #else - _is_enable = _muxer->isEnabled(); + _is_enable = flag; #endif //ENABLE_RTPPROXY if (_is_enable) { //无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍,所以刷新计数器无意义且浪费cpu @@ -495,5 +444,4 @@ bool MultiMediaSourceMuxer::isEnabled(){ return _is_enable; } - -}//namespace mediakit +}//namespace mediakit \ No newline at end of file diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index e37cde9f..95e9dde8 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -23,10 +23,10 @@ namespace mediakit{ -class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this{ +class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSink, public std::enable_shared_from_this{ public: - friend class MultiMediaSourceMuxer; - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; + class Listener{ public: Listener() = default; @@ -34,43 +34,7 @@ public: virtual void onAllTrackReady() = 0; }; - ~MultiMuxerPrivate() override; - -private: - MultiMuxerPrivate(const string &vhost,const string &app, const string &stream,float dur_sec, - bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4); - void resetTracks() override; - void setMediaListener(const std::weak_ptr &listener); - int totalReaderCount() const; - void setTimeStamp(uint32_t stamp); - void setTrackListener(Listener *listener); - bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second); - bool isRecording(MediaSource &sender, Recorder::type type); - bool isEnabled(); - void onTrackReady(const Track::Ptr & track) override; - void onTrackFrame(const Frame::Ptr &frame) override; - void onAllTrackReady() override; - -private: - string _stream_url; - Listener *_track_listener = nullptr; - RtmpMediaSourceMuxer::Ptr _rtmp; - RtspMediaSourceMuxer::Ptr _rtsp; - HlsRecorder::Ptr _hls; - MediaSinkInterface::Ptr _mp4; - TSMediaSourceMuxer::Ptr _ts; -#if defined(ENABLE_MP4) - FMP4MediaSourceMuxer::Ptr _fmp4; -#endif - std::weak_ptr _listener; -}; - -class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSinkInterface, public MultiMuxerPrivate::Listener, public std::enable_shared_from_this{ -public: - typedef MultiMuxerPrivate::Listener Listener; - typedef std::shared_ptr Ptr; - - ~MultiMediaSourceMuxer() override; + ~MultiMediaSourceMuxer() override = default; MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec = 0.0, bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false); @@ -84,7 +48,7 @@ public: * 随着Track就绪事件监听器 * @param listener 事件监听器 */ - void setTrackListener(const std::weak_ptr &listener); + void setTrackListener(const std::weak_ptr &listener); /** * 返回总的消费者个数 @@ -104,13 +68,6 @@ public: /////////////////////////////////MediaSourceEvent override///////////////////////////////// - /** - * 获取所有Track - * @param trackReady 是否筛选过滤未就绪的track - * @return 所有Track - */ - vector getTracks(MediaSource &sender, bool trackReady = true) const override; - /** * 观看总人数 * @param sender 事件发送者 @@ -150,48 +107,52 @@ public: */ bool stopSendRtp(MediaSource &sender, const string &ssrc) override; - /////////////////////////////////MediaSinkInterface override///////////////////////////////// - - /** - * 添加track,内部会调用Track的clone方法 - * 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系 - * @param track 添加音频或视频轨道 - */ - void addTrack(const Track::Ptr &track) override; - - /** - * 添加track完毕 - */ - void addTrackCompleted() override; - /** * 重置track */ void resetTracks() override; - /** - * 写入帧数据 - * @param frame 帧 - */ - void inputFrame(const Frame::Ptr &frame) override; - - /////////////////////////////////MultiMuxerPrivate::Listener override///////////////////////////////// +protected: + /////////////////////////////////MediaSink override///////////////////////////////// /** - * 所有track全部就绪 + * 某track已经准备好,其ready()状态返回true, + * 此时代表可以获取其例如sps pps等相关信息了 + * @param track + */ + void onTrackReady(const Track::Ptr & track) override; + + /** + * 所有Track已经准备好, */ void onAllTrackReady() override; + /** + * 某Track输出frame,在onAllTrackReady触发后才会调用此方法 + * @param frame + */ + void onTrackFrame(const Frame::Ptr &frame) override; + private: bool _is_enable = false; + string _stream_url; Ticker _last_check; Stamp _stamp[2]; - MultiMuxerPrivate::Ptr _muxer; - std::weak_ptr _track_listener; + std::weak_ptr _track_listener; #if defined(ENABLE_RTPPROXY) mutex _rtp_sender_mtx; unordered_map _rtp_sender; #endif //ENABLE_RTPPROXY + +#if defined(ENABLE_MP4) + FMP4MediaSourceMuxer::Ptr _fmp4; +#endif + RtmpMediaSourceMuxer::Ptr _rtmp; + RtspMediaSourceMuxer::Ptr _rtsp; + TSMediaSourceMuxer::Ptr _ts; + MediaSinkInterface::Ptr _mp4; + HlsRecorder::Ptr _hls; + //对象个数统计 ObjectStatistic _statistic; }; diff --git a/src/Rtmp/RtmpMediaSourceImp.h b/src/Rtmp/RtmpMediaSourceImp.h index 88c95065..14672f97 100644 --- a/src/Rtmp/RtmpMediaSourceImp.h +++ b/src/Rtmp/RtmpMediaSourceImp.h @@ -127,7 +127,7 @@ public: if (_recreate_metadata) { //更新metadata - for (auto &track : _muxer->getTracks(*this)) { + for (auto &track : _muxer->MediaSink::getTracks()) { Metadata::addTrack(_metadata, track); } RtmpMediaSource::updateMetaData(_metadata);