diff --git a/README.md b/README.md index 8582fa0e..18dd35fc 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ # 一个基于C++11简单易用的轻量级流媒体库 平台|编译状态 ----|------- -Linux | [![Build Status](https://travis-ci.org/xiongziliang/ZLMediaKit.svg?branch=master)](https://travis-ci.org/xiongziliang/ZLMediaKit) -macOS | [![Build Status](https://travis-ci.org/xiongziliang/ZLMediaKit_build_for_mac.svg?branch=master)](https://travis-ci.org/xiongziliang/ZLMediaKit_build_for_mac) -iOS | [![Build Status](https://travis-ci.org/xiongziliang/ZLMediaKit-build_for_ios.svg?branch=master)](https://travis-ci.org/xiongziliang/ZLMediaKit-build_for_ios) -Android | [![Build Status](https://travis-ci.org/xiongziliang/ZLMediaKit_build_for_android.svg?branch=master)](https://travis-ci.org/xiongziliang/ZLMediaKit_build_for_android) +Linux | [![Build Status](https://travis-ci.org/xiongziliang/ZLMediaKit.svg?branch=2.0-alpha)](https://travis-ci.org/xiongziliang/ZLMediaKit) +macOS | [![Build Status](https://travis-ci.org/xiongziliang/ZLMediaKit_build_for_mac.svg?branch=2.0-alpha)](https://travis-ci.org/xiongziliang/ZLMediaKit_build_for_mac) +iOS | [![Build Status](https://travis-ci.org/xiongziliang/ZLMediaKit-build_for_ios.svg?branch=2.0-alpha)](https://travis-ci.org/xiongziliang/ZLMediaKit-build_for_ios) +Android | [![Build Status](https://travis-ci.org/xiongziliang/ZLMediaKit_build_for_android.svg?branch=2.0-alpha)](https://travis-ci.org/xiongziliang/ZLMediaKit_build_for_android) Windows | 已经完成移植 ## 项目特点 @@ -124,15 +124,15 @@ Windows | 已经完成移植 ## 使用方法 - 作为服务器: ``` - TcpServer::Ptr rtspSrv(new TcpServer()); - TcpServer::Ptr rtmpSrv(new TcpServer()); - TcpServer::Ptr httpSrv(new TcpServer()); - TcpServer::Ptr httpsSrv(new TcpServer()); + TcpServer::Ptr rtspSrv(new TcpServer()); + TcpServer::Ptr rtmpSrv(new TcpServer()); + TcpServer::Ptr httpSrv(new TcpServer()); + TcpServer::Ptr httpsSrv(new TcpServer()); - rtspSrv->start(mINI::Instance()[Config::Rtsp::kPort]); - rtmpSrv->start(mINI::Instance()[Config::Rtmp::kPort]); - httpSrv->start(mINI::Instance()[Config::Http::kPort]); - httpsSrv->start(mINI::Instance()[Config::Http::kSSLPort]); + rtspSrv->start(mINI::Instance()[Config::Rtsp::kPort]); + rtmpSrv->start(mINI::Instance()[Config::Rtmp::kPort]); + httpSrv->start(mINI::Instance()[Config::Http::kPort]); + httpsSrv->start(mINI::Instance()[Config::Http::kSSLPort]); EventPoller::Instance().runLoop(); ``` diff --git a/build_for_android.sh b/build_for_android.sh index d720baa3..2e28ed34 100755 --- a/build_for_android.sh +++ b/build_for_android.sh @@ -1,12 +1,13 @@ #!/bin/bash path=`pwd` -wget https://raw.githubusercontent.com/xiongziliang/ZLToolKit/master/build_for_android.sh -O toolkit_build.sh +wget https://raw.githubusercontent.com/xiongziliang/ZLToolKit/develop/build_for_android.sh -O toolkit_build.sh sudo chmod +x ./toolkit_build.sh ./toolkit_build.sh cd $path cd .. git clone --depth=50 https://github.com/xiongziliang/ZLMediaKit.git cd ZLMediaKit +git checkout 2.0-alpha mkdir -p android_build rm -rf ./build ln -s ./android_build build diff --git a/build_for_linux.sh b/build_for_linux.sh index a84090cb..a065a553 100755 --- a/build_for_linux.sh +++ b/build_for_linux.sh @@ -1,6 +1,6 @@ #!/bin/bash path=`pwd` -wget https://raw.githubusercontent.com/xiongziliang/ZLToolKit/master/build_for_linux.sh -O ./toolkit_build.sh +wget https://raw.githubusercontent.com/xiongziliang/ZLToolKit/develop/build_for_linux.sh -O ./toolkit_build.sh sudo chmod +x ./toolkit_build.sh ./toolkit_build.sh sudo apt-get install libx264-dev @@ -13,6 +13,7 @@ cd $path cd .. git clone --depth=50 https://github.com/xiongziliang/ZLMediaKit.git cd ZLMediaKit +git checkout 2.0-alpha mkdir -p linux_build rm -rf ./build ln -s ./linux_build build diff --git a/build_for_mac.sh b/build_for_mac.sh index 92910f4a..357c9187 100755 --- a/build_for_mac.sh +++ b/build_for_mac.sh @@ -1,6 +1,6 @@ #!/bin/bash path=`pwd` -wget https://raw.githubusercontent.com/xiongziliang/ZLToolKit/master/build_for_mac.sh -O toolkit_build.sh +wget https://raw.githubusercontent.com/xiongziliang/ZLToolKit/develop/build_for_mac.sh -O toolkit_build.sh sudo chmod +x ./toolkit_build.sh ./toolkit_build.sh brew install x264 @@ -12,6 +12,7 @@ cd $path cd .. git clone --depth=50 https://github.com/xiongziliang/ZLMediaKit.git cd ZLMediaKit +git checkout 2.0-alpha mkdir -p mac_build rm -rf ./build ln -s ./mac_build build diff --git a/src/Common/config.h b/src/Common/config.h index 037d2ae8..337b06c9 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -44,8 +44,6 @@ namespace Config { //默认配置文件名为 /path/to/your/exe.ini //加载配置文件成功后返回true,否则返回false bool loadIniConfig(const char *ini_path = nullptr); -////////////TCP最大连接数/////////// -#define MAX_TCP_SESSION 100000 ////////////其他宏定义/////////// #ifndef MAX #define MAX(a,b) ((a) > (b) ? (a) : (b) ) diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index 115a830a..8c9e657e 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -110,7 +110,7 @@ void HttpClient::onConnect(const SockException &ex) { send(_body); } } -void HttpClient::onRecv(const Socket::Buffer::Ptr &pBuf) { +void HttpClient::onRecv(const Buffer::Ptr &pBuf) { onRecvBytes(pBuf->data(),pBuf->size()); } diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 1bdf80d2..15ca40c5 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -111,7 +111,7 @@ protected: virtual void onDisconnect(const SockException &ex){} private: virtual void onConnect(const SockException &ex) override; - virtual void onRecv(const Socket::Buffer::Ptr &pBuf) override; + virtual void onRecv(const Buffer::Ptr &pBuf) override; virtual void onErr(const SockException &ex) override; //send diff --git a/src/Http/HttpDownloader.cpp b/src/Http/HttpDownloader.cpp index 295d57ee..5b464312 100644 --- a/src/Http/HttpDownloader.cpp +++ b/src/Http/HttpDownloader.cpp @@ -44,6 +44,7 @@ HttpDownloader::~HttpDownloader() { void HttpDownloader::startDownload(const string& url, const string& filePath,bool bAppend,uint32_t timeOutSecond) { _filePath = filePath; _timeOutSecond = timeOutSecond; + _downloadTicker.resetTime(); if(_filePath.empty()){ _filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest(); } @@ -67,7 +68,8 @@ void HttpDownloader::startDownload(const string& url, const string& filePath,boo } void HttpDownloader::onResponseHeader(const string& status,const HttpHeader& headers) { - if(status != "200" && status != "206"){ + _downloadTicker.resetTime(); + if(status != "200" && status != "206"){ //失败 shutdown(); closeFile(); @@ -81,7 +83,8 @@ void HttpDownloader::onResponseHeader(const string& status,const HttpHeader& hea } void HttpDownloader::onResponseBody(const char* buf, size_t size, size_t recvedSize, size_t totalSize) { - if(_saveFile){ + _downloadTicker.resetTime(); + if(_saveFile){ fwrite(buf,size,1,_saveFile); } } @@ -126,7 +129,7 @@ void HttpDownloader::closeFile() { } void HttpDownloader::onManager(){ - if(elapsedTime() > _timeOutSecond * 1000){ + if(_downloadTicker.elapsedTime() > _timeOutSecond * 1000){ //超时 onDisconnect(SockException(Err_timeout,"download timeout")); shutdown(); diff --git a/src/Http/HttpDownloader.h b/src/Http/HttpDownloader.h index 78c471ff..cc2b7c80 100644 --- a/src/Http/HttpDownloader.h +++ b/src/Http/HttpDownloader.h @@ -62,6 +62,7 @@ private: onDownloadResult _onResult; uint32_t _timeOutSecond; bool _bDownloadSuccess = false; + Ticker _downloadTicker; }; } /* namespace Http */ diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index cc1e1020..21b0d226 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -101,7 +101,7 @@ get_mime_type(const char* name) { HttpSession::HttpSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : - TcpLimitedSession(pTh, pSock) { + TcpSession(pTh, pSock) { GET_CONFIG_AND_REGISTER(string,rootPath,Config::Http::kRootPath); m_strPath = rootPath; @@ -115,7 +115,7 @@ HttpSession::~HttpSession() { //DebugL; } -void HttpSession::onRecv(const Socket::Buffer::Ptr &pBuf) { +void HttpSession::onRecv(const Buffer::Ptr &pBuf) { onRecv(pBuf->data(),pBuf->size()); } void HttpSession::onRecv(const char *data,int size){ @@ -256,6 +256,11 @@ inline bool HttpSession::checkLiveFlvStream(){ }); //开始发送rtmp负载 + + //关闭tcp_nodelay ,优化性能 + SockUtil::setNoDelay(_sock->rawFD(),false); + (*this) << SocketFlags(sock_flags); + m_pRingReader = mediaSrc->getRing()->attach(); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ @@ -392,9 +397,9 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { GET_CONFIG_AND_REGISTER(uint32_t,sendBufSize,Config::Http::kSendBufSize); //不允许主动丢包 - sock->setShouldDropPacket(false); + _sock->setShouldDropPacket(false); //缓存大小为两个包,太大可能导致发送时间太长从而超时 - sock->setSendPktSize(2); + _sock->setSendPktSize(2); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); auto onFlush = [pFilePtr,bClose,weakSelf,piLeft]() { TimeTicker(); @@ -403,7 +408,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { //更新超时定时器 strongSelf->m_ticker.resetTime(); //从循环池获取一个内存片 - auto sendBuf = strongSelf->sock->obtainBuffer(); + auto sendBuf = strongSelf->obtainBuffer(); sendBuf->setCapacity(sendBufSize); //本次需要读取文件字节数 int64_t iReq = MIN(sendBufSize,*piLeft); @@ -421,8 +426,8 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { //InfoL << "send complete!" << iRead << " " << iReq << " " << *piLeft; if(iRead>0) { sendBuf->setSize(iRead); - strongSelf->sock->setSendPktSize(3);//强制写入socket缓存 - strongSelf->sock->send(sendBuf,sock_flags); + strongSelf->_sock->setSendPktSize(3);//强制写入socket缓存 + strongSelf->send(sendBuf); } if(bClose) { strongSelf->shutdown(); @@ -431,7 +436,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { } //文件还未读完 sendBuf->setSize(iRead); - int iSent = strongSelf->sock->send(sendBuf,sock_flags); + int iSent = strongSelf->send(sendBuf); if(iSent == -1) { //send error //InfoL << "send error"; @@ -451,9 +456,10 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { return false; }; //关闭tcp_nodelay ,优化性能 - SockUtil::setNoDelay(sock->rawFD(),false); - onFlush(); - sock->setOnFlush(onFlush); + SockUtil::setNoDelay(_sock->rawFD(),false); + (*this) << SocketFlags(sock_flags); + onFlush(); + _sock->setOnFlush(onFlush); return Http_success; } @@ -690,7 +696,7 @@ public: #pragma pack(pop) #endif // defined(_WIN32) -class BufferRtmp : public Socket::Buffer{ +class BufferRtmp : public Buffer{ public: typedef std::shared_ptr Ptr; BufferRtmp(const RtmpPacket::Ptr & pkt):_rtmp(pkt){} @@ -708,28 +714,28 @@ private: void HttpSession::sendRtmp(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) { auto size = htonl(m_previousTagSize); - sock->send((char *)&size,4,sock_flags);//send PreviousTagSize + send((char *)&size,4);//send PreviousTagSize RtmpTagHeader header; header.type = pkt->typeId; set_be24(header.data_size, pkt->strBuf.size()); header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); - sock->send((char *)&header, sizeof(header),sock_flags);//send tag header - sock->send(std::make_shared(pkt),sock_flags);//send tag data + send((char *)&header, sizeof(header));//send tag header + send(std::make_shared(pkt));//send tag data m_previousTagSize += (pkt->strBuf.size() + sizeof(header) + 4); m_ticker.resetTime(); } void HttpSession::sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp) { auto size = htonl(m_previousTagSize); - sock->send((char *)&size,4,sock_flags);//send PreviousTagSize + send((char *)&size,4);//send PreviousTagSize RtmpTagHeader header; header.type = ui8Type; set_be24(header.data_size, strBuf.size()); header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); - sock->send((char *)&header, sizeof(header),sock_flags);//send tag header - sock->send(strBuf,sock_flags);//send tag data + send((char *)&header, sizeof(header));//send tag header + send(strBuf);//send tag data m_previousTagSize += (strBuf.size() + sizeof(header) + 4); m_ticker.resetTime(); } diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 42e57efe..758c08e8 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -29,7 +29,7 @@ #include #include "Common/config.h" #include "Rtsp/Rtsp.h" -#include "Network/TcpLimitedSession.h" +#include "Network/TcpSession.h" #include "Rtmp/RtmpMediaSource.h" using namespace std; @@ -40,7 +40,7 @@ namespace ZL { namespace Http { -class HttpSession: public TcpLimitedSession { +class HttpSession: public TcpSession { public: typedef StrCaseMap KeyValue; typedef std::function &pTh, const Socket::Ptr &pSock); virtual ~HttpSession(); - virtual void onRecv(const Socket::Buffer::Ptr &) override; + virtual void onRecv(const Buffer::Ptr &) override; virtual void onError(const SockException &err) override; virtual void onManager() override; diff --git a/src/Http/HttpsSession.h b/src/Http/HttpsSession.h index 88ffbb07..122affd3 100644 --- a/src/Http/HttpsSession.h +++ b/src/Http/HttpsSession.h @@ -58,7 +58,7 @@ public: virtual ~HttpsSession(){ //m_sslBox.shutdown(); } - void onRecv(const Socket::Buffer::Ptr &pBuf) override{ + void onRecv(const Buffer::Ptr &pBuf) override{ TimeTicker(); m_sslBox.onRecv(pBuf->data(), pBuf->size()); } diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index 9cb0ba0d..a6615b85 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -126,7 +126,7 @@ void RtmpPlayer::onConnect(const SockException &err){ strongSelf->send_connect(); }); } -void RtmpPlayer::onRecv(const Socket::Buffer::Ptr &pBuf){ +void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){ try { onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index ff52fa2e..bd0dc0b3 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -99,7 +99,7 @@ private: } //for Tcpclient - void onRecv(const Socket::Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &pBuf) override; void onConnect(const SockException &err) override; void onErr(const SockException &ex) override; //fro RtmpProtocol @@ -108,8 +108,8 @@ private: void onSendRawData(const char *pcRawData, int iSize) override { send(pcRawData, iSize); } - void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{ - m_pSock->send(buffer,flags); + void onSendRawData(const Buffer::Ptr &buffer,int flags) override{ + _sock->send(buffer,flags); } template diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index 9d26142e..d8c6234e 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -204,7 +204,7 @@ void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, //估算rtmp包数据大小 uint32_t capacity = ((bExtStamp ? 5 : 1) * (1 + (strBuf.size() / m_iChunkLenOut))) + strBuf.size() + sizeof(header); uint32_t totalSize = 0; - Socket::BufferRaw::Ptr buffer = m_bufferPool.obtain(); + BufferRaw::Ptr buffer = m_bufferPool.obtain(); buffer->setCapacity(capacity); memcpy(buffer->data() + totalSize,(char *) &header, sizeof(header)); totalSize += sizeof(header); diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index 56d6a503..3f09a123 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -55,7 +55,7 @@ public: void reset(); protected: virtual void onSendRawData(const char *pcRawData,int iSize) = 0; - virtual void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) = 0; + virtual void onSendRawData(const Buffer::Ptr &buffer,int flags) = 0; virtual void onRtmpChunk(RtmpPacket &chunkData) = 0; @@ -85,7 +85,7 @@ protected: int m_iNowStreamID = 0; int m_iNowChunkID = 0; bool m_bDataStarted = false; - ResourcePool m_bufferPool; + ResourcePool m_bufferPool; private: void handle_S0S1S2(const function &cb); void handle_C0C1(); diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 4f8546d1..66116907 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -131,7 +131,7 @@ void RtmpPusher::onConnect(const SockException &err){ strongSelf->send_connect(); }); } -void RtmpPusher::onRecv(const Socket::Buffer::Ptr &pBuf){ +void RtmpPusher::onRecv(const Buffer::Ptr &pBuf){ try { onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index 138bf35c..9f1955a9 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -56,7 +56,7 @@ public: protected: //for Tcpclient - void onRecv(const Socket::Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &pBuf) override; void onConnect(const SockException &err) override; void onErr(const SockException &ex) override; @@ -65,8 +65,8 @@ protected: void onSendRawData(const char *pcRawData, int iSize) override { send(pcRawData, iSize); } - void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{ - m_pSock->send(buffer,flags); + void onSendRawData(const Buffer::Ptr &buffer,int flags) override{ + _sock->send(buffer,flags); } private: void init(const RtmpMediaSource::Ptr &src); diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index c9319d8f..f6d42371 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -33,7 +33,7 @@ namespace Rtmp { unordered_map RtmpSession::g_mapCmd; RtmpSession::RtmpSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : - TcpLimitedSession(pTh, pSock) { + TcpSession(pTh, pSock) { static onceToken token([]() { g_mapCmd.emplace("connect",&RtmpSession::onCmd_connect); g_mapCmd.emplace("createStream",&RtmpSession::onCmd_createStream); @@ -43,11 +43,11 @@ RtmpSession::RtmpSession(const std::shared_ptr &pTh, const Socket::P g_mapCmd.emplace("play2",&RtmpSession::onCmd_play2); g_mapCmd.emplace("seek",&RtmpSession::onCmd_seek); g_mapCmd.emplace("pause",&RtmpSession::onCmd_pause);}, []() {}); - DebugL << getPeerIp(); + DebugL << get_peer_ip(); } RtmpSession::~RtmpSession() { - DebugL << getPeerIp(); + DebugL << get_peer_ip(); } void RtmpSession::onError(const SockException& err) { @@ -64,20 +64,20 @@ void RtmpSession::onError(const SockException& err) { void RtmpSession::onManager() { if (m_ticker.createdTime() > 10 * 1000) { if (!m_pRingReader && !m_pPublisherSrc) { - WarnL << "非法链接:" << getPeerIp(); + WarnL << "非法链接:" << get_peer_ip(); shutdown(); } } if (m_pPublisherSrc) { //publisher if (m_ticker.elapsedTime() > 10 * 1000) { - WarnL << "数据接收超时:" << getPeerIp(); + WarnL << "数据接收超时:" << get_peer_ip(); shutdown(); } } } -void RtmpSession::onRecv(const Socket::Buffer::Ptr &pBuf) { +void RtmpSession::onRecv(const Buffer::Ptr &pBuf) { m_ticker.resetTime(); try { m_ui64TotalBytes += pBuf->size(); @@ -273,7 +273,7 @@ void RtmpSession::doPlay(AMFDecoder &dec){ m_pRingReader = src->getRing()->attach(); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - SockUtil::setNoDelay(sock->rawFD(), false); + SockUtil::setNoDelay(_sock->rawFD(), false); m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 4122e3cb..2e7deaa2 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -36,7 +36,7 @@ #include "RtmpToRtspMediaSource.h" #include "Util/util.h" #include "Util/TimeTicker.h" -#include "Network/TcpLimitedSession.h" +#include "Network/TcpSession.h" using namespace ZL::Util; using namespace ZL::Network; @@ -44,12 +44,12 @@ using namespace ZL::Network; namespace ZL { namespace Rtmp { -class RtmpSession: public TcpLimitedSession ,public RtmpProtocol , public MediaSourceEvent{ +class RtmpSession: public TcpSession ,public RtmpProtocol , public MediaSourceEvent{ public: typedef std::shared_ptr Ptr; RtmpSession(const std::shared_ptr &_th, const Socket::Ptr &_sock); virtual ~RtmpSession(); - void onRecv(const Socket::Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &pBuf) override; void onError(const SockException &err) override; void onManager() override; private: @@ -86,9 +86,9 @@ private: m_ui64TotalBytes += iSize; send(pcRawData, iSize); } - void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{ + void onSendRawData(const Buffer::Ptr &buffer,int flags) override{ m_ui64TotalBytes += buffer->size(); - sock->send(buffer,flags); + _sock->send(buffer,flags); } void onRtmpChunk(RtmpPacket &chunkData) override; diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 00fe821c..e9f07f7e 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -173,7 +173,7 @@ void RtspPlayer::onConnect(const SockException &err){ })); } -void RtspPlayer::onRecv(const Socket::Buffer::Ptr& pBuf) { +void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { const char *buf = pBuf->data(); int size = pBuf->size(); if (m_onHandshake) { @@ -364,7 +364,7 @@ void RtspPlayer::HandleResSETUP(const Parser& parser, unsigned int uiTrackIndex) } auto srcIP = inet_addr(get_peer_ip().data()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - pUdpSockRef->setOnRead([srcIP,i,weakSelf](const Socket::Buffer::Ptr &buf, struct sockaddr *addr) { + pUdpSockRef->setOnRead([srcIP,i,weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr) { auto strongSelf=weakSelf.lock(); if(!strongSelf) { return; diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index 436b8227..937b4bd3 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -108,7 +108,7 @@ private: void play(const char* strUrl, const char *strUser, const char *strPwd, eRtpType eType); void onConnect(const SockException &err) override; - void onRecv(const Socket::Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &pBuf) override; void onErr(const SockException &ex) override; void HandleResSETUP(const Parser &parser, unsigned int uiTrackIndex); diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index c317ea8d..cc38d515 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -56,7 +56,7 @@ recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护 recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护 unordered_map RtspSession::g_mapCmd; RtspSession::RtspSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : - TcpLimitedSession(pTh, pSock), m_pSender(pSock) { + TcpSession(pTh, pSock), m_pSender(pSock) { static onceToken token( []() { g_mapCmd.emplace("OPTIONS",&RtspSession::handleReq_Options); g_mapCmd.emplace("DESCRIBE",&RtspSession::handleReq_Describe); @@ -76,21 +76,21 @@ RtspSession::RtspSession(const std::shared_ptr &pTh, const Socket::P pSock->setSendPktSize(32); #endif//__x86_64__ - DebugL << getPeerIp(); + DebugL << get_peer_ip(); } RtspSession::~RtspSession() { if (m_onDestory) { m_onDestory(); } - DebugL << getPeerIp(); + DebugL << get_peer_ip(); } void RtspSession::shutdown(){ - if (sock) { - sock->emitErr(SockException(Err_other, "self shutdown")); + if (_sock) { + _sock->emitErr(SockException(Err_other, "self shutdown")); } - if (m_bBase64need && !sock) { + if (m_bBase64need && !_sock) { //quickTime http postter,and self is detached from tcpServer lock_guard lock(g_mtxPostter); g_mapPostter.erase(this); @@ -108,7 +108,7 @@ void RtspSession::onError(const SockException& err) { TraceL << err.getErrCode() << " " << err.what(); if (m_bListenPeerUdpData) { //取消UDP端口监听 - UDPServer::Instance().stopListenPeer(getPeerIp().data(), this); + UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); m_bListenPeerUdpData = false; } if (!m_bBase64need && m_strSessionCookie.size() != 0) { @@ -119,7 +119,7 @@ void RtspSession::onError(const SockException& err) { if (m_bBase64need && err.getErrCode() == Err_eof) { //quickTime http postter,正在发送rtp; QuickTime只是断开了请求连接,请继续发送rtp - sock = nullptr; + _sock = nullptr; lock_guard lock(g_mtxPostter); //为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用 g_mapPostter.emplace(this, dynamic_pointer_cast(shared_from_this())); @@ -136,19 +136,19 @@ void RtspSession::onError(const SockException& err) { void RtspSession::onManager() { if (m_ticker.createdTime() > 10 * 1000) { if (m_strSession.size() == 0) { - WarnL << "非法链接:" << getPeerIp(); + WarnL << "非法链接:" << get_peer_ip(); shutdown(); return; } } if (m_rtpType != PlayerBase::RTP_TCP && m_ticker.elapsedTime() > 15 * 1000) { - WarnL << "RTSP会话超时:" << getPeerIp(); + WarnL << "RTSP会话超时:" << get_peer_ip(); shutdown(); return; } } -void RtspSession::onRecv(const Socket::Buffer::Ptr &pBuf) { +void RtspSession::onRecv(const Buffer::Ptr &pBuf) { m_ticker.resetTime(); char tmp[2 * 1024]; m_pcBuf = tmp; @@ -556,14 +556,14 @@ bool RtspSession::handleReq_Setup() { break; case PlayerBase::RTP_UDP: { //我们用trackIdx区分rtp和rtcp包 - auto pSockRtp = UDPServer::Instance().getSock(getLocalIp().data(),2*trackIdx); + auto pSockRtp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx); if (!pSockRtp) { //分配端口失败 WarnL << "分配rtp端口失败"; send_NotAcceptable(); return false; } - auto pSockRtcp = UDPServer::Instance().getSock(getLocalIp().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1); + auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1); if (!pSockRtcp) { //分配端口失败 WarnL << "分配rtcp端口失败"; @@ -577,7 +577,7 @@ bool RtspSession::handleReq_Setup() { struct sockaddr_in peerAddr; peerAddr.sin_family = AF_INET; peerAddr.sin_port = htons(ui16PeerPort); - peerAddr.sin_addr.s_addr = inet_addr(getPeerIp().data()); + peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data()); bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero); m_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr))); //尝试获取客户端nat映射地址 @@ -601,7 +601,7 @@ bool RtspSession::handleReq_Setup() { break; case PlayerBase::RTP_MULTICAST: { if(!m_pBrdcaster){ - m_pBrdcaster = RtpBroadCaster::get(getLocalIp(),m_mediaInfo.m_vhost, m_mediaInfo.m_app, m_mediaInfo.m_streamid); + m_pBrdcaster = RtpBroadCaster::get(get_local_ip(),m_mediaInfo.m_vhost, m_mediaInfo.m_app, m_mediaInfo.m_streamid); if (!m_pBrdcaster) { send_NotAcceptable(); return false; @@ -617,7 +617,7 @@ bool RtspSession::handleReq_Setup() { } int iSrvPort = m_pBrdcaster->getPort(trackid); //我们用trackIdx区分rtp和rtcp包 - auto pSockRtcp = UDPServer::Instance().getSock(getLocalIp().data(),2*trackIdx + 1,iSrvPort + 1); + auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1); if (!pSockRtcp) { //分配端口失败 WarnL << "分配rtcp端口失败"; @@ -636,7 +636,7 @@ bool RtspSession::handleReq_Setup() { m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data(), m_pBrdcaster->getIP().data(), - getLocalIp().data(), iSrvPort, pSockRtcp->get_local_port(), + get_local_ip().data(), iSrvPort, pSockRtcp->get_local_port(), udpTTL,printSSRC(trackRef.ssrc).data(), m_strSession.data()); send(m_pcBuf, n); @@ -934,12 +934,12 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { } } -inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Socket::Buffer::Ptr &pBuf, const struct sockaddr& addr) { +inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) { if(iTrackIdx % 2 == 0){ //这是rtp探测包 if(!m_bGotAllPeerUdp){ //还没有获取完整的rtp探测包 - if(SockUtil::in_same_lan(getLocalIp().data(),getPeerIp().data())){ + if(SockUtil::in_same_lan(get_local_ip().data(),get_peer_ip().data())){ //在内网中,客户端上报的端口号是真实的,所以我们忽略udp打洞包 m_bGotAllPeerUdp = true; return; @@ -967,8 +967,8 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Socket::Buffer::P inline void RtspSession::startListenPeerUdpData() { m_bListenPeerUdpData = true; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - UDPServer::Instance().listenPeer(getPeerIp().data(), this, - [weakSelf](int iTrackIdx,const Socket::Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool { + UDPServer::Instance().listenPeer(get_peer_ip().data(), this, + [weakSelf](int iTrackIdx,const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool { auto strongSelf=weakSelf.lock(); if(!strongSelf) { return false; @@ -986,7 +986,7 @@ inline void RtspSession::startListenPeerUdpData() { } inline void RtspSession::initSender(const std::shared_ptr& session) { - m_pSender = session->sock; + m_pSender = session->_sock; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); session->m_onDestory = [weakSelf]() { auto strongSelf=weakSelf.lock(); diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 4f545c4f..7e835462 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -37,7 +37,7 @@ #include "Player/PlayerBase.h" #include "Util/util.h" #include "Util/logger.h" -#include "Network/TcpLimitedSession.h" +#include "Network/TcpSession.h" using namespace std; using namespace ZL::Util; @@ -50,7 +50,7 @@ namespace Rtsp { class RtspSession; -class BufferRtp : public Socket::Buffer{ +class BufferRtp : public Buffer{ public: typedef std::shared_ptr Ptr; BufferRtp(const RtpPacket::Ptr & pkt,uint32_t offset = 0 ):_rtp(pkt),_offset(offset){} @@ -67,7 +67,7 @@ private: uint32_t _offset; }; -class RtspSession: public TcpLimitedSession { +class RtspSession: public TcpSession { public: typedef std::shared_ptr Ptr; typedef std::function onGetRealm; @@ -77,7 +77,7 @@ public: RtspSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock); virtual ~RtspSession(); - void onRecv(const Socket::Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &pBuf) override; void onError(const SockException &err) override; void onManager() override; private: @@ -94,7 +94,7 @@ private: m_ui64TotalBytes += iSize; return m_pSender->send(pcBuf, iSize); } - int send(const Socket::Buffer::Ptr &pkt) override{ + int send(const Buffer::Ptr &pkt) override{ m_ui64TotalBytes += pkt->size(); return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE); } @@ -134,7 +134,7 @@ private: } return -1; } - inline void onRcvPeerUdpData(int iTrackIdx, const Socket::Buffer::Ptr &pBuf, const struct sockaddr &addr); + inline void onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr &addr); inline void startListenPeerUdpData(); //认证相关 diff --git a/src/Rtsp/UDPServer.cpp b/src/Rtsp/UDPServer.cpp index dbe1617f..e37d2a33 100644 --- a/src/Rtsp/UDPServer.cpp +++ b/src/Rtsp/UDPServer.cpp @@ -87,7 +87,7 @@ void UDPServer::onErr(const string& strKey, const SockException& err) { lock_guard lck(m_mtxUpdSock); m_mapUpdSock.erase(strKey); } -void UDPServer::onRcvData(int iTrackIndex, const Socket::Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) { +void UDPServer::onRcvData(int iTrackIndex, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) { //TraceL << trackIndex; struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr; string peerIp = inet_ntoa(in->sin_addr); diff --git a/src/Rtsp/UDPServer.h b/src/Rtsp/UDPServer.h index 2e0f7196..0ce34208 100644 --- a/src/Rtsp/UDPServer.h +++ b/src/Rtsp/UDPServer.h @@ -44,7 +44,7 @@ namespace Rtsp { class UDPServer { public: - typedef function< bool(int, const Socket::Buffer::Ptr &, struct sockaddr *)> onRecvData; + typedef function< bool(int, const Buffer::Ptr &, struct sockaddr *)> onRecvData; UDPServer(); virtual ~UDPServer(); static UDPServer &Instance() { @@ -58,7 +58,7 @@ public: void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb); void stopListenPeer(const char *strPeerIp, void *pSelf); private: - void onRcvData(int iTrackId, const Socket::Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr); + void onRcvData(int iTrackId, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr); void onErr(const string &strKey,const SockException &err); unordered_map m_mapUpdSock; mutex m_mtxUpdSock; diff --git a/src/Shell/ShellSession.cpp b/src/Shell/ShellSession.cpp index 1147d730..80f7300e 100644 --- a/src/Shell/ShellSession.cpp +++ b/src/Shell/ShellSession.cpp @@ -38,14 +38,14 @@ namespace Shell { ShellSession::ShellSession(const std::shared_ptr &_th, const Socket::Ptr &_sock) : - TcpLimitedSession(_th, _sock) { + TcpSession(_th, _sock) { pleaseInputUser(); } ShellSession::~ShellSession() { } -void ShellSession::onRecv(const Socket::Buffer::Ptr&buf) { +void ShellSession::onRecv(const Buffer::Ptr&buf) { //DebugL << hexdump(buf->data(), buf->size()); GET_CONFIG_AND_REGISTER(uint32_t,maxReqSize,Config::Shell::kMaxReqSize); if (m_strRecvBuf.size() + buf->size() >= maxReqSize) { diff --git a/src/Shell/ShellSession.h b/src/Shell/ShellSession.h index 75247a1c..dc3edb77 100644 --- a/src/Shell/ShellSession.h +++ b/src/Shell/ShellSession.h @@ -30,7 +30,7 @@ #include #include "Common/config.h" #include "Util/TimeTicker.h" -#include "Network/TcpLimitedSession.h" +#include "Network/TcpSession.h" using namespace std; using namespace ZL::Util; @@ -39,12 +39,12 @@ using namespace ZL::Network; namespace ZL { namespace Shell { -class ShellSession: public TcpLimitedSession { +class ShellSession: public TcpSession { public: ShellSession(const std::shared_ptr &_th, const Socket::Ptr &_sock); virtual ~ShellSession(); - void onRecv(const Socket::Buffer::Ptr &) override; + void onRecv(const Buffer::Ptr &) override; void onError(const SockException &err) override {}; void onManager() override; diff --git a/tests/test_httpApi.cpp b/tests/test_httpApi.cpp index c3e7e87c..f9418dfc 100644 --- a/tests/test_httpApi.cpp +++ b/tests/test_httpApi.cpp @@ -112,13 +112,13 @@ int main(int argc,char *argv[]){ #endif //ENABLE_OPENSSL //开启http服务器 - TcpServer::Ptr httpSrv(new TcpServer()); - httpSrv->start(mINI::Instance()[Config::Http::kPort]);//默认80 + TcpServer::Ptr httpSrv(new TcpServer()); + httpSrv->start(mINI::Instance()[Config::Http::kPort]);//默认80 #ifdef ENABLE_OPENSSL //如果支持ssl,还可以开启https服务器 - TcpServer::Ptr httpsSrv(new TcpServer()); - httpsSrv->start(mINI::Instance()[Config::Http::kSSLPort]);//默认443 + TcpServer::Ptr httpsSrv(new TcpServer()); + httpsSrv->start(mINI::Instance()[Config::Http::kSSLPort]);//默认443 #endif //ENABLE_OPENSSL InfoL << "你可以在浏览器输入:http://127.0.0.1/api/my_api?key0=val0&key1=参数1" << endl; diff --git a/tests/test_server.cpp b/tests/test_server.cpp index f36aabff..d0a57d23 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -195,47 +195,48 @@ int main(int argc,char *argv[]) { //简单的telnet服务器,可用于服务器调试,但是不能使用23端口,否则telnet上了莫名其妙的现象 //测试方法:telnet 127.0.0.1 9000 - TcpServer::Ptr shellSrv(new TcpServer()); - TcpServer::Ptr rtspSrv(new TcpServer()); - TcpServer::Ptr rtmpSrv(new TcpServer()); - TcpServer::Ptr httpSrv(new TcpServer()); - shellSrv->start(shellPort); - rtspSrv->start(rtspPort);//默认554 - rtmpSrv->start(rtmpPort);//默认1935 - httpSrv->start(httpPort);//默认80 + TcpServer::Ptr shellSrv(new TcpServer()); + TcpServer::Ptr rtspSrv(new TcpServer()); + TcpServer::Ptr rtmpSrv(new TcpServer()); + TcpServer::Ptr httpSrv(new TcpServer()); + + shellSrv->start(shellPort); + rtspSrv->start(rtspPort);//默认554 + rtmpSrv->start(rtmpPort);//默认1935 + httpSrv->start(httpPort);//默认80 #ifdef ENABLE_OPENSSL //如果支持ssl,还可以开启https服务器 - TcpServer::Ptr httpsSrv(new TcpServer()); - httpsSrv->start(httpsPort);//默认443 + TcpServer::Ptr httpsSrv(new TcpServer()); + httpsSrv->start(httpsPort);//默认443 #endif //ENABLE_OPENSSL NoticeCenter::Instance().addListener(ReloadConfigTag,Config::Broadcast::kBroadcastReloadConfig,[&](BroadcastReloadConfigArgs){ //重新创建服务器 if(shellPort != mINI::Instance()[Config::Shell::kPort].as()){ shellPort = mINI::Instance()[Config::Shell::kPort]; - shellSrv->start(shellPort); + shellSrv->start(shellPort); InfoL << "重启shell服务器:" << shellPort; } if(rtspPort != mINI::Instance()[Config::Rtsp::kPort].as()){ rtspPort = mINI::Instance()[Config::Rtsp::kPort]; - rtspSrv->start(rtspPort); + rtspSrv->start(rtspPort); InfoL << "重启rtsp服务器" << rtspPort; } if(rtmpPort != mINI::Instance()[Config::Rtmp::kPort].as()){ rtmpPort = mINI::Instance()[Config::Rtmp::kPort]; - rtmpSrv->start(rtmpPort); + rtmpSrv->start(rtmpPort); InfoL << "重启rtmp服务器" << rtmpPort; } if(httpPort != mINI::Instance()[Config::Http::kPort].as()){ httpPort = mINI::Instance()[Config::Http::kPort]; - httpSrv->start(httpPort); + httpSrv->start(httpPort); InfoL << "重启http服务器" << httpPort; } #ifdef ENABLE_OPENSSL if(httpsPort != mINI::Instance()[Config::Http::kSSLPort].as()){ httpsPort = mINI::Instance()[Config::Http::kSSLPort]; - httpsSrv->start(httpsPort); + httpsSrv->start(httpsPort); InfoL << "重启https服务器" << httpsPort; } #endif //ENABLE_OPENSSL