优化关闭媒体源相关逻辑: #1963

This commit is contained in:
ziyue 2022-09-18 20:36:47 +08:00
parent daafe62f35
commit 15affeff1d
21 changed files with 44 additions and 69 deletions

View File

@ -60,19 +60,15 @@ public:
protected: protected:
// 通知其停止推流 // 通知其停止推流
bool close(MediaSource &sender,bool force) override{ bool close(MediaSource &sender) override {
if(!force && _channel->totalReaderCount()){ if (!_on_close) {
//非强制关闭且正有人在观看该视频
return false;
}
if(!_on_close){
//未设置回调,没法关闭 //未设置回调,没法关闭
WarnL << "请使用mk_media_set_on_close函数设置回调函数!"; WarnL << "请使用mk_media_set_on_close函数设置回调函数!";
return false; return false;
} }
//请在回调中调用mk_media_release函数释放资源,否则MediaSource::close()操作不会生效 //请在回调中调用mk_media_release函数释放资源,否则MediaSource::close()操作不会生效
_on_close(_on_close_data); _on_close(_on_close_data);
WarnL << "close media:" << sender.getUrl() << " " << force; WarnL << "close media: " << sender.getUrl();
return true; return true;
} }

View File

@ -273,14 +273,14 @@ void FFmpegSource::setOnClose(const function<void()> &cb){
_onClose = cb; _onClose = cb;
} }
bool FFmpegSource::close(MediaSource &sender, bool force) { bool FFmpegSource::close(MediaSource &sender) {
auto listener = getDelegate(); auto listener = getDelegate();
if(listener && !listener->close(sender,force)){ if (listener && !listener->close(sender)) {
//关闭失败 //关闭失败
return false; return false;
} }
//该流无人观看,我们停止吧 //该流无人观看,我们停止吧
if(_onClose){ if (_onClose) {
_onClose(); _onClose();
} }
return true; return true;

View File

@ -75,7 +75,7 @@ private:
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
bool close(mediakit::MediaSource &sender,bool force) override; bool close(mediakit::MediaSource &sender) override;
// 获取媒体源类型 // 获取媒体源类型
mediakit::MediaOriginType getOriginType(mediakit::MediaSource &sender) const override; mediakit::MediaOriginType getOriginType(mediakit::MediaSource &sender) const override;
//获取媒体源url或者文件路径 //获取媒体源url或者文件路径

View File

@ -211,10 +211,14 @@ bool MediaSource::speed(float speed) {
bool MediaSource::close(bool force) { bool MediaSource::close(bool force) {
auto listener = _listener.lock(); auto listener = _listener.lock();
if(!listener){ if (!listener) {
return false; return false;
} }
return listener->close(*this,force); if (!force && totalReaderCount()) {
//有人观看,不强制关闭
return false;
}
return listener->close(*this);
} }
float MediaSource::getLossRate(mediakit::TrackType type) { float MediaSource::getLossRate(mediakit::TrackType type) {
@ -680,12 +684,12 @@ bool MediaSourceEventInterceptor::speed(MediaSource &sender, float speed) {
return listener->speed(sender, speed); return listener->speed(sender, speed);
} }
bool MediaSourceEventInterceptor::close(MediaSource &sender, bool force) { bool MediaSourceEventInterceptor::close(MediaSource &sender) {
auto listener = _listener.lock(); auto listener = _listener.lock();
if (!listener) { if (!listener) {
return false; return false;
} }
return listener->close(sender, force); return listener->close(sender);
} }
int MediaSourceEventInterceptor::totalReaderCount(MediaSource &sender) { int MediaSourceEventInterceptor::totalReaderCount(MediaSource &sender) {

View File

@ -74,7 +74,7 @@ public:
// 通知倍数 // 通知倍数
virtual bool speed(MediaSource &sender, float speed) { return false; } virtual bool speed(MediaSource &sender, float speed) { return false; }
// 通知其停止产生流 // 通知其停止产生流
virtual bool close(MediaSource &sender, bool force) { return false; } virtual bool close(MediaSource &sender) { return false; }
// 获取观看总人数,此函数一般强制重载 // 获取观看总人数,此函数一般强制重载
virtual int totalReaderCount(MediaSource &sender) { throw NotImplemented(toolkit::demangle(typeid(*this).name()) + "::totalReaderCount not implemented"); } virtual int totalReaderCount(MediaSource &sender) { throw NotImplemented(toolkit::demangle(typeid(*this).name()) + "::totalReaderCount not implemented"); }
// 通知观看人数变化 // 通知观看人数变化
@ -150,7 +150,7 @@ public:
bool seekTo(MediaSource &sender, uint32_t stamp) override; bool seekTo(MediaSource &sender, uint32_t stamp) override;
bool pause(MediaSource &sender, bool pause) override; bool pause(MediaSource &sender, bool pause) override;
bool speed(MediaSource &sender, float speed) override; bool speed(MediaSource &sender, float speed) override;
bool close(MediaSource &sender, bool force) override; bool close(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
void onReaderChanged(MediaSource &sender, int size) override; void onReaderChanged(MediaSource &sender, int size) override;
void onRegist(MediaSource &sender, bool regist) override; void onRegist(MediaSource &sender, bool regist) override;

View File

@ -143,11 +143,7 @@ void PlayerProxy::rePlay(const string &strUrl, int iFailedCnt) {
}, getPoller()); }, getPoller());
} }
bool PlayerProxy::close(MediaSource &sender, bool force) { bool PlayerProxy::close(MediaSource &sender) {
if (!force && totalReaderCount()) {
return false;
}
//通知其停止推流 //通知其停止推流
weak_ptr<PlayerProxy> weakSelf = dynamic_pointer_cast<PlayerProxy>(shared_from_this()); weak_ptr<PlayerProxy> weakSelf = dynamic_pointer_cast<PlayerProxy>(shared_from_this());
getPoller()->async_first([weakSelf]() { getPoller()->async_first([weakSelf]() {
@ -160,7 +156,7 @@ bool PlayerProxy::close(MediaSource &sender, bool force) {
strongSelf->teardown(); strongSelf->teardown();
}); });
_on_close(SockException(Err_shutdown, "closed by user")); _on_close(SockException(Err_shutdown, "closed by user"));
WarnL << sender.getUrl() << " " << force; WarnL << "close media: " << sender.getUrl();
return true; return true;
} }

View File

@ -54,7 +54,7 @@ public:
private: private:
//MediaSourceEvent override //MediaSourceEvent override
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
MediaOriginType getOriginType(MediaSource &sender) const override; MediaOriginType getOriginType(MediaSource &sender) const override;
std::string getOriginUrl(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override;

View File

@ -230,12 +230,9 @@ bool MP4Reader::seekTo(uint32_t stamp_seek) {
return false; return false;
} }
bool MP4Reader::close(MediaSource &sender, bool force) { bool MP4Reader::close(MediaSource &sender) {
if (!_muxer || (!force && _muxer->totalReaderCount())) { _timer = nullptr;
return false; WarnL << "close media: " << sender.getUrl();
}
_timer.reset();
WarnL << sender.getUrl() << " " << force;
return true; return true;
} }

View File

@ -55,7 +55,7 @@ private:
bool pause(MediaSource &sender, bool pause) override; bool pause(MediaSource &sender, bool pause) override;
bool speed(MediaSource &sender, float speed) override; bool speed(MediaSource &sender, float speed) override;
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender) override;
MediaOriginType getOriginType(MediaSource &sender) const override; MediaOriginType getOriginType(MediaSource &sender) const override;
std::string getOriginUrl(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override;
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;

View File

@ -569,13 +569,10 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
sendRtmp(pkt->type_id, pkt->stream_index, pkt, pkt->time_stamp, pkt->chunk_id); sendRtmp(pkt->type_id, pkt->stream_index, pkt, pkt->time_stamp, pkt->chunk_id);
} }
bool RtmpSession::close(MediaSource &sender,bool force) { bool RtmpSession::close(MediaSource &sender) {
//此回调在其他线程触发 //此回调在其他线程触发
if(!_push_src || (!force && _push_src->totalReaderCount())){ string err = StrPrinter << "close media: " << sender.getUrl();
return false; safeShutdown(SockException(Err_shutdown, err));
}
string err = StrPrinter << "close media:" << sender.getUrl() << " " << force;
safeShutdown(SockException(Err_shutdown,err));
return true; return true;
} }

View File

@ -71,7 +71,7 @@ private:
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
bool close(MediaSource &sender, bool force) override; bool close(MediaSource &sender) override;
// 播放总人数 // 播放总人数
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
// 获取媒体源类型 // 获取媒体源类型

View File

@ -134,17 +134,14 @@ void RtpProcessHelper::attachEvent() {
_process->setDelegate(shared_from_this()); _process->setDelegate(shared_from_this());
} }
bool RtpProcessHelper::close(MediaSource &sender, bool force) { bool RtpProcessHelper::close(MediaSource &sender) {
//此回调在其他线程触发 //此回调在其他线程触发
if (!_process || (!force && _process->totalReaderCount(sender))) {
return false;
}
auto parent = _parent.lock(); auto parent = _parent.lock();
if (!parent) { if (!parent) {
return false; return false;
} }
parent->delProcess(_stream_id, _process.get()); parent->delProcess(_stream_id, _process.get());
WarnL << "close media:" << sender.getUrl() << " " << force; WarnL << "close media: " << sender.getUrl();
return true; return true;
} }

