diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index a50a8ecb..a3e1aaf5 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -78,11 +78,6 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data emitOnPublish(); } - if (!_muxer) { - //无权限推流 - return false; - } - _total_bytes += len; if (_save_file_rtp) { uint16_t size = (uint16_t)len; @@ -95,7 +90,7 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data } GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); - if (!_muxer->isEnabled() && !dts_out && dump_dir.empty()) { + if (_muxer && !_muxer->isEnabled() && !dts_out && dump_dir.empty()) { //无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据 _last_frame_time.resetTime(); return false; @@ -109,20 +104,55 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data } void RtpProcess::inputFrame(const Frame::Ptr &frame) { - _last_frame_time.resetTime(); _dts = frame->dts(); if (_save_file_video && frame->getTrackType() == TrackVideo) { fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get()); } - _muxer->inputFrame(frame); + if (_muxer) { + _last_frame_time.resetTime(); + _muxer->inputFrame(frame); + } else { + if (_cached_func.size() > 100) { + WarnL << "cached frame of track(" << frame->getCodecName() << ") is too much, now droped"; + return; + } + auto frame_cached = Frame::getCacheAbleFrame(frame); + lock_guard lck(_func_mtx); + _cached_func.emplace_back([this, frame_cached]() { + _last_frame_time.resetTime(); + _muxer->inputFrame(frame_cached); + }); + } } void RtpProcess::addTrack(const Track::Ptr &track) { - _muxer->addTrack(track); + if (_muxer) { + _muxer->addTrack(track); + } else { + lock_guard lck(_func_mtx); + _cached_func.emplace_back([this, track]() { + _muxer->addTrack(track); + }); + } } void RtpProcess::addTrackCompleted() { - _muxer->addTrackCompleted(); + if (_muxer) { + _muxer->addTrackCompleted(); + } else { + lock_guard lck(_func_mtx); + _cached_func.emplace_back([this]() { + _muxer->addTrackCompleted(); + }); + } +} + +void RtpProcess::doCachedFunc() { + lock_guard lck(_func_mtx); + for (auto &func : _cached_func) { + func(); + } + _cached_func.clear(); } bool RtpProcess::alive() { @@ -197,19 +227,20 @@ void RtpProcess::setListener(const std::weak_ptr &listener) { void RtpProcess::emitOnPublish() { weak_ptr weak_self = shared_from_this(); Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) { - auto strongSelf = weak_self.lock(); - if (!strongSelf) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } if (err.empty()) { - strongSelf->_muxer = std::make_shared(strongSelf->_media_info._vhost, - strongSelf->_media_info._app, - strongSelf->_media_info._streamid, 0.0f, + strong_self->_muxer = std::make_shared(strong_self->_media_info._vhost, + strong_self->_media_info._app, + strong_self->_media_info._streamid, 0.0f, true, true, enableHls, enableMP4); - strongSelf->_muxer->setMediaListener(strongSelf); - InfoP(strongSelf) << "允许RTP推流"; + strong_self->_muxer->setMediaListener(strong_self); + strong_self->doCachedFunc(); + InfoP(strong_self) << "允许RTP推流"; } else { - WarnP(strongSelf) << "禁止RTP推流:" << err; + WarnP(strong_self) << "禁止RTP推流:" << err; } }; diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 14e87a3c..977f1eec 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -79,6 +79,7 @@ protected: private: void emitOnPublish(); + void doCachedFunc(); private: uint32_t _dts = 0; @@ -95,6 +96,8 @@ private: atomic_bool _stop_rtp_check{false}; atomic_flag _busy_flag{false}; Ticker _last_check_alive; + recursive_mutex _func_mtx; + deque > _cached_func; }; }//namespace mediakit