diff --git a/ZLToolKit b/ZLToolKit index 8c1a0f88..01c9f814 160000 --- a/ZLToolKit +++ b/ZLToolKit @@ -1 +1 @@ -Subproject commit 8c1a0f88a0b8e332c3eaf04dbb9a8f2402b267ba +Subproject commit 01c9f8141b43fe270aa0d646d115df98aa6c5e14 diff --git a/src/Player/MediaPlayer.cpp b/src/Player/MediaPlayer.cpp index e208fc52..f54c51a5 100644 --- a/src/Player/MediaPlayer.cpp +++ b/src/Player/MediaPlayer.cpp @@ -46,12 +46,12 @@ void MediaPlayer::play(const char* strUrl) { _parser->play(strUrl); } -TaskExecutor::Ptr MediaPlayer::getExecutor(){ +EventPoller::Ptr MediaPlayer::getPoller(){ auto parser = dynamic_pointer_cast(_parser); if(!parser){ return nullptr; } - return parser->getExecutor(); + return parser->getPoller(); } void MediaPlayer::pause(bool bPause) { diff --git a/src/Player/MediaPlayer.h b/src/Player/MediaPlayer.h index 7e60e126..4f0b00a1 100644 --- a/src/Player/MediaPlayer.h +++ b/src/Player/MediaPlayer.h @@ -46,7 +46,7 @@ public: void play(const char* strUrl) override; void pause(bool bPause) override; void teardown() override; - TaskExecutor::Ptr getExecutor(); + EventPoller::Ptr getPoller(); }; diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 6fbc85cb..72c86386 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -136,9 +136,9 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ bool PlayerProxy::close() { //通知其停止推流 weak_ptr weakSlef = dynamic_pointer_cast(shared_from_this()); - auto executor = getExecutor(); - if(executor) { - executor->async_first([weakSlef]() { + auto poller = getPoller(); + if(poller) { + poller->async_first([weakSlef]() { auto stronSelf = weakSlef.lock(); if (stronSelf) { stronSelf->_mediaMuxer.reset(); diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index d0c851ad..9de1c1c2 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -116,7 +116,7 @@ void RtmpPlayer::onConnect(const SockException &err){ strongSelf->_onPlayResult(SockException(Err_timeout,"play rtmp timeout")); strongSelf->teardown(); return false; - },getExecutor())); + },getPoller())); startClientSession([weakSelf](){ auto strongSelf=weakSelf.lock(); if(!strongSelf) { @@ -230,7 +230,7 @@ inline void RtmpPlayer::send_pause(bool bPause) { uint32_t timeStamp = ::time(NULL); strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp); return true; - },getExecutor())); + },getPoller())); } } diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index 4631190b..2a9265bd 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -88,7 +88,7 @@ private: return false; } return true; - },getExecutor())); + },getPoller())); } onPlayResult(ex); } diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index f63bcdcf..74292dcf 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -121,7 +121,7 @@ void RtmpPusher::onConnect(const SockException &err){ strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); strongSelf->teardown(); return false; - },getExecutor())); + },getPoller())); startClientSession([weakSelf](){ auto strongSelf=weakSelf.lock(); if(!strongSelf) { diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index d1b22946..08f92b97 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -153,7 +153,7 @@ void RtspPlayer::onConnect(const SockException &err){ strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout")); strongSelf->teardown(); return false; - },getExecutor())); + },getPoller())); } void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { @@ -352,7 +352,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) return false; } return strongSelf->sendOptions(); - },getExecutor())); + },getPoller())); pause(false); } @@ -603,7 +603,7 @@ void RtspPlayer::onPlayResult_l(const SockException &ex) { return false; } return true; - },getExecutor())); + },getPoller())); } onPlayResult(ex); } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index ffc7cc86..823ed03e 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -70,19 +70,42 @@ static unordered_map > g_mapGetter; //对g_mapGetter上锁保护 static recursive_mutex g_mtxGetter; +//rtsp会话个数统计 +static recursive_mutex g_mtxSessionCounter; +static unordered_map > g_mapSessionCounter; + static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; +uint32_t RtspSession::getSessionCountOnInterface(const string &ifr){ + lock_guard lck(g_mtxSessionCounter); + if(ifr.empty()){ + int i = 0; + for(auto &pr : g_mapSessionCounter){ + i += pr.second; + } + return i; + } + return g_mapSessionCounter[ifr]; +} + RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { //设置10秒发送缓存 - pSock->setSendBufSecond(10); + pSock->setSendBufSecond(30); //设置15秒发送超时时间 - pSock->setSendTimeOutSecond(15); + pSock->setSendTimeOutSecond(45); DebugL << get_peer_ip(); + + lock_guard lck(g_mtxSessionCounter); + ++(g_mapSessionCounter[get_local_ip()]); } RtspSession::~RtspSession() { DebugL << get_peer_ip(); + + lock_guard lck(g_mtxSessionCounter); + --(g_mapSessionCounter[get_local_ip()]); + } void RtspSession::onError(const SockException& err) { diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 3c7a1c8e..0c0e9689 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -28,6 +28,7 @@ #define SESSION_RTSPSESSION_H_ #include +#include #include #include #include "Util/util.h" @@ -79,6 +80,7 @@ public: void onRecv(const Buffer::Ptr &pBuf) override; void onError(const SockException &err) override; void onManager() override; + static uint32_t getSessionCountOnInterface(const string &ifr); protected: //RtspSplitter override /** diff --git a/tests/test_benchmark.cpp b/tests/test_benchmark.cpp index b205e4d0..bc946a21 100644 --- a/tests/test_benchmark.cpp +++ b/tests/test_benchmark.cpp @@ -33,7 +33,6 @@ #include "Network/sockutil.h" #include "Poller/EventPoller.h" #include "Player/PlayerProxy.h" -#include "Thread/WorkThreadPool.h" using namespace std; using namespace toolkit; diff --git a/tests/test_server.cpp b/tests/test_server.cpp index 53abdd47..8b8d8a83 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -266,7 +266,7 @@ int main(int argc,char *argv[]) { //指定RTP over TCP(播放rtsp时有效) (*player)[RtspPlayer::kRtpType] = PlayerBase::RTP_TCP; //开始播放,如果播放失败或者播放中止,将会自动重试若干次,重试次数在配置文件中配置,默认一直重试 - player->play(url); + //player->play(url); //需要保存PlayerProxy,否则作用域结束就会销毁该对象 proxyMap.emplace(to_string(i), player); ++i;