diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index a2ce2b24..f1fa3a9e 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -107,13 +107,13 @@ void RtspPlayer::onConnect(const SockException &err){ sendOptions(); } -void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { +void RtspPlayer::onRecv(const Buffer::Ptr& buf) { if(_benchmark_mode && !_play_check_timer){ //在性能测试模式下,如果rtsp握手完毕后,不再解析rtp包 _rtp_recv_ticker.resetTime(); return; } - input(pBuf->data(),pBuf->size()); + input(buf->data(), buf->size()); } void RtspPlayer::onErr(const SockException &ex) { @@ -218,9 +218,9 @@ void RtspPlayer::createUdpSockIfNecessary(int track_idx){ } //发送SETUP命令 -void RtspPlayer::sendSetup(unsigned int trackIndex) { - _on_response = std::bind(&RtspPlayer::handleResSETUP, this, placeholders::_1, trackIndex); - auto &track = _sdp_track[trackIndex]; +void RtspPlayer::sendSetup(unsigned int track_idx) { + _on_response = std::bind(&RtspPlayer::handleResSETUP, this, placeholders::_1, track_idx); + auto &track = _sdp_track[track_idx]; auto baseUrl = _content_base + "/" + track->_control_surffix; switch (_rtp_type) { case Rtsp::RTP_TCP: { @@ -232,11 +232,11 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) { } break; case Rtsp::RTP_UDP: { - createUdpSockIfNecessary(trackIndex); + createUdpSockIfNecessary(track_idx); sendRtspRequest("SETUP", baseUrl, {"Transport", StrPrinter << "RTP/AVP;unicast;client_port=" - << _rtp_sock[trackIndex]->get_local_port() << "-" - << _rtcp_sock[trackIndex]->get_local_port()}); + << _rtp_sock[track_idx]->get_local_port() << "-" + << _rtcp_sock[track_idx]->get_local_port()}); } break; default: @@ -244,12 +244,12 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) { } } -void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) { +void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) { if (parser.Url() != "200") { throw std::runtime_error( StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl); } - if (uiTrackIndex == 0) { + if (track_idx == 0) { _session_id = parser["Session"]; _session_id.append(";"); _session_id = FindField(_session_id.data(), nullptr, ";"); @@ -268,14 +268,14 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) if(_rtp_type == Rtsp::RTP_TCP) { string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-"); - _sdp_track[uiTrackIndex]->_interleaved = atoi(interleaved.data()); + _sdp_track[track_idx]->_interleaved = atoi(interleaved.data()); }else{ const char *strPos = (_rtp_type == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ; auto port_str = FindField((strTransport + ";").data(), strPos, ";"); uint16_t rtp_port = atoi(FindField(port_str.data(), NULL, "-").data()); uint16_t rtcp_port = atoi(FindField(port_str.data(), "-",NULL).data()); - auto &pRtpSockRef = _rtp_sock[uiTrackIndex]; - auto &pRtcpSockRef = _rtcp_sock[uiTrackIndex]; + auto &pRtpSockRef = _rtp_sock[track_idx]; + auto &pRtcpSockRef = _rtcp_sock[track_idx]; if (_rtp_type == Rtsp::RTP_MULTICAST) { //udp组播 @@ -290,7 +290,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data()); } } else { - createUdpSockIfNecessary(uiTrackIndex); + createUdpSockIfNecessary(track_idx); //udp单播 struct sockaddr_in rtpto; rtpto.sin_port = ntohs(rtp_port); @@ -310,7 +310,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) auto srcIP = inet_addr(get_peer_ip().data()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); //设置rtp over udp接收回调处理函数 - pRtpSockRef->setOnRead([srcIP, uiTrackIndex, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) { + pRtpSockRef->setOnRead([srcIP, track_idx, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; @@ -319,13 +319,13 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); return; } - strongSelf->handleOneRtp(uiTrackIndex, strongSelf->_sdp_track[uiTrackIndex]->_type, - strongSelf->_sdp_track[uiTrackIndex]->_samplerate, (unsigned char *) buf->data(), buf->size()); + strongSelf->handleOneRtp(track_idx, strongSelf->_sdp_track[track_idx]->_type, + strongSelf->_sdp_track[track_idx]->_samplerate, (unsigned char *) buf->data(), buf->size()); }); if(pRtcpSockRef) { //设置rtcp over udp接收回调处理函数 - pRtcpSockRef->setOnRead([srcIP, uiTrackIndex, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) { + pRtcpSockRef->setOnRead([srcIP, track_idx, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; @@ -334,14 +334,14 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); return; } - strongSelf->onRtcpPacket(uiTrackIndex, strongSelf->_sdp_track[uiTrackIndex], (unsigned char *) buf->data(), buf->size()); + strongSelf->onRtcpPacket(track_idx, strongSelf->_sdp_track[track_idx], (unsigned char *) buf->data(), buf->size()); }); } } - if (uiTrackIndex < _sdp_track.size() - 1) { + if (track_idx < _sdp_track.size() - 1) { //需要继续发送SETUP命令 - sendSetup(uiTrackIndex + 1); + sendSetup(track_idx + 1); return; } //所有setup命令发送完毕 @@ -404,8 +404,8 @@ void RtspPlayer::sendPause(int type , uint32_t seekMS){ } } -void RtspPlayer::pause(bool bPause) { - sendPause(bPause ? type_pause : type_seek, getProgressMilliSecond()); +void RtspPlayer::pause(bool pause_flag) { + sendPause(pause_flag ? type_pause : type_seek, getProgressMilliSecond()); } void RtspPlayer::handleResPAUSE(const Parser& parser,int type) { @@ -416,6 +416,7 @@ void RtspPlayer::handleResPAUSE(const Parser& parser,int type) { break; case type_play: WarnL << "Play failed:" << parser.Url() << " " << parser.Tail() << endl; + onPlayResult_l(SockException(Err_shutdown, StrPrinter << "rtsp play failed:" << parser.Url() << " " << parser.Tail() ), !_play_check_timer); break; case type_seek: WarnL << "Seek failed:" << parser.Url() << " " << parser.Tail() << endl; @@ -442,8 +443,8 @@ void RtspPlayer::handleResPAUSE(const Parser& parser,int type) { iSeekTo = 1000 * atof(strStart.data()); DebugL << "seekTo(ms):" << iSeekTo; } - //设置相对时间戳 - onPlayResult_l(SockException(Err_success, type == type_seek ? "resum rtsp success" : "rtsp play success"), type == type_seek); + + onPlayResult_l(SockException(Err_success, type == type_seek ? "resum rtsp success" : "rtsp play success"), !_play_check_timer); } void RtspPlayer::onWholeRtspPacket(Parser &parser) { @@ -473,7 +474,7 @@ void RtspPlayer::onRtpPacket(const char *data, uint64_t len) { } //此处预留rtcp处理函数 -void RtspPlayer::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){} +void RtspPlayer::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len){} #if 0 //改代码提取自FFmpeg,参考之 @@ -533,12 +534,12 @@ void RtspPlayer::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char avio_w8(pb, 0); #endif -void RtspPlayer::sendReceiverReport(bool overTcp,int iTrackIndex){ +void RtspPlayer::sendReceiverReport(bool over_tcp, int track_idx){ static const char s_cname[] = "ZLMediaKitRtsp"; uint8_t aui8Rtcp[4 + 32 + 10 + sizeof(s_cname) + 1] = {0}; uint8_t *pui8Rtcp_RR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_RR + 32; - auto &track = _sdp_track[iTrackIndex]; - auto &counter = _rtcp_counter[iTrackIndex]; + auto &track = _sdp_track[track_idx]; + auto &counter = _rtcp_counter[track_idx]; aui8Rtcp[0] = '$'; aui8Rtcp[1] = track->_interleaved + 1; @@ -564,13 +565,13 @@ void RtspPlayer::sendReceiverReport(bool overTcp,int iTrackIndex){ pui8Rtcp_RR[15] = 0x00; //FIXME: max sequence received - int cycleCount = getCycleCount(iTrackIndex); + int cycleCount = getCycleCount(track_idx); pui8Rtcp_RR[16] = cycleCount >> 8; pui8Rtcp_RR[17] = cycleCount & 0xFF; pui8Rtcp_RR[18] = counter.pktCnt >> 8; pui8Rtcp_RR[19] = counter.pktCnt & 0xFF; - uint32_t jitter = htonl(getJitterSize(iTrackIndex)); + uint32_t jitter = htonl(getJitterSize(track_idx)); //FIXME: jitter memcpy(pui8Rtcp_RR + 20, &jitter , 4); /* last SR timestamp */ @@ -592,10 +593,10 @@ void RtspPlayer::sendReceiverReport(bool overTcp,int iTrackIndex){ memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname)); pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00; - if(overTcp){ + if (over_tcp) { send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp))); - }else if(_rtcp_sock[iTrackIndex]) { - _rtcp_sock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4); + } else if (_rtcp_sock[track_idx]) { + _rtcp_sock[track_idx]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4); } } @@ -613,24 +614,24 @@ void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){ } float RtspPlayer::getPacketLossRate(TrackType type) const{ - int iTrackIdx = getTrackIndexByTrackType(type); - if(iTrackIdx == -1){ + try { + auto track_idx = getTrackIndexByTrackType(type); + if (_rtp_seq_now[track_idx] - _rtp_seq_start[track_idx] + 1 == 0) { + return 0; + } + return 1.0 - (double) _rtp_recv_count[track_idx] / (_rtp_seq_now[track_idx] - _rtp_seq_start[track_idx] + 1); + } catch (...) { uint64_t totalRecv = 0; uint64_t totalSend = 0; for (unsigned int i = 0; i < _sdp_track.size(); i++) { totalRecv += _rtp_recv_count[i]; totalSend += (_rtp_seq_now[i] - _rtp_seq_start[i] + 1); } - if(totalSend == 0){ + if (totalSend == 0) { return 0; } - return 1.0 - (double)totalRecv / totalSend; + return 1.0 - (double) totalRecv / totalSend; } - - if(_rtp_seq_now[iTrackIdx] - _rtp_seq_start[iTrackIdx] + 1 == 0){ - return 0; - } - return 1.0 - (double)_rtp_recv_count[iTrackIdx] / (_rtp_seq_now[iTrackIdx] - _rtp_seq_start[iTrackIdx] + 1); } uint32_t RtspPlayer::getProgressMilliSecond() const{ @@ -705,26 +706,26 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC SockSender::send(printer << "\r\n"); } -void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &track) { +void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) { _rtp_recv_ticker.resetTime(); - onRecvRTP(pkt, track); + onRecvRTP(rtp, track); - int iTrackIndex = getTrackIndexByTrackType(pkt->type); - RtcpCounter &counter = _rtcp_counter[iTrackIndex]; - counter.pktCnt = pkt->sequence; - auto &ticker = _rtcp_send_ticker[iTrackIndex]; + int track_idx = getTrackIndexByTrackType(rtp->type); + RtcpCounter &counter = _rtcp_counter[track_idx]; + counter.pktCnt = rtp->sequence; + auto &ticker = _rtcp_send_ticker[track_idx]; if (ticker.elapsedTime() > 5 * 1000) { //send rtcp every 5 second counter.lastTimeStamp = counter.timeStamp; //直接保存网络字节序 - memcpy(&counter.timeStamp, pkt->data() + 8, 4); + memcpy(&counter.timeStamp, rtp->data() + 8, 4); if (counter.lastTimeStamp != 0) { - sendReceiverReport(_rtp_type == Rtsp::RTP_TCP, iTrackIndex); + sendReceiverReport(_rtp_type == Rtsp::RTP_TCP, track_idx); ticker.resetTime(); } //有些rtsp服务器需要rtcp保活,有些需要发送信令保活 - if (iTrackIndex == 0) { + if (track_idx == 0) { //只需要发送一次心跳信令包 sendKeepAlive(); } @@ -785,16 +786,16 @@ int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const { throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved); } -int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const { +int RtspPlayer::getTrackIndexByTrackType(TrackType track_type) const { for (unsigned int i = 0; i < _sdp_track.size(); i++) { - if (_sdp_track[i]->_type == trackType) { + if (_sdp_track[i]->_type == track_type) { return i; } } if (_sdp_track.size() == 1) { return 0; } - throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) trackType); + throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) track_type); } } /* namespace mediakit */ diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index 5a7b8efc..e1c485fa 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -36,16 +36,18 @@ class RtspPlayer: public PlayerBase,public TcpClient, public RtspSplitter, publi public: typedef std::shared_ptr Ptr; - RtspPlayer(const EventPoller::Ptr &poller) ; - virtual ~RtspPlayer(void); + RtspPlayer(const EventPoller::Ptr &poller); + ~RtspPlayer() override; + void play(const string &strUrl) override; - void pause(bool bPause) override; + void pause(bool pause_flag) override; void teardown() override; float getPacketLossRate(TrackType type) const override; + protected: //派生类回调函数 - virtual bool onCheckSDP(const string &strSdp) = 0; - virtual void onRecvRTP(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) = 0; + virtual bool onCheckSDP(const string &sdp) = 0; + virtual void onRecvRTP(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) = 0; uint32_t getProgressMilliSecond() const; void seekToMilliSecond(uint32_t ms); @@ -64,47 +66,48 @@ protected: /** * rtp数据包排序后输出 - * @param rtppt rtp数据包 - * @param trackidx track索引 + * @param rtp rtp数据包 + * @param track_idx track索引 */ - void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; - + void onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) override; /** * 收到RTCP包回调 - * @param iTrackidx - * @param track - * @param pucData - * @param uiLen + * @param track_idx track索引 + * @param track sdp相关信息 + * @param data rtcp内容 + * @param len rtcp内容长度 */ - virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen); + virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len); /////////////TcpClient override///////////// void onConnect(const SockException &err) override; - void onRecv(const Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &buf) override; void onErr(const SockException &ex) override; + private: - void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track); + void onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track); void onPlayResult_l(const SockException &ex , bool handshakeCompleted); int getTrackIndexByInterleaved(int interleaved) const; - int getTrackIndexByTrackType(TrackType trackType) const; + int getTrackIndexByTrackType(TrackType track_type) const; - void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex); + void handleResSETUP(const Parser &parser, unsigned int track_idx); void handleResDESCRIBE(const Parser &parser); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); void handleResPAUSE(const Parser &parser, int type); bool handleResponse(const string &cmd, const Parser &parser); void sendOptions(); - void sendSetup(unsigned int uiTrackIndex); + void sendSetup(unsigned int track_idx); void sendPause(int type , uint32_t ms); void sendDescribe(); void sendKeepAlive(); void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap()); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header); - void sendReceiverReport(bool overTcp,int iTrackIndex); + void sendReceiverReport(bool over_tcp, int track_idx); void createUdpSockIfNecessary(int track_idx); + private: string _play_url; vector _sdp_track; @@ -148,5 +151,4 @@ private: }; } /* namespace mediakit */ - #endif /* SRC_RTSPPLAYER_RTSPPLAYER_H_TXT_ */