From 458a92521fcff218bd02de718b89e459107c4d6b Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Tue, 28 May 2019 18:46:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=9C=8D=E5=8A=A1=E5=99=A8?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpSession.cpp | 9 ++--- src/Rtmp/RtmpSession.cpp | 46 +++++++++++++---------- src/Rtmp/RtmpSession.h | 2 +- src/Rtsp/RtspSession.cpp | 80 +++++++++++++++++++++------------------- 4 files changed, 73 insertions(+), 64 deletions(-) diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index b0b481b0..29b85dc7 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -124,7 +124,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { string cmd = _parser.Method(); auto it = g_mapCmdIndex.find(cmd); if (it == g_mapCmdIndex.end()) { - WarnL << cmd; + WarnP(this) << cmd; sendResponse("403 Forbidden", makeHttpHeader(true), ""); shutdown(); return 0; @@ -156,7 +156,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { } void HttpSession::onError(const SockException& err) { - //WarnL << err.what(); +// WarnP(this) << err.what(); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ @@ -174,7 +174,7 @@ void HttpSession::onManager() { if(_ticker.elapsedTime() > keepAliveSec * 1000){ //1分钟超时 - WarnL<<"HttpSession timeouted!"; +// WarnP(this) <<"HttpSession timeouted!"; shutdown(); } } @@ -441,7 +441,6 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) { if (iRead < iReq || !*piLeft) { //文件读完 - //InfoL << "send complete!" << iRead << " " << iReq << " " << *piLeft; if(iRead>0) { sendBuf->setSize(iRead); strongSelf->send(sendBuf); @@ -456,7 +455,6 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) { int iSent = strongSelf->send(sendBuf); if(iSent == -1) { //套机制销毁 - //InfoL << "send error"; return false; } if(strongSelf->isSocketBusy()){ @@ -583,7 +581,6 @@ inline void HttpSession::sendResponse(const char* pcStatus, const KeyValue& head } printer << "\r\n" << strContent; auto strSend = printer << endl; - //DebugL << strSend; send(strSend); _ticker.resetTime(); } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 128ffb58..8b7a7aa9 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -34,7 +34,7 @@ namespace mediakit { static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { - DebugL << get_peer_ip(); + DebugP(this); //设置15秒发送超时时间 pSock->setSendTimeOutSecond(15); //起始接收buffer缓存设置为4K,节省内存 @@ -42,11 +42,11 @@ RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { } RtmpSession::~RtmpSession() { - DebugL << get_peer_ip(); + DebugP(this); } void RtmpSession::onError(const SockException& err) { - DebugL << err.what(); + DebugP(this) << err.what(); //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); @@ -65,14 +65,14 @@ void RtmpSession::onError(const SockException& err) { void RtmpSession::onManager() { if (_ticker.createdTime() > 15 * 1000) { if (!_pRingReader && !_pPublisherSrc) { - WarnL << "非法链接:" << get_peer_ip(); + WarnP(this) << "非法链接"; shutdown(); } } if (_pPublisherSrc) { //publisher if (_ticker.elapsedTime() > 15 * 1000) { - WarnL << "数据接收超时:" << get_peer_ip(); + WarnP(this) << "数据接收超时"; shutdown(); } } @@ -84,7 +84,7 @@ void RtmpSession::onRecv(const Buffer::Ptr &pBuf) { _ui64TotalBytes += pBuf->size(); onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { - WarnL << e.what(); + WarnP(this) << e.what(); shutdown(); } } @@ -134,8 +134,12 @@ void RtmpSession::onCmd_createStream(AMFDecoder &dec) { void RtmpSession::onCmd_publish(AMFDecoder &dec) { std::shared_ptr pTicker(new Ticker); - std::shared_ptr pToken(new onceToken(nullptr,[pTicker](){ - DebugL << "publish 回复时间:" << pTicker->elapsedTime() << "ms"; + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + std::shared_ptr pToken(new onceToken(nullptr,[pTicker,weakSelf](){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + DebugP(strongSelf.get()) << "publish 回复时间:" << pTicker->elapsedTime() << "ms"; + } })); dec.load();/* NULL */ _mediaInfo.parse(_strTcUrl + "/" + dec.load()); @@ -155,7 +159,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { - WarnL << "onPublish:" + WarnP(this) << "onPublish:" << (authSuccess ? "Already publishing:" : err.data()) << " " << _mediaInfo._vhost << " " << _mediaInfo._app << " " @@ -169,7 +173,6 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { _sock->setReadBuffer(std::make_shared(256 * 1024)); }; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); Broadcast::AuthInvoker invoker = [weakSelf,onRes,pToken](const string &err){ auto strongSelf = weakSelf.lock(); if(!strongSelf){ @@ -219,7 +222,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { - WarnL << (authSuccess ? "No such stream:" : err.data()) << " " + WarnP(this) << (authSuccess ? "No such stream:" : err.data()) << " " << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid @@ -264,7 +267,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr sendResponse(MSG_DATA, invoke.data()); src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { - //DebugL<<"send initial frame"; + //DebugP(this)<<"send initial frame"; onSendMedia(pkt); }); @@ -318,10 +321,13 @@ void RtmpSession::doPlayResponse(const string &err,const std::function pTicker(new Ticker); - std::shared_ptr pToken(new onceToken(nullptr,[pTicker](){ - DebugL << "play 回复时间:" << pTicker->elapsedTime() << "ms"; - })); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + std::shared_ptr pToken(new onceToken(nullptr,[pTicker,weakSelf](){ + auto strongSelf = weakSelf.lock(); + if(strongSelf) { + DebugP(strongSelf.get()) << "play 回复时间:" << pTicker->elapsedTime() << "ms"; + } + })); Broadcast::AuthInvoker invoker = [weakSelf,pToken](const string &err){ auto strongSelf = weakSelf.lock(); if(!strongSelf){ @@ -353,7 +359,7 @@ void RtmpSession::onCmd_play(AMFDecoder &dec) { void RtmpSession::onCmd_pause(AMFDecoder &dec) { dec.load();/* NULL */ bool paused = dec.load(); - TraceL << paused; + TraceP(this) << paused; AMFValue status(AMF_OBJECT); status.set("level", "status"); status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify"); @@ -406,7 +412,7 @@ void RtmpSession::onProcessCmd(AMFDecoder &dec) { std::string method = dec.load(); auto it = g_mapCmd.find(method); if (it == g_mapCmd.end()) { - TraceL << "can not support cmd:" << method; + TraceP(this) << "can not support cmd:" << method; return; } _dNowReqID = dec.load(); @@ -427,7 +433,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { case MSG_DATA3: { AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0); std::string type = dec.load(); - TraceL << "notify:" << type; + TraceP(this) << "notify:" << type; if (type == "@setDataFrame") { setMetaData(dec); } @@ -446,7 +452,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { } break; default: - WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size()); + WarnP(this) << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size()); break; } } @@ -454,7 +460,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { void RtmpSession::onCmd_seek(AMFDecoder &dec) { dec.load();/* NULL */ auto milliSeconds = dec.load().as_number(); - InfoL << "rtmp seekTo(ms):" << milliSeconds; + InfoP(this) << "rtmp seekTo(ms):" << milliSeconds; auto stongSrc = _pPlayerSrc.lock(); if (stongSrc) { stongSrc->seekTo(milliSeconds); diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 6f9495f2..e04ae727 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -85,7 +85,7 @@ private: if(!force && _pPublisherSrc->readerCount() != 0){ return false; } - InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; + InfoP(this) << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; safeShutdown(); return true; } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index d0a1a43d..578976e6 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -76,15 +76,15 @@ RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { pSock->setSendTimeOutSecond(15); //起始接收buffer缓存设置为4K,节省内存 pSock->setReadBuffer(std::make_shared(4 * 1024)); - DebugL << get_peer_ip(); + DebugP(this); } RtspSession::~RtspSession() { - DebugL << get_peer_ip(); + DebugP(this); } void RtspSession::onError(const SockException& err) { - TraceL << err.getErrCode() << " " << err.what(); + TraceP(this) << err.getErrCode() << " " << err.what(); if (_rtpType == Rtsp::RTP_MULTICAST) { //取消UDP端口监听 UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); @@ -113,7 +113,7 @@ void RtspSession::onError(const SockException& err) { void RtspSession::onManager() { if (_ticker.createdTime() > 15 * 1000) { if (_strSession.size() == 0) { - WarnL << "非法链接:" << get_peer_ip(); + WarnP(this) << "非法链接"; shutdown(); return; } @@ -122,7 +122,7 @@ void RtspSession::onManager() { if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) { //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测 - WarnL << "RTSP会话超时:" << get_peer_ip(); + WarnP(this) << "RTSP会话超时"; shutdown(); return; } @@ -135,7 +135,7 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { //http poster的请求数据转发给http getter处理 _onRecv(pBuf); } else { -// TraceL << pBuf->size() << "\r\n" << pBuf->data(); +// TraceP(this) << pBuf->size() << "\r\n" << pBuf->data(); input(pBuf->data(),pBuf->size()); } } @@ -173,7 +173,7 @@ void RtspSession::onWholeRtspPacket(Parser &parser) { } } else{ shutdown(); - WarnL << "不支持的rtsp命令:" << strCmd; + WarnP(this) << "不支持的rtsp命令:" << strCmd; } } @@ -223,7 +223,7 @@ bool RtspSession::handleReq_ANNOUNCE(const Parser &parser) { false)); if(src){ sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing."); - WarnL << "ANNOUNCE:" + WarnP(this) << "ANNOUNCE:" << "Already publishing:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " @@ -312,7 +312,7 @@ bool RtspSession::handleReq_Describe(const Parser &parser) { auto rtsp_src = dynamic_pointer_cast(src); if (!rtsp_src) { //未找到相应的MediaSource - WarnL << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; + WarnP(strongSelf.get()) << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; strongSelf->send_StreamNotFound(); strongSelf->shutdown(); return; @@ -436,7 +436,7 @@ void RtspSession::onAuthBasic(const weak_ptr &weakSelf,const string //此时必须提供明文密码 if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,realm,user, true,invoker,*strongSelf)){ //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 - WarnL << "请监听kBroadcastOnRtspAuth事件!"; + WarnP(strongSelf.get()) << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 //我们输入的密码是明文 invoker(false,pwd); @@ -449,7 +449,7 @@ void RtspSession::onAuthDigest(const weak_ptr &weakSelf,const strin return; } - DebugL << strMd5; + DebugP(strongSelf.get()) << strMd5; auto mapTmp = Parser::parseArgs(strMd5,",","="); decltype(mapTmp) map; for(auto &pr : mapTmp){ @@ -457,14 +457,14 @@ void RtspSession::onAuthDigest(const weak_ptr &weakSelf,const strin } //check realm if(realm != map["realm"]){ - TraceL << "realm not mached:" << realm << "," << map["realm"]; + TraceP(strongSelf.get()) << "realm not mached:" << realm << "," << map["realm"]; onAuthFailed(weakSelf,realm); return ; } //check nonce auto nonce = map["nonce"]; if(strongSelf->_strNonce != nonce){ - TraceL << "nonce not mached:" << nonce << "," << strongSelf->_strNonce; + TraceP(strongSelf.get()) << "nonce not mached:" << nonce << "," << strongSelf->_strNonce; onAuthFailed(weakSelf,realm); return ; } @@ -473,16 +473,20 @@ void RtspSession::onAuthDigest(const weak_ptr &weakSelf,const strin auto uri = map["uri"]; auto response = map["response"]; if(username.empty() || uri.empty() || response.empty()){ - TraceL << "username/uri/response empty:" << username << "," << uri << "," << response; + TraceP(strongSelf.get()) << "username/uri/response empty:" << username << "," << uri << "," << response; onAuthFailed(weakSelf,realm); return ; } auto realInvoker = [weakSelf,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } if(ignoreAuth){ //忽略认证 onAuthSuccess(weakSelf); - TraceL << "auth ignored"; + TraceP(strongSelf.get()) << "auth ignored"; return; } /* @@ -503,11 +507,11 @@ void RtspSession::onAuthDigest(const weak_ptr &weakSelf,const strin if(strcasecmp(good_response.data(),response.data()) == 0){ //认证成功!md5不区分大小写 onAuthSuccess(weakSelf); - TraceL << "onAuthSuccess"; + TraceP(strongSelf.get()) << "onAuthSuccess"; }else{ //认证失败! onAuthFailed(weakSelf,realm); - TraceL << "onAuthFailed"; + TraceP(strongSelf.get()) << "onAuthFailed"; } }; onAuth invoker = [realInvoker](bool encrypted,const string &good_pwd){ @@ -517,7 +521,7 @@ void RtspSession::onAuthDigest(const weak_ptr &weakSelf,const strin //此时可以提供明文或md5加密的密码 if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,realm,username, false,invoker,*strongSelf)){ //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 - WarnL << "请监听kBroadcastOnRtspAuth事件!"; + WarnP(strongSelf.get()) << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 realInvoker(true,true,""); } @@ -602,14 +606,14 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { auto pSockRtp = std::make_shared(_sock->getPoller()); if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) { //分配端口失败 - WarnL << "分配rtp端口失败"; + WarnP(this) << "分配rtp端口失败"; send_NotAcceptable(); return false; } auto pSockRtcp = std::make_shared(_sock->getPoller()); if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) { //分配端口失败 - WarnL << "分配rtcp端口失败"; + WarnP(this) << "分配rtcp端口失败"; send_NotAcceptable(); return false; } @@ -637,7 +641,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { //尝试获取客户端nat映射地址 startListenPeerUdpData(trackIdx); - //InfoL << "分配端口:" << srv_port; + //InfoP(this) << "分配端口:" << srv_port; sendRtspResponse("200 OK", {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;" @@ -669,7 +673,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1); if (!pSockRtcp) { //分配端口失败 - WarnL << "分配rtcp端口失败"; + WarnP(this) << "分配rtcp端口失败"; send_NotAcceptable(); return false; } @@ -724,7 +728,7 @@ bool RtspSession::handleReq_Play(const Parser &parser) { strStart = "0"; } auto iStartTime = 1000 * atof(strStart.data()); - InfoL << "rtsp seekTo(ms):" << iStartTime; + InfoP(this) << "rtsp seekTo(ms):" << iStartTime; useBuf = !pMediaSrc->seekTo(iStartTime); }else if(pMediaSrc->readerCount() == 0){ //第一个消费者 @@ -824,7 +828,7 @@ bool RtspSession::handleReq_Pause(const Parser &parser) { bool RtspSession::handleReq_Teardown(const Parser &parser) { sendRtspResponse("200 OK"); - TraceL << "播放器断开连接!"; + TraceP(this) << "播放器断开连接!"; return true; } @@ -850,7 +854,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) { //Poster 找到 Getter auto it = g_mapGetter.find(sessioncookie); if (it == g_mapGetter.end()) { - WarnL << "Http Poster未找到Http Getter"; + WarnP(this) << "Http Poster未找到Http Getter"; return false; } @@ -863,7 +867,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) { _onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){ auto httpGetterStrong = httpGetterWeak.lock(); if(!httpGetterStrong){ - WarnL << "Http Getter已经释放"; + WarnP(this) << "Http Getter已经释放"; shutdown(); return; } @@ -886,7 +890,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) { } bool RtspSession::handleReq_SET_PARAMETER(const Parser &parser) { - //TraceL< weakSelf = dynamic_pointer_cast(shared_from_this()); auto srcIP = inet_addr(get_peer_ip().data()); auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){ - if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) { - WarnL << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") - << inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr); - return true; - } auto strongSelf=weakSelf.lock(); if(!strongSelf) { return false; } + + if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) { + WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") + << inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr); + return true; + } + struct sockaddr addr=*pPeerAddr; strongSelf->async([weakSelf,pBuf,addr,intervaled]() { auto strongSelf=weakSelf.lock(); @@ -958,7 +964,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { case Rtsp::RTP_UDP:{ auto setEvent = [&](Socket::Ptr &sock,int intervaled){ if(!sock){ - WarnL << "udp端口为空:" << intervaled; + WarnP(this) << "udp端口为空:" << intervaled; return; } sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){ @@ -1012,13 +1018,13 @@ bool RtspSession::sendRtspResponse(const string &res_code, if(!sdp.empty()){ printer << sdp; } -// DebugL << printer; +// DebugP(this) << printer; return send(std::make_shared(printer)) > 0 ; } int RtspSession::send(const Buffer::Ptr &pkt){ // if(!_enableSendRtp){ -// DebugL << pkt->data(); +// DebugP(this) << pkt->data(); // } _ui64TotalBytes += pkt->size(); return TcpSession::send(pkt); @@ -1083,14 +1089,14 @@ bool RtspSession::close(bool force) { if(!force && _pushSrc->readerCount() != 0){ return false; } - InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; + InfoP(this) << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; safeShutdown(); return true; } inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { - //InfoL<<(int)pkt.Interleaved; + //InfoP(this) <<(int)pkt.Interleaved; switch (_rtpType) { case Rtsp::RTP_TCP: { BufferRtp::Ptr buffer(new BufferRtp(pkt));