diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index aacd1352..85e68656 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -299,5 +299,13 @@ uint16_t RtpProcess::get_peer_port() { return ntohs(((struct sockaddr_in *) _addr)->sin_port); } +int RtpProcess::totalReaderCount(){ + return _muxer->totalReaderCount(); +} + +void RtpProcess::setListener(const std::weak_ptr &listener){ + _muxer->setListener(listener); +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 7053956e..c9fb7397 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -49,6 +49,9 @@ public: bool alive(); string get_peer_ip(); uint16_t get_peer_port(); + + int totalReaderCount(); + void setListener(const std::weak_ptr &listener); protected: void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; void onRtpDecode(const void *packet, int bytes, uint32_t timestamp, int flags) override; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index bc8e7924..e30d4af3 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -63,11 +63,12 @@ RtpProcess::Ptr RtpSelector::getProcess(uint32_t ssrc,bool makeNew) { if(it == _map_rtp_process.end() && !makeNew){ return nullptr; } - RtpProcess::Ptr &ref = _map_rtp_process[ssrc]; + RtpProcessHelper::Ptr &ref = _map_rtp_process[ssrc]; if(!ref){ - ref = std::make_shared(ssrc); + ref = std::make_shared(ssrc,shared_from_this()); + ref->attachEvent(); } - return ref; + return ref->getProcess(); } void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) { @@ -77,7 +78,7 @@ void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) { return; } - if(it->second.get() != ptr){ + if(it->second->getProcess().get() != ptr){ return; } @@ -87,7 +88,7 @@ void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) { void RtpSelector::onManager() { lock_guard lck(_mtx_map); for (auto it = _map_rtp_process.begin(); it != _map_rtp_process.end();) { - if (it->second->alive()) { + if (it->second->getProcess()->alive()) { ++it; continue; } @@ -102,5 +103,47 @@ RtpSelector::RtpSelector() { RtpSelector::~RtpSelector() { } +RtpProcessHelper::RtpProcessHelper(uint32_t ssrc, const weak_ptr &parent) { + _ssrc = ssrc; + _parent = parent; + _process = std::make_shared(_ssrc); +} + +RtpProcessHelper::~RtpProcessHelper() { +} + +void RtpProcessHelper::attachEvent() { + _process->setListener(shared_from_this()); +} + +bool RtpProcessHelper::close(MediaSource &sender, bool force) { + //此回调在其他线程触发 + if(!_process || (!force && _process->totalReaderCount())){ + return false; + } + auto parent = _parent.lock(); + if(!parent){ + return false; + } + parent->delProcess(_ssrc,_process.get()); + string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + return true; +} + +void RtpProcessHelper::onNoneReader(MediaSource &sender) { + if(!_process || _process->totalReaderCount()){ + return; + } + MediaSourceEvent::onNoneReader(sender); +} + +int RtpProcessHelper::totalReaderCount(MediaSource &sender) { + return _process ? _process->totalReaderCount() : sender.totalReaderCount(); +} + +RtpProcess::Ptr &RtpProcessHelper::getProcess() { + return _process; +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index a701a83d..f1b4b97f 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -32,9 +32,31 @@ #include #include #include "RtpProcess.h" +#include "Common/MediaSource.h" namespace mediakit{ +class RtpSelector; +class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this { +public: + typedef std::shared_ptr Ptr; + RtpProcessHelper(uint32_t ssrc,const weak_ptr &parent); + ~RtpProcessHelper(); + void attachEvent(); + RtpProcess::Ptr & getProcess(); +protected: + // 通知其停止推流 + bool close(MediaSource &sender,bool force) override; + // 通知无人观看 + void onNoneReader(MediaSource &sender) override; + // 观看总人数 + int totalReaderCount(MediaSource &sender) override; +private: + weak_ptr _parent; + RtpProcess::Ptr _process; + uint32_t _ssrc = 0; +}; + class RtpSelector : public std::enable_shared_from_this{ public: RtpSelector(); @@ -48,7 +70,7 @@ public: private: void onManager(); private: - unordered_map _map_rtp_process; + unordered_map _map_rtp_process; recursive_mutex _mtx_map; Ticker _last_rtp_time; }; diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index f807a85d..e638b88e 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -71,10 +71,34 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) { return; } _process = RtpSelector::Instance().getProcess(_ssrc, true); + _process->setListener(dynamic_pointer_cast(shared_from_this())); } _process->inputRtp(data + 2, len - 2, &addr); _ticker.resetTime(); } +bool RtpSession::close(MediaSource &sender, bool force) { + //此回调在其他线程触发 + if(!_process || (!force && _process->totalReaderCount())){ + return false; + } + string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + safeShutdown(SockException(Err_shutdown,err)); + return true; +} + +void RtpSession::onNoneReader(MediaSource &sender) { + //此回调在其他线程触发 + if(!_process || _process->totalReaderCount()){ + return; + } + MediaSourceEvent::onNoneReader(sender); +} + +int RtpSession::totalReaderCount(MediaSource &sender) { + //此回调在其他线程触发 + return _process ? _process->totalReaderCount() : sender.totalReaderCount(); +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index ff8771a1..e8d2afed 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -36,14 +36,20 @@ using namespace toolkit; namespace mediakit{ -class RtpSession : public TcpSession , public RtpSplitter{ +class RtpSession : public TcpSession , public RtpSplitter , public MediaSourceEvent{ public: RtpSession(const Socket::Ptr &sock); ~RtpSession() override; void onRecv(const Buffer::Ptr &) override; void onError(const SockException &err) override; void onManager() override; -private: +protected: + // 通知其停止推流 + bool close(MediaSource &sender,bool force) override; + // 通知无人观看 + void onNoneReader(MediaSource &sender) override; + // 观看总人数 + int totalReaderCount(MediaSource &sender) override; void onRtpPacket(const char *data,uint64_t len) override; private: uint32_t _ssrc = 0;