From 05e6d325760506a60633ce4a5d7be285f05269ac Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 26 Sep 2018 23:12:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84webSocket=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=20=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ZLToolKit | 2 +- src/Http/HttpClientImp.cpp | 26 +---- src/Http/HttpClientImp.h | 6 +- src/Http/HttpRequestSplitter.cpp | 15 +++ src/Http/HttpSession.cpp | 27 +---- src/Http/HttpSession.h | 185 ++++++++++++++++++++++++++++--- src/Http/HttpsSession.h | 20 +--- src/Rtmp/RtmpPlayer.h | 7 +- src/Rtmp/RtmpProtocol.cpp | 28 +++-- src/Rtmp/RtmpProtocol.h | 14 +-- src/Rtmp/RtmpPusher.cpp | 9 +- src/Rtmp/RtmpPusher.h | 12 +- src/Rtmp/RtmpSession.cpp | 8 +- src/Rtmp/RtmpSession.h | 8 +- src/Rtsp/RtpBroadCaster.cpp | 2 +- src/Rtsp/RtspSession.cpp | 40 ++++--- src/Rtsp/RtspSession.h | 14 +-- tests/test_httpClient.cpp | 7 +- 18 files changed, 275 insertions(+), 155 deletions(-) diff --git a/ZLToolKit b/ZLToolKit index 4916fec3..bb2ba900 160000 --- a/ZLToolKit +++ b/ZLToolKit @@ -1 +1 @@ -Subproject commit 4916fec31fcbe383ae4d38570b84858f6ec7747d +Subproject commit bb2ba9005b0191f709897bf51e5bb719eac856bb diff --git a/src/Http/HttpClientImp.cpp b/src/Http/HttpClientImp.cpp index f2cda890..32d74656 100644 --- a/src/Http/HttpClientImp.cpp +++ b/src/Http/HttpClientImp.cpp @@ -57,7 +57,7 @@ void HttpClientImp::sendRequest(const string& url,float fTimeOutSec) { #if defined(__GNUC__) && (__GNUC__ < 5) public_send(data,len); #else//defined(__GNUC__) && (__GNUC__ < 5) - HttpClient::send(data,len); + HttpClient::send(obtainBuffer(data,len)); #endif//defined(__GNUC__) && (__GNUC__ < 5) }); #endif //ENABLE_OPENSSL @@ -78,28 +78,12 @@ void HttpClientImp::onRecvBytes(const char* data, int size) { } } -int HttpClientImp::send(const string& str) { +int HttpClientImp::send(const Buffer::Ptr &buf) { if(_sslBox){ - _sslBox->onSend(str.data(),str.size()); - return str.size(); + _sslBox->onSend(buf->data(),buf->size()); + return buf->size(); } - return HttpClient::send(str); -} -int HttpClientImp::send(string &&str){ - if(_sslBox){ - _sslBox->onSend(str.data(),str.size()); - return str.size(); - } - return HttpClient::send(std::move(str)); -} - -int HttpClientImp::send(const char* str, int len) { - if(_sslBox){ - _sslBox->onSend(str,len); - return len; - } - return HttpClient::send(str,len); - + return HttpClient::send(buf); } #endif //ENABLE_OPENSSL diff --git a/src/Http/HttpClientImp.h b/src/Http/HttpClientImp.h index 654a6496..05a14894 100644 --- a/src/Http/HttpClientImp.h +++ b/src/Http/HttpClientImp.h @@ -48,15 +48,13 @@ public: HttpClient::onRecvBytes(data,len); } void public_send(const char *data, uint32_t len){ - HttpClient::send(data,len); + HttpClient::send(obtainBuffer(data,len)); } #endif //defined(__GNUC__) && (__GNUC__ < 5) private: #ifdef ENABLE_OPENSSL virtual void onRecvBytes(const char *data,int size) override; - virtual int send(const string &str) override; - virtual int send(string &&str) override; - virtual int send(const char *str, int len) override; + virtual int send(const Buffer::Ptr &buf) override; std::shared_ptr _sslBox; #endif //ENABLE_OPENSSL }; diff --git a/src/Http/HttpRequestSplitter.cpp b/src/Http/HttpRequestSplitter.cpp index 10375789..4ae9e50b 100644 --- a/src/Http/HttpRequestSplitter.cpp +++ b/src/Http/HttpRequestSplitter.cpp @@ -43,6 +43,16 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { splitPacket: + /*确保ptr最后一个字节是0,防止strstr越界 + *由于ZLToolKit确保内存最后一个字节是保留未使用字节并置0, + *所以此处可以不用再次置0 + *但是上层数据可能来自其他渠道,保险起见还是置0 + */ + + char &tail_ref = ((char *) ptr)[len]; + char tail_tmp = tail_ref; + tail_ref = 0; + //数据按照请求头处理 const char *index = nullptr; while (_content_len == 0 && (index = strstr(ptr,"\r\n\r\n")) != nullptr) { @@ -51,6 +61,11 @@ splitPacket: ptr = index + 4; } + /* + * 恢复末尾字节 + */ + tail_ref = tail_tmp; + uint64_t remain = len - (ptr - data); if(remain <= 0){ //没有剩余数据,清空缓存 diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 0e3d961d..180bbb59 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -49,7 +49,7 @@ using namespace ZL::Util; namespace ZL { namespace Http { -static int sock_flags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; +static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; string dateStr() { char buf[64]; @@ -235,7 +235,7 @@ inline bool HttpSession::checkLiveFlvStream(){ //开始发送rtmp负载 //关闭tcp_nodelay ,优化性能 SockUtil::setNoDelay(_sock->rawFD(),false); - (*this) << SocketFlags(sock_flags); + (*this) << SocketFlags(kSockFlags); try{ start(mediaSrc); @@ -425,29 +425,10 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) { //关闭tcp_nodelay ,优化性能 SockUtil::setNoDelay(_sock->rawFD(),false); //设置MSG_MORE,优化性能 - (*this) << SocketFlags(sock_flags); - - //后台线程执行onFlush - auto onFlushWrapper = [onFlush,weakSelf](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - return false; - } - strongSelf->async([onFlush,weakSelf](){ - //在后台线程完成文件读取,释放主线程性能 - if(!onFlush()){ - //如果onFlush返回false,则说明不再监听flush事件 - auto strongSelf = weakSelf.lock(); - if(strongSelf){ - strongSelf->_sock->setOnFlush(nullptr); - } - } - }); - return true; - }; + (*this) << SocketFlags(kSockFlags); onFlush(); - _sock->setOnFlush(onFlushWrapper); + _sock->setOnFlush(onFlush); return true; } diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 1ff32946..efc7df13 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -126,34 +126,185 @@ private: const string &contentOut); }; + /** - * 回显WebSocket会话 + * 通过该模板类可以透明化WebSocket协议, + * 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等 + * @tparam SessionType 业务协议的TcpSession类 */ -class EchoWebSocketSession : public HttpSession { +template +class WebSocketSession : public HttpSession { public: - EchoWebSocketSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : HttpSession(pTh,pSock){}; - virtual ~EchoWebSocketSession(){}; + WebSocketSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : HttpSession(pTh,pSock){ + _session = std::make_shared(pTh,pSock); + } + virtual ~WebSocketSession(){}; + + //收到eof或其他导致脱离TcpServer事件的回调 + void onError(const SockException &err) override{ + HttpSession::onError(err); + _session->onError(err); + } + //每隔一段时间触发,用来做超时管理 + void onManager() override{ + HttpSession::onManager(); + _session->onManager(); + } + //在创建TcpSession后,TcpServer会把自身的配置参数通过该函数传递给TcpSession + void attachServer(const TcpServer &server) override{ + HttpSession::attachServer(server); + _session->attachServer(server); + + //此处截取数据并进行websocket协议打包 + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + bool mask_flag = strongSelf->_mask_flag; + strongSelf->_mask_flag = false; + strongSelf->WebSocketSplitter::encode((uint8_t *)buf->data(),buf->size()); + strongSelf->_mask_flag = mask_flag; + } + return buf->size(); + }); + + } + //作为该TcpSession的唯一标识符 + string getIdentifier() const override{ + return _session->getIdentifier(); + } protected: + /** + * 开始收到一个webSocket数据包 + * @param packet + */ void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{ - DebugL << packet._playload_len; - }; + //新包,原来的包残余数据清空掉 + _remian_data.clear(); + } + + /** + * 收到websocket数据包负载 + * @param packet + * @param ptr + * @param len + * @param recved + */ void onWebSocketDecodePlayload(const WebSocketHeader &packet,const uint8_t *ptr,uint64_t len,uint64_t recved) override { - DebugL << string((char *)ptr,len) << " " << recved; - - //webSocket服务器不允许对数据进行掩码加密 - bool mask_flag = _mask_flag; - _mask_flag = false; - WebSocketSplitter::encode((uint8_t *)ptr,len); - _mask_flag = mask_flag; - - }; + if(packet._playload_len == recved){ + //收到完整的包 + if(_remian_data.empty()){ + onRecvWholePacket((char *)ptr,len); + }else{ + _remian_data.append((char *)ptr,len); + onRecvWholePacket(_remian_data); + _remian_data.clear(); + } + } else { + //部分数据 + _remian_data.append((char *)ptr,len); + } + } + /** + * 发送数据进行websocket协议打包后回调 + * @param ptr + * @param len + */ void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override{ - send((char *)ptr,len); - }; + _session->realSend(_session->obtainBuffer((char *)ptr,len)); + } + /** + * 收到一个完整的websock数据包 + * @param data + * @param len + */ + void onRecvWholePacket(const char *data,uint64_t len){ + BufferRaw::Ptr buffer = _session->obtainBuffer(data,len); + _session->onRecv(buffer); + } + + /** + * 收到一个完整的websock数据包 + * @param str + */ + void onRecvWholePacket(const string &str){ + BufferString::Ptr buffer = std::make_shared(str); + _session->onRecv(buffer); + } + +private: + typedef function onBeforeSendCB; + /** + * 该类实现了TcpSession派生类发送数据的截取 + * 目的是发送业务数据前进行websocket协议的打包 + */ + class SessionImp : public SessionType{ + public: + SessionImp(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : SessionType(pTh,pSock){}; + ~SessionImp(){} + + /** + * 截取到数据后,再进行webSocket协议打包 + * 然后真正的发送数据到socket + * @param buf 数据 + * @return 数据字节数 + */ + int realSend(const Buffer::Ptr &buf){ + return SessionType::send(buf); + } + + /** + * 设置发送数据截取回调函数 + * @param cb 截取回调函数 + */ + void setOnBeforeSendCB(const onBeforeSendCB &cb){ + _beforeSendCB = cb; + } + protected: + /** + * 重载send函数截取数据 + * @param buf 需要截取的数据 + * @return 数据字节数 + */ + int send(const Buffer::Ptr &buf) override { + if(_beforeSendCB){ + return _beforeSendCB(buf); + } + return SessionType::send(buf); + } + private: + onBeforeSendCB _beforeSendCB; + }; +private: + std::shared_ptr _session; + string _remian_data; }; +/** + * 回显会话 + */ +class EchoSession : public TcpSession { +public: + EchoSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : TcpSession(pTh,pSock){}; + virtual ~EchoSession(){}; + + void onRecv(const Buffer::Ptr &buffer) override { + send(buffer); + } + void onError(const SockException &err) override{ + WarnL << err.what(); + } + //每隔一段时间触发,用来做超时管理 + void onManager() override{ + DebugL; + } +}; + + +typedef WebSocketSession EchoWebSocketSession; + } /* namespace Http */ } /* namespace ZL */ diff --git a/src/Http/HttpsSession.h b/src/Http/HttpsSession.h index 122affd3..0a131643 100644 --- a/src/Http/HttpsSession.h +++ b/src/Http/HttpsSession.h @@ -44,7 +44,7 @@ public: #if defined(__GNUC__) && (__GNUC__ < 5) public_send(data,len); #else//defined(__GNUC__) && (__GNUC__ < 5) - HttpSession::send(data,len); + HttpSession::send(obtainBuffer(data,len)); #endif//defined(__GNUC__) && (__GNUC__ < 5) }); m_sslBox.setOnDecData([&](const char *data, uint32_t len){ @@ -64,27 +64,17 @@ public: } #if defined(__GNUC__) && (__GNUC__ < 5) int public_send(const char *data, uint32_t len){ - return HttpSession::send(data,len); + return HttpSession::send(obtainBuffer(data,len)); } void public_onRecv(const char *data, uint32_t len){ HttpSession::onRecv(data,len); } #endif//defined(__GNUC__) && (__GNUC__ < 5) private: - virtual int send(const string &buf) override{ + virtual int send(const Buffer::Ptr &buf) override{ TimeTicker(); - m_sslBox.onSend(buf.data(), buf.size()); - return buf.size(); - } - virtual int send(string &&buf) override{ - TimeTicker(); - m_sslBox.onSend(buf.data(), buf.size()); - return buf.size(); - } - virtual int send(const char *buf, int size) override{ - TimeTicker(); - m_sslBox.onSend(buf, size); - return size; + m_sslBox.onSend(buf->data(), buf->size()); + return buf->size(); } SSL_Box m_sslBox; }; diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index 5628b840..c58b9113 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -105,11 +105,8 @@ private: //fro RtmpProtocol void onRtmpChunk(RtmpPacket &chunkData) override; void onStreamDry(uint32_t ui32StreamId) override; - void onSendRawData(const char *pcRawData, int iSize) override { - send(pcRawData, iSize); - } - void onSendRawData(const Buffer::Ptr &buffer,int flags) override{ - _sock->send(buffer,flags); + void onSendRawData(const Buffer::Ptr &buffer) override{ + send(buffer); } template diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index edc07ae2..6daae921 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -190,7 +190,7 @@ void RtmpProtocol::sendRequest(int iCmd, const string& str) { } void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, - const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId , bool msg_more) { + const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) { if (iChunkId < 2 || iChunkId > 63) { auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl; throw std::runtime_error(strErr); @@ -235,7 +235,7 @@ void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, pos += chunk; } buffer->setSize(totalSize); - onSendRawData(buffer,msg_more ? SOCKET_DEFAULE_FLAGS : (SOCKET_DEFAULE_FLAGS | FLAG_MORE)); + onSendRawData(buffer); m_ui32ByteSent += totalSize; if (m_ui32WinSize > 0 && m_ui32ByteSent - m_ui32LastSent >= m_ui32WinSize) { m_ui32LastSent = m_ui32ByteSent; @@ -253,9 +253,9 @@ void RtmpProtocol::onParseRtmp(const char *pcRawData, int iSize) { void RtmpProtocol::startClientSession(const function &callBack) { //发送 C0C1 char handshake_head = HANDSHAKE_PLAINTEXT; - onSendRawData(&handshake_head, 1); + onSendRawData(obtainBuffer(&handshake_head, 1)); RtmpHandshake c1(0); - onSendRawData((char *) (&c1), sizeof(c1)); + onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1))); m_nextHandle = [this,callBack]() { //等待 S0+S1+S2 handle_S0S1S2(callBack); @@ -271,7 +271,7 @@ void RtmpProtocol::handle_S0S1S2(const function &callBack) { } //发送 C2 const char *pcC2 = m_strRcvBuf.data() + 1; - onSendRawData(pcC2, C1_HANDSHARK_SIZE); + onSendRawData(obtainBuffer(pcC2, C1_HANDSHARK_SIZE)); m_strRcvBuf.erase(0, 1 + 2 * C1_HANDSHARK_SIZE); //握手结束 m_nextHandle = [this]() { @@ -306,12 +306,12 @@ void RtmpProtocol::handle_C0C1() { void RtmpProtocol::handle_C1_simple(){ //发送S0 char handshake_head = HANDSHAKE_PLAINTEXT; - onSendRawData(&handshake_head, 1); + onSendRawData(obtainBuffer(&handshake_head, 1)); //发送S1 RtmpHandshake s1(0); - onSendRawData((char *) &s1, C1_HANDSHARK_SIZE); + onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE)); //发送S2 - onSendRawData(m_strRcvBuf.c_str() + 1, C1_HANDSHARK_SIZE); + onSendRawData(obtainBuffer(m_strRcvBuf.c_str() + 1, C1_HANDSHARK_SIZE)); //等待C2 m_nextHandle = [this]() { handle_C2(); @@ -433,7 +433,7 @@ void RtmpProtocol::send_complex_S0S1S2(int schemeType,const string &digest){ //S1S2计算参考自:https://github.com/hitYangfei/golang/blob/master/rtmpserver.go //发送S0 char handshake_head = HANDSHAKE_PLAINTEXT; - onSendRawData(&handshake_head, 1); + onSendRawData(obtainBuffer(&handshake_head, 1)); //S1 RtmpHandshake s1(0); memcpy(s1.zero,"\x04\x05\x00\x01",4); @@ -460,7 +460,7 @@ void RtmpProtocol::send_complex_S0S1S2(int schemeType,const string &digest){ s1_joined.erase(digestPos - s1_start,C1_DIGEST_SIZE); string s1_digest = openssl_HMACsha256(FMSKey,S1_FMS_KEY_SIZE,s1_joined.data(),s1_joined.size()); memcpy(digestPos,s1_digest.data(),s1_digest.size()); - onSendRawData((char *) &s1, sizeof(s1)); + onSendRawData(obtainBuffer((char *) &s1, sizeof(s1))); //S2 string s2_key = openssl_HMACsha256(FMSKey,S2_FMS_KEY_SIZE,digest.data(),digest.size()); @@ -468,7 +468,7 @@ void RtmpProtocol::send_complex_S0S1S2(int schemeType,const string &digest){ s2.random_generate((char *)&s2,8); string s2_digest = openssl_HMACsha256(s2_key.data(),s2_key.size(),&s2,sizeof(s2) - C1_DIGEST_SIZE); memcpy((char *)&s2 + C1_HANDSHARK_SIZE - C1_DIGEST_SIZE,s2_digest.data(),C1_DIGEST_SIZE); - onSendRawData((char *)&s2, sizeof(s2)); + onSendRawData(obtainBuffer((char *)&s2, sizeof(s2))); //等待C2 m_nextHandle = [this]() { handle_C2(); @@ -691,5 +691,11 @@ BufferRaw::Ptr RtmpProtocol::obtainBuffer() { return std::make_shared() ;//_bufferPool.obtain(); } +BufferRaw::Ptr RtmpProtocol::obtainBuffer(const void *data, int len) { + auto buffer = obtainBuffer(); + buffer->assign((const char *)data,len); + return buffer; +} + } /* namespace Rtmp */ } /* namespace ZL */ diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index 88fb05ae..67d0edd3 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -54,11 +54,8 @@ public: void onParseRtmp(const char *pcRawData,int iSize); void reset(); protected: - virtual void onSendRawData(const char *pcRawData,int iSize) = 0; - virtual void onSendRawData(const Buffer::Ptr &buffer,int flags) = 0; - + virtual void onSendRawData(const Buffer::Ptr &buffer) = 0; virtual void onRtmpChunk(RtmpPacket &chunkData) = 0; - virtual void onStreamBegin(uint32_t ui32StreamId){ m_ui32StreamId = ui32StreamId; } @@ -78,19 +75,19 @@ protected: void sendInvoke(const string &strCmd, const AMFValue &val); void sendRequest(int iCmd, const string &str); void sendResponse(int iType, const string &str); - void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID,bool msg_more = false); + void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID); protected: int m_iReqID = 0; uint32_t m_ui32StreamId = STREAM_CONTROL; int m_iNowStreamID = 0; int m_iNowChunkID = 0; bool m_bDataStarted = false; - BufferRaw::Ptr obtainBuffer(); - //ResourcePool m_bufferPool; + inline BufferRaw::Ptr obtainBuffer(); + inline BufferRaw::Ptr obtainBuffer(const void *data, int len); + //ResourcePool m_bufferPool; private: void handle_S0S1S2(const function &cb); void handle_C0C1(); - void handle_C1_simple(); #ifdef ENABLE_OPENSSL void handle_C1_complex(); @@ -104,6 +101,7 @@ private: void handle_rtmp(); void handle_rtmpChunk(RtmpPacket &chunkData); +private: ////////////ChunkSize//////////// size_t m_iChunkLenIn = DEFAULT_CHUNK_LEN; size_t m_iChunkLenOut = DEFAULT_CHUNK_LEN; diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 0c4753e4..c75716c2 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -35,6 +35,8 @@ using namespace ZL::Util; namespace ZL { namespace Rtmp { +static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; + unordered_map RtmpPusher::g_mapCmd; RtmpPusher::RtmpPusher(const char *strVhost,const char *strApp,const char *strStream) { auto src = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA,strVhost,strApp,strStream)); @@ -200,7 +202,7 @@ inline void RtmpPusher::send_metaData(){ sendRequest(MSG_DATA, enc.data()); src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - sendRtmp(pkt->typeId, m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId , true); + sendRtmp(pkt->typeId, m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId ); }); m_pRtmpReader = src->getRing()->attach(); @@ -210,7 +212,7 @@ inline void RtmpPusher::send_metaData(){ if(!strongSelf) { return; } - strongSelf->sendRtmp(pkt->typeId, strongSelf->m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId , true); + strongSelf->sendRtmp(pkt->typeId, strongSelf->m_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId); }); m_pRtmpReader->setDetachCB([weakSelf](){ auto strongSelf = weakSelf.lock(); @@ -220,6 +222,9 @@ inline void RtmpPusher::send_metaData(){ } }); onPublishResult(SockException(Err_success,"success")); + //提高发送性能 + (*this) << SocketFlags(kSockFlags); + SockUtil::setNoDelay(_sock->rawFD(),false); } void RtmpPusher::onCmd_result(AMFDecoder &dec){ auto iReqId = dec.load(); diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index b0ce3dcb..2ed1cce1 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -54,19 +54,15 @@ public: } protected: - - //for Tcpclient + //for Tcpclient override void onRecv(const Buffer::Ptr &pBuf) override; void onConnect(const SockException &err) override; void onErr(const SockException &ex) override; - //fro RtmpProtocol + //for RtmpProtocol override void onRtmpChunk(RtmpPacket &chunkData) override; - void onSendRawData(const char *pcRawData, int iSize) override { - send(pcRawData, iSize); - } - void onSendRawData(const Buffer::Ptr &buffer,int flags) override{ - _sock->send(buffer,flags); + void onSendRawData(const Buffer::Ptr &buffer) override{ + send(buffer); } private: void init(const RtmpMediaSource::Ptr &src); diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 9e43ed64..7eb80dec 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -32,6 +32,8 @@ namespace ZL { namespace Rtmp { +static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; + RtmpSession::RtmpSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : TcpSession(pTh, pSock) { DebugL << get_peer_ip(); @@ -366,6 +368,10 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar if (src->getRing()->readerCount() == 1) { src->seekTo(0); } + + //提高发送性能 + (*this) << SocketFlags(kSockFlags); + SockUtil::setNoDelay(_sock->rawFD(),false); } void RtmpSession::doPlay(AMFDecoder &dec){ @@ -539,7 +545,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { CLEAR_ARR(m_aui32FirstStamp); modifiedStamp = 0; } - sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId , true); + sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId); } void RtmpSession::doDelay(int delaySec, const std::function &fun) { diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index d05a6422..8cafe61d 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -69,13 +69,9 @@ private: void setMetaData(AMFDecoder &dec); void onSendMedia(const RtmpPacket::Ptr &pkt); - void onSendRawData(const char *pcRawData,int iSize) override{ - m_ui64TotalBytes += iSize; - send(pcRawData, iSize); - } - void onSendRawData(const Buffer::Ptr &buffer,int flags) override{ + void onSendRawData(const Buffer::Ptr &buffer) override{ m_ui64TotalBytes += buffer->size(); - _sock->send(buffer,flags); + send(buffer); } void onRtmpChunk(RtmpPacket &chunkData) override; diff --git a/src/Rtsp/RtpBroadCaster.cpp b/src/Rtsp/RtpBroadCaster.cpp index 56e90188..11d4e7c3 100644 --- a/src/Rtsp/RtpBroadCaster.cpp +++ b/src/Rtsp/RtpBroadCaster.cpp @@ -130,7 +130,7 @@ RtpBroadCaster::RtpBroadCaster(const string &strLocalIp,const string &strVhost,c auto &pSock = m_apUdpSock[i]; auto &peerAddr = m_aPeerUdpAddr[i]; BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); - pSock->send(buffer,SOCKET_DEFAULE_FLAGS,(struct sockaddr *)(&peerAddr)); + pSock->send(buffer,SOCKET_DEFAULE_FLAGS | FLAG_MORE,(struct sockaddr *)(&peerAddr)); }); m_pReader->setDetachCB([this](){ unordered_map m_mapDetach_copy; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 91baa0c9..9d2ff603 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -43,6 +43,8 @@ using namespace ZL::Network; namespace ZL { namespace Rtsp { +static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; + string dateHeader() { char buf[200]; time_t tt = time(NULL); @@ -210,7 +212,7 @@ bool RtspSession::handleReq_Options() { m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); return true; } @@ -277,7 +279,7 @@ void RtspSession::onAuthSuccess(const weak_ptr &weakSelf) { RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data(), strongSelf->m_strUrl.data(), (int) strongSelf->m_strSdp.length(), strongSelf->m_strSdp.data()); - strongSelf->send(response, n); + strongSelf->SocketHelper::send(response, n); }); } void RtspSession::onAuthFailed(const weak_ptr &weakSelf,const string &realm) { @@ -320,7 +322,7 @@ void RtspSession::onAuthFailed(const weak_ptr &weakSelf,const strin RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data(), realm.data()); } - strongSelf->send(response, n); + strongSelf->SocketHelper::send(response, n); }); } @@ -471,7 +473,7 @@ inline void RtspSession::send_StreamNotFound() { m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); } inline void RtspSession::send_UnsupportedTransport() { int n = sprintf(m_pcBuf, "RTSP/1.0 461 Unsupported Transport\r\n" @@ -482,7 +484,7 @@ inline void RtspSession::send_UnsupportedTransport() { m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); } inline void RtspSession::send_SessionNotFound() { @@ -494,7 +496,7 @@ inline void RtspSession::send_SessionNotFound() { m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); /*40 Method Not Allowed*/ @@ -562,7 +564,7 @@ bool RtspSession::handleReq_Setup() { trackRef.type * 2 + 1, printSSRC(trackRef.ssrc).data(), m_strSession.data()); - send(m_pcBuf, iLen); + SocketHelper::send(m_pcBuf, iLen); } break; case PlayerBase::RTP_UDP: { @@ -607,7 +609,7 @@ bool RtspSession::handleReq_Setup() { pSockRtp->get_local_port(), pSockRtcp->get_local_port(), printSSRC(trackRef.ssrc).data(), m_strSession.data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); } break; case PlayerBase::RTP_MULTICAST: { @@ -650,7 +652,7 @@ bool RtspSession::handleReq_Setup() { get_local_ip().data(), iSrvPort, pSockRtcp->get_local_port(), udpTTL,printSSRC(trackRef.ssrc).data(), m_strSession.data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); } break; default: @@ -685,7 +687,7 @@ bool RtspSession::handleReq_Play() { m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data(),(int)err.size(),err.data()); - send(m_pcBuf,n); + SocketHelper::send(m_pcBuf,n); shutdown(); return; } @@ -757,7 +759,11 @@ bool RtspSession::handleReq_Play() { iLen -= 1; (m_pcBuf)[iLen] = '\0'; iLen += sprintf(m_pcBuf + iLen, "\r\n\r\n"); - send(m_pcBuf, iLen); + SocketHelper::send(m_pcBuf, iLen); + + //提高发送性能 + (*this) << SocketFlags(kSockFlags); + SockUtil::setNoDelay(m_pSender->rawFD(),false); }; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); @@ -793,7 +799,7 @@ bool RtspSession::handleReq_Pause() { "%s" "Session: %s\r\n\r\n", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data(), m_strSession.data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); if(m_pRtpReader){ m_pRtpReader->setReadCB(nullptr); } @@ -809,7 +815,7 @@ bool RtspSession::handleReq_Teardown() { "Session: %s\r\n\r\n", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data(), m_strSession.data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); TraceL << "播放器断开连接!"; return false; } @@ -827,7 +833,7 @@ bool RtspSession::handleReq_Get() { lock_guard lock(g_mtxGetter); g_mapGetter[m_strSessionCookie] = dynamic_pointer_cast(shared_from_this()); //InfoL << m_strSessionCookie; - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); return true; } @@ -862,7 +868,7 @@ bool RtspSession::handleReq_SET_PARAMETER() { "%s" "Session: %s\r\n\r\n", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data(), m_strSession.data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); return true; } @@ -873,7 +879,7 @@ inline void RtspSession::send_NotAcceptable() { "%s" "Connection: Close\r\n\r\n", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, dateHeader().data()); - send(m_pcBuf, n); + SocketHelper::send(m_pcBuf, n); } @@ -939,7 +945,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { } BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); m_ui64TotalBytes += buffer->size(); - pSock->send(buffer,SOCKET_DEFAULE_FLAGS, peerAddr.get()); + pSock->send(buffer,kSockFlags, peerAddr.get()); } break; default: diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index b5d3bfe3..8b08060a 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -89,21 +89,9 @@ protected: int64_t onRecvHeader(const char *data,uint64_t len) override ; private: void inputRtspOrRtcp(const char *data,uint64_t len); - int send(const string &strBuf) override { - m_ui64TotalBytes += strBuf.size(); - return m_pSender->send(strBuf); - } - int send(string &&strBuf) override { - m_ui64TotalBytes += strBuf.size(); - return m_pSender->send(std::move(strBuf)); - } - int send(const char *pcBuf, int iSize) override { - m_ui64TotalBytes += iSize; - return m_pSender->send(pcBuf, iSize); - } int send(const Buffer::Ptr &pkt) override{ m_ui64TotalBytes += pkt->size(); - return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE); + return m_pSender->send(pkt,_flags); } void shutdown() override ; void shutdown_l(bool close); diff --git a/tests/test_httpClient.cpp b/tests/test_httpClient.cpp index 1cf03dc8..b94159db 100644 --- a/tests/test_httpClient.cpp +++ b/tests/test_httpClient.cpp @@ -53,8 +53,11 @@ int main(int argc, char *argv[]) { //下载器map map downloaderMap; //下载两个文件,一个是http下载,一个https下载 - auto urlList = {"http://img3.imgtn.bdimg.com/it/u=158031390,1321729164&fm=214&gp=0.jpg", - "https://ss0.bdstatic.com/70cFvHSh_Q1YnxGkpoWK1HF6hhy/it/u=931786003,1029770543&fm=27&gp=0.jpg"}; + auto urlList = {"https://timgsa.baidu.com/timg?image&quality=80&" + "size=b9999_10000&sec=1537717640404&" + "di=f602efbebbc1e7f6b9ccb0bf0def89d0&" + "imgtype=0&" + "src=http%3A%2F%2Fimgsrc.baidu.com%2Fimgad%2Fpic%2Fitem%2F241f95cad1c8a786ff65052a6d09c93d70cf5042.jpg",}; for (auto &url : urlList) { //创建下载器