From 20f1275c58e289edb08a13314ad13dc2ca2a41ad Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 3 Sep 2022 15:53:01 +0800 Subject: [PATCH] =?UTF-8?q?rtsp=E6=8B=89=E6=B5=81=E3=80=81rtp=E5=8D=95?= =?UTF-8?q?=E7=AB=AF=E5=8F=A3=E6=8E=A8=E6=B5=81=E6=96=B0=E5=A2=9E=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=8E=B7=E5=8F=96=E4=B8=A2=E5=8C=85=E7=8E=87:=20#1877?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Common/MediaSource.cpp | 4 ++-- src/Common/MediaSource.h | 6 +++--- src/Player/PlayerBase.h | 2 +- src/Player/PlayerProxy.cpp | 4 ++++ src/Player/PlayerProxy.h | 1 + src/Rtp/RtpProcess.cpp | 23 +++++++++-------------- src/Rtp/RtpProcess.h | 7 ++----- src/Rtp/RtpServer.cpp | 16 +++++++--------- webrtc/WebRtcPusher.cpp | 2 +- webrtc/WebRtcPusher.h | 2 +- webrtc/WebRtcTransport.cpp | 6 +++--- webrtc/WebRtcTransport.h | 2 +- 12 files changed, 35 insertions(+), 40 deletions(-) diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index a53dcd43..810aead7 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -219,7 +219,7 @@ bool MediaSource::close(bool force) { return listener->close(*this,force); } -int MediaSource::getLossRate(mediakit::TrackType type) { +float MediaSource::getLossRate(mediakit::TrackType type) { auto listener = _listener.lock(); if (!listener) { return -1; @@ -720,7 +720,7 @@ void MediaSourceEventInterceptor::onRegist(MediaSource &sender, bool regist) { } } -int MediaSourceEventInterceptor::getLossRate(MediaSource &sender, TrackType type){ +float MediaSourceEventInterceptor::getLossRate(MediaSource &sender, TrackType type){ auto listener = _listener.lock(); if (listener) { return listener->getLossRate(sender, type); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index fbeaa115..7c7a96b1 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -87,7 +87,7 @@ public: //流注册或注销事件 virtual void onRegist(MediaSource &sender, bool regist) {}; // 获取丢包率 - virtual int getLossRate(MediaSource &sender, TrackType type) { return -1; } + virtual float getLossRate(MediaSource &sender, TrackType type) { return -1; } // 获取所在线程, 此函数一般强制重载 virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) { throw NotImplemented(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller not implemented"); } @@ -164,7 +164,7 @@ public: std::vector getMediaTracks(MediaSource &sender, bool trackReady = true) const override; void startSendRtp(MediaSource &sender, const SendRtpArgs &args, const std::function cb) override; bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override; - int getLossRate(MediaSource &sender, TrackType type) override; + float getLossRate(MediaSource &sender, TrackType type) override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; private: @@ -280,7 +280,7 @@ public: // 停止发送ps-rtp bool stopSendRtp(const std::string &ssrc); // 获取丢包率 - int getLossRate(mediakit::TrackType type); + float getLossRate(mediakit::TrackType type); // 获取所在线程 toolkit::EventPoller::Ptr getOwnerPoller(); diff --git a/src/Player/PlayerBase.h b/src/Player/PlayerBase.h index b6c0d080..d0eca682 100644 --- a/src/Player/PlayerBase.h +++ b/src/Player/PlayerBase.h @@ -89,7 +89,7 @@ public: * 获取丢包率,只支持rtsp * @param type 音频或视频,TrackInvalid时为总丢包率 */ - virtual float getPacketLossRate(TrackType type) const { return 0; }; + virtual float getPacketLossRate(TrackType type) const { return -1; }; /** * 获取所有track diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 688befc2..c79ef642 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -185,6 +185,10 @@ std::shared_ptr PlayerProxy::getOriginSock(MediaSource &sender) const return getSockInfo(); } +float PlayerProxy::getLossRate(MediaSource &sender, TrackType type) { + return getPacketLossRate(type); +} + void PlayerProxy::onPlaySuccess() { GET_CONFIG(bool, reset_when_replay, General::kResetWhenRePlay); if (dynamic_pointer_cast(_media_src)) { diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 718f76a4..6e3a2401 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -59,6 +59,7 @@ private: MediaOriginType getOriginType(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override; std::shared_ptr getOriginSock(MediaSource &sender) const override; + float getLossRate(MediaSource &sender, TrackType type) override; void rePlay(const std::string &strUrl,int iFailedCnt); void onPlaySuccess(); diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 4100cf8c..0e1822f8 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -95,6 +95,9 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data _process = std::make_shared(_media_info, this); } + auto header = (RtpHeader *) data; + onRtp(ntohs(header->seq), ntohl(header->stamp), 0/*不发送sr,所以可以设置为0*/ , 90000/*ps/ts流时间戳按照90K采样率*/, len); + GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); if (_muxer && !_muxer->isEnabled() && !dts_out && dump_dir.empty()) { //无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据 @@ -280,20 +283,12 @@ toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) { return _sock ? _sock->getPoller() : EventPollerPool::Instance().getPoller(); } -void RtpProcess::setHelper(std::weak_ptr help) { - _help = std::move(help); -} - -int RtpProcess::getLossRate(MediaSource &sender, TrackType type) { - auto help = _help.lock(); - if (!help) { - return -1; - } - auto expected = help->getExpectedPacketsInterval(); - if (!expected) { - return 0; - } - return help->geLostInterval() * 100 / expected; +float RtpProcess::getLossRate(MediaSource &sender, TrackType type) { + auto expected = getExpectedPacketsInterval(); + if (!expected) { + return -1; + } + return geLostInterval() * 100 / expected; } }//namespace mediakit diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 35bc9e6e..2c27c143 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -18,7 +18,7 @@ namespace mediakit { -class RtpProcess : public toolkit::SockInfo, public MediaSinkInterface, public MediaSourceEventInterceptor, public std::enable_shared_from_this{ +class RtpProcess : public RtcpContextForRecv, public toolkit::SockInfo, public MediaSinkInterface, public MediaSourceEventInterceptor, public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; friend class RtpProcessHelper; @@ -64,8 +64,6 @@ public: uint16_t get_peer_port() override; std::string getIdentifier() const override; - void setHelper(const std::weak_ptr help); - protected: bool inputFrame(const Frame::Ptr &frame) override; bool addTrack(const Track::Ptr & track) override; @@ -77,7 +75,7 @@ protected: std::string getOriginUrl(MediaSource &sender) const override; std::shared_ptr getOriginSock(MediaSource &sender) const override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; - int getLossRate(MediaSource &sender, TrackType type) override; + float getLossRate(MediaSource &sender, TrackType type) override; private: void emitOnPublish(); @@ -100,7 +98,6 @@ private: toolkit::Ticker _last_check_alive; std::recursive_mutex _func_mtx; std::deque > _cached_func; - std::weak_ptr _help; }; }//namespace mediakit diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 0d47ff88..8c596357 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -27,19 +27,18 @@ RtpServer::~RtpServer() { } } -class RtcpHelper : public RtcpContextForRecv, public std::enable_shared_from_this { +class RtcpHelper: public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - RtcpHelper(Socket::Ptr rtcp_sock, uint32_t sample_rate) { + RtcpHelper(Socket::Ptr rtcp_sock, RtpProcess::Ptr process) { _rtcp_sock = std::move(rtcp_sock); - _sample_rate = sample_rate; + _process = std::move(process); } void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){ //统计rtp接受情况,用于发送rr包 auto header = (RtpHeader *) buf->data(); - onRtp(ntohs(header->seq), ntohl(header->stamp), 0/*不发送sr,所以可以设置为0*/ , _sample_rate, buf->size()); sendRtcp(ntohl(header->ssrc), addr, addr_len); } @@ -58,7 +57,7 @@ public: } auto rtcps = RtcpHeader::loadFromBytes(buf->data(), buf->size()); for (auto &rtcp : rtcps) { - strong_self->onRtcp(rtcp); + strong_self->_process->onRtcp(rtcp); } }); } @@ -81,13 +80,13 @@ private: //未收到rtcp打洞包时,采用默认的rtcp端口 rtcp_addr = addr; } - _rtcp_sock->send(createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr, addr_len); + _rtcp_sock->send(_process->createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr, addr_len); } private: Ticker _ticker; Socket::Ptr _rtcp_sock; - uint32_t _sample_rate; + RtpProcess::Ptr _process; std::shared_ptr _rtcp_addr; }; @@ -126,8 +125,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_ if (!stream_id.empty()) { //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) process = RtpSelector::Instance().getProcess(stream_id, true); - RtcpHelper::Ptr helper = std::make_shared(std::move(rtcp_socket), 90000); - process->setHelper(helper); + RtcpHelper::Ptr helper = std::make_shared(std::move(rtcp_socket), process); helper->startRtcp(); rtp_socket->setOnRead([rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { RtpHeader *header = (RtpHeader *)buf->data(); diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 89dd456b..f73864b3 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -148,6 +148,6 @@ void WebRtcPusher::onRtcConfigure(RtcConfigure &configure) const { configure.audio.direction = configure.video.direction = RtpDirection::recvonly; } -int WebRtcPusher::getLossRate(MediaSource &sender,mediakit::TrackType type){ +float WebRtcPusher::getLossRate(MediaSource &sender,mediakit::TrackType type){ return WebRtcTransportImp::getLossRate(type); } \ No newline at end of file diff --git a/webrtc/WebRtcPusher.h b/webrtc/WebRtcPusher.h index d1f23347..a3e78e00 100644 --- a/webrtc/WebRtcPusher.h +++ b/webrtc/WebRtcPusher.h @@ -40,7 +40,7 @@ protected: // 获取媒体源客户端相关信息 std::shared_ptr getOriginSock(mediakit::MediaSource &sender) const override; // 获取丢包率 - int getLossRate(mediakit::MediaSource &sender,mediakit::TrackType type) override; + float getLossRate(mediakit::MediaSource &sender,mediakit::TrackType type) override; private: WebRtcPusher(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src, diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 9f1dc284..31bce08c 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -650,10 +650,10 @@ public: return _rtcp_context.createRtcpRR(ssrc, getSSRC()); } - int getLossRate() { + float getLossRate() { auto expected = _rtcp_context.getExpectedPacketsInterval(); if (!expected) { - return 0; + return -1; } return _rtcp_context.geLostInterval() * 100 / expected; } @@ -698,7 +698,7 @@ std::shared_ptr MediaTrack::getRtpChannel(uint32_t ssrc) const { return it_chn->second; } -int WebRtcTransportImp::getLossRate(mediakit::TrackType type) { +float WebRtcTransportImp::getLossRate(mediakit::TrackType type) { for (auto &pr : _ssrc_to_track) { auto ssrc = pr.first; auto &track = pr.second; diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 23f5880b..01d842f6 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -263,7 +263,7 @@ protected: void onShutdown(const SockException &ex) override; virtual void onRecvRtp(MediaTrack &track, const std::string &rid, mediakit::RtpPacket::Ptr rtp) = 0; void updateTicker(); - int getLossRate(mediakit::TrackType type); + float getLossRate(mediakit::TrackType type); private: void onSortedRtp(MediaTrack &track, const std::string &rid, mediakit::RtpPacket::Ptr rtp);