From d6fa0296cba19bbd9a26c6fb6b4ad860a08d18fd Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 4 Sep 2019 18:57:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=85=8D=E7=BD=AE=E9=A1=B9ul?= =?UTF-8?q?traLowDelay,=E5=8F=AF=E9=80=89=E6=8B=A9=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E4=BD=8E=E5=BB=B6=E6=97=B6=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/config.ini | 2 ++ src/Common/config.cpp | 2 ++ src/Common/config.h | 2 ++ src/Http/HttpSession.cpp | 24 +++++++++++++++--------- src/Http/HttpSession.h | 3 +++ src/Rtmp/RtmpPusher.cpp | 17 ++++++++++++----- src/Rtmp/RtmpPusher.h | 2 +- src/Rtmp/RtmpSession.cpp | 19 ++++++++++++------- src/Rtmp/RtmpSession.h | 1 + src/Rtsp/RtspPusher.cpp | 16 +++++++++++----- src/Rtsp/RtspPusher.h | 1 + src/Rtsp/RtspSession.cpp | 19 ++++++++++++------- src/Rtsp/RtspSession.h | 2 ++ 13 files changed, 76 insertions(+), 34 deletions(-) diff --git a/conf/config.ini b/conf/config.ini index 8a36fdf5..bdd06dd3 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -28,6 +28,8 @@ maxStreamWaitMS=5000 #某个流无人观看时,触发hook.on_stream_none_reader事件的最大等待时间,单位毫秒 #在配合hook.on_stream_none_reader事件时,可以做到无人观看自动停止拉流或停止接收推流 streamNoneReaderDelayMS=5000 +#是否开启低延时模式,该模式下禁用MSG_MORE,启用TCP_NODEALY,延时将降低,但数据发送性能将降低 +ultraLowDelay=1 [hls] #hls写文件的buf大小,调整参数可以提高文件io性能 diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 63a22495..b8cbc458 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -76,11 +76,13 @@ const string kFlowThreshold = GENERAL_FIELD"flowThreshold"; const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS"; const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS"; const string kEnableVhost = GENERAL_FIELD"enableVhost"; +const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay"; onceToken token([](){ mINI::Instance()[kFlowThreshold] = 1024; mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000; mINI::Instance()[kMaxStreamWaitTimeMS] = 5 * 1000; mINI::Instance()[kEnableVhost] = 1; + mINI::Instance()[kUltraLowDelay] = 1; },nullptr); }//namespace General diff --git a/src/Common/config.h b/src/Common/config.h index 47c06bfd..1f647069 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -168,6 +168,8 @@ extern const string kStreamNoneReaderDelayMS; extern const string kMaxStreamWaitTimeMS; //是否启动虚拟主机 extern const string kEnableVhost; +//超低延时模式,默认打开,打开后会降低延时但是转发性能会稍差 +extern const string kUltraLowDelay; }//namespace General diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 4f9118c5..da9850e6 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -48,7 +48,6 @@ using namespace toolkit; namespace mediakit { -static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; static int kHlsCookieSecond = 10 * 60; static const string kCookieName = "ZL_COOKIE"; static const string kCookiePathKey = "kCookiePathKey"; @@ -283,10 +282,9 @@ inline bool HttpSession::checkLiveFlvStream(const function &cb){ cb(); } - //开始发送rtmp负载 - //关闭tcp_nodelay ,优化性能 - SockUtil::setNoDelay(_sock->rawFD(),false); - (*this) << SocketFlags(kSockFlags); + //http-flv直播牺牲延时提升发送性能 + setSocketFlags(); + try{ start(getPoller(),rtmp_src); }catch (std::exception &ex){ @@ -657,10 +655,9 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) { } return false; }; - //关闭tcp_nodelay ,优化性能 - SockUtil::setNoDelay(_sock->rawFD(),false); - //设置MSG_MORE,优化性能 - (*this) << SocketFlags(kSockFlags); + + //文件下载提升发送性能 + setSocketFlags(); onFlush(); _sock->setOnFlush(onFlush); @@ -950,6 +947,15 @@ inline void HttpSession::sendNotFound(bool bClose) { sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); } +void HttpSession::setSocketFlags(){ + GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay); + if(!ultraLowDelay) { + //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 + SockUtil::setNoDelay(_sock->rawFD(), false); + //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 + (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + } +} void HttpSession::onWrite(const Buffer::Ptr &buffer) { _ticker.resetTime(); diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index d99bff95..4ab9dcad 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -142,6 +142,9 @@ private: * @return */ inline string getClientUid(); + + //设置socket标志 + void setSocketFlags(); private: string _origin; Parser _parser; diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 65e17047..834276a7 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -33,8 +33,6 @@ using namespace mediakit::Client; namespace mediakit { -static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; - RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){ _pMediaSrc=src; } @@ -229,10 +227,19 @@ inline void RtmpPusher::send_metaData(){ } }); onPublishResult(SockException(Err_success,"success")); - //提高发送性能 - (*this) << SocketFlags(kSockFlags); - SockUtil::setNoDelay(_sock->rawFD(),false); + //提升发送性能 + setSocketFlags(); } + +void RtmpPusher::setSocketFlags(){ + GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay); + if(!ultraLowDelay) { + //提高发送性能 + (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + SockUtil::setNoDelay(_sock->rawFD(), false); + } +} + void RtmpPusher::onCmd_result(AMFDecoder &dec){ auto iReqId = dec.load(); lock_guard lck(_mtxOnResultCB); diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index 7a589cf9..bc696c2e 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -84,7 +84,7 @@ private: inline void send_createStream(); inline void send_publish(); inline void send_metaData(); - + void setSocketFlags(); private: string _strApp; string _strStream; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 024b1c65..786c9011 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -31,8 +31,6 @@ namespace mediakit { -static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; - RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { DebugP(this); GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond); @@ -171,6 +169,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { _pPublisherSrc->setListener(dynamic_pointer_cast(shared_from_this())); //如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能 _sock->setReadBuffer(std::make_shared(256 * 1024)); + setSocketFlags(); }; Broadcast::AuthInvoker invoker = [weakSelf,onRes,pToken](const string &err){ @@ -272,7 +271,6 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr _pRingReader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - SockUtil::setNoDelay(_sock->rawFD(), false); _pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { @@ -291,10 +289,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr if (src->readerCount() == 1) { src->seekTo(0); } - - //提高发送性能 - (*this) << SocketFlags(kSockFlags); - SockUtil::setNoDelay(_sock->rawFD(),false); + //提高服务器发送性能 + setSocketFlags(); } void RtmpSession::doPlayResponse(const string &err,const std::function &cb){ @@ -502,4 +498,13 @@ void RtmpSession::onNoneReader(MediaSource &sender) { MediaSourceEvent::onNoneReader(sender); } +void RtmpSession::setSocketFlags(){ + GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay); + if(!ultraLowDelay) { + //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 + SockUtil::setNoDelay(_sock->rawFD(), false); + //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 + (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + } +} } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index fbb8cf38..452f2c95 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -85,6 +85,7 @@ private: bool close(MediaSource &sender,bool force) override ; void onNoneReader(MediaSource &sender) override; + void setSocketFlags(); private: std::string _strTcUrl; MediaInfo _mediaInfo; diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index d468ddc8..7e2ca8c2 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -11,8 +11,6 @@ using namespace mediakit::Client; namespace mediakit { -static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; - RtspPusher::RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src) : TcpClient(poller){ _pMediaSrc = src; } @@ -392,13 +390,21 @@ void RtspPusher::sendRecord() { },getPoller())); } onPublishResult(SockException(Err_success,"success")); - //提高发送性能 - (*this) << SocketFlags(kSockFlags); - SockUtil::setNoDelay(_sock->rawFD(),false); + //提升发送性能 + setSocketFlags(); }; sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"}); } +void RtspPusher::setSocketFlags(){ + GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay); + if(!ultraLowDelay) { + //提高发送性能 + (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + SockUtil::setNoDelay(_sock->rawFD(), false); + } +} + void RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list &header,const string &sdp ) { string key; StrCaseMap header_map; diff --git a/src/Rtsp/RtspPusher.h b/src/Rtsp/RtspPusher.h index aa8232bd..958c1c87 100644 --- a/src/Rtsp/RtspPusher.h +++ b/src/Rtsp/RtspPusher.h @@ -67,6 +67,7 @@ private: void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header,const string &sdp = ""); void createUdpSockIfNecessary(int track_idx); + void setSocketFlags(); private: //rtsp鉴权相关 string _rtspMd5Nonce; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index eb0fa6e8..6c45888b 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -71,7 +71,6 @@ namespace mediakit { static unordered_map > g_mapGetter; //对g_mapGetter上锁保护 static recursive_mutex g_mtxGetter; -static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { DebugP(this); @@ -277,12 +276,11 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ rtp_info.pop_back(); sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); - SockUtil::setNoDelay(_sock->rawFD(),false); if(_rtpType == Rtsp::RTP_TCP){ //如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能 _sock->setReadBuffer(std::make_shared(256 * 1024)); + setSocketFlags(); } - (*this) << SocketFlags(kSockFlags); }; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); @@ -780,10 +778,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { }); _enableSendRtp = true; - - //提高发送性能 - SockUtil::setNoDelay(_sock->rawFD(),false); - (*this) << SocketFlags(kSockFlags); + setSocketFlags(); if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); @@ -1230,6 +1225,16 @@ void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) { } } +void RtspSession::setSocketFlags(){ + GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay); + if(!ultraLowDelay) { + //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 + SockUtil::setNoDelay(_sock->rawFD(), false); + //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 + (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + } +} + } /* namespace mediakit */ diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 90ebc94f..33637666 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -183,6 +183,8 @@ private: bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); //服务器发送rtcp void sendSenderReport(bool overTcp,int iTrackIndex); + //设置socket标志 + void setSocketFlags(); private: //用于判断客户端是否超时 Ticker _ticker;