diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 55a7c0e7..93a0929b 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -22,7 +22,6 @@ public: template MediaHelper(ArgsType &&...args){ _channel = std::make_shared(std::forward(args)...); - _poller = EventPollerPool::Instance().getPoller(); } ~MediaHelper(){} @@ -59,11 +58,6 @@ public: _on_regist_data = user_data; } - // 获取所属线程 - toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override { - return _poller; - } - protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override{ @@ -105,11 +99,6 @@ protected: return _on_speed(_on_speed_data, speed); } - // 观看总人数 - int totalReaderCount(MediaSource &sender) override{ - return _channel->totalReaderCount(); - } - void onRegist(MediaSource &sender, bool regist) override{ if (_on_regist) { _on_regist(_on_regist_data, &sender, regist); @@ -117,7 +106,6 @@ protected: } private: - EventPoller::Ptr _poller; DevChannel::Ptr _channel; on_mk_media_close _on_close = nullptr; on_mk_media_seek _on_seek = nullptr; diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 25c8dc8f..a53dcd43 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -592,7 +592,7 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string & /////////////////////////////////////MediaSourceEvent////////////////////////////////////// void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){ - if (size || totalReaderCount(sender)) { + if (size || sender.totalReaderCount()) { //还有人观看该视频,不触发关闭事件 _async_close_timer = nullptr; return; @@ -736,7 +736,6 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc return EventPollerPool::Instance().getPoller(); } - bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) { auto listener = _listener.lock(); if (!listener) { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index a6f9118f..2ea5204d 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -54,6 +54,14 @@ class MediaSource; class MediaSourceEvent { public: friend class MediaSource; + + class NotImplemented : public std::runtime_error { + public: + template + NotImplemented(T && ...args) : std::runtime_error(std::forward(args)...) {} + ~NotImplemented() override = default; + }; + MediaSourceEvent(){}; virtual ~MediaSourceEvent(){}; @@ -72,16 +80,16 @@ public: virtual bool speed(MediaSource &sender, float speed) { return false; } // 通知其停止产生流 virtual bool close(MediaSource &sender, bool force) { return false; } - // 获取观看总人数 - virtual int totalReaderCount(MediaSource &sender) = 0; + // 获取观看总人数,此函数一般强制重载 + virtual int totalReaderCount(MediaSource &sender) { throw NotImplemented(toolkit::demangle(typeid(*this).name()) + "::totalReaderCount not implemented"); } // 通知观看人数变化 virtual void onReaderChanged(MediaSource &sender, int size); //流注册或注销事件 virtual void onRegist(MediaSource &sender, bool regist) {}; // 获取丢包率 virtual int getLossRate(MediaSource &sender, TrackType type) { return -1; } - // 获取所在线程 - virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) = 0; + // 获取所在线程, 此函数一般强制重载 + virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) { throw NotImplemented(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller not implemented"); } ////////////////////////仅供MultiMediaSourceMuxer对象继承//////////////////////// // 开启或关闭录制 diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index a7d52106..291f0ef1 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -88,6 +88,7 @@ const std::string &MultiMediaSourceMuxer::getStreamId() const { } MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, const ProtocolOption &option) { + _poller = EventPollerPool::Instance().getPoller(); _vhost = vhost; _app = app; _stream_id = stream; @@ -187,7 +188,12 @@ int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) { if (!listener) { return totalReaderCount(); } - return listener->totalReaderCount(sender); + try { + return listener->totalReaderCount(sender); + } catch (MediaSourceEvent::NotImplemented &) { + //listener未重载totalReaderCount + return totalReaderCount(); + } } //此函数可能跨线程调用 @@ -290,6 +296,19 @@ vector MultiMediaSourceMuxer::getMediaTracks(MediaSource &sender, bo return getTracks(trackReady); } +EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) { + auto listener = getDelegate(); + if (!listener) { + return _poller; + } + try { + return listener->getOwnerPoller(sender); + } catch (MediaSourceEvent::NotImplemented &) { + // listener未重载getOwnerPoller + return _poller; + } +} + bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { if (CodecL16 == track->getCodecId()) { WarnL << "L16音频格式目前只支持RTSP协议推流拉流!!!"; diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index b513a989..d9ccb368 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -154,6 +154,11 @@ public: */ std::vector getMediaTracks(MediaSource &sender, bool trackReady = true) const override; + /** + * 获取所属线程 + */ + toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; + const std::string& getVhost() const; const std::string& getApp() const; const std::string& getStreamId() const; @@ -201,6 +206,7 @@ private: TSMediaSourceMuxer::Ptr _ts; MediaSinkInterface::Ptr _mp4; HlsRecorder::Ptr _hls; + toolkit::EventPoller::Ptr _poller; //对象个数统计 toolkit::ObjectStatistic _statistic; diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index bd50e4bf..688befc2 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -185,10 +185,6 @@ std::shared_ptr PlayerProxy::getOriginSock(MediaSource &sender) const return getSockInfo(); } -toolkit::EventPoller::Ptr PlayerProxy::getOwnerPoller(MediaSource &sender) { - return getPoller(); -} - void PlayerProxy::onPlaySuccess() { GET_CONFIG(bool, reset_when_replay, General::kResetWhenRePlay); if (dynamic_pointer_cast(_media_src)) { diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 2b192d0d..718f76a4 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -59,7 +59,6 @@ private: MediaOriginType getOriginType(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override; std::shared_ptr getOriginSock(MediaSource &sender) const override; - toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; void rePlay(const std::string &strUrl,int iFailedCnt); void onPlaySuccess(); diff --git a/src/Record/MP4Reader.cpp b/src/Record/MP4Reader.cpp index 29d92d79..f58299fa 100644 --- a/src/Record/MP4Reader.cpp +++ b/src/Record/MP4Reader.cpp @@ -239,10 +239,6 @@ bool MP4Reader::close(MediaSource &sender, bool force) { return true; } -int MP4Reader::totalReaderCount(MediaSource &sender) { - return _muxer ? _muxer->totalReaderCount() : sender.readerCount(); -} - MediaOriginType MP4Reader::getOriginType(MediaSource &sender) const { return MediaOriginType::mp4_vod; } diff --git a/src/Record/MP4Reader.h b/src/Record/MP4Reader.h index 889e9590..10a7ff35 100644 --- a/src/Record/MP4Reader.h +++ b/src/Record/MP4Reader.h @@ -56,7 +56,6 @@ private: bool speed(MediaSource &sender, float speed) override; bool close(MediaSource &sender,bool force) override; - int totalReaderCount(MediaSource &sender) override; MediaOriginType getOriginType(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index fea6f27d..2135fe9c 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -599,10 +599,6 @@ std::shared_ptr RtmpSession::getOriginSock(MediaSource &sender) const return const_cast(this)->shared_from_this(); } -toolkit::EventPoller::Ptr RtmpSession::getOwnerPoller(MediaSource &sender) { - return getPoller(); -} - void RtmpSession::setSocketFlags(){ GET_CONFIG(int, merge_write_ms, General::kMergeWriteMS); if (merge_write_ms > 0) { diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index c6367d35..7bce876c 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -80,7 +80,6 @@ private: std::string getOriginUrl(MediaSource &sender) const override; // 获取媒体源客户端相关信息 std::shared_ptr getOriginSock(MediaSource &sender) const override; - toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; void setSocketFlags(); std::string getStreamId(const std::string &str); diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index dad0bb98..4100cf8c 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -229,14 +229,6 @@ string RtpProcess::getIdentifier() const { return _media_info._streamid; } -int RtpProcess::getTotalReaderCount() { - return _muxer ? _muxer->totalReaderCount() : 0; -} - -void RtpProcess::setListener(const std::weak_ptr &listener) { - setDelegate(listener); -} - void RtpProcess::emitOnPublish() { weak_ptr weak_self = shared_from_this(); Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, const ProtocolOption &option) { diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 0d729b06..35bc9e6e 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -64,11 +64,8 @@ public: uint16_t get_peer_port() override; std::string getIdentifier() const override; - int getTotalReaderCount(); - void setListener(const std::weak_ptr &listener); - void setHelper(const std::weak_ptr help); - int getLossRate(MediaSource &sender, TrackType type) override; + protected: bool inputFrame(const Frame::Ptr &frame) override; bool addTrack(const Track::Ptr & track) override; @@ -80,6 +77,7 @@ protected: std::string getOriginUrl(MediaSource &sender) const override; std::shared_ptr getOriginSock(MediaSource &sender) const override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; + int getLossRate(MediaSource &sender, TrackType type) override; private: void emitOnPublish(); diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 92ef4030..507ec2fc 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -131,12 +131,12 @@ RtpProcessHelper::~RtpProcessHelper() { void RtpProcessHelper::attachEvent() { //主要目的是close回调触发时能把对象从RtpSelector中删除 - _process->setListener(shared_from_this()); + _process->setDelegate(shared_from_this()); } bool RtpProcessHelper::close(MediaSource &sender, bool force) { //此回调在其他线程触发 - if (!_process || (!force && _process->getTotalReaderCount())) { + if (!_process || (!force && _process->totalReaderCount(sender))) { return false; } auto parent = _parent.lock(); @@ -148,14 +148,6 @@ bool RtpProcessHelper::close(MediaSource &sender, bool force) { return true; } -int RtpProcessHelper::totalReaderCount(MediaSource &sender) { - return _process ? _process->getTotalReaderCount() : sender.totalReaderCount(); -} - -toolkit::EventPoller::Ptr RtpProcessHelper::getOwnerPoller(MediaSource &sender) { - return toolkit::EventPollerPool::Instance().getPoller(); -} - RtpProcess::Ptr &RtpProcessHelper::getProcess() { return _process; } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 7c7fd7c6..0befef6b 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -32,15 +32,11 @@ public: protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override; - // 观看总人数 - int totalReaderCount(MediaSource &sender) override; - // 获取所属线程 - toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; private: - std::weak_ptr _parent; - RtpProcess::Ptr _process; std::string _stream_id; + RtpProcess::Ptr _process; + std::weak_ptr _parent; }; class RtpSelector : public std::enable_shared_from_this{ diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index a41a56b0..4b3213e6 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -101,7 +101,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { } //tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess _process = RtpSelector::Instance().getProcess(_stream_id, true); - _process->setListener(dynamic_pointer_cast(shared_from_this())); + _process->setDelegate(dynamic_pointer_cast(shared_from_this())); } try { uint32_t rtp_ssrc = 0; @@ -126,7 +126,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { bool RtpSession::close(MediaSource &sender, bool force) { //此回调在其他线程触发 - if(!_process || (!force && _process->getTotalReaderCount())){ + if(!_process || (!force && static_pointer_cast(_process)->totalReaderCount(sender))){ return false; } string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; @@ -134,15 +134,6 @@ bool RtpSession::close(MediaSource &sender, bool force) { return true; } -int RtpSession::totalReaderCount(MediaSource &sender) { - //此回调在其他线程触发 - return _process ? _process->getTotalReaderCount() : sender.totalReaderCount(); -} - -toolkit::EventPoller::Ptr RtpSession::getOwnerPoller(MediaSource &sender) { - return getPoller(); -} - static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) { //rtp前面必须预留两个字节的长度字段 for (ssize_t i = 2; i <= len - 4; ++i) { diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 7ef5343e..842ea53c 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -36,13 +36,9 @@ public: protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override; - // 观看总人数 - int totalReaderCount(MediaSource &sender) override; - // 获取所属线程 - toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; // 收到rtp回调 void onRtpPacket(const char *data, size_t len) override; - + // RtpSplitter override const char *onSearchPacketTail(const char *data, size_t len) override; private: diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 125bcc2f..5d7d675b 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1154,10 +1154,6 @@ std::shared_ptr RtspSession::getOriginSock(MediaSource &sender) const return const_cast(this)->shared_from_this(); } -toolkit::EventPoller::Ptr RtspSession::getOwnerPoller(MediaSource &sender) { - return getPoller(); -} - void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){ updateRtcpContext(rtp); } diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 3f8ba826..b07428aa 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -91,7 +91,6 @@ protected: std::string getOriginUrl(MediaSource &sender) const override; // 获取媒体源客户端相关信息 std::shared_ptr getOriginSock(MediaSource &sender) const override; - toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; /////TcpSession override//// ssize_t send(toolkit::Buffer::Ptr pkt) override; diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index be8daec9..aebb6cbf 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -129,11 +129,6 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force) { return true; } -// 播放总人数 -int SrtTransportImp::totalReaderCount(mediakit::MediaSource &sender) { - return _muxer ? _muxer->totalReaderCount() : sender.readerCount(); -} - // 获取媒体源类型 mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &sender) const { return MediaOriginType::srt_push; @@ -149,11 +144,6 @@ std::shared_ptr SrtTransportImp::getOriginSock(mediakit::MediaSource & return static_pointer_cast(getSession()); } -toolkit::EventPoller::Ptr SrtTransportImp::getOwnerPoller(MediaSource &sender){ - auto session = getSession(); - return session ? session->getPoller() : EventPollerPool::Instance().getPoller(); -} - void SrtTransportImp::emitOnPublish() { std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); Broadcast::PublishAuthInvoker invoker = [weak_self](const std::string &err, const ProtocolOption &option) { diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index 3ff24901..987dee8e 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -51,16 +51,12 @@ protected: ///////MediaSourceEvent override/////// // 关闭 bool close(mediakit::MediaSource &sender, bool force) override; - // 播放总人数 - int totalReaderCount(mediakit::MediaSource &sender) override; // 获取媒体源类型 mediakit::MediaOriginType getOriginType(mediakit::MediaSource &sender) const override; // 获取媒体源url或者文件路径 std::string getOriginUrl(mediakit::MediaSource &sender) const override; // 获取媒体源客户端相关信息 std::shared_ptr getOriginSock(mediakit::MediaSource &sender) const override; - // get poller - toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; ///////MediaSinkInterface override/////// void resetTracks() override {}; diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 2de44e9f..89dd456b 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -150,8 +150,4 @@ void WebRtcPusher::onRtcConfigure(RtcConfigure &configure) const { int WebRtcPusher::getLossRate(MediaSource &sender,mediakit::TrackType type){ return WebRtcTransportImp::getLossRate(type); -} - -toolkit::EventPoller::Ptr WebRtcPusher::getOwnerPoller(mediakit::MediaSource &sender) { - return getPoller(); } \ No newline at end of file diff --git a/webrtc/WebRtcPusher.h b/webrtc/WebRtcPusher.h index 4580684b..d1f23347 100644 --- a/webrtc/WebRtcPusher.h +++ b/webrtc/WebRtcPusher.h @@ -41,8 +41,6 @@ protected: std::shared_ptr getOriginSock(mediakit::MediaSource &sender) const override; // 获取丢包率 int getLossRate(mediakit::MediaSource &sender,mediakit::TrackType type) override; - // 获取MediaSource归属线程 - toolkit::EventPoller::Ptr getOwnerPoller(mediakit::MediaSource &sender) override; private: WebRtcPusher(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src,