优化性能

This commit is contained in:
771730766@qq.com 2018-01-30 11:23:57 +08:00
parent 28b8e8e09f
commit 8c50aa6c66
10 changed files with 46 additions and 26 deletions

View File

@ -87,6 +87,14 @@ if(ZLTOOLKIT_FOUND)
list(APPEND LINK_LIB_LIST ${ZLTOOLKIT_LIBRARIES}) list(APPEND LINK_LIB_LIST ${ZLTOOLKIT_LIBRARIES})
endif() endif()
#ZLToolKit
find_package(JEMALLOC QUIET)
if(JEMALLOC_FOUND)
message(STATUS "找到JEMALLOC库:\"${JEMALLOC_INCLUDE_DIR}\"")
include_directories(${JEMALLOC_INCLUDE_DIR})
list(APPEND LINK_LIB_LIST ${JEMALLOC_LIBRARIES})
endif()
# #
message(STATUS "将链接依赖库:${LINK_LIB_LIST}") message(STATUS "将链接依赖库:${LINK_LIB_LIST}")
#RTSP/RTMP,HLS #RTSP/RTMP,HLS

View File

@ -108,11 +108,10 @@ private:
void onSendRawData(const char *pcRawData, int iSize) override { void onSendRawData(const char *pcRawData, int iSize) override {
send(pcRawData, iSize); send(pcRawData, iSize);
} }
void onSendRawData(string &&strData) override { void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{
send(std::move(strData)); m_pSock->send(buffer,flags);
} }
template<typename FUN> template<typename FUN>
inline void addOnResultCB(const FUN &fun) { inline void addOnResultCB(const FUN &fun) {
lock_guard<recursive_mutex> lck(m_mtxOnResultCB); lock_guard<recursive_mutex> lck(m_mtxOnResultCB);

View File

@ -187,7 +187,7 @@ void RtmpProtocol::sendRequest(int iCmd, const string& str) {
} }
void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId,
const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) { const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId , bool msg_more) {
if (iChunkId < 2 || iChunkId > 63) { if (iChunkId < 2 || iChunkId > 63) {
auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl; auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl;
throw std::runtime_error(strErr); throw std::runtime_error(strErr);
@ -200,8 +200,15 @@ void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId,
set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp); set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp);
set_be24(header.bodySize, strBuf.size()); set_be24(header.bodySize, strBuf.size());
set_le32(header.streamId, ui32StreamId); set_le32(header.streamId, ui32StreamId);
std::string strSend;
strSend.append((char *) &header, sizeof(header)); //估算rtmp包数据大小
uint32_t capacity = ((bExtStamp ? 5 : 1) * ((strBuf.size() / m_iChunkLenOut))) + strBuf.size() + sizeof(header) + 32;
uint32_t totalSize = 0;
Socket::BufferRaw::Ptr buffer = m_bufferPool.obtain();
buffer->setCapacity(capacity);
memcpy(buffer->data() + totalSize,(char *) &header, sizeof(header));
totalSize += sizeof(header);
char acExtStamp[4]; char acExtStamp[4];
if (bExtStamp) { if (bExtStamp) {
//扩展时间戳 //扩展时间戳
@ -211,18 +218,22 @@ void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId,
while (pos < strBuf.size()) { while (pos < strBuf.size()) {
if (pos) { if (pos) {
uint8_t flags = (iChunkId & 0x3f) | (3 << 6); uint8_t flags = (iChunkId & 0x3f) | (3 << 6);
strSend += char(flags); memcpy(buffer->data() + totalSize,&flags, 1);
totalSize += 1;
} }
if (bExtStamp) { if (bExtStamp) {
//扩展时间戳 //扩展时间戳
strSend.append(acExtStamp, 4); memcpy(buffer->data() + totalSize,acExtStamp, 4);
totalSize += 4;
} }
size_t chunk = min(m_iChunkLenOut, strBuf.size() - pos); size_t chunk = min(m_iChunkLenOut, strBuf.size() - pos);
strSend.append(strBuf, pos, chunk); memcpy(buffer->data() + totalSize,strBuf.data() + pos, chunk);
totalSize += chunk;
pos += chunk; pos += chunk;
} }
onSendRawData(std::move(strSend)); buffer->setSize(totalSize);
m_ui32ByteSent += strSend.size(); onSendRawData(buffer,msg_more ? SOCKET_DEFAULE_FLAGS : (SOCKET_DEFAULE_FLAGS | FLAG_MORE));
m_ui32ByteSent += totalSize;
if (m_ui32WinSize > 0 && m_ui32ByteSent - m_ui32LastSent >= m_ui32WinSize) { if (m_ui32WinSize > 0 && m_ui32ByteSent - m_ui32LastSent >= m_ui32WinSize) {
m_ui32LastSent = m_ui32ByteSent; m_ui32LastSent = m_ui32ByteSent;
sendAcknowledgement(m_ui32ByteSent); sendAcknowledgement(m_ui32ByteSent);

View File

@ -36,6 +36,7 @@
#include "Util/logger.h" #include "Util/logger.h"
#include "Util/TimeTicker.h" #include "Util/TimeTicker.h"
#include "Network/Socket.h" #include "Network/Socket.h"
#include "Util/ResourcePool.h"
using namespace std; using namespace std;
using namespace ZL::Util; using namespace ZL::Util;
@ -54,9 +55,7 @@ public:
void reset(); void reset();
protected: protected:
virtual void onSendRawData(const char *pcRawData,int iSize) = 0; virtual void onSendRawData(const char *pcRawData,int iSize) = 0;
virtual void onSendRawData(string &&strData) { virtual void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) = 0;
onSendRawData(strData.data(),strData.size());
};
virtual void onRtmpChunk(RtmpPacket &chunkData) = 0; virtual void onRtmpChunk(RtmpPacket &chunkData) = 0;
@ -79,13 +78,14 @@ protected:
void sendInvoke(const string &strCmd, const AMFValue &val); void sendInvoke(const string &strCmd, const AMFValue &val);
void sendRequest(int iCmd, const string &str); void sendRequest(int iCmd, const string &str);
void sendResponse(int iType, const string &str); void sendResponse(int iType, const string &str);
void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID); void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID,bool msg_more = false);
protected: protected:
int m_iReqID = 0; int m_iReqID = 0;
uint32_t m_ui32StreamId = STREAM_CONTROL; uint32_t m_ui32StreamId = STREAM_CONTROL;
int m_iNowStreamID = 0; int m_iNowStreamID = 0;
int m_iNowChunkID = 0; int m_iNowChunkID = 0;
bool m_bDataStarted = false; bool m_bDataStarted = false;
ResourcePool<Socket::BufferRaw,MAX_SEND_PKT> m_bufferPool;
private: private:
void handle_S0S1S2(const function<void()> &cb); void handle_S0S1S2(const function<void()> &cb);
void handle_C0C1(); void handle_C0C1();

View File

@ -203,7 +203,7 @@ inline void RtmpPusher::send_metaData(){
sendRequest(MSG_DATA, enc.data()); sendRequest(MSG_DATA, enc.data());
src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
sendRtmp(pkt->typeId, m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId); sendRtmp(pkt->typeId, m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId , true);
}); });
m_pRtmpReader = src->getRing()->attach(); m_pRtmpReader = src->getRing()->attach();
@ -213,7 +213,7 @@ inline void RtmpPusher::send_metaData(){
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->sendRtmp(pkt->typeId, strongSelf->m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId); strongSelf->sendRtmp(pkt->typeId, strongSelf->m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId , true);
}); });
m_pRtmpReader->setDetachCB([weakSelf](){ m_pRtmpReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();

View File

@ -65,8 +65,8 @@ protected:
void onSendRawData(const char *pcRawData, int iSize) override { void onSendRawData(const char *pcRawData, int iSize) override {
send(pcRawData, iSize); send(pcRawData, iSize);
} }
void onSendRawData(string &&strData) override { void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{
send(std::move(strData)); m_pSock->send(buffer,flags);
} }
private: private:
void init(const RtmpMediaSource::Ptr &src); void init(const RtmpMediaSource::Ptr &src);

View File

@ -91,7 +91,7 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) {
amfVer = objectEncoding.as_number(); amfVer = objectEncoding.as_number();
} }
///////////set chunk size//////////////// ///////////set chunk size////////////////
sendChunkSize(4096); sendChunkSize(60000);
////////////window Acknowledgement size///// ////////////window Acknowledgement size/////
sendAcknowledgementSize(5000000); sendAcknowledgementSize(5000000);
///////////set peerBandwidth//////////////// ///////////set peerBandwidth////////////////
@ -214,6 +214,7 @@ void RtmpSession::doPlay(){
m_pRingReader = src->getRing()->attach(); m_pRingReader = src->getRing()->attach();
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
SockUtil::setNoDelay(sock->rawFD(), false);
m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
@ -377,7 +378,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
CLEAR_ARR(m_aui32FirstStamp); CLEAR_ARR(m_aui32FirstStamp);
modifiedStamp = 0; modifiedStamp = 0;
} }
sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId); sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId , true);
} }
} /* namespace Rtmp */ } /* namespace Rtmp */

View File

@ -84,8 +84,8 @@ private:
void onSendRawData(const char *pcRawData,int iSize) override{ void onSendRawData(const char *pcRawData,int iSize) override{
send(pcRawData, iSize); send(pcRawData, iSize);
} }
void onSendRawData(string &&strData) override{ void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{
send(std::move(strData)); sock->send(buffer,flags);
} }
void onRtmpChunk(RtmpPacket &chunkData) override; void onRtmpChunk(RtmpPacket &chunkData) override;

View File

@ -654,6 +654,7 @@ bool RtspSession::handleReq_Play() {
if(m_pRtpReader){ if(m_pRtpReader){
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
SockUtil::setNoDelay(m_pSender->rawFD(), false);
m_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) { m_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {

View File

@ -92,7 +92,7 @@ private:
return m_pSender->send(pcBuf, iSize); return m_pSender->send(pcBuf, iSize);
} }
int send(const Socket::Buffer::Ptr &pkt){ int send(const Socket::Buffer::Ptr &pkt){
return m_pSender->send(pkt); return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE);
} }
void shutdown() override; void shutdown() override;
bool handleReq_Options(); //处理options方法 bool handleReq_Options(); //处理options方法