From 8c50aa6c66b1475d82c0d32f7fca8b25dfc7c888 Mon Sep 17 00:00:00 2001 From: "771730766@qq.com" Date: Tue, 30 Jan 2018 11:23:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 8 ++++++++ src/Rtmp/RtmpPlayer.h | 7 +++---- src/Rtmp/RtmpProtocol.cpp | 27 +++++++++++++++++++-------- src/Rtmp/RtmpProtocol.h | 8 ++++---- src/Rtmp/RtmpPusher.cpp | 4 ++-- src/Rtmp/RtmpPusher.h | 4 ++-- src/Rtmp/RtmpSession.cpp | 7 ++++--- src/Rtmp/RtmpSession.h | 4 ++-- src/Rtsp/RtspSession.cpp | 1 + src/Rtsp/RtspSession.h | 2 +- 10 files changed, 46 insertions(+), 26 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5abfdc24..c5358938 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,6 +87,14 @@ if(ZLTOOLKIT_FOUND) list(APPEND LINK_LIB_LIST ${ZLTOOLKIT_LIBRARIES}) endif() +#查找ZLToolKit是否安装 +find_package(JEMALLOC QUIET) +if(JEMALLOC_FOUND) + message(STATUS "找到JEMALLOC库:\"${JEMALLOC_INCLUDE_DIR}\"") + include_directories(${JEMALLOC_INCLUDE_DIR}) + list(APPEND LINK_LIB_LIST ${JEMALLOC_LIBRARIES}) +endif() + #打印库文件 message(STATUS "将链接依赖库:${LINK_LIB_LIST}") #开启RTSP/RTMP之间的互相转换,开启HLS diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index 8a84223c..ff52fa2e 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -108,10 +108,9 @@ private: void onSendRawData(const char *pcRawData, int iSize) override { send(pcRawData, iSize); } - void onSendRawData(string &&strData) override { - send(std::move(strData)); - } - + void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{ + m_pSock->send(buffer,flags); + } template inline void addOnResultCB(const FUN &fun) { diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index 8273aa4c..23876fc8 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -187,7 +187,7 @@ void RtmpProtocol::sendRequest(int iCmd, const string& str) { } void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, - const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) { + const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId , bool msg_more) { if (iChunkId < 2 || iChunkId > 63) { auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl; throw std::runtime_error(strErr); @@ -200,8 +200,15 @@ void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp); set_be24(header.bodySize, strBuf.size()); set_le32(header.streamId, ui32StreamId); - std::string strSend; - strSend.append((char *) &header, sizeof(header)); + + //估算rtmp包数据大小 + uint32_t capacity = ((bExtStamp ? 5 : 1) * ((strBuf.size() / m_iChunkLenOut))) + strBuf.size() + sizeof(header) + 32; + uint32_t totalSize = 0; + Socket::BufferRaw::Ptr buffer = m_bufferPool.obtain(); + buffer->setCapacity(capacity); + memcpy(buffer->data() + totalSize,(char *) &header, sizeof(header)); + totalSize += sizeof(header); + char acExtStamp[4]; if (bExtStamp) { //扩展时间戳 @@ -211,18 +218,22 @@ void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, while (pos < strBuf.size()) { if (pos) { uint8_t flags = (iChunkId & 0x3f) | (3 << 6); - strSend += char(flags); + memcpy(buffer->data() + totalSize,&flags, 1); + totalSize += 1; } if (bExtStamp) { //扩展时间戳 - strSend.append(acExtStamp, 4); + memcpy(buffer->data() + totalSize,acExtStamp, 4); + totalSize += 4; } size_t chunk = min(m_iChunkLenOut, strBuf.size() - pos); - strSend.append(strBuf, pos, chunk); + memcpy(buffer->data() + totalSize,strBuf.data() + pos, chunk); + totalSize += chunk; pos += chunk; } - onSendRawData(std::move(strSend)); - m_ui32ByteSent += strSend.size(); + buffer->setSize(totalSize); + onSendRawData(buffer,msg_more ? SOCKET_DEFAULE_FLAGS : (SOCKET_DEFAULE_FLAGS | FLAG_MORE)); + m_ui32ByteSent += totalSize; if (m_ui32WinSize > 0 && m_ui32ByteSent - m_ui32LastSent >= m_ui32WinSize) { m_ui32LastSent = m_ui32ByteSent; sendAcknowledgement(m_ui32ByteSent); diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index 678bc45c..56d6a503 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -36,6 +36,7 @@ #include "Util/logger.h" #include "Util/TimeTicker.h" #include "Network/Socket.h" +#include "Util/ResourcePool.h" using namespace std; using namespace ZL::Util; @@ -54,9 +55,7 @@ public: void reset(); protected: virtual void onSendRawData(const char *pcRawData,int iSize) = 0; - virtual void onSendRawData(string &&strData) { - onSendRawData(strData.data(),strData.size()); - }; + virtual void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) = 0; virtual void onRtmpChunk(RtmpPacket &chunkData) = 0; @@ -79,13 +78,14 @@ protected: void sendInvoke(const string &strCmd, const AMFValue &val); void sendRequest(int iCmd, const string &str); void sendResponse(int iType, const string &str); - void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID); + void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID,bool msg_more = false); protected: int m_iReqID = 0; uint32_t m_ui32StreamId = STREAM_CONTROL; int m_iNowStreamID = 0; int m_iNowChunkID = 0; bool m_bDataStarted = false; + ResourcePool m_bufferPool; private: void handle_S0S1S2(const function &cb); void handle_C0C1(); diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index eef7dcca..b589356d 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -203,7 +203,7 @@ inline void RtmpPusher::send_metaData(){ sendRequest(MSG_DATA, enc.data()); src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - sendRtmp(pkt->typeId, m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId); + sendRtmp(pkt->typeId, m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId , true); }); m_pRtmpReader = src->getRing()->attach(); @@ -213,7 +213,7 @@ inline void RtmpPusher::send_metaData(){ if(!strongSelf) { return; } - strongSelf->sendRtmp(pkt->typeId, strongSelf->m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId); + strongSelf->sendRtmp(pkt->typeId, strongSelf->m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId , true); }); m_pRtmpReader->setDetachCB([weakSelf](){ auto strongSelf = weakSelf.lock(); diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index 4793d2a2..c779c0f1 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -65,8 +65,8 @@ protected: void onSendRawData(const char *pcRawData, int iSize) override { send(pcRawData, iSize); } - void onSendRawData(string &&strData) override { - send(std::move(strData)); + void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{ + m_pSock->send(buffer,flags); } private: void init(const RtmpMediaSource::Ptr &src); diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index d62cb32e..99852640 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -91,7 +91,7 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) { amfVer = objectEncoding.as_number(); } ///////////set chunk size//////////////// - sendChunkSize(4096); + sendChunkSize(60000); ////////////window Acknowledgement size///// sendAcknowledgementSize(5000000); ///////////set peerBandwidth//////////////// @@ -214,7 +214,8 @@ void RtmpSession::doPlay(){ m_pRingReader = src->getRing()->attach(); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ + SockUtil::setNoDelay(sock->rawFD(), false); + m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; @@ -377,7 +378,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { CLEAR_ARR(m_aui32FirstStamp); modifiedStamp = 0; } - sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId); + sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId , true); } } /* namespace Rtmp */ diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 3b679d4e..044bee82 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -84,8 +84,8 @@ private: void onSendRawData(const char *pcRawData,int iSize) override{ send(pcRawData, iSize); } - void onSendRawData(string &&strData) override{ - send(std::move(strData)); + void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{ + sock->send(buffer,flags); } void onRtmpChunk(RtmpPacket &chunkData) override; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index e470eb2d..528e8c6e 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -654,6 +654,7 @@ bool RtspSession::handleReq_Play() { if(m_pRtpReader){ weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + SockUtil::setNoDelay(m_pSender->rawFD(), false); m_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) { auto strongSelf = weakSelf.lock(); if(!strongSelf) { diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 9be33962..424af0ca 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -92,7 +92,7 @@ private: return m_pSender->send(pcBuf, iSize); } int send(const Socket::Buffer::Ptr &pkt){ - return m_pSender->send(pkt); + return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE); } void shutdown() override; bool handleReq_Options(); //处理options方法