From 0dc7cbb8798ad32be1706c5a344676b228c1a594 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Mon, 27 May 2019 22:32:07 +0800 Subject: [PATCH] =?UTF-8?q?=E7=B2=BE=E7=AE=80=E7=AD=89=E5=BE=85=E7=9B=91?= =?UTF-8?q?=E5=90=AC=E6=B5=81=E5=BC=82=E6=AD=A5=E6=B3=A8=E5=86=8C=E5=90=8E?= =?UTF-8?q?=E5=9B=9E=E5=A4=8D=E4=BA=8B=E4=BB=B6=E7=9B=B8=E5=85=B3=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Common/MediaSource.cpp | 63 ++++++++++++- src/Common/MediaSource.h | 11 ++- src/Rtmp/RtmpSession.cpp | 96 ++----------------- src/Rtmp/RtmpSession.h | 6 -- src/Rtsp/RtspSession.cpp | 184 ++++++++++--------------------------- src/Rtsp/RtspSession.h | 9 -- 6 files changed, 129 insertions(+), 240 deletions(-) diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 80279b9d..5fdd6b61 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -30,6 +30,7 @@ #include "Util/util.h" #include "Rtsp/Rtsp.h" #include "Network/sockutil.h" +#include "Network/TcpSession.h" using namespace toolkit; @@ -38,6 +39,67 @@ namespace mediakit { recursive_mutex MediaSource::g_mtxMediaSrc; MediaSource::SchemaVhostAppStreamMap MediaSource::g_mapMediaSrc; + +void MediaSource::findAsync(const MediaInfo &info, + const std::shared_ptr &session, + bool retry, + int maxWaitMs, + const function &cb){ + + auto src = MediaSource::find(info._schema, + info._vhost, + info._app, + info._streamid, + true); + if(src || !retry){ + cb(src); + return; + } + + void *listener_tag = session.get(); + weak_ptr weakSession = session; + //广播未找到流,此时可以立即去拉流,这样还来得及 + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,info,session); + + //若干秒后执行等待媒体注册超时回调 + auto onRegistTimeout = session->getPoller()->doDelayTask(maxWaitMs,[cb,listener_tag](){ + //取消监听该事件 + NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); + cb(nullptr); + return 0; + }); + + auto onRegist = [listener_tag,weakSession,info,cb,maxWaitMs,onRegistTimeout](BroadcastMediaChangedArgs) { + if(!bRegist || schema != info._schema || vhost != info._vhost || app != info._app ||stream != info._streamid){ + //不是自己感兴趣的事件,忽略之 + return; + } + + //取消延时任务,防止多次回调 + onRegistTimeout->cancel(); + + //播发器请求的流终于注册上了 + auto strongSession = weakSession.lock(); + if(!strongSession) { + return; + } + + //切换到自己的线程再回复 + strongSession->async([listener_tag,weakSession,info,cb,maxWaitMs](){ + auto strongSession = weakSession.lock(); + if(!strongSession) { + return; + } + DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid; + //再找一遍媒体源,一般能找到 + findAsync(info,strongSession,false,maxWaitMs,cb); + //取消事件监听 + NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); + }, false); + }; + //监听媒体注册事件 + NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist); +} MediaSource::Ptr MediaSource::find( const string &schema, const string &vhost_tmp, @@ -69,7 +131,6 @@ MediaSource::Ptr MediaSource::find( ret = MediaReader::onMakeMediaSource(schema, vhost,app,id); } return ret; - } void MediaSource::regist() { //注册该源,注册后服务器才能找到该源 diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index bc49b9cf..887b1c0c 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -42,6 +42,10 @@ using namespace std; using namespace toolkit; +namespace toolkit{ + class TcpSession; +}//namespace toolkit + namespace mediakit { class MediaSource; @@ -90,7 +94,6 @@ public: string _param_strs; }; - class MediaSource: public enable_shared_from_this { public: typedef std::shared_ptr Ptr; @@ -122,6 +125,12 @@ public: const string &id, bool bMake = true) ; + static void findAsync(const MediaInfo &info, + const std::shared_ptr &session, + bool retry, + int maxWaitMs, + const function &cb); + const string& getSchema() const { return _strSchema; } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index a5a2bdd5..835ee27a 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -43,10 +43,6 @@ RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtmpSession::~RtmpSession() { DebugL << get_peer_ip(); - if(_delayTask){ - _delayTask(); - _delayTask = nullptr; - } } void RtmpSession::onError(const SockException& err) { @@ -80,12 +76,6 @@ void RtmpSession::onManager() { shutdown(); } } - if(_delayTask){ - if(time(NULL) > _iTaskTimeLine){ - _delayTask(); - _delayTask = nullptr; - } - } } void RtmpSession::onRecv(const Buffer::Ptr &pBuf) { @@ -212,70 +202,6 @@ void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) { throw std::runtime_error(StrPrinter << "Stop publishing." << endl); } -void RtmpSession::findStream(const function &cb,bool retry) { - auto src = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA, - _mediaInfo._vhost, - _mediaInfo._app, - _mediaInfo._streamid, - true)); - if(src || !retry){ - cb(src); - return; - } - - //广播未找到流 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,_mediaInfo,*this); - - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto task_id = this; - auto media_info = _mediaInfo; - - auto onRegist = [task_id, weakSelf, media_info, cb](BroadcastMediaChangedArgs) { - if(bRegist && - schema == media_info._schema && - vhost == media_info._vhost && - app == media_info._app && - stream == media_info._streamid){ - //播发器请求的rtmp流终于注册上了 - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - //切换到自己的线程再回复 - //如果触发 kBroadcastMediaChanged 事件的线程与本RtmpSession绑定的线程相同, - //那么strongSelf->async操作可能是同步操作, - //通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行, - //以便防止遍历事件监听对象map时做删除操作 - strongSelf->async([task_id,weakSelf,media_info,cb](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - DebugL << "收到rtmp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/" << media_info._app << "/" << media_info._streamid; - - //再找一遍媒体源,一般能找到 - strongSelf->findStream(cb,false); - - //取消延时任务,防止多次回复 - strongSelf->cancelDelyaTask(); - - //取消事件监听 - //在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃 - NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged); - }, false); - } - }; - - NoticeCenter::Instance().addListener(task_id, Broadcast::kBroadcastMediaChanged, onRegist); - //5秒后执行失败回调 - doDelay(5, [cb,task_id]() { - //取消监听该事件,该延时任务可以在本对象析构时或到达指定延时后调用 - //所以该对象在销毁前一定会被取消事件监听 - NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged); - cb(nullptr); - }); - -} void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src){ bool authSuccess = err.empty(); @@ -377,14 +303,16 @@ void RtmpSession::doPlayResponse(const string &err,const std::function weakSelf = dynamic_pointer_cast(shared_from_this()); //鉴权成功,查找媒体源并回复 - findStream([weakSelf,cb](const RtmpMediaSource::Ptr &src){ + _mediaInfo._schema = RTMP_SCHEMA; + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,5000,[weakSelf,cb](const MediaSource::Ptr &src){ + auto rtmp_src = dynamic_pointer_cast(src); auto strongSelf = weakSelf.lock(); if(strongSelf){ - strongSelf->sendPlayResponse("", src); + strongSelf->sendPlayResponse("", rtmp_src); } - cb(src.operator bool()); + cb(rtmp_src.operator bool()); }); } @@ -556,17 +484,5 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { sendRtmp(pkt->typeId, pkt->streamId, pkt, modifiedStamp, pkt->chunkId); } -void RtmpSession::doDelay(int delaySec, const std::function &fun) { - if(_delayTask){ - _delayTask(); - } - _delayTask = fun; - _iTaskTimeLine = time(NULL) + delaySec; -} - -void RtmpSession::cancelDelyaTask(){ - _delayTask = nullptr; -} - } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 1a4b1133..6f9495f2 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -89,10 +89,6 @@ private: safeShutdown(); return true; } - - void doDelay(int delaySec,const std::function &fun); - void cancelDelyaTask(); - void findStream(const function &cb ,bool retry = true); private: std::string _strTcUrl; MediaInfo _mediaInfo; @@ -105,8 +101,6 @@ private: uint32_t _aui32FirstStamp[2] = {0}; //消耗的总流量 uint64_t _ui64TotalBytes = 0; - std::function _delayTask; - uint32_t _iTaskTimeLine = 0; }; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 3ba0d5e1..b80daaa7 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -126,13 +126,6 @@ void RtspSession::onManager() { shutdown(); return; } - - if(_delayTask){ - if(time(NULL) > _iTaskTimeLine){ - _delayTask(); - _delayTask = nullptr; - } - } } void RtspSession::onRecv(const Buffer::Ptr &pBuf) { @@ -307,43 +300,60 @@ bool RtspSession::handleReq_RECORD(const Parser &parser){ return true; } - bool RtspSession::handleReq_Describe(const Parser &parser) { - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto authorization = parser["Authorization"]; + _mediaInfo._schema = RTSP_SCHEMA; + auto authorization = parser["Authorization"]; + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,5000,[weakSelf,authorization](const MediaSource::Ptr &src){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } + auto rtsp_src = dynamic_pointer_cast(src); + if (!rtsp_src) { + //未找到相应的MediaSource + WarnL << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; + strongSelf->send_StreamNotFound(); + strongSelf->shutdown(); + return; + } + //找到了响应的rtsp流 + strongSelf->_strSdp = rtsp_src->getSdp(); + SdpAttr sdpAttr(strongSelf->_strSdp); + strongSelf->_aTrackInfo = sdpAttr.getAvailableTrack(); + if (strongSelf->_aTrackInfo.empty()) { + //该流无效 + strongSelf->send_StreamNotFound(); + strongSelf->shutdown(); + return; + } + strongSelf->_strSession = makeRandStr(12); + strongSelf->_pMediaSrc = rtsp_src; + for(auto &track : strongSelf->_aTrackInfo){ + track->_ssrc = rtsp_src->getSsrc(track->_type); + track->_seq = rtsp_src->getSeqence(track->_type); + track->_time_stamp = rtsp_src->getTimeStamp(track->_type); + } - findStream([weakSelf,authorization](bool success){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - return; - } + //该请求中的认证信息 + onGetRealm invoker = [weakSelf,authorization](const string &realm){ + if(realm.empty()){ + //无需认证,回复sdp + onAuthSuccess(weakSelf); + return; + } + //该流需要认证 + onAuthUser(weakSelf,realm,authorization); + }; - if(!success){ - //未找到相应的MediaSource - WarnL << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; - strongSelf->send_StreamNotFound(); - strongSelf->shutdown(); - return; - } - //该请求中的认证信息 - onGetRealm invoker = [weakSelf,authorization](const string &realm){ - if(realm.empty()){ - //无需认证,回复sdp - onAuthSuccess(weakSelf); - return; - } - //该流需要认证 - onAuthUser(weakSelf,realm,authorization); - }; - - //广播是否需要认证事件 - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, - strongSelf->_mediaInfo, - invoker, - *strongSelf)){ - //无人监听此事件,说明无需认证 - invoker(""); - } + //广播是否需要认证事件 + if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, + strongSelf->_mediaInfo, + invoker, + *strongSelf)){ + //无人监听此事件,说明无需认证 + invoker(""); + } }); return true; } @@ -885,98 +895,6 @@ inline void RtspSession::send_NotAcceptable() { sendRtspResponse("406 Not Acceptable",{"Connection","Close"}); } -void RtspSession::doDelay(int delaySec, const std::function &fun) { - if(_delayTask){ - _delayTask(); - } - _delayTask = fun; - _iTaskTimeLine = time(NULL) + delaySec; -} - -void RtspSession::cancelDelyaTask(){ - _delayTask = nullptr; -} - -void RtspSession::findStream(const function &cb) { - bool success = findStream(); - if (success) { - cb(true); - return; - } - - //广播未找到流 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,_mediaInfo,*this); - - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto task_id = this; - auto media_info = _mediaInfo; - - auto onRegist = [task_id, weakSelf, media_info, cb](BroadcastMediaChangedArgs) { - if (bRegist && - schema == media_info._schema && - vhost == media_info._vhost && - app == media_info._app && - stream == media_info._streamid) { - //播发器请求的rtsp流终于注册上了 - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return; - } - //切换到自己的线程再回复 - //如果触发 kBroadcastMediaChanged 事件的线程与本RtspSession绑定的线程相同, - //那么strongSelf->async操作可能是同步操作, - //通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行, - //以便防止遍历事件监听对象map时做删除操作 - strongSelf->async([task_id, weakSelf, media_info, cb]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return; - } - DebugL << "收到rtsp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/" - << media_info._app << "/" << media_info._streamid; - cb(strongSelf->findStream()); - //取消延时任务,防止多次回复 - strongSelf->cancelDelyaTask(); - - //取消事件监听 - //在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃 - NoticeCenter::Instance().delListener(task_id, Broadcast::kBroadcastMediaChanged); - }, false); - } - }; - - NoticeCenter::Instance().addListener(task_id, Broadcast::kBroadcastMediaChanged, onRegist); - //5秒后执行失败回调 - doDelay(5, [cb,task_id]() { - NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged); - cb(false); - }); -} - -inline bool RtspSession::findStream() { - RtspMediaSource::Ptr pMediaSrc = - dynamic_pointer_cast( MediaSource::find(RTSP_SCHEMA,_mediaInfo._vhost, _mediaInfo._app,_mediaInfo._streamid) ); - if (!pMediaSrc) { - return false; - } - _strSdp = pMediaSrc->getSdp(); - SdpAttr sdpAttr(_strSdp); - _aTrackInfo = sdpAttr.getAvailableTrack(); - - if (_aTrackInfo.empty()) { - return false; - } - _strSession = makeRandStr(12); - _pMediaSrc = pMediaSrc; - - for(auto &track : _aTrackInfo){ - track->_ssrc = pMediaSrc->getSsrc(track->_type); - track->_seq = pMediaSrc->getSeqence(track->_type); - track->_time_stamp = pMediaSrc->getTimeStamp(track->_type); - } - return true; -} - void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) { _pushSrc->onWrite(rtppt, false); diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 0a0e72b5..afd66ab0 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -136,9 +136,6 @@ private: void inline send_UnsupportedTransport(); //不支持的传输模式 void inline send_SessionNotFound(); //会话id错误 void inline send_NotAcceptable(); //rtsp同时播放数限制 - inline bool findStream(); //根据rtsp url查找 MediaSource实例 - inline void findStream(const function &cb); //根据rtsp url查找 MediaSource实例 - inline string printSSRC(uint32_t ui32Ssrc); inline int getTrackIndexByTrackType(TrackType type); inline int getTrackIndexByControlSuffix(const string &controlSuffix); @@ -154,9 +151,6 @@ private: static void onAuthBasic(const weak_ptr &weakSelf,const string &realm,const string &strBase64); static void onAuthDigest(const weak_ptr &weakSelf,const string &realm,const string &strMd5); - void doDelay(int delaySec,const std::function &fun); - void cancelDelyaTask(); - inline void sendRtpPacket(const RtpPacket::Ptr &pkt); bool sendRtspResponse(const string &res_code,const std::initializer_list &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); @@ -190,9 +184,6 @@ private: //一次发送 get 一次发送post,需要通过x-sessioncookie关联起来 string _http_x_sessioncookie; function _onRecv; - - std::function _delayTask; - uint32_t _iTaskTimeLine = 0; atomic _enableSendRtp; //rtsp推流相关