From 21c03f772fd77fd7fe91bd0f32952c05784473c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=8F=E6=A5=9A?= <771730766@qq.com> Date: Sat, 2 Dec 2023 10:20:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=B9=B3=E6=BB=91=E5=8F=91?= =?UTF-8?q?=E9=80=81=E9=80=BB=E8=BE=91=20(#3072)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/config.ini | 4 ++ src/Common/MediaSource.cpp | 2 + src/Common/MediaSource.h | 5 ++ src/Common/MultiMediaSourceMuxer.cpp | 100 ++++++++++++++++++++++++++- src/Common/MultiMediaSourceMuxer.h | 2 + src/Common/config.cpp | 2 + src/Common/config.h | 3 + 7 files changed, 117 insertions(+), 1 deletion(-) diff --git a/conf/config.ini b/conf/config.ini index f15ec366..1f4a354c 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -50,6 +50,10 @@ auto_close=0 #此参数不应大于播放器超时时间;单位毫秒 continue_push_ms=15000 +#平滑发送定时器间隔,单位毫秒,置0则关闭;开启后影响cpu性能同时增加内存 +#该配置开启后可以解决一些流发送不平滑导致zlmediakit转发也不平滑的问题 +paced_sender_ms=0 + #是否开启转换为hls(mpegts) enable_hls=1 #是否开启转换为hls(fmp4) diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 1977b616..0859b7ce 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -60,6 +60,7 @@ ProtocolOption::ProtocolOption() { GET_CONFIG(bool, s_add_mute_audio, Protocol::kAddMuteAudio); GET_CONFIG(bool, s_auto_close, Protocol::kAutoClose); GET_CONFIG(uint32_t, s_continue_push_ms, Protocol::kContinuePushMS); + GET_CONFIG(uint32_t, s_paced_sender_ms, Protocol::kPacedSenderMS); GET_CONFIG(bool, s_enable_hls, Protocol::kEnableHls); GET_CONFIG(bool, s_enable_hls_fmp4, Protocol::kEnableHlsFmp4); @@ -86,6 +87,7 @@ ProtocolOption::ProtocolOption() { add_mute_audio = s_add_mute_audio; auto_close = s_auto_close; continue_push_ms = s_continue_push_ms; + paced_sender_ms = s_paced_sender_ms; enable_hls = s_enable_hls; enable_hls_fmp4 = s_enable_hls_fmp4; diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 9721210a..9aba7df4 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -161,6 +161,10 @@ public: //断连续推延时,单位毫秒,默认采用配置文件 uint32_t continue_push_ms; + // 平滑发送定时器间隔,单位毫秒,置0则关闭;开启后影响cpu性能同时增加内存 + // 该配置开启后可以解决一些流发送不平滑导致zlmediakit转发也不平滑的问题 + uint32_t paced_sender_ms; + //是否开启转换为hls(mpegts) bool enable_hls; //是否开启转换为hls(fmp4) @@ -213,6 +217,7 @@ public: GET_OPT_VALUE(add_mute_audio); GET_OPT_VALUE(auto_close); GET_OPT_VALUE(continue_push_ms); + GET_OPT_VALUE(paced_sender_ms); GET_OPT_VALUE(enable_hls); GET_OPT_VALUE(enable_hls_fmp4); diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index e38c6134..32506dda 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -32,6 +32,87 @@ public: }; } // namespace +class FramePacedSender : public FrameWriterInterface, public std::enable_shared_from_this { +public: + using OnFrame = std::function; + // 最小缓存100ms数据 + static constexpr auto kMinCacheMS = 100; + + FramePacedSender(uint32_t paced_sender_ms, OnFrame cb) { + _paced_sender_ms = paced_sender_ms; + _cb = std::move(cb); + } + + void resetTimer(const EventPoller::Ptr &poller) { + std::weak_ptr weak_self = shared_from_this(); + _timer = std::make_shared(_paced_sender_ms / 1000.0f, [weak_self]() { + if (auto strong_self = weak_self.lock()) { + strong_self->onTick(); + return true; + } + return false; + }, poller); + } + + bool inputFrame(const Frame::Ptr &frame) override { + if (!_timer) { + setCurrentStamp(frame->dts()); + resetTimer(EventPoller::getCurrentPoller()); + } + + _cache.emplace_back(frame->dts() + _cache_ms, Frame::getCacheAbleFrame(frame)); + return true; + } + +private: + void onTick() { + auto dst = _cache.empty() ? 0 : _cache.back().first; + while (!_cache.empty()) { + auto &front = _cache.front(); + if (getCurrentStamp() < front.first) { + // 还没到消费时间 + break; + } + // 时间到了,该消费frame了 + _cb(front.second); + _cache.pop_front(); + } + + if (_cache.empty() && dst) { + // 消费太快,需要增加缓存大小 + setCurrentStamp(dst); + _cache_ms += kMinCacheMS; + } + + // 消费太慢,需要强制flush数据 + if (_cache.size() > 25 * 5) { + WarnL << "Flush frame paced sender cache: " << _cache.size(); + while (!_cache.empty()) { + auto &front = _cache.front(); + _cb(front.second); + _cache.pop_front(); + } + setCurrentStamp(dst); + } + } + + uint64_t getCurrentStamp() { return _ticker.elapsedTime() + _stamp_offset; } + + void setCurrentStamp(uint64_t stamp) { + _stamp_offset = stamp; + _ticker.resetTime(); + } + +private: + uint32_t _paced_sender_ms; + uint32_t _cache_ms = kMinCacheMS; + uint64_t _stamp_offset = 0; + OnFrame _cb; + Ticker _ticker; + Timer::Ptr _timer; + std::list> _cache; +}; + static std::shared_ptr makeRecorder(MediaSource &sender, const vector &tracks, Recorder::type type, const ProtocolOption &option){ auto recorder = Recorder::createRecorder(type, sender.getMediaTuple(), option); for (auto &track : tracks) { @@ -367,6 +448,9 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) { if (ret != _poller) { WarnL << "OwnerPoller changed " << _poller->getThreadName() << " -> " << ret->getThreadName() << " : " << shortUrl(); _poller = ret; + if (_paced_sender) { + _paced_sender->resetTimer(_poller); + } } return ret; } catch (MediaSourceEvent::NotImplemented &) { @@ -407,6 +491,16 @@ bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { void MultiMediaSourceMuxer::onAllTrackReady() { CHECK(!_create_in_poller || getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread()); + + if (_option.paced_sender_ms) { + std::weak_ptr weak_self = shared_from_this(); + _paced_sender = std::make_shared(_option.paced_sender_ms, [weak_self](const Frame::Ptr &frame) { + if (auto strong_self = weak_self.lock()) { + strong_self->onTrackFrame_l(frame); + } + }); + } + setMediaListener(getDelegate()); if (_rtmp) { @@ -492,7 +586,11 @@ void MultiMediaSourceMuxer::resetTracks() { } } -bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) { +bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame) { + return _paced_sender ? _paced_sender->inputFrame(frame) : onTrackFrame_l(frame); +} + +bool MultiMediaSourceMuxer::onTrackFrame_l(const Frame::Ptr &frame_in) { auto frame = frame_in; if (_option.modify_stamp != ProtocolOption::kModifyStampOff) { // 时间戳不采用原始的绝对时间戳 diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 45ab7623..7e0708fd 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -155,6 +155,7 @@ protected: * @param frame */ bool onTrackFrame(const Frame::Ptr &frame) override; + bool onTrackFrame_l(const Frame::Ptr &frame); private: void createGopCacheIfNeed(); @@ -163,6 +164,7 @@ private: bool _is_enable = false; bool _create_in_poller = false; bool _video_key_pos = false; + std::shared_ptr _paced_sender; MediaTuple _tuple; ProtocolOption _option; toolkit::Ticker _last_check; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index df9f7c5d..2a7b1432 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -101,6 +101,7 @@ const string kEnableAudio = PROTOCOL_FIELD "enable_audio"; const string kAddMuteAudio = PROTOCOL_FIELD "add_mute_audio"; const string kAutoClose = PROTOCOL_FIELD "auto_close"; const string kContinuePushMS = PROTOCOL_FIELD "continue_push_ms"; +const string kPacedSenderMS = PROTOCOL_FIELD "paced_sender_ms"; const string kEnableHls = PROTOCOL_FIELD "enable_hls"; const string kEnableHlsFmp4 = PROTOCOL_FIELD "enable_hls_fmp4"; @@ -127,6 +128,7 @@ static onceToken token([]() { mINI::Instance()[kEnableAudio] = 1; mINI::Instance()[kAddMuteAudio] = 1; mINI::Instance()[kContinuePushMS] = 15000; + mINI::Instance()[kPacedSenderMS] = 0; mINI::Instance()[kAutoClose] = 0; mINI::Instance()[kEnableHls] = 1; diff --git a/src/Common/config.h b/src/Common/config.h index 68726225..65bd3715 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -196,6 +196,9 @@ extern const std::string kAddMuteAudio; extern const std::string kAutoClose; //断连续推延时,单位毫秒,默认采用配置文件 extern const std::string kContinuePushMS; +// 平滑发送定时器间隔,单位毫秒,置0则关闭;开启后影响cpu性能同时增加内存 +// 该配置开启后可以解决一些流发送不平滑导致zlmediakit转发也不平滑的问题 +extern const std::string kPacedSenderMS; //是否开启转换为hls(mpegts) extern const std::string kEnableHls;