diff --git a/ZLToolKit b/ZLToolKit index e3d2a2eb..9bfa63bd 160000 --- a/ZLToolKit +++ b/ZLToolKit @@ -1 +1 @@ -Subproject commit e3d2a2eb5d7b816b28f308eac8c53d147db80533 +Subproject commit 9bfa63bdca291123621e7805908195b999f15aec diff --git a/c_wrapper/src/common.cpp b/c_wrapper/src/common.cpp index 59b1096d..9508d011 100755 --- a/c_wrapper/src/common.cpp +++ b/c_wrapper/src/common.cpp @@ -53,8 +53,6 @@ static TcpServer::Ptr s_pHttpSrv; API_EXPORT void API_CALL onAppStart(){ static onceToken s_token([](){ Logger::Instance().add(std::make_shared("stdout", LTrace)); - EventPoller::Instance().runLoop(false); - cleaner::Instance().push_back([](){ s_pRtspSrv.reset(); s_pRtmpSrv.reset(); diff --git a/src/Device/PlayerProxy.cpp b/src/Device/PlayerProxy.cpp index a9eb23df..2cbb741d 100644 --- a/src/Device/PlayerProxy.cpp +++ b/src/Device/PlayerProxy.cpp @@ -192,13 +192,16 @@ void PlayerProxy::initMedia() { bool PlayerProxy::shutDown() { //通知其停止推流 weak_ptr weakSlef = dynamic_pointer_cast(shared_from_this()); - ASYNC_TRACE([weakSlef](){ - auto stronSelf = weakSlef.lock(); - if(stronSelf){ - stronSelf->m_pChn.reset(); - stronSelf->teardown(); - } - }); + auto executor = getExecutor(); + if(executor) { + executor->async_first([weakSlef]() { + auto stronSelf = weakSlef.lock(); + if (stronSelf) { + stronSelf->m_pChn.reset(); + stronSelf->teardown(); + } + }); + } return true; } diff --git a/src/Player/MediaPlayer.cpp b/src/Player/MediaPlayer.cpp index bcd72d24..53fe5aaa 100644 --- a/src/Player/MediaPlayer.cpp +++ b/src/Player/MediaPlayer.cpp @@ -56,6 +56,13 @@ void MediaPlayer::play(const char* strUrl) { m_parser->play(strUrl); } +TaskExecutor::Ptr MediaPlayer::getExecutor(){ + auto parser = dynamic_pointer_cast(m_parser); + if(!parser){ + return nullptr; + } + return parser->getExecutor(); +} void MediaPlayer::pause(bool bPause) { if (m_parser) { diff --git a/src/Player/MediaPlayer.h b/src/Player/MediaPlayer.h index 42ebfa17..ca23d992 100644 --- a/src/Player/MediaPlayer.h +++ b/src/Player/MediaPlayer.h @@ -33,10 +33,12 @@ #include "PlayerBase.h" #include "Rtsp/RtspPlayer.h" #include "Rtmp/RtmpPlayer.h" +#include "Thread/TaskExecutor.h" using namespace std; using namespace ZL::Rtsp; using namespace ZL::Rtmp; +using namespace ZL::Thread; namespace ZL { namespace Player { @@ -50,6 +52,7 @@ public: void play(const char* strUrl) override; void pause(bool bPause) override; void teardown() override; + TaskExecutor::Ptr getExecutor(); private: string m_strPrefix; diff --git a/src/Player/PlayerBase.cpp b/src/Player/PlayerBase.cpp index fe0d6832..e624cd17 100644 --- a/src/Player/PlayerBase.cpp +++ b/src/Player/PlayerBase.cpp @@ -46,18 +46,13 @@ const char PlayerBase::kRtspPwdIsMD5[] = "rtsp_pwd_md5"; PlayerBase::Ptr PlayerBase::createPlayer(const char* strUrl) { string prefix = FindField(strUrl, NULL, "://"); - auto onDestory = [](PlayerBase *ptr){ - ASYNC_TRACE([ptr](){ - delete ptr; - }); - }; if (strcasecmp("rtsp",prefix.data()) == 0) { - return PlayerBase::Ptr(new RtspPlayerImp(),onDestory); + return PlayerBase::Ptr(new RtspPlayerImp()); } if (strcasecmp("rtmp",prefix.data()) == 0) { - return PlayerBase::Ptr(new RtmpPlayerImp(),onDestory); + return PlayerBase::Ptr(new RtmpPlayerImp()); } - return PlayerBase::Ptr(new RtspPlayerImp(),onDestory); + return PlayerBase::Ptr(new RtspPlayerImp()); } } /* namespace Player */ diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index 9f87b7e5..574acec3 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -119,7 +119,7 @@ void RtmpPlayer::onConnect(const SockException &err){ strongSelf->_onPlayResult(SockException(Err_timeout,"play rtmp timeout")); strongSelf->teardown(); return false; - })); + },getExecutor())); startClientSession([weakSelf](){ auto strongSelf=weakSelf.lock(); if(!strongSelf) { @@ -233,7 +233,7 @@ inline void RtmpPlayer::send_pause(bool bPause) { uint32_t timeStamp = ::time(NULL); strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp); return true; - })); + },getExecutor())); } } diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index bd0dc0b3..5628b840 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -93,7 +93,7 @@ private: return false; } return true; - })); + },getExecutor())); } onPlayResult(ex); } diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 6d3ca5b0..0c4753e4 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -121,7 +121,7 @@ void RtmpPusher::onConnect(const SockException &err){ strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); strongSelf->teardown(); return false; - })); + },getExecutor())); startClientSession([weakSelf](){ auto strongSelf=weakSelf.lock(); if(!strongSelf) { diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 99f59d57..b7a1db6d 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -174,7 +174,7 @@ void RtspPlayer::onConnect(const SockException &err){ strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout")); strongSelf->teardown(); return false; - })); + },getExecutor())); } void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { @@ -414,7 +414,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) return false; } return strongSelf->sendOptions(); - })); + },getExecutor())); pause(false); } @@ -808,7 +808,7 @@ void RtspPlayer::onPlayResult_l(const SockException &ex) { return false; } return true; - })); + },getExecutor())); } onPlayResult(ex); } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 71f658d5..198280a5 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -72,8 +72,11 @@ RtspSession::~RtspSession() { } void RtspSession::shutdown(){ + shutdown_l(true); +} +void RtspSession::shutdown_l(bool close){ if (_sock) { - _sock->emitErr(SockException(Err_other, "self shutdown")); + _sock->emitErr(SockException(Err_other, "self shutdown"),close); } if (m_bBase64need && !_sock) { //quickTime http postter,and self is detached from tcpServer @@ -107,7 +110,10 @@ void RtspSession::onError(const SockException& err) { _sock = nullptr; lock_guard lock(g_mtxPostter); //为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用 - g_mapPostter.emplace(this, dynamic_pointer_cast(shared_from_this())); + try { + g_mapPostter.emplace(this, dynamic_pointer_cast(shared_from_this())); + }catch (std::exception &ex){ + } TraceL << "quickTime will not send request any more!"; } @@ -143,10 +149,18 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { //quicktime 加密后的rtsp请求,需要解密 av_base64_decode((uint8_t *) m_pcBuf, pBuf->data(), sizeof(tmp)); m_parser.Parse(m_pcBuf); //rtsp请求解析 - //TraceL << buf; } else { - //TraceL << request; m_parser.Parse(pBuf->data()); //rtsp请求解析 + if(!m_parser.Content().empty()){ + weak_ptr weakSelf = shared_from_this(); + BufferString::Ptr nextRecv(new BufferString(m_parser.Content())); + async([weakSelf,nextRecv](){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + strongSelf->onRecv(nextRecv); + } + }, false); + } } string strCmd = m_parser.Method(); //提取出请求命令字 @@ -811,6 +825,7 @@ bool RtspSession::handleReq_Get() { //注册GET lock_guard lock(g_mtxGetter); g_mapGetter[m_strSessionCookie] = dynamic_pointer_cast(shared_from_this()); + //InfoL << m_strSessionCookie; send(m_pcBuf, n); return true; @@ -822,6 +837,7 @@ bool RtspSession::handleReq_Post() { //Poster 找到 Getter auto it = g_mapGetter.find(sessioncookie); if (it == g_mapGetter.end()) { + //WarnL << sessioncookie; return false; } m_bBase64need = true; @@ -830,6 +846,7 @@ bool RtspSession::handleReq_Post() { g_mapGetter.erase(sessioncookie); if (!strongSession) { send_SessionNotFound(); + //WarnL; return false; } initSender(strongSession); @@ -985,10 +1002,11 @@ inline void RtspSession::initSender(const std::shared_ptr& session) weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); session->m_onDestory = [weakSelf]() { auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->m_pSender->setOnErr([weakSelf](const SockException &err) { + if(!strongSelf) { + return; + } + //DebugL; + strongSelf->m_pSender->setOnErr([weakSelf](const SockException &err) { auto strongSelf=weakSelf.lock(); if(!strongSelf) { return; @@ -996,7 +1014,7 @@ inline void RtspSession::initSender(const std::shared_ptr& session) strongSelf->safeShutdown(); }); }; - session->shutdown(); + session->shutdown_l(false); } #ifdef RTSP_SEND_RTCP diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 2ed20b75..1409e6b8 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -97,7 +97,8 @@ private: m_ui64TotalBytes += pkt->size(); return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE); } - void shutdown() override; + void shutdown() override ; + void shutdown_l(bool close); bool handleReq_Options(); //处理options方法 bool handleReq_Describe(); //处理describe方法 bool handleReq_Setup(); //处理setup方法