diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 60aab3b9..17e82574 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 60aab3b9f6230312c882b9d3d360ed27c94ebd9f +Subproject commit 17e82574991134f798ae32f82d48e2d6c6b97b06 diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index e7bc6b14..b6a454a3 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -178,39 +178,44 @@ static void eraseIfEmpty(MAP &map, IT0 it0, IT1 it1, IT2 it2) { } } } -}; +} void MediaSource::findAsync_l(const MediaInfo &info, const std::shared_ptr &session, bool retry, const function &cb){ auto src = MediaSource::find_l(info._schema, info._vhost, info._app, info._streamid, true); - if(src || !retry){ + if (src || !retry) { cb(src); return; } void *listener_tag = session.get(); weak_ptr weakSession = session; - //广播未找到流,此时可以立即去拉流,这样还来得及 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,info, static_cast(*session)); - //最多等待一定时间,如果这个时间内,流未注册上,那么返回未找到流 - GET_CONFIG(int,maxWaitMS,General::kMaxStreamWaitTimeMS); - - //若干秒后执行等待媒体注册超时回调 - auto onRegistTimeout = session->getPoller()->doDelayTask(maxWaitMS,[cb,listener_tag](){ - //取消监听该事件 - NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); + GET_CONFIG(int, maxWaitMS, General::kMaxStreamWaitTimeMS); + auto onTimeout = session->getPoller()->doDelayTask(maxWaitMS, [cb, listener_tag]() { + //最多等待一定时间,如果这个时间内,流未注册上,那么返回未找到流 + NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged); cb(nullptr); return 0; }); - auto onRegist = [listener_tag,weakSession,info,cb,onRegistTimeout](BroadcastMediaChangedArgs) { + auto cancelAll = [onTimeout, listener_tag]() { + //取消延时任务,防止多次回调 + onTimeout->cancel(); + //取消媒体注册事件监听 + NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged); + }; + + function closePlayer = [cb, cancelAll]() { + cancelAll(); + //告诉播放器,流不存在,这样会立即断开播放器 + cb(nullptr); + }; + + auto onRegist = [weakSession, info, cb, cancelAll](BroadcastMediaChangedArgs) { auto strongSession = weakSession.lock(); - if(!strongSession) { + if (!strongSession) { //自己已经销毁 - //取消延时任务,防止多次回调 - onRegistTimeout->cancel(); - //取消事件监听 - NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); + cancelAll(); return; } @@ -223,24 +228,24 @@ void MediaSource::findAsync_l(const MediaInfo &info, const std::shared_ptrcancel(); - //取消事件监听 - NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); + cancelAll(); //播发器请求的流终于注册上了,切换到自己的线程再回复 - strongSession->async([weakSession,info,cb](){ + strongSession->async([weakSession, info, cb]() { auto strongSession = weakSession.lock(); - if(!strongSession) { + if (!strongSession) { return; } DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid; //再找一遍媒体源,一般能找到 - findAsync_l(info,strongSession,false,cb); + findAsync_l(info, strongSession, false, cb); }, false); }; + //监听媒体注册事件 NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist); + //广播未找到流,此时可以立即去拉流,这样还来得及 + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream, info, static_cast(*session), closePlayer); } void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr &session,const function &cb){ diff --git a/src/Common/config.h b/src/Common/config.h index 5d210ce5..6832aec1 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -111,7 +111,7 @@ extern const string kBroadcastFlowReport; //未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了 extern const string kBroadcastNotFoundStream; -#define BroadcastNotFoundStreamArgs const MediaInfo &args,SockInfo &sender +#define BroadcastNotFoundStreamArgs const MediaInfo &args,SockInfo &sender, const function &closePlayer //某个流无人消费时触发,目的为了实现无人观看时主动断开拉流等业务逻辑 extern const string kBroadcastStreamNoneReader;