diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 0d9ed0a7..7cfb157c 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 0d9ed0a73e11193c880b6a7f3d923e4e1e4e65a2 +Subproject commit 7cfb157c63ce33c2e14461c0b9fe444318bfd217 diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 9a56edbf..a7556eb6 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -191,12 +191,20 @@ void RtspSession::onRtpPacket(const char *data, uint64_t len) { uint8_t interleaved = data[1]; if(interleaved %2 == 0){ trackIdx = getTrackIndexByInterleaved(interleaved); - } - if (trackIdx != -1) { - handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4); + if (trackIdx != -1) { + handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4); + } + }else{ + trackIdx = getTrackIndexByInterleaved(interleaved - 1); + if (trackIdx != -1) { + onRecvRtcp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4); + } } } +void RtspSession::onRecvRtcp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){ + +} int64_t RtspSession::getContentLength(Parser &parser) { if(parser.Method() == "POST"){ //http post请求的content数据部分是base64编码后的rtsp请求信令包 @@ -998,14 +1006,13 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) { _pushSrc->onWrite(rtppt, false); } -inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) { +inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) { //这是rtcp心跳包,说明播放器还存活 _ticker.resetTime(); - if(iTrackIdx % 2 == 0){ - + if(intervaled % 2 == 0){ if(_pushSrc){ - handleOneRtp(iTrackIdx / 2,_aTrackInfo[iTrackIdx / 2],( unsigned char *)pBuf->data(),pBuf->size()); + handleOneRtp(intervaled / 2,_aTrackInfo[intervaled / 2],( unsigned char *)pBuf->data(),pBuf->size()); } //这是rtp探测包 @@ -1017,8 +1024,8 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf return; } //设置真实的客户端nat映射端口号 - _apRtpSock[iTrackIdx / 2]->setSendPeerAddr(&addr); - _abGotPeerUdp[iTrackIdx / 2] = true; + _apRtpSock[intervaled / 2]->setSendPeerAddr(&addr); + _abGotPeerUdp[intervaled / 2] = true; _bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包 for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { if (!_abGotPeerUdp[i]) { @@ -1028,25 +1035,28 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf } } } - } + }else{ + //rtcp包 + onRecvRtcp((intervaled - 1) / 2,_aTrackInfo[(intervaled - 1) / 2],( unsigned char *)pBuf->data(),pBuf->size()); + } } inline void RtspSession::startListenPeerUdpData(int trackIdx) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto onUdpData = [weakSelf](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int iTrackIdx){ + auto onUdpData = [weakSelf](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){ auto strongSelf=weakSelf.lock(); if(!strongSelf) { return false; } struct sockaddr addr=*pPeerAddr; - strongSelf->async([weakSelf,pBuf,addr,iTrackIdx]() { + strongSelf->async([weakSelf,pBuf,addr,intervaled]() { auto strongSelf=weakSelf.lock(); if(!strongSelf) { return; } - strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr); + strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr); }); return true; }; @@ -1055,19 +1065,19 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { case Rtsp::RTP_MULTICAST:{ //组播使用的共享rtcp端口 UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData]( - int iTrackIdx, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) { - return onUdpData(pBuf,pPeerAddr,iTrackIdx); + int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) { + return onUdpData(pBuf,pPeerAddr,intervaled); }); } break; case Rtsp::RTP_UDP:{ - auto setEvent = [&](Socket::Ptr &sock,int iTrackIdx){ + auto setEvent = [&](Socket::Ptr &sock,int intervaled){ if(!sock){ - WarnL << "udp端口为空:" << iTrackIdx; + WarnL << "udp端口为空:" << intervaled; return; } - sock->setOnRead([onUdpData,iTrackIdx](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){ - onUdpData(pBuf,pPeerAddr,iTrackIdx); + sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){ + onUdpData(pBuf,pPeerAddr,intervaled); }); }; setEvent(_apRtpSock[trackIdx], 2*trackIdx ); diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 5b2a7fa6..84b0ee4a 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -108,6 +108,16 @@ protected: //TcpSession override int send(const Buffer::Ptr &pkt) override; + + + /** + * 收到RTCP包回调 + * @param iTrackidx + * @param track + * @param pucData + * @param uiLen + */ + void onRecvRtcp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen); private: bool handleReq_Options(const Parser &parser); //处理options方法 bool handleReq_Describe(const Parser &parser); //处理describe方法 @@ -133,7 +143,7 @@ private: inline int getTrackIndexByControlSuffix(const string &controlSuffix); inline int getTrackIndexByInterleaved(int interleaved); - inline void onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr &addr); + inline void onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr &addr); inline void startListenPeerUdpData(int iTrackIdx); //认证相关 diff --git a/src/Rtsp/UDPServer.cpp b/src/Rtsp/UDPServer.cpp index ec7454d3..8db3bf4c 100644 --- a/src/Rtsp/UDPServer.cpp +++ b/src/Rtsp/UDPServer.cpp @@ -41,9 +41,9 @@ UDPServer::~UDPServer() { InfoL; } -Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex,uint16_t iLocalPort) { +Socket::Ptr UDPServer::getSock(const char* strLocalIp, int intervaled,uint16_t iLocalPort) { lock_guard lck(_mtxUpdSock); - string strKey = StrPrinter << strLocalIp << ":" << iTrackIndex << endl; + string strKey = StrPrinter << strLocalIp << ":" << intervaled << endl; auto it = _mapUpdSock.find(strKey); if (it == _mapUpdSock.end()) { Socket::Ptr pSock(new Socket()); @@ -53,10 +53,10 @@ Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex,uint16_t return nullptr; } - pSock->setOnRead(bind(&UDPServer::onRcvData, this, iTrackIndex, placeholders::_1,placeholders::_2)); + pSock->setOnRead(bind(&UDPServer::onRcvData, this, intervaled, placeholders::_1,placeholders::_2)); pSock->setOnErr(bind(&UDPServer::onErr, this, strKey, placeholders::_1)); _mapUpdSock[strKey] = pSock; - DebugL << strLocalIp << " " << pSock->get_local_port() << " " << iTrackIndex; + DebugL << strLocalIp << " " << pSock->get_local_port() << " " << intervaled; return pSock; } return it->second; @@ -89,7 +89,7 @@ void UDPServer::onErr(const string& strKey, const SockException& err) { lock_guard lck(_mtxUpdSock); _mapUpdSock.erase(strKey); } -void UDPServer::onRcvData(int iTrackIndex, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) { +void UDPServer::onRcvData(int intervaled, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) { //TraceL << trackIndex; struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr; string peerIp = inet_ntoa(in->sin_addr); @@ -101,7 +101,7 @@ void UDPServer::onRcvData(int iTrackIndex, const Buffer::Ptr &pBuf, struct socka auto &mapRef = it0->second; for (auto it1 = mapRef.begin(); it1 != mapRef.end(); ++it1) { onRecvData &funRef = it1->second; - if (!funRef(iTrackIndex, pBuf, pPeerAddr)) { + if (!funRef(intervaled, pBuf, pPeerAddr)) { it1 = mapRef.erase(it1); } } diff --git a/src/Rtsp/UDPServer.h b/src/Rtsp/UDPServer.h index 1b55b730..e40d119e 100644 --- a/src/Rtsp/UDPServer.h +++ b/src/Rtsp/UDPServer.h @@ -43,15 +43,15 @@ namespace mediakit { class UDPServer : public std::enable_shared_from_this { public: - typedef function< bool(int, const Buffer::Ptr &, struct sockaddr *)> onRecvData; + typedef function< bool(int intervaled, const Buffer::Ptr &buffer, struct sockaddr *peer_addr)> onRecvData; ~UDPServer(); static UDPServer &Instance(); - Socket::Ptr getSock(const char *strLocalIp, int iTrackIndex,uint16_t iLocalPort = 0); + Socket::Ptr getSock(const char *strLocalIp, int intervaled,uint16_t iLocalPort = 0); void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb); void stopListenPeer(const char *strPeerIp, void *pSelf); private: UDPServer(); - void onRcvData(int iTrackId, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr); + void onRcvData(int intervaled, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr); void onErr(const string &strKey,const SockException &err); unordered_map _mapUpdSock; mutex _mtxUpdSock;