From 520945c2e9f3ee4631f9c1663f6dc987af7d24e7 Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Mon, 31 Oct 2022 17:53:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E9=AB=98=E7=BA=BF=E7=A8=8B=E5=AE=89?= =?UTF-8?q?=E5=85=A8=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Common/MediaSource.cpp | 18 +++---- src/Common/MediaSource.h | 77 ++++++++++++---------------- src/Common/MultiMediaSourceMuxer.cpp | 9 +++- src/Common/MultiMediaSourceMuxer.h | 1 + src/Rtmp/RtmpSession.cpp | 4 ++ src/Rtmp/RtmpSession.h | 2 + src/Rtp/RtpProcess.cpp | 16 ++++-- src/Rtsp/RtspSession.cpp | 4 ++ src/Rtsp/RtspSession.h | 2 + webrtc/WebRtcPusher.cpp | 4 ++ webrtc/WebRtcPusher.h | 2 + 11 files changed, 81 insertions(+), 58 deletions(-) diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index d1b18f52..b1556efd 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -70,7 +70,6 @@ MediaSource::MediaSource(const string &schema, const string &vhost, const string _app = app; _stream_id = stream_id; _create_stamp = time(NULL); - _default_poller = EventPollerPool::Instance().getPoller(); } MediaSource::~MediaSource() { @@ -233,22 +232,23 @@ toolkit::EventPoller::Ptr MediaSource::getOwnerPoller() { toolkit::EventPoller::Ptr ret; auto listener = _listener.lock(); if (listener) { - ret = listener->getOwnerPoller(*this); + return listener->getOwnerPoller(*this); } - return ret ? ret : _default_poller; + throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed:" + getUrl()); } void MediaSource::onReaderChanged(int size) { weak_ptr weak_self = shared_from_this(); - getOwnerPoller()->async([weak_self, size]() { + auto listener = _listener.lock(); + if (!listener) { + return; + } + getOwnerPoller()->async([weak_self, size, listener]() { auto strong_self = weak_self.lock(); if (!strong_self) { return; } - auto listener = strong_self->_listener.lock(); - if (listener) { - listener->onReaderChanged(*strong_self, size); - } + listener->onReaderChanged(*strong_self, size); }); } @@ -729,7 +729,7 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc if (listener) { return listener->getOwnerPoller(sender); } - return EventPollerPool::Instance().getPoller(); + throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed"); } bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index fe61d685..3e3d45e3 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -24,9 +24,9 @@ #include "Extension/Track.h" #include "Record/Recorder.h" -namespace toolkit{ - class Session; -}// namespace toolkit +namespace toolkit { +class Session; +} // namespace toolkit namespace mediakit { @@ -57,8 +57,8 @@ public: ~NotImplemented() override = default; }; - MediaSourceEvent(){}; - virtual ~MediaSourceEvent(){}; + MediaSourceEvent() {}; + virtual ~MediaSourceEvent() {}; // 获取媒体源类型 virtual MediaOriginType getOriginType(MediaSource &sender) const { return MediaOriginType::unknown; } @@ -135,10 +135,10 @@ private: }; //该对象用于拦截感兴趣的MediaSourceEvent事件 -class MediaSourceEventInterceptor : public MediaSourceEvent{ +class MediaSourceEventInterceptor : public MediaSourceEvent { public: - MediaSourceEventInterceptor(){} - ~MediaSourceEventInterceptor() override {} + MediaSourceEventInterceptor() = default; + ~MediaSourceEventInterceptor() override = default; void setDelegate(const std::weak_ptr &listener); std::shared_ptr getDelegate() const; @@ -169,23 +169,20 @@ private: /** * 解析url获取媒体相关信息 */ -class MediaInfo{ +class MediaInfo { public: - ~MediaInfo() {} - MediaInfo() {} + ~MediaInfo() = default; + MediaInfo() = default; MediaInfo(const std::string &url) { parse(url); } void parse(const std::string &url); - std::string shortUrl() const { - return _vhost + "/" + _app + "/" + _streamid; - } - std::string getUrl() const { - return _schema + "://" + shortUrl(); - } + std::string shortUrl() const { return _vhost + "/" + _app + "/" + _streamid; } + std::string getUrl() const { return _schema + "://" + shortUrl(); } + public: + uint16_t _port = 0; std::string _full_url; std::string _schema; std::string _host; - uint16_t _port = 0; std::string _vhost; std::string _app; std::string _streamid; @@ -200,7 +197,7 @@ public: static MediaSource& NullMediaSource(); using Ptr = std::shared_ptr; - MediaSource(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &stream_id) ; + MediaSource(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &stream_id); virtual ~MediaSource(); ////////////////获取MediaSource相关信息//////////////// @@ -214,13 +211,10 @@ public: // 流id const std::string& getId() const; - std::string shortUrl() const { - return _vhost + "/" + _app + "/" + _stream_id; - } - std::string getUrl() const { - return _schema + "://" + shortUrl(); - } - + std::string shortUrl() const { return _vhost + "/" + _app + "/" + _stream_id; } + + std::string getUrl() const { return _schema + "://" + shortUrl(); } + //获取对象所有权 std::shared_ptr getOwnership(); @@ -235,7 +229,7 @@ public: // 获取数据速率,单位bytes/s int getBytesSpeed(TrackType type = TrackInvalid); // 获取流创建GMT unix时间戳,单位秒 - uint64_t getCreateStamp() const {return _create_stamp;} + uint64_t getCreateStamp() const { return _create_stamp; } // 获取流上线时间,单位秒 uint64_t getAliveSecond() const; @@ -266,9 +260,9 @@ public: // 拖动进度条 bool seekTo(uint32_t stamp); - //暂停 + // 暂停 bool pause(bool pause); - //倍数播放 + // 倍数播放 bool speed(float speed); // 关闭该流 bool close(bool force); @@ -310,9 +304,9 @@ protected: void regist(); private: - //媒体注销 + // 媒体注销 bool unregist(); - //触发媒体事件 + // 触发媒体事件 void emitEvent(bool regist); protected: @@ -327,12 +321,11 @@ private: std::string _app; std::string _stream_id; std::weak_ptr _listener; - toolkit::EventPoller::Ptr _default_poller; - //对象个数统计 + // 对象个数统计 toolkit::ObjectStatistic _statistic; }; -///缓存刷新策略类 +/// 缓存刷新策略类 class FlushPolicy { public: FlushPolicy() = default; @@ -342,7 +335,7 @@ public: private: // 音视频的最后时间戳 - uint64_t _last_stamp[2] = {0, 0}; + uint64_t _last_stamp[2] = { 0, 0 }; }; /// 合并写缓存模板 @@ -352,9 +345,7 @@ private: template > > class PacketCache { public: - PacketCache(){ - _cache = std::make_shared(); - } + PacketCache() { _cache = std::make_shared(); } virtual ~PacketCache() = default; @@ -392,15 +383,15 @@ public: private: bool flushImmediatelyWhenCloseMerge() { - //一般的协议关闭合并写时,立即刷新缓存,这样可以减少一帧的延时,但是rtp例外 - //因为rtp的包很小,一个RtpPacket包中也不是完整的一帧图像,所以在关闭合并写时, - //还是有必要缓冲一帧的rtp(也就是时间戳相同的rtp)再输出,这样虽然会增加一帧的延时 - //但是却对性能提升很大,这样做还是比较划算的 + // 一般的协议关闭合并写时,立即刷新缓存,这样可以减少一帧的延时,但是rtp例外 + // 因为rtp的包很小,一个RtpPacket包中也不是完整的一帧图像,所以在关闭合并写时, + // 还是有必要缓冲一帧的rtp(也就是时间戳相同的rtp)再输出,这样虽然会增加一帧的延时 + // 但是却对性能提升很大,这样做还是比较划算的 GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); GET_CONFIG(int, rtspLowLatency, Rtsp::kLowLatency); - if(std::is_same::value && rtspLowLatency){ + if (std::is_same::value && rtspLowLatency) { return true; } diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 565bd266..0b540390 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -91,6 +91,7 @@ const std::string &MultiMediaSourceMuxer::getStreamId() const { MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, const ProtocolOption &option) { _poller = EventPollerPool::Instance().getPoller(); + _create_in_poller = _poller->isCurrentThread(); _vhost = vhost; _app = app; _stream_id = stream; @@ -310,7 +311,12 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) { return _poller; } try { - return listener->getOwnerPoller(sender); + auto ret = listener->getOwnerPoller(sender); + if (ret != _poller) { + WarnL << "OwnerPoller changed:" << _get_origin_url(); + _poller = ret; + } + return ret; } catch (MediaSourceEvent::NotImplemented &) { // listener未重载getOwnerPoller return _poller; @@ -348,6 +354,7 @@ bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { } void MultiMediaSourceMuxer::onAllTrackReady() { + CHECK(!_create_in_poller || getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread()); setMediaListener(getDelegate()); if (_rtmp) { diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 6d34feaa..245a55f5 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -217,6 +217,7 @@ protected: private: bool _is_enable = false; + bool _create_in_poller = false; std::string _vhost; std::string _app; std::string _stream_id; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 1e1e86ae..fba56f3d 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -592,6 +592,10 @@ std::shared_ptr RtmpSession::getOriginSock(MediaSource &sender) const return const_cast(this)->shared_from_this(); } +toolkit::EventPoller::Ptr RtmpSession::getOwnerPoller(MediaSource &sender) { + return getPoller(); +} + void RtmpSession::setSocketFlags(){ GET_CONFIG(int, merge_write_ms, General::kMergeWriteMS); if (merge_write_ms > 0) { diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 0fbd4eaa..16ea4d84 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -80,6 +80,8 @@ private: std::string getOriginUrl(MediaSource &sender) const override; // 获取媒体源客户端相关信息 std::shared_ptr getOriginSock(MediaSource &sender) const override; + // 由于支持断连续推,存在OwnerPoller变更的可能 + toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; void setSocketFlags(); std::string getStreamId(const std::string &str); diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 099bd876..e4ca2dac 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -66,11 +66,14 @@ RtpProcess::~RtpProcess() { } bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) { - if (!_sock) { - //第一次运行本函数 + if (_sock != sock) { + // 第一次运行本函数 + bool first = !_sock; _sock = sock; _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); - emitOnPublish(); + if (first) { + emitOnPublish(); + } } _total_bytes += len; @@ -228,7 +231,7 @@ void RtpProcess::emitOnPublish() { if (!strong_self) { return; } - auto poller = strong_self->_sock ? strong_self->_sock->getPoller() : EventPollerPool::Instance().getPoller(); + auto poller = strong_self->getOwnerPoller(MediaSource::NullMediaSource()); poller->async([weak_self, err, option]() { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -269,7 +272,10 @@ std::shared_ptr RtpProcess::getOriginSock(MediaSource &sender) const { } toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) { - return _sock ? _sock->getPoller() : EventPollerPool::Instance().getPoller(); + if (_sock) { + return _sock->getPoller(); + } + throw std::runtime_error("RtpProcess::getOwnerPoller failed:" + _media_info._streamid); } float RtpProcess::getLossRate(MediaSource &sender, TrackType type) { diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 9321215c..b204c03d 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1159,6 +1159,10 @@ std::shared_ptr RtspSession::getOriginSock(MediaSource &sender) const return const_cast(this)->shared_from_this(); } +toolkit::EventPoller::Ptr RtspSession::getOwnerPoller(MediaSource &sender) { + return getPoller(); +} + void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){ updateRtcpContext(rtp); } diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index b5e50a2a..da3701fe 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -91,6 +91,8 @@ protected: std::string getOriginUrl(MediaSource &sender) const override; // 获取媒体源客户端相关信息 std::shared_ptr getOriginSock(MediaSource &sender) const override; + // 由于支持断连续推,存在OwnerPoller变更的可能 + toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; /////TcpSession override//// ssize_t send(toolkit::Buffer::Ptr pkt) override; diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 33622ca8..ca9115c8 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -74,6 +74,10 @@ std::shared_ptr WebRtcPusher::getOriginSock(MediaSource &sender) const return static_pointer_cast(getSession()); } +toolkit::EventPoller::Ptr WebRtcPusher::getOwnerPoller(MediaSource &sender) { + return getPoller(); +} + void WebRtcPusher::onRecvRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) { if (!_simulcast) { assert(_push_src); diff --git a/webrtc/WebRtcPusher.h b/webrtc/WebRtcPusher.h index 9c71420f..fee55e11 100644 --- a/webrtc/WebRtcPusher.h +++ b/webrtc/WebRtcPusher.h @@ -44,6 +44,8 @@ protected: std::string getOriginUrl(MediaSource &sender) const override; // 获取媒体源客户端相关信息 std::shared_ptr getOriginSock(MediaSource &sender) const override; + // 由于支持断连续推,存在OwnerPoller变更的可能 + toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; // 获取丢包率 float getLossRate(MediaSource &sender,TrackType type) override;