修复FFmpeg拉流代理无法通过close_stream接口关闭的bug

This commit is contained in:
xiongziliang 2019-11-18 12:07:11 +08:00
parent ddf351d350
commit 0d42aab9e7
3 changed files with 41 additions and 26 deletions

View File

@ -48,7 +48,6 @@ FFmpegSource::FFmpegSource() {
} }
FFmpegSource::~FFmpegSource() { FFmpegSource::~FFmpegSource() {
NoticeCenter::Instance().delListener(this, Broadcast::kBroadcastStreamNoneReader);
DebugL; DebugL;
} }
@ -83,6 +82,7 @@ void FFmpegSource::play(const string &src_url,const string &dst_url,int timeout_
if(src){ if(src){
//推流给自己成功 //推流给自己成功
cb(SockException()); cb(SockException());
strongSelf->onGetMediaSource(src);
strongSelf->startTimer(timeout_ms); strongSelf->startTimer(timeout_ms);
return; return;
} }
@ -192,8 +192,7 @@ void FFmpegSource::startTimer(int timeout_ms) {
//同步查找流 //同步查找流
if (!src) { if (!src) {
//流不在线,重新拉流 //流不在线,重新拉流
strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [](const SockException &) {});
[](const SockException &) {});
} }
}); });
} else { } else {
@ -205,29 +204,35 @@ void FFmpegSource::startTimer(int timeout_ms) {
} }
return true; return true;
}, _poller); }, _poller);
NoticeCenter::Instance().delListener(this, Broadcast::kBroadcastStreamNoneReader);
NoticeCenter::Instance().addListener(this, Broadcast::kBroadcastStreamNoneReader,[weakSelf](BroadcastStreamNoneReaderArgs) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
//自身已经销毁
return;
}
if(sender.getVhost() != strongSelf->_media_info._vhost ||
sender.getApp() != strongSelf->_media_info._app ||
sender.getId() != strongSelf->_media_info._streamid){
//不是自己感兴趣的事件,忽略之
return;
}
//该流无人观看,我们停止吧
if(strongSelf->_onClose){
strongSelf->_onClose();
}
});
} }
void FFmpegSource::setOnClose(const function<void()> &cb){ void FFmpegSource::setOnClose(const function<void()> &cb){
_onClose = cb; _onClose = cb;
} }
bool FFmpegSource::close(MediaSource &sender, bool force) {
auto listener = _listener.lock();
if(listener && !listener->close(sender,force)){
//关闭失败
return false;
}
//该流无人观看,我们停止吧
if(_onClose){
_onClose();
}
return true;
}
void FFmpegSource::onNoneReader(MediaSource &sender) {
auto listener = _listener.lock();
if(listener){
listener->onNoneReader(sender);
}else{
MediaSourceEvent::onNoneReader(sender);
}
}
void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
_listener = src->getListener();
src->setListener(shared_from_this());
}

View File

@ -39,7 +39,7 @@ using namespace std;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
class FFmpegSource : public std::enable_shared_from_this<FFmpegSource>{ class FFmpegSource : public std::enable_shared_from_this<FFmpegSource> , public MediaSourceEvent{
public: public:
typedef shared_ptr<FFmpegSource> Ptr; typedef shared_ptr<FFmpegSource> Ptr;
typedef function<void(const SockException &ex)> onPlay; typedef function<void(const SockException &ex)> onPlay;
@ -55,6 +55,10 @@ public:
private: private:
void findAsync(int maxWaitMS ,const function<void(const MediaSource::Ptr &src)> &cb); void findAsync(int maxWaitMS ,const function<void(const MediaSource::Ptr &src)> &cb);
void startTimer(int timeout_ms); void startTimer(int timeout_ms);
void onGetMediaSource(const MediaSource::Ptr &src);
bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override ;
private: private:
Process _process; Process _process;
Timer::Ptr _timer; Timer::Ptr _timer;
@ -63,6 +67,7 @@ private:
string _src_url; string _src_url;
string _dst_url; string _dst_url;
function<void()> _onClose; function<void()> _onClose;
std::weak_ptr<MediaSourceEvent> _listener;
}; };

View File

@ -167,10 +167,15 @@ public:
} }
listener->onNoneReader(*this); listener->onNoneReader(*this);
} }
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener){ virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_listener = listener; _listener = listener;
} }
std::weak_ptr<MediaSourceEvent> getListener(){
return _listener;
}
template <typename FUN> template <typename FUN>
static void for_each_media(FUN && fun){ static void for_each_media(FUN && fun){
lock_guard<recursive_mutex> lock(g_mtxMediaSrc); lock_guard<recursive_mutex> lock(g_mtxMediaSrc);