diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 38b38711..cb281b42 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -63,7 +63,8 @@ void RtspPlayer::teardown(){ _strContentBase.clear(); RtpReceiver::clear(); - CLEAR_ARR(_apUdpSock); + CLEAR_ARR(_apRtpSock); + CLEAR_ARR(_apRtcpSock); CLEAR_ARR(_aui16FirstSeq) CLEAR_ARR(_aui64RtpRecv) CLEAR_ARR(_aui64RtpRecv) @@ -248,13 +249,20 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) { } break; case Rtsp::RTP_UDP: { - _apUdpSock[trackIndex].reset(new Socket()); - if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { - _apUdpSock[trackIndex].reset(); - throw std::runtime_error("open udp sock err"); + _apRtpSock[trackIndex].reset(new Socket()); + if (!_apRtpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { + _apRtpSock[trackIndex].reset(); + throw std::runtime_error("open rtp sock err"); } - int port = _apUdpSock[trackIndex]->get_local_port(); - sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); + _apRtcpSock[trackIndex].reset(new Socket()); + if (!_apRtcpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { + _apRtcpSock[trackIndex].reset(); + throw std::runtime_error("open rtcp sock err"); + } + sendRtspRequest("SETUP",baseUrl,{"Transport", + StrPrinter << "RTP/AVP;unicast;client_port=" + << _apRtpSock[trackIndex]->get_local_port() << "-" + << _apRtcpSock[trackIndex]->get_local_port()}); } break; default: @@ -290,30 +298,69 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) }else{ const char *strPos = (_eType == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ; auto port_str = FindField((strTransport + ";").data(), strPos, ";"); - uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data()); - auto &pUdpSockRef = _apUdpSock[uiTrackIndex]; - if(!pUdpSockRef){ - pUdpSockRef.reset(new Socket()); - } + uint16_t rtp_port = atoi(FindField(port_str.data(), NULL, "-").data()); + uint16_t rtcp_port = atoi(FindField(port_str.data(), "-",NULL).data()); + auto &pRtpSockRef = _apRtpSock[uiTrackIndex]; + auto &pRtcpSockRef = _apRtcpSock[uiTrackIndex]; if (_eType == Rtsp::RTP_MULTICAST) { + //udp组播 auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";"); - if (!pUdpSockRef->bindUdpSock(port, "0.0.0.0")) { - pUdpSockRef.reset(); + pRtpSockRef.reset(new Socket()); + if (!pRtpSockRef->bindUdpSock(rtp_port, "0.0.0.0")) { + pRtpSockRef.reset(); throw std::runtime_error("open udp sock err"); } - auto fd = pUdpSockRef->rawFD(); + auto fd = pRtpSockRef->rawFD(); if (-1 == SockUtil::joinMultiAddrFilter(fd, multiAddr.data(), get_peer_ip().data(),get_local_ip().data())) { SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data()); } } else { + //udp单播 struct sockaddr_in rtpto; - rtpto.sin_port = ntohs(port); + rtpto.sin_port = ntohs(rtp_port); rtpto.sin_family = AF_INET; rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data()); - pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto)); - pUdpSockRef->send("\xce\xfa\xed\xfe", 4); + pRtpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto)); + //发送rtp打洞包 + pRtpSockRef->send("\xce\xfa\xed\xfe", 4); + + //设置rtcp发送目标,为后续发送rtcp做准备 + rtpto.sin_port = ntohs(rtcp_port); + rtpto.sin_family = AF_INET; + rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data()); + pRtcpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto)); } + + auto srcIP = inet_addr(get_peer_ip().data()); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + //设置rtp over udp接收回调处理函数 + pRtpSockRef->setOnRead([srcIP, uiTrackIndex, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return; + } + if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) { + WarnL << "收到其他地址的rtp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); + return; + } + strongSelf->handleOneRtp(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size()); + }); + + if(pRtcpSockRef) { + //设置rtcp over udp接收回调处理函数 + pRtcpSockRef->setOnRead([srcIP, uiTrackIndex, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return; + } + if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) { + WarnL << "收到其他地址的rtcp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); + return; + } + strongSelf->onRecvRtcp(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size()); + }); + } } if (uiTrackIndex < _aTrackInfo.size() - 1) { @@ -321,41 +368,19 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) sendSetup(uiTrackIndex + 1); return; } + //所有setup命令发送完毕 + //设置心跳包发送定时器 + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weakSelf](){ + auto strongSelf = weakSelf.lock(); + if (!strongSelf){ + return false; + } + strongSelf->sendRtcpPacket(); + return true; + },getPoller())); - for (unsigned int i = 0; i < _aTrackInfo.size() && _eType != Rtsp::RTP_TCP; i++) { - auto &pUdpSockRef = _apUdpSock[i]; - if(!pUdpSockRef){ - continue; - } - auto srcIP = inet_addr(get_peer_ip().data()); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - pUdpSockRef->setOnRead([srcIP,i,weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr) { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return; - } - if(((struct sockaddr_in *)addr)->sin_addr.s_addr != srcIP) { - WarnL << "收到其他地址的UDP数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); - return; - } - strongSelf->handleOneRtp(i,strongSelf->_aTrackInfo[i],(unsigned char *)buf->data(),buf->size()); - }); - } - /////////////////////////心跳///////////////////////////////// - //有些设备在rtp over tcp的情况下也需要定时发送心跳包(比较坑爹) - //if(_eType != Rtsp::RTP_TCP) - { - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weakSelf](){ - auto strongSelf = weakSelf.lock(); - if (!strongSelf){ - return false; - } - strongSelf->sendOptions(); - return true; - },getPoller())); - } - + //发送play命令 pause(false); } @@ -449,14 +474,27 @@ void RtspPlayer::onWholeRtspPacket(Parser &parser) { void RtspPlayer::onRtpPacket(const char *data, uint64_t len) { int trackIdx = -1; uint8_t interleaved = data[1]; - if(interleaved %2 ==0){ + if(interleaved %2 == 0){ trackIdx = getTrackIndexByInterleaved(interleaved); - } - if (trackIdx != -1) { - handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4); + if (trackIdx != -1) { + handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4); + } + }else{ + trackIdx = getTrackIndexByInterleaved(interleaved - 1); + if (trackIdx != -1) { + onRecvRtcp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4); + } } } +void RtspPlayer::onRecvRtcp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){ + +} + +void RtspPlayer::sendRtcpPacket(){ + //目前只实现了通过options命令实现心跳包 + sendOptions(); +} void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){ //统计丢包率 diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index a9a23ed5..ac149bc7 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -84,13 +84,28 @@ protected: * @param trackidx track索引 */ void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; + + + /** + * 收到RTCP包回调 + * @param iTrackidx + * @param track + * @param pucData + * @param uiLen + */ + virtual void onRecvRtcp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen); + + /** + * 发送rtcp包维持心跳 + */ + virtual void sendRtcpPacket(); private: void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track); void onPlayResult_l(const SockException &ex); int getTrackIndexByControlSuffix(const string &controlSuffix) const; int getTrackIndexByInterleaved(int interleaved) const; - int getTrackIndexByTrackType(TrackType trackId) const; + int getTrackIndexByTrackType(TrackType trackType) const; void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType); void onConnect(const SockException &err) override; @@ -115,8 +130,10 @@ private: SdpAttr _sdpAttr; vector _aTrackInfo; function _onHandshake; - Socket::Ptr _apUdpSock[2]; - //rtsp鉴权相关 + Socket::Ptr _apRtpSock[2]; //RTP端口,trackid idx 为数组下标 + Socket::Ptr _apRtcpSock[2];//RTCP端口,trackid idx 为数组下标 + + //rtsp鉴权相关 string _rtspMd5Nonce; string _rtspRealm; //rtsp info