适配ZLToolKit(更新定时器)

This commit is contained in:
xiongziliang 2019-01-30 17:00:28 +08:00
parent cc7556b5a8
commit b5ea9fac08
12 changed files with 42 additions and 18 deletions

@ -1 +1 @@
Subproject commit 8c1a0f88a0b8e332c3eaf04dbb9a8f2402b267ba Subproject commit 01c9f8141b43fe270aa0d646d115df98aa6c5e14

View File

@ -46,12 +46,12 @@ void MediaPlayer::play(const char* strUrl) {
_parser->play(strUrl); _parser->play(strUrl);
} }
TaskExecutor::Ptr MediaPlayer::getExecutor(){ EventPoller::Ptr MediaPlayer::getPoller(){
auto parser = dynamic_pointer_cast<SocketHelper>(_parser); auto parser = dynamic_pointer_cast<SocketHelper>(_parser);
if(!parser){ if(!parser){
return nullptr; return nullptr;
} }
return parser->getExecutor(); return parser->getPoller();
} }
void MediaPlayer::pause(bool bPause) { void MediaPlayer::pause(bool bPause) {

View File

@ -46,7 +46,7 @@ public:
void play(const char* strUrl) override; void play(const char* strUrl) override;
void pause(bool bPause) override; void pause(bool bPause) override;
void teardown() override; void teardown() override;
TaskExecutor::Ptr getExecutor(); EventPoller::Ptr getPoller();
}; };

View File

@ -136,9 +136,9 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
bool PlayerProxy::close() { bool PlayerProxy::close() {
//通知其停止推流 //通知其停止推流
weak_ptr<PlayerProxy> weakSlef = dynamic_pointer_cast<PlayerProxy>(shared_from_this()); weak_ptr<PlayerProxy> weakSlef = dynamic_pointer_cast<PlayerProxy>(shared_from_this());
auto executor = getExecutor(); auto poller = getPoller();
if(executor) { if(poller) {
executor->async_first([weakSlef]() { poller->async_first([weakSlef]() {
auto stronSelf = weakSlef.lock(); auto stronSelf = weakSlef.lock();
if (stronSelf) { if (stronSelf) {
stronSelf->_mediaMuxer.reset(); stronSelf->_mediaMuxer.reset();

View File

@ -116,7 +116,7 @@ void RtmpPlayer::onConnect(const SockException &err){
strongSelf->_onPlayResult(SockException(Err_timeout,"play rtmp timeout")); strongSelf->_onPlayResult(SockException(Err_timeout,"play rtmp timeout"));
strongSelf->teardown(); strongSelf->teardown();
return false; return false;
},getExecutor())); },getPoller()));
startClientSession([weakSelf](){ startClientSession([weakSelf](){
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
@ -230,7 +230,7 @@ inline void RtmpPlayer::send_pause(bool bPause) {
uint32_t timeStamp = ::time(NULL); uint32_t timeStamp = ::time(NULL);
strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp); strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp);
return true; return true;
},getExecutor())); },getPoller()));
} }
} }

View File

@ -88,7 +88,7 @@ private:
return false; return false;
} }
return true; return true;
},getExecutor())); },getPoller()));
} }
onPlayResult(ex); onPlayResult(ex);
} }

View File

@ -121,7 +121,7 @@ void RtmpPusher::onConnect(const SockException &err){
strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout"));
strongSelf->teardown(); strongSelf->teardown();
return false; return false;
},getExecutor())); },getPoller()));
startClientSession([weakSelf](){ startClientSession([weakSelf](){
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {

View File

@ -153,7 +153,7 @@ void RtspPlayer::onConnect(const SockException &err){
strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout")); strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout"));
strongSelf->teardown(); strongSelf->teardown();
return false; return false;
},getExecutor())); },getPoller()));
} }
void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
@ -352,7 +352,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
return false; return false;
} }
return strongSelf->sendOptions(); return strongSelf->sendOptions();
},getExecutor())); },getPoller()));
pause(false); pause(false);
} }
@ -603,7 +603,7 @@ void RtspPlayer::onPlayResult_l(const SockException &ex) {
return false; return false;
} }
return true; return true;
},getExecutor())); },getPoller()));
} }
onPlayResult(ex); onPlayResult(ex);
} }

View File

@ -70,19 +70,42 @@ static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护 //对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter; static recursive_mutex g_mtxGetter;
//rtsp会话个数统计
static recursive_mutex g_mtxSessionCounter;
static unordered_map<string,atomic<uint32_t > > g_mapSessionCounter;
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
uint32_t RtspSession::getSessionCountOnInterface(const string &ifr){
lock_guard<recursive_mutex> lck(g_mtxSessionCounter);
if(ifr.empty()){
int i = 0;
for(auto &pr : g_mapSessionCounter){
i += pr.second;
}
return i;
}
return g_mapSessionCounter[ifr];
}
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
//设置10秒发送缓存 //设置10秒发送缓存
pSock->setSendBufSecond(10); pSock->setSendBufSecond(30);
//设置15秒发送超时时间 //设置15秒发送超时时间
pSock->setSendTimeOutSecond(15); pSock->setSendTimeOutSecond(45);
DebugL << get_peer_ip(); DebugL << get_peer_ip();
lock_guard<recursive_mutex> lck(g_mtxSessionCounter);
++(g_mapSessionCounter[get_local_ip()]);
} }
RtspSession::~RtspSession() { RtspSession::~RtspSession() {
DebugL << get_peer_ip(); DebugL << get_peer_ip();
lock_guard<recursive_mutex> lck(g_mtxSessionCounter);
--(g_mapSessionCounter[get_local_ip()]);
} }
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {

View File

@ -28,6 +28,7 @@
#define SESSION_RTSPSESSION_H_ #define SESSION_RTSPSESSION_H_
#include <set> #include <set>
#include <atomic>
#include <vector> #include <vector>
#include <unordered_map> #include <unordered_map>
#include "Util/util.h" #include "Util/util.h"
@ -79,6 +80,7 @@ public:
void onRecv(const Buffer::Ptr &pBuf) override; void onRecv(const Buffer::Ptr &pBuf) override;
void onError(const SockException &err) override; void onError(const SockException &err) override;
void onManager() override; void onManager() override;
static uint32_t getSessionCountOnInterface(const string &ifr);
protected: protected:
//RtspSplitter override //RtspSplitter override
/** /**

View File

@ -33,7 +33,6 @@
#include "Network/sockutil.h" #include "Network/sockutil.h"
#include "Poller/EventPoller.h" #include "Poller/EventPoller.h"
#include "Player/PlayerProxy.h" #include "Player/PlayerProxy.h"
#include "Thread/WorkThreadPool.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;

View File

@ -266,7 +266,7 @@ int main(int argc,char *argv[]) {
//指定RTP over TCP(播放rtsp时有效) //指定RTP over TCP(播放rtsp时有效)
(*player)[RtspPlayer::kRtpType] = PlayerBase::RTP_TCP; (*player)[RtspPlayer::kRtpType] = PlayerBase::RTP_TCP;
//开始播放,如果播放失败或者播放中止,将会自动重试若干次,重试次数在配置文件中配置,默认一直重试 //开始播放,如果播放失败或者播放中止,将会自动重试若干次,重试次数在配置文件中配置,默认一直重试
player->play(url); //player->play(url);
//需要保存PlayerProxy否则作用域结束就会销毁该对象 //需要保存PlayerProxy否则作用域结束就会销毁该对象
proxyMap.emplace(to_string(i), player); proxyMap.emplace(to_string(i), player);
++i; ++i;