修复addFFmpegSource添加的流事件拦截可能失效问题 (#2642 #2629)

此pr主要为了修复 #2629,通过新增getMuxer接口,
可以直接获取到所有协议共享的MultiMediaSourceMuxer对象,
在此对象完成事件拦截,防止某种协议事件丢失。
同时调整了下FFmpegSource.cpp代码格式。
This commit is contained in:
夏楚 2023-07-08 21:35:09 +08:00 committed by GitHub
parent fad8dd74e7
commit e52c1cc510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 106 deletions

View File

@ -11,6 +11,7 @@
#include "FFmpegSource.h"
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Common/MultiMediaSourceMuxer.h"
#include "Util/File.h"
#include "System.h"
#include "Thread/WorkThreadPool.h"
@ -70,10 +71,10 @@ void FFmpegSource::setupRecordFlag(bool enable_hls, bool enable_mp4){
_enable_mp4 = enable_mp4;
}
void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url,const string &dst_url,int timeout_ms,const onPlay &cb) {
GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin);
GET_CONFIG(string,ffmpeg_cmd_default,FFmpeg::kCmd);
GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog);
void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url, const string &dst_url, int timeout_ms, const onPlay &cb) {
GET_CONFIG(string, ffmpeg_bin, FFmpeg::kBin);
GET_CONFIG(string, ffmpeg_cmd_default, FFmpeg::kCmd);
GET_CONFIG(string, ffmpeg_log, FFmpeg::kLog);
_src_url = src_url;
_dst_url = dst_url;
@ -91,120 +92,114 @@ void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url,cons
auto cmd_it = mINI::Instance().find(ffmpeg_cmd_key);
if (cmd_it != mINI::Instance().end()) {
ffmpeg_cmd = cmd_it->second;
} else{
} else {
WarnL << "配置文件中,ffmpeg命令模板(" << ffmpeg_cmd_key << ")不存在,已采用默认模板(" << ffmpeg_cmd_default << ")";
}
}
char cmd[2048] = {0};
char cmd[2048] = { 0 };
snprintf(cmd, sizeof(cmd), ffmpeg_cmd.data(), File::absolutePath("", ffmpeg_bin).data(), src_url.data(), dst_url.data());
auto log_file = ffmpeg_log.empty() ? "" : File::absolutePath("", ffmpeg_log);
_process.run(cmd, log_file);
InfoL << cmd;
if (is_local_ip(_media_info.host)) {
//推流给自己的,通过判断流是否注册上来判断是否正常
// 推流给自己的,通过判断流是否注册上来判断是否正常
if (_media_info.schema != RTSP_SCHEMA && _media_info.schema != RTMP_SCHEMA) {
cb(SockException(Err_other,"本服务只支持rtmp/rtsp推流"));
cb(SockException(Err_other, "本服务只支持rtmp/rtsp推流"));
return;
}
weak_ptr<FFmpegSource> weakSelf = shared_from_this();
findAsync(timeout_ms,[cb,weakSelf,timeout_ms](const MediaSource::Ptr &src){
findAsync(timeout_ms, [cb, weakSelf, timeout_ms](const MediaSource::Ptr &src) {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//自己已经销毁
if (!strongSelf) {
// 自己已经销毁
return;
}
if(src){
//推流给自己成功
if (src) {
// 推流给自己成功
cb(SockException());
strongSelf->onGetMediaSource(src);
strongSelf->startTimer(timeout_ms);
return;
}
//推流失败
if(!strongSelf->_process.wait(false)){
//ffmpeg进程已经退出
cb(SockException(Err_other,StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code()));
if (!strongSelf->_process.wait(false)) {
// ffmpeg进程已经退出
cb(SockException(Err_other, StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code()));
return;
}
//ffmpeg进程还在线但是等待推流超时
cb(SockException(Err_other,"等待超时"));
// ffmpeg进程还在线但是等待推流超时
cb(SockException(Err_other, "等待超时"));
});
} else{
//推流给其他服务器的通过判断FFmpeg进程是否在线判断是否成功
weak_ptr<FFmpegSource> weakSelf = shared_from_this();
_timer = std::make_shared<Timer>(timeout_ms / 1000.0f,[weakSelf,cb,timeout_ms](){
_timer = std::make_shared<Timer>(timeout_ms / 1000.0f, [weakSelf, cb, timeout_ms]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//自身已经销毁
if (!strongSelf) {
// 自身已经销毁
return false;
}
//FFmpeg还在线那么我们认为推流成功
if(strongSelf->_process.wait(false)){
// FFmpeg还在线那么我们认为推流成功
if (strongSelf->_process.wait(false)) {
cb(SockException());
strongSelf->startTimer(timeout_ms);
return false;
}
//ffmpeg进程已经退出
cb(SockException(Err_other,StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code()));
// ffmpeg进程已经退出
cb(SockException(Err_other, StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code()));
return false;
},_poller);
}, _poller);
}
}
void FFmpegSource::findAsync(int maxWaitMS, const function<void(const MediaSource::Ptr &src)> &cb) {
auto src = MediaSource::find(_media_info.schema,
_media_info.vhost,
_media_info.app,
_media_info.stream);
if(src || !maxWaitMS){
auto src = MediaSource::find(_media_info.schema, _media_info.vhost, _media_info.app, _media_info.stream);
if (src || !maxWaitMS) {
cb(src);
return;
}
void *listener_tag = this;
//若干秒后执行等待媒体注册超时回调
auto onRegistTimeout = _poller->doDelayTask(maxWaitMS,[cb,listener_tag](){
//取消监听该事件
NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
// 若干秒后执行等待媒体注册超时回调
auto onRegistTimeout = _poller->doDelayTask(maxWaitMS, [cb, listener_tag]() {
// 取消监听该事件
NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
cb(nullptr);
return 0;
});
weak_ptr<FFmpegSource> weakSelf = shared_from_this();
auto onRegist = [listener_tag,weakSelf,cb,onRegistTimeout](BroadcastMediaChangedArgs) {
auto onRegist = [listener_tag, weakSelf, cb, onRegistTimeout](BroadcastMediaChangedArgs) {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
//本身已经销毁,取消延时任务
if (!strongSelf) {
// 本身已经销毁,取消延时任务
onRegistTimeout->cancel();
NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
return;
}
if (!bRegist ||
sender.getSchema() != strongSelf->_media_info.schema ||
if (!bRegist || sender.getSchema() != strongSelf->_media_info.schema ||
!equalMediaTuple(sender.getMediaTuple(), strongSelf->_media_info)) {
//不是自己感兴趣的事件,忽略之
// 不是自己感兴趣的事件,忽略之
return;
}
//查找的流终于注册上了;取消延时任务,防止多次回调
// 查找的流终于注册上了;取消延时任务,防止多次回调
onRegistTimeout->cancel();
//取消事件监听
NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
// 取消事件监听
NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
//切换到自己的线程再回复
strongSelf->_poller->async([weakSelf,cb](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
// 切换到自己的线程再回复
strongSelf->_poller->async([weakSelf, cb]() {
if (auto strongSelf = weakSelf.lock()) {
// 再找一遍媒体源,一般能找到
strongSelf->findAsync(0, cb);
}
//再找一遍媒体源,一般能找到
strongSelf->findAsync(0,cb);
}, false);
};
//监听媒体注册事件
// 监听媒体注册事件
NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist);
}
@ -222,49 +217,49 @@ void FFmpegSource::startTimer(int timeout_ms) {
}
bool needRestart = ffmpeg_restart_sec > 0 && strongSelf->_replay_ticker.elapsedTime() > ffmpeg_restart_sec * 1000;
if (is_local_ip(strongSelf->_media_info.host)) {
//推流给自己的我们通过检查是否已经注册来判断FFmpeg是否工作正常
// 推流给自己的我们通过检查是否已经注册来判断FFmpeg是否工作正常
strongSelf->findAsync(0, [&](const MediaSource::Ptr &src) {
//同步查找流
// 同步查找流
if (!src || needRestart) {
if(needRestart){
if (needRestart) {
strongSelf->_replay_ticker.resetTime();
if(strongSelf->_process.wait(false)){
//FFmpeg进程还在运行超时就关闭它
if (strongSelf->_process.wait(false)) {
// FFmpeg进程还在运行超时就关闭它
strongSelf->_process.kill(2000);
}
InfoL << "FFmpeg即将重启, 将会继续拉流 " << strongSelf->_src_url;
}
//流不在线,重新拉流, 这里原先是10秒超时实际发现10秒不够改成20秒了
if(strongSelf->_replay_ticker.elapsedTime() > 20 * 1000){
//上次重试时间超过10秒那么再重试FFmpeg拉流
// 流不在线,重新拉流, 这里原先是10秒超时实际发现10秒不够改成20秒了
if (strongSelf->_replay_ticker.elapsedTime() > 20 * 1000) {
// 上次重试时间超过10秒那么再重试FFmpeg拉流
strongSelf->_replay_ticker.resetTime();
strongSelf->play(strongSelf->_ffmpeg_cmd_key, strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [](const SockException &) {});
}
}
});
} else {
//推流给其他服务器的我们通过判断FFmpeg进程是否在线如果FFmpeg推流中断那么它应该会自动退出
// 推流给其他服务器的我们通过判断FFmpeg进程是否在线如果FFmpeg推流中断那么它应该会自动退出
if (!strongSelf->_process.wait(false) || needRestart) {
if(needRestart){
if (needRestart) {
strongSelf->_replay_ticker.resetTime();
if(strongSelf->_process.wait(false)){
//FFmpeg进程还在运行超时就关闭它
if (strongSelf->_process.wait(false)) {
// FFmpeg进程还在运行超时就关闭它
strongSelf->_process.kill(2000);
}
InfoL << "FFmpeg即将重启, 将会继续拉流 " << strongSelf->_src_url;
}
//ffmpeg不在线重新拉流
// ffmpeg不在线重新拉流
strongSelf->play(strongSelf->_ffmpeg_cmd_key, strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [weakSelf](const SockException &ex) {
if(!ex){
//没有错误
if (!ex) {
// 没有错误
return;
}
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
//自身已经销毁
// 自身已经销毁
return;
}
//上次重试时间超过10秒那么再重试FFmpeg拉流
// 上次重试时间超过10秒那么再重试FFmpeg拉流
strongSelf->startTimer(10 * 1000);
});
}
@ -294,20 +289,17 @@ MediaOriginType FFmpegSource::getOriginType(MediaSource &sender) const{
return MediaOriginType::ffmpeg_pull;
}
string FFmpegSource::getOriginUrl(MediaSource &sender) const{
string FFmpegSource::getOriginUrl(MediaSource &sender) const {
return _src_url;
}
std::shared_ptr<SockInfo> FFmpegSource::getOriginSock(MediaSource &sender) const {
return nullptr;
}
void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
auto listener = src->getListener(true);
if (listener.lock().get() != this) {
auto muxer = src->getMuxer();
auto listener = muxer ? muxer->getDelegate() : nullptr;
if (listener && listener.get() != this) {
//防止多次进入onGetMediaSource函数导致无限递归调用的bug
setDelegate(listener);
src->setListener(shared_from_this());
muxer->setDelegate(shared_from_this());
if (_enable_hls) {
src->setupRecord(Recorder::type_hls, true, "", 0);
}
@ -318,14 +310,14 @@ void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
}
void FFmpegSnap::makeSnap(const string &play_url, const string &save_path, float timeout_sec, const onSnap &cb) {
GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin);
GET_CONFIG(string,ffmpeg_snap,FFmpeg::kSnap);
GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog);
GET_CONFIG(string, ffmpeg_bin, FFmpeg::kBin);
GET_CONFIG(string, ffmpeg_snap, FFmpeg::kSnap);
GET_CONFIG(string, ffmpeg_log, FFmpeg::kLog);
Ticker ticker;
WorkThreadPool::Instance().getPoller()->async([timeout_sec, play_url,save_path,cb, ticker](){
WorkThreadPool::Instance().getPoller()->async([timeout_sec, play_url, save_path, cb, ticker]() {
auto elapsed_ms = ticker.elapsedTime();
if (elapsed_ms > timeout_sec * 1000) {
//超时,后台线程负载太高,当代太久才启动该任务
// 超时,后台线程负载太高,当代太久才启动该任务
cb(false, "wait work poller schedule snap task timeout");
return;
}
@ -346,13 +338,12 @@ void FFmpegSnap::makeSnap(const string &play_url, const string &save_path, float
return 0;
});
//等待FFmpeg进程退出
// 等待FFmpeg进程退出
process->wait(true);
// FFmpeg进程退出了可以取消定时器了
delayTask->cancel();
//执行回调函数
// 执行回调函数
bool success = process->exit_code() == 0 && File::fileSize(save_path.data());
cb(success, (!success && !log_file.empty()) ? File::loadFile(log_file.data()) : "");
});
}

View File

@ -79,8 +79,6 @@ private:
mediakit::MediaOriginType getOriginType(mediakit::MediaSource &sender) const override;
//获取媒体源url或者文件路径
std::string getOriginUrl(mediakit::MediaSource &sender) const override;
// 获取媒体源客户端相关信息
std::shared_ptr<toolkit::SockInfo> getOriginSock(mediakit::MediaSource &sender) const override;
private:
bool _enable_hls = false;

View File

@ -172,20 +172,8 @@ void MediaSource::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_listener = listener;
}
std::weak_ptr<MediaSourceEvent> MediaSource::getListener(bool next) const{
if (!next) {
std::weak_ptr<MediaSourceEvent> MediaSource::getListener() const {
return _listener;
}
auto listener = dynamic_pointer_cast<MediaSourceEventInterceptor>(_listener.lock());
if (!listener) {
//不是MediaSourceEventInterceptor对象或者对象已经销毁
return _listener;
}
//获取被拦截的对象
auto next_obj = listener->getDelegate();
//有则返回之
return next_obj ? next_obj : _listener;
}
int MediaSource::totalReaderCount(){
@ -277,6 +265,11 @@ toolkit::EventPoller::Ptr MediaSource::getOwnerPoller() {
throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed: " + getUrl());
}
std::shared_ptr<MultiMediaSourceMuxer> MediaSource::getMuxer() {
auto listener = _listener.lock();
return listener ? listener->getMuxer(*this) : nullptr;
}
void MediaSource::onReaderChanged(int size) {
try {
weak_ptr<MediaSource> weak_self = shared_from_this();
@ -780,6 +773,11 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc
throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed");
}
std::shared_ptr<MultiMediaSourceMuxer> MediaSourceEventInterceptor::getMuxer(MediaSource &sender) {
auto listener = _listener.lock();
return listener ? listener->getMuxer(sender) : nullptr;
}
bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) {
auto listener = _listener.lock();
if (!listener) {

View File

@ -41,6 +41,7 @@ enum class MediaOriginType : uint8_t {
std::string getOriginTypeString(MediaOriginType type);
class MediaSource;
class MultiMediaSourceMuxer;
class MediaSourceEvent {
public:
friend class MediaSource;
@ -88,6 +89,8 @@ public:
virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }
// 获取所有track相关信息
virtual std::vector<Track::Ptr> getMediaTracks(MediaSource &sender, bool trackReady = true) const { return std::vector<Track::Ptr>(); };
// 获取MultiMediaSourceMuxer对象
virtual std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) { return nullptr; }
class SendRtpArgs {
public:
@ -257,6 +260,7 @@ public:
bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override;
float getLossRate(MediaSource &sender, TrackType type) override;
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) override;
private:
std::weak_ptr<MediaSourceEvent> _listener;
@ -330,7 +334,7 @@ public:
// 设置监听者
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
// 获取监听者
std::weak_ptr<MediaSourceEvent> getListener(bool next = false) const;
std::weak_ptr<MediaSourceEvent> getListener() const;
// 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数
virtual int readerCount() = 0;
@ -372,6 +376,8 @@ public:
float getLossRate(mediakit::TrackType type);
// 获取所在线程
toolkit::EventPoller::Ptr getOwnerPoller();
// 获取MultiMediaSourceMuxer对象
std::shared_ptr<MultiMediaSourceMuxer> getMuxer();
////////////////static方法查找或生成MediaSource////////////////

View File

@ -71,6 +71,14 @@ static string getTrackInfoStr(const TrackSource *track_src){
return std::move(codec_info);
}
const ProtocolOption &MultiMediaSourceMuxer::getOption() const {
return _option;
}
const MediaTuple &MultiMediaSourceMuxer::getMediaTuple() const {
return _tuple;
}
std::string MultiMediaSourceMuxer::shortUrl() const {
auto ret = getOriginUrl(MediaSource::NullMediaSource());
if (!ret.empty()) {
@ -361,6 +369,10 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) {
}
}
std::shared_ptr<MultiMediaSourceMuxer> MultiMediaSourceMuxer::getMuxer(MediaSource &sender) {
return shared_from_this();
}
bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) {
bool ret = false;
if (_rtmp) {

View File

@ -126,9 +126,13 @@ public:
*/
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
const MediaTuple& getMediaTuple() const {
return _tuple;
}
/**
*
*/
std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) override;
const ProtocolOption &getOption() const;
const MediaTuple &getMediaTuple() const;
std::string shortUrl() const;
protected: