From e52c1cc510eab18d307cc362c8820547bb2781bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=8F=E6=A5=9A?= <771730766@qq.com> Date: Sat, 8 Jul 2023 21:35:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DaddFFmpegSource=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E7=9A=84=E6=B5=81=E4=BA=8B=E4=BB=B6=E6=8B=A6=E6=88=AA?= =?UTF-8?q?=E5=8F=AF=E8=83=BD=E5=A4=B1=E6=95=88=E9=97=AE=E9=A2=98=20(#2642?= =?UTF-8?q?=20#2629)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 此pr主要为了修复 #2629,通过新增getMuxer接口, 可以直接获取到所有协议共享的MultiMediaSourceMuxer对象, 在此对象完成事件拦截,防止某种协议事件丢失。 同时调整了下FFmpegSource.cpp代码格式。 --- server/FFmpegSource.cpp | 163 +++++++++++++-------------- server/FFmpegSource.h | 2 - src/Common/MediaSource.cpp | 26 ++--- src/Common/MediaSource.h | 8 +- src/Common/MultiMediaSourceMuxer.cpp | 12 ++ src/Common/MultiMediaSourceMuxer.h | 10 +- 6 files changed, 115 insertions(+), 106 deletions(-) diff --git a/server/FFmpegSource.cpp b/server/FFmpegSource.cpp index fa9044ed..8363950c 100644 --- a/server/FFmpegSource.cpp +++ b/server/FFmpegSource.cpp @@ -11,6 +11,7 @@ #include "FFmpegSource.h" #include "Common/config.h" #include "Common/MediaSource.h" +#include "Common/MultiMediaSourceMuxer.h" #include "Util/File.h" #include "System.h" #include "Thread/WorkThreadPool.h" @@ -70,10 +71,10 @@ void FFmpegSource::setupRecordFlag(bool enable_hls, bool enable_mp4){ _enable_mp4 = enable_mp4; } -void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url,const string &dst_url,int timeout_ms,const onPlay &cb) { - GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin); - GET_CONFIG(string,ffmpeg_cmd_default,FFmpeg::kCmd); - GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog); +void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url, const string &dst_url, int timeout_ms, const onPlay &cb) { + GET_CONFIG(string, ffmpeg_bin, FFmpeg::kBin); + GET_CONFIG(string, ffmpeg_cmd_default, FFmpeg::kCmd); + GET_CONFIG(string, ffmpeg_log, FFmpeg::kLog); _src_url = src_url; _dst_url = dst_url; @@ -91,120 +92,114 @@ void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url,cons auto cmd_it = mINI::Instance().find(ffmpeg_cmd_key); if (cmd_it != mINI::Instance().end()) { ffmpeg_cmd = cmd_it->second; - } else{ + } else { WarnL << "配置文件中,ffmpeg命令模板(" << ffmpeg_cmd_key << ")不存在,已采用默认模板(" << ffmpeg_cmd_default << ")"; } } - char cmd[2048] = {0}; + char cmd[2048] = { 0 }; snprintf(cmd, sizeof(cmd), ffmpeg_cmd.data(), File::absolutePath("", ffmpeg_bin).data(), src_url.data(), dst_url.data()); auto log_file = ffmpeg_log.empty() ? "" : File::absolutePath("", ffmpeg_log); _process.run(cmd, log_file); InfoL << cmd; if (is_local_ip(_media_info.host)) { - //推流给自己的,通过判断流是否注册上来判断是否正常 + // 推流给自己的,通过判断流是否注册上来判断是否正常 if (_media_info.schema != RTSP_SCHEMA && _media_info.schema != RTMP_SCHEMA) { - cb(SockException(Err_other,"本服务只支持rtmp/rtsp推流")); + cb(SockException(Err_other, "本服务只支持rtmp/rtsp推流")); return; } weak_ptr weakSelf = shared_from_this(); - findAsync(timeout_ms,[cb,weakSelf,timeout_ms](const MediaSource::Ptr &src){ + findAsync(timeout_ms, [cb, weakSelf, timeout_ms](const MediaSource::Ptr &src) { auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - //自己已经销毁 + if (!strongSelf) { + // 自己已经销毁 return; } - if(src){ - //推流给自己成功 + if (src) { + // 推流给自己成功 cb(SockException()); strongSelf->onGetMediaSource(src); strongSelf->startTimer(timeout_ms); return; } //推流失败 - if(!strongSelf->_process.wait(false)){ - //ffmpeg进程已经退出 - cb(SockException(Err_other,StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code())); + if (!strongSelf->_process.wait(false)) { + // ffmpeg进程已经退出 + cb(SockException(Err_other, StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code())); return; } - //ffmpeg进程还在线,但是等待推流超时 - cb(SockException(Err_other,"等待超时")); + // ffmpeg进程还在线,但是等待推流超时 + cb(SockException(Err_other, "等待超时")); }); } else{ //推流给其他服务器的,通过判断FFmpeg进程是否在线判断是否成功 weak_ptr weakSelf = shared_from_this(); - _timer = std::make_shared(timeout_ms / 1000.0f,[weakSelf,cb,timeout_ms](){ + _timer = std::make_shared(timeout_ms / 1000.0f, [weakSelf, cb, timeout_ms]() { auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - //自身已经销毁 + if (!strongSelf) { + // 自身已经销毁 return false; } - //FFmpeg还在线,那么我们认为推流成功 - if(strongSelf->_process.wait(false)){ + // FFmpeg还在线,那么我们认为推流成功 + if (strongSelf->_process.wait(false)) { cb(SockException()); strongSelf->startTimer(timeout_ms); return false; } - //ffmpeg进程已经退出 - cb(SockException(Err_other,StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code())); + // ffmpeg进程已经退出 + cb(SockException(Err_other, StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code())); return false; - },_poller); + }, _poller); } } void FFmpegSource::findAsync(int maxWaitMS, const function &cb) { - auto src = MediaSource::find(_media_info.schema, - _media_info.vhost, - _media_info.app, - _media_info.stream); - if(src || !maxWaitMS){ + auto src = MediaSource::find(_media_info.schema, _media_info.vhost, _media_info.app, _media_info.stream); + if (src || !maxWaitMS) { cb(src); return; } void *listener_tag = this; - //若干秒后执行等待媒体注册超时回调 - auto onRegistTimeout = _poller->doDelayTask(maxWaitMS,[cb,listener_tag](){ - //取消监听该事件 - NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); + // 若干秒后执行等待媒体注册超时回调 + auto onRegistTimeout = _poller->doDelayTask(maxWaitMS, [cb, listener_tag]() { + // 取消监听该事件 + NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged); cb(nullptr); return 0; }); weak_ptr weakSelf = shared_from_this(); - auto onRegist = [listener_tag,weakSelf,cb,onRegistTimeout](BroadcastMediaChangedArgs) { + auto onRegist = [listener_tag, weakSelf, cb, onRegistTimeout](BroadcastMediaChangedArgs) { auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - //本身已经销毁,取消延时任务 + if (!strongSelf) { + // 本身已经销毁,取消延时任务 onRegistTimeout->cancel(); - NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); + NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged); return; } - if (!bRegist || - sender.getSchema() != strongSelf->_media_info.schema || + if (!bRegist || sender.getSchema() != strongSelf->_media_info.schema || !equalMediaTuple(sender.getMediaTuple(), strongSelf->_media_info)) { - //不是自己感兴趣的事件,忽略之 + // 不是自己感兴趣的事件,忽略之 return; } - //查找的流终于注册上了;取消延时任务,防止多次回调 + // 查找的流终于注册上了;取消延时任务,防止多次回调 onRegistTimeout->cancel(); - //取消事件监听 - NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); + // 取消事件监听 + NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged); - //切换到自己的线程再回复 - strongSelf->_poller->async([weakSelf,cb](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; + // 切换到自己的线程再回复 + strongSelf->_poller->async([weakSelf, cb]() { + if (auto strongSelf = weakSelf.lock()) { + // 再找一遍媒体源,一般能找到 + strongSelf->findAsync(0, cb); } - //再找一遍媒体源,一般能找到 - strongSelf->findAsync(0,cb); }, false); }; - //监听媒体注册事件 + // 监听媒体注册事件 NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist); } @@ -222,49 +217,49 @@ void FFmpegSource::startTimer(int timeout_ms) { } bool needRestart = ffmpeg_restart_sec > 0 && strongSelf->_replay_ticker.elapsedTime() > ffmpeg_restart_sec * 1000; if (is_local_ip(strongSelf->_media_info.host)) { - //推流给自己的,我们通过检查是否已经注册来判断FFmpeg是否工作正常 + // 推流给自己的,我们通过检查是否已经注册来判断FFmpeg是否工作正常 strongSelf->findAsync(0, [&](const MediaSource::Ptr &src) { - //同步查找流 + // 同步查找流 if (!src || needRestart) { - if(needRestart){ + if (needRestart) { strongSelf->_replay_ticker.resetTime(); - if(strongSelf->_process.wait(false)){ - //FFmpeg进程还在运行,超时就关闭它 + if (strongSelf->_process.wait(false)) { + // FFmpeg进程还在运行,超时就关闭它 strongSelf->_process.kill(2000); } InfoL << "FFmpeg即将重启, 将会继续拉流 " << strongSelf->_src_url; } - //流不在线,重新拉流, 这里原先是10秒超时,实际发现10秒不够,改成20秒了 - if(strongSelf->_replay_ticker.elapsedTime() > 20 * 1000){ - //上次重试时间超过10秒,那么再重试FFmpeg拉流 + // 流不在线,重新拉流, 这里原先是10秒超时,实际发现10秒不够,改成20秒了 + if (strongSelf->_replay_ticker.elapsedTime() > 20 * 1000) { + // 上次重试时间超过10秒,那么再重试FFmpeg拉流 strongSelf->_replay_ticker.resetTime(); strongSelf->play(strongSelf->_ffmpeg_cmd_key, strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [](const SockException &) {}); } } }); } else { - //推流给其他服务器的,我们通过判断FFmpeg进程是否在线,如果FFmpeg推流中断,那么它应该会自动退出 + // 推流给其他服务器的,我们通过判断FFmpeg进程是否在线,如果FFmpeg推流中断,那么它应该会自动退出 if (!strongSelf->_process.wait(false) || needRestart) { - if(needRestart){ + if (needRestart) { strongSelf->_replay_ticker.resetTime(); - if(strongSelf->_process.wait(false)){ - //FFmpeg进程还在运行,超时就关闭它 + if (strongSelf->_process.wait(false)) { + // FFmpeg进程还在运行,超时就关闭它 strongSelf->_process.kill(2000); } InfoL << "FFmpeg即将重启, 将会继续拉流 " << strongSelf->_src_url; } - //ffmpeg不在线,重新拉流 + // ffmpeg不在线,重新拉流 strongSelf->play(strongSelf->_ffmpeg_cmd_key, strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [weakSelf](const SockException &ex) { - if(!ex){ - //没有错误 + if (!ex) { + // 没有错误 return; } auto strongSelf = weakSelf.lock(); if (!strongSelf) { - //自身已经销毁 + // 自身已经销毁 return; } - //上次重试时间超过10秒,那么再重试FFmpeg拉流 + // 上次重试时间超过10秒,那么再重试FFmpeg拉流 strongSelf->startTimer(10 * 1000); }); } @@ -294,20 +289,17 @@ MediaOriginType FFmpegSource::getOriginType(MediaSource &sender) const{ return MediaOriginType::ffmpeg_pull; } -string FFmpegSource::getOriginUrl(MediaSource &sender) const{ +string FFmpegSource::getOriginUrl(MediaSource &sender) const { return _src_url; } -std::shared_ptr FFmpegSource::getOriginSock(MediaSource &sender) const { - return nullptr; -} - void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { - auto listener = src->getListener(true); - if (listener.lock().get() != this) { + auto muxer = src->getMuxer(); + auto listener = muxer ? muxer->getDelegate() : nullptr; + if (listener && listener.get() != this) { //防止多次进入onGetMediaSource函数导致无限递归调用的bug setDelegate(listener); - src->setListener(shared_from_this()); + muxer->setDelegate(shared_from_this()); if (_enable_hls) { src->setupRecord(Recorder::type_hls, true, "", 0); } @@ -318,14 +310,14 @@ void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { } void FFmpegSnap::makeSnap(const string &play_url, const string &save_path, float timeout_sec, const onSnap &cb) { - GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin); - GET_CONFIG(string,ffmpeg_snap,FFmpeg::kSnap); - GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog); + GET_CONFIG(string, ffmpeg_bin, FFmpeg::kBin); + GET_CONFIG(string, ffmpeg_snap, FFmpeg::kSnap); + GET_CONFIG(string, ffmpeg_log, FFmpeg::kLog); Ticker ticker; - WorkThreadPool::Instance().getPoller()->async([timeout_sec, play_url,save_path,cb, ticker](){ + WorkThreadPool::Instance().getPoller()->async([timeout_sec, play_url, save_path, cb, ticker]() { auto elapsed_ms = ticker.elapsedTime(); if (elapsed_ms > timeout_sec * 1000) { - //超时,后台线程负载太高,当代太久才启动该任务 + // 超时,后台线程负载太高,当代太久才启动该任务 cb(false, "wait work poller schedule snap task timeout"); return; } @@ -346,13 +338,12 @@ void FFmpegSnap::makeSnap(const string &play_url, const string &save_path, float return 0; }); - //等待FFmpeg进程退出 + // 等待FFmpeg进程退出 process->wait(true); // FFmpeg进程退出了可以取消定时器了 delayTask->cancel(); - //执行回调函数 + // 执行回调函数 bool success = process->exit_code() == 0 && File::fileSize(save_path.data()); cb(success, (!success && !log_file.empty()) ? File::loadFile(log_file.data()) : ""); }); } - diff --git a/server/FFmpegSource.h b/server/FFmpegSource.h index e85df3ec..68e3c440 100644 --- a/server/FFmpegSource.h +++ b/server/FFmpegSource.h @@ -79,8 +79,6 @@ private: mediakit::MediaOriginType getOriginType(mediakit::MediaSource &sender) const override; //获取媒体源url或者文件路径 std::string getOriginUrl(mediakit::MediaSource &sender) const override; - // 获取媒体源客户端相关信息 - std::shared_ptr getOriginSock(mediakit::MediaSource &sender) const override; private: bool _enable_hls = false; diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 16adfd8d..9b494f16 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -172,20 +172,8 @@ void MediaSource::setListener(const std::weak_ptr &listener){ _listener = listener; } -std::weak_ptr MediaSource::getListener(bool next) const{ - if (!next) { - return _listener; - } - - auto listener = dynamic_pointer_cast(_listener.lock()); - if (!listener) { - //不是MediaSourceEventInterceptor对象或者对象已经销毁 - return _listener; - } - //获取被拦截的对象 - auto next_obj = listener->getDelegate(); - //有则返回之 - return next_obj ? next_obj : _listener; +std::weak_ptr MediaSource::getListener() const { + return _listener; } int MediaSource::totalReaderCount(){ @@ -277,6 +265,11 @@ toolkit::EventPoller::Ptr MediaSource::getOwnerPoller() { throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed: " + getUrl()); } +std::shared_ptr MediaSource::getMuxer() { + auto listener = _listener.lock(); + return listener ? listener->getMuxer(*this) : nullptr; +} + void MediaSource::onReaderChanged(int size) { try { weak_ptr weak_self = shared_from_this(); @@ -780,6 +773,11 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed"); } +std::shared_ptr MediaSourceEventInterceptor::getMuxer(MediaSource &sender) { + auto listener = _listener.lock(); + return listener ? listener->getMuxer(sender) : nullptr; +} + bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) { auto listener = _listener.lock(); if (!listener) { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 0f58456d..8d2c7658 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -41,6 +41,7 @@ enum class MediaOriginType : uint8_t { std::string getOriginTypeString(MediaOriginType type); class MediaSource; +class MultiMediaSourceMuxer; class MediaSourceEvent { public: friend class MediaSource; @@ -88,6 +89,8 @@ public: virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; } // 获取所有track相关信息 virtual std::vector getMediaTracks(MediaSource &sender, bool trackReady = true) const { return std::vector(); }; + // 获取MultiMediaSourceMuxer对象 + virtual std::shared_ptr getMuxer(MediaSource &sender) { return nullptr; } class SendRtpArgs { public: @@ -257,6 +260,7 @@ public: bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override; float getLossRate(MediaSource &sender, TrackType type) override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; + std::shared_ptr getMuxer(MediaSource &sender) override; private: std::weak_ptr _listener; @@ -330,7 +334,7 @@ public: // 设置监听者 virtual void setListener(const std::weak_ptr &listener); // 获取监听者 - std::weak_ptr getListener(bool next = false) const; + std::weak_ptr getListener() const; // 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数 virtual int readerCount() = 0; @@ -372,6 +376,8 @@ public: float getLossRate(mediakit::TrackType type); // 获取所在线程 toolkit::EventPoller::Ptr getOwnerPoller(); + // 获取MultiMediaSourceMuxer对象 + std::shared_ptr getMuxer(); ////////////////static方法,查找或生成MediaSource//////////////// diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 40fb1eb6..0446abf6 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -71,6 +71,14 @@ static string getTrackInfoStr(const TrackSource *track_src){ return std::move(codec_info); } +const ProtocolOption &MultiMediaSourceMuxer::getOption() const { + return _option; +} + +const MediaTuple &MultiMediaSourceMuxer::getMediaTuple() const { + return _tuple; +} + std::string MultiMediaSourceMuxer::shortUrl() const { auto ret = getOriginUrl(MediaSource::NullMediaSource()); if (!ret.empty()) { @@ -361,6 +369,10 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) { } } +std::shared_ptr MultiMediaSourceMuxer::getMuxer(MediaSource &sender) { + return shared_from_this(); +} + bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { bool ret = false; if (_rtmp) { diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 22b53c75..4db34d42 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -126,9 +126,13 @@ public: */ toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; - const MediaTuple& getMediaTuple() const { - return _tuple; - } + /** + * 获取本对象 + */ + std::shared_ptr getMuxer(MediaSource &sender) override; + + const ProtocolOption &getOption() const; + const MediaTuple &getMediaTuple() const; std::string shortUrl() const; protected: