添加配置项ultraLowDelay,可选择是否低延时模式

This commit is contained in:
xiongziliang 2019-09-04 18:57:54 +08:00
parent d864e7084b
commit d6fa0296cb
13 changed files with 76 additions and 34 deletions

View File

@ -28,6 +28,8 @@ maxStreamWaitMS=5000
#某个流无人观看时触发hook.on_stream_none_reader事件的最大等待时间单位毫秒 #某个流无人观看时触发hook.on_stream_none_reader事件的最大等待时间单位毫秒
#在配合hook.on_stream_none_reader事件时可以做到无人观看自动停止拉流或停止接收推流 #在配合hook.on_stream_none_reader事件时可以做到无人观看自动停止拉流或停止接收推流
streamNoneReaderDelayMS=5000 streamNoneReaderDelayMS=5000
#是否开启低延时模式该模式下禁用MSG_MORE,启用TCP_NODEALY延时将降低但数据发送性能将降低
ultraLowDelay=1
[hls] [hls]
#hls写文件的buf大小调整参数可以提高文件io性能 #hls写文件的buf大小调整参数可以提高文件io性能

View File

@ -76,11 +76,13 @@ const string kFlowThreshold = GENERAL_FIELD"flowThreshold";
const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS"; const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS";
const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS"; const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS";
const string kEnableVhost = GENERAL_FIELD"enableVhost"; const string kEnableVhost = GENERAL_FIELD"enableVhost";
const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kFlowThreshold] = 1024; mINI::Instance()[kFlowThreshold] = 1024;
mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000; mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000;
mINI::Instance()[kMaxStreamWaitTimeMS] = 5 * 1000; mINI::Instance()[kMaxStreamWaitTimeMS] = 5 * 1000;
mINI::Instance()[kEnableVhost] = 1; mINI::Instance()[kEnableVhost] = 1;
mINI::Instance()[kUltraLowDelay] = 1;
},nullptr); },nullptr);
}//namespace General }//namespace General

View File

@ -168,6 +168,8 @@ extern const string kStreamNoneReaderDelayMS;
extern const string kMaxStreamWaitTimeMS; extern const string kMaxStreamWaitTimeMS;
//是否启动虚拟主机 //是否启动虚拟主机
extern const string kEnableVhost; extern const string kEnableVhost;
//超低延时模式,默认打开,打开后会降低延时但是转发性能会稍差
extern const string kUltraLowDelay;
}//namespace General }//namespace General

View File

@ -48,7 +48,6 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
static int kHlsCookieSecond = 10 * 60; static int kHlsCookieSecond = 10 * 60;
static const string kCookieName = "ZL_COOKIE"; static const string kCookieName = "ZL_COOKIE";
static const string kCookiePathKey = "kCookiePathKey"; static const string kCookiePathKey = "kCookiePathKey";
@ -283,10 +282,9 @@ inline bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
cb(); cb();
} }
//开始发送rtmp负载 //http-flv直播牺牲延时提升发送性能
//关闭tcp_nodelay ,优化性能 setSocketFlags();
SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(kSockFlags);
try{ try{
start(getPoller(),rtmp_src); start(getPoller(),rtmp_src);
}catch (std::exception &ex){ }catch (std::exception &ex){
@ -657,10 +655,9 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) {
} }
return false; return false;
}; };
//关闭tcp_nodelay ,优化性能
SockUtil::setNoDelay(_sock->rawFD(),false); //文件下载提升发送性能
//设置MSG_MORE优化性能 setSocketFlags();
(*this) << SocketFlags(kSockFlags);
onFlush(); onFlush();
_sock->setOnFlush(onFlush); _sock->setOnFlush(onFlush);
@ -950,6 +947,15 @@ inline void HttpSession::sendNotFound(bool bClose) {
sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound);
} }
void HttpSession::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//推流模式下关闭TCP_NODELAY会增加推流端的延时但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下开启MSG_MORE会增加延时但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}
void HttpSession::onWrite(const Buffer::Ptr &buffer) { void HttpSession::onWrite(const Buffer::Ptr &buffer) {
_ticker.resetTime(); _ticker.resetTime();

View File

@ -142,6 +142,9 @@ private:
* @return * @return
*/ */
inline string getClientUid(); inline string getClientUid();
//设置socket标志
void setSocketFlags();
private: private:
string _origin; string _origin;
Parser _parser; Parser _parser;

View File

@ -33,8 +33,6 @@ using namespace mediakit::Client;
namespace mediakit { namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){ RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){
_pMediaSrc=src; _pMediaSrc=src;
} }
@ -229,10 +227,19 @@ inline void RtmpPusher::send_metaData(){
} }
}); });
onPublishResult(SockException(Err_success,"success")); onPublishResult(SockException(Err_success,"success"));
//提高发送性能 //提升发送性能
(*this) << SocketFlags(kSockFlags); setSocketFlags();
SockUtil::setNoDelay(_sock->rawFD(),false);
} }
void RtmpPusher::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
SockUtil::setNoDelay(_sock->rawFD(), false);
}
}
void RtmpPusher::onCmd_result(AMFDecoder &dec){ void RtmpPusher::onCmd_result(AMFDecoder &dec){
auto iReqId = dec.load<int>(); auto iReqId = dec.load<int>();
lock_guard<recursive_mutex> lck(_mtxOnResultCB); lock_guard<recursive_mutex> lck(_mtxOnResultCB);

View File

@ -84,7 +84,7 @@ private:
inline void send_createStream(); inline void send_createStream();
inline void send_publish(); inline void send_publish();
inline void send_metaData(); inline void send_metaData();
void setSocketFlags();
private: private:
string _strApp; string _strApp;
string _strStream; string _strStream;

View File

@ -31,8 +31,6 @@
namespace mediakit { namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
DebugP(this); DebugP(this);
GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond); GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond);
@ -171,6 +169,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
_pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
//如果是rtmp推流客户端那么加大TCP接收缓存这样能提升接收性能 //如果是rtmp推流客户端那么加大TCP接收缓存这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024)); _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags();
}; };
Broadcast::AuthInvoker invoker = [weakSelf,onRes,pToken](const string &err){ Broadcast::AuthInvoker invoker = [weakSelf,onRes,pToken](const string &err){
@ -272,7 +271,6 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
_pRingReader = src->getRing()->attach(getPoller()); _pRingReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
SockUtil::setNoDelay(_sock->rawFD(), false);
_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { _pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
@ -291,10 +289,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
if (src->readerCount() == 1) { if (src->readerCount() == 1) {
src->seekTo(0); src->seekTo(0);
} }
//提高服务器发送性能
//提高发送性能 setSocketFlags();
(*this) << SocketFlags(kSockFlags);
SockUtil::setNoDelay(_sock->rawFD(),false);
} }
void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool)> &cb){ void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool)> &cb){
@ -502,4 +498,13 @@ void RtmpSession::onNoneReader(MediaSource &sender) {
MediaSourceEvent::onNoneReader(sender); MediaSourceEvent::onNoneReader(sender);
} }
void RtmpSession::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//推流模式下关闭TCP_NODELAY会增加推流端的延时但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下开启MSG_MORE会增加延时但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -85,6 +85,7 @@ private:
bool close(MediaSource &sender,bool force) override ; bool close(MediaSource &sender,bool force) override ;
void onNoneReader(MediaSource &sender) override; void onNoneReader(MediaSource &sender) override;
void setSocketFlags();
private: private:
std::string _strTcUrl; std::string _strTcUrl;
MediaInfo _mediaInfo; MediaInfo _mediaInfo;