View File

@ -31,7 +31,7 @@ public:
protected: protected:
// 通知其停止推流 // 通知其停止推流
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender) override;
private: private:
std::string _stream_id; std::string _stream_id;

View File

@ -124,13 +124,10 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
_ticker.resetTime(); _ticker.resetTime();
} }
bool RtpSession::close(MediaSource &sender, bool force) { bool RtpSession::close(MediaSource &sender) {
//此回调在其他线程触发 //此回调在其他线程触发
if(!_process || (!force && static_pointer_cast<MediaSourceEvent>(_process)->totalReaderCount(sender))){ string err = StrPrinter << "close media: " << sender.getUrl();
return false; safeShutdown(SockException(Err_shutdown, err));
}
string err = StrPrinter << "close media:" << sender.getUrl() << " " << force;
safeShutdown(SockException(Err_shutdown,err));
return true; return true;
} }

View File

@ -35,7 +35,7 @@ public:
protected: protected:
// 通知其停止推流 // 通知其停止推流
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender) override;
// 收到rtp回调 // 收到rtp回调
void onRtpPacket(const char *data, size_t len) override; void onRtpPacket(const char *data, size_t len) override;
// RtpSplitter override // RtpSplitter override

View File

@ -1136,12 +1136,9 @@ int RtspSession::getTrackIndexByInterleaved(int interleaved) {
throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved); throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved);
} }
bool RtspSession::close(MediaSource &sender, bool force) { bool RtspSession::close(MediaSource &sender) {
//此回调在其他线程触发 //此回调在其他线程触发
if(!_push_src || (!force && _push_src->totalReaderCount())){ string err = StrPrinter << "close media: " << sender.getUrl();
return false;
}
string err = StrPrinter << "close media:" << sender.getUrl() << " " << force;
safeShutdown(SockException(Err_shutdown,err)); safeShutdown(SockException(Err_shutdown,err));
return true; return true;
} }