View File

@ -11,8 +11,6 @@ using namespace mediakit::Client;
namespace mediakit { namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspPusher::RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src) : TcpClient(poller){ RtspPusher::RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src) : TcpClient(poller){
_pMediaSrc = src; _pMediaSrc = src;
} }
@ -392,13 +390,21 @@ void RtspPusher::sendRecord() {
},getPoller())); },getPoller()));
} }
onPublishResult(SockException(Err_success,"success")); onPublishResult(SockException(Err_success,"success"));
//提高发送性能 //提升发送性能
(*this) << SocketFlags(kSockFlags); setSocketFlags();
SockUtil::setNoDelay(_sock->rawFD(),false);
}; };
sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"}); sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"});
} }
void RtspPusher::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
SockUtil::setNoDelay(_sock->rawFD(), false);
}
}
void RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header,const string &sdp ) { void RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header,const string &sdp ) {
string key; string key;
StrCaseMap header_map; StrCaseMap header_map;

View File

@ -67,6 +67,7 @@ private:
void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header,const string &sdp = ""); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header,const string &sdp = "");
void createUdpSockIfNecessary(int track_idx); void createUdpSockIfNecessary(int track_idx);
void setSocketFlags();
private: private:
//rtsp鉴权相关 //rtsp鉴权相关
string _rtspMd5Nonce; string _rtspMd5Nonce;

View File

@ -71,7 +71,6 @@ namespace mediakit {
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter; static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护 //对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter; static recursive_mutex g_mtxGetter;
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
DebugP(this); DebugP(this);
@ -277,12 +276,11 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
rtp_info.pop_back(); rtp_info.pop_back();
sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
SockUtil::setNoDelay(_sock->rawFD(),false);
if(_rtpType == Rtsp::RTP_TCP){ if(_rtpType == Rtsp::RTP_TCP){
//如果是rtsp推流服务器并且是TCP推流那么加大TCP接收缓存这样能提升接收性能 //如果是rtsp推流服务器并且是TCP推流那么加大TCP接收缓存这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024)); _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags();
} }
(*this) << SocketFlags(kSockFlags);
}; };
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
@ -780,10 +778,7 @@ void RtspSession::handleReq_Play(const Parser &parser) {
}); });
_enableSendRtp = true; _enableSendRtp = true;
setSocketFlags();
//提高发送性能
SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(kSockFlags);
if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) { if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
@ -1230,6 +1225,16 @@ void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
} }
} }
void RtspSession::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//推流模式下关闭TCP_NODELAY会增加推流端的延时但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下开启MSG_MORE会增加延时但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}
} }
/* namespace mediakit */ /* namespace mediakit */

View File

@ -183,6 +183,8 @@ private:
bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0");
//服务器发送rtcp //服务器发送rtcp
void sendSenderReport(bool overTcp,int iTrackIndex); void sendSenderReport(bool overTcp,int iTrackIndex);
//设置socket标志
void setSocketFlags();
private: private:
//用于判断客户端是否超时 //用于判断客户端是否超时
Ticker _ticker; Ticker _ticker;