View File

@ -82,7 +82,7 @@ protected:
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
bool close(MediaSource &sender, bool force) override; bool close(MediaSource &sender) override;
// 播放总人数 // 播放总人数
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
// 获取媒体源类型 // 获取媒体源类型

View File

@ -109,11 +109,8 @@ void SrtTransportImp::onShutdown(const SockException &ex) {
SrtTransport::onShutdown(ex); SrtTransport::onShutdown(ex);
} }
bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force) { bool SrtTransportImp::close(mediakit::MediaSource &sender) {
if (!force && totalReaderCount(sender)) { std::string err = StrPrinter << "close media: " << sender.getUrl();
return false;
}
std::string err = StrPrinter << "close media:" << sender.getUrl() << " " << force;
weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this()); weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
getPoller()->async([weak_self, err]() { getPoller()->async([weak_self, err]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();

View File

@ -50,7 +50,7 @@ protected:
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
bool close(mediakit::MediaSource &sender, bool force) override; bool close(mediakit::MediaSource &sender) override;
// 获取媒体源类型 // 获取媒体源类型
mediakit::MediaOriginType getOriginType(mediakit::MediaSource &sender) const override; mediakit::MediaOriginType getOriginType(mediakit::MediaSource &sender) const override;
// 获取媒体源url或者文件路径 // 获取媒体源url或者文件路径

View File

@ -38,12 +38,9 @@ WebRtcPusher::WebRtcPusher(const EventPoller::Ptr &poller,
CHECK(_push_src); CHECK(_push_src);
} }
bool WebRtcPusher::close(MediaSource &sender, bool force) { bool WebRtcPusher::close(MediaSource &sender) {
//此回调在其他线程触发 //此回调在其他线程触发
if (!force && totalReaderCount(sender)) { string err = StrPrinter << "close media: " << sender.getUrl();
return false;
}
string err = StrPrinter << "close media:" << sender.getUrl() << " " << force;
weak_ptr<WebRtcPusher> weak_self = static_pointer_cast<WebRtcPusher>(shared_from_this()); weak_ptr<WebRtcPusher> weak_self = static_pointer_cast<WebRtcPusher>(shared_from_this());
getPoller()->async([weak_self, err]() { getPoller()->async([weak_self, err]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();

View File

@ -33,7 +33,7 @@ protected:
protected: protected:
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
bool close(mediakit::MediaSource &sender, bool force) override; bool close(mediakit::MediaSource &sender) override;
// 播放总人数 // 播放总人数
int totalReaderCount(mediakit::MediaSource &sender) override; int totalReaderCount(mediakit::MediaSource &sender) override;
// 获取媒体源类型 // 获取媒体源类型