diff --git a/.gitignore b/.gitignore index f5ec7af1..61eb6410 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,4 @@ /3rdpart/media-server/.idea/ /build/ /3rdpart/media-server/.idea/ +/ios/ diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 210034d3..a63e8d15 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -71,15 +71,6 @@ protected: return true; } - // 通知无人观看 - void onNoneReader(MediaSource &sender) override{ - if(_channel->totalReaderCount()){ - //统计有误,还有人在看 - return; - } - MediaSourceEvent::onNoneReader(sender); - } - // 观看总人数 int totalReaderCount(MediaSource &sender) override{ return _channel->totalReaderCount(); diff --git a/server/FFmpegSource.cpp b/server/FFmpegSource.cpp index e0e210d2..0c7a33d0 100644 --- a/server/FFmpegSource.cpp +++ b/server/FFmpegSource.cpp @@ -236,21 +236,12 @@ bool FFmpegSource::close(MediaSource &sender, bool force) { return true; } -void FFmpegSource::onNoneReader(MediaSource &sender) { - auto listener = _listener.lock(); - if(listener){ - listener->onNoneReader(sender); - }else{ - MediaSourceEvent::onNoneReader(sender); - } -} - int FFmpegSource::totalReaderCount(MediaSource &sender) { auto listener = _listener.lock(); if(listener){ return listener->totalReaderCount(sender); } - return 0; + return sender.readerCount(); } void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { diff --git a/server/FFmpegSource.h b/server/FFmpegSource.h index a11144d7..2b4fe27a 100644 --- a/server/FFmpegSource.h +++ b/server/FFmpegSource.h @@ -59,7 +59,6 @@ private: //MediaSourceEvent override bool close(MediaSource &sender,bool force) override; - void onNoneReader(MediaSource &sender) override ; int totalReaderCount(MediaSource &sender) override; private: Process _process; diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 9d15ab09..e06fc7fe 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -117,7 +117,9 @@ void MediaSource::onNoneReader(){ if(!listener){ return; } - listener->onNoneReader(*this); + if (listener->totalReaderCount(*this) == 0) { + listener->onNoneReader(*this); + } } void MediaSource::for_each_media(const function &cb) { @@ -382,16 +384,31 @@ void MediaInfo::parse(const string &url){ void MediaSourceEvent::onNoneReader(MediaSource &sender){ //没有任何读取器消费该源,表明该源可以关闭了 - WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId(); - weak_ptr weakPtr = sender.shared_from_this(); + GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS); - //异步广播该事件,防止同步调用sender.close()导致在接收rtp或rtmp包时清空包缓存等操作 - EventPollerPool::Instance().getPoller()->async([weakPtr](){ - auto strongPtr = weakPtr.lock(); - if(strongPtr){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,*strongPtr); + weak_ptr weakSender = sender.shared_from_this(); + _async_close_timer = std::make_shared(stream_none_reader_delay / 1000.0, [weakSender]() { + auto strongSender = weakSender.lock(); + if (!strongSender) { + //对象已经销毁 + return false; } - },false); + + if (strongSender->totalReaderCount() != 0) { + //还有人消费 + return false; + } + + WarnL << "onNoneReader:" + << strongSender->getSchema() << "/" + << strongSender->getVhost() << "/" + << strongSender->getApp() << "/" + << strongSender->getId(); + + //触发消息广播 + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strongSender); + return false; + }, nullptr); } diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index e6920050..b431d314 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -52,24 +52,21 @@ namespace mediakit { class MediaSource; class MediaSourceEvent{ public: + friend class MediaSource; MediaSourceEvent(){}; virtual ~MediaSourceEvent(){}; // 通知拖动进度条 - virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){ - return false; - } - + virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){ return false; } // 通知其停止推流 - virtual bool close(MediaSource &sender,bool force) { - return false; - } - - // 通知无人观看 - virtual void onNoneReader(MediaSource &sender); - + virtual bool close(MediaSource &sender,bool force) { return false;} // 观看总人数 virtual int totalReaderCount(MediaSource &sender) = 0; +private: + // 通知无人观看 + void onNoneReader(MediaSource &sender); +private: + Timer::Ptr _async_close_timer; }; /** diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 6547be7c..6b0b8be3 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -196,13 +196,6 @@ bool PlayerProxy::close(MediaSource &sender,bool force) { return true; } -void PlayerProxy::onNoneReader(MediaSource &sender) { - if(!_mediaMuxer || totalReaderCount()){ - return; - } - MediaSourceEvent::onNoneReader(sender); -} - int PlayerProxy::totalReaderCount(){ return (_mediaMuxer ? _mediaMuxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0); } diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 83bfe8e0..c1568c26 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -83,7 +83,6 @@ public: private: //MediaSourceEvent override bool close(MediaSource &sender,bool force) override; - void onNoneReader(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override; void rePlay(const string &strUrl,int iFailedCnt); void onPlaySuccess(); diff --git a/src/Record/HlsMediaSource.h b/src/Record/HlsMediaSource.h index 664781b1..823b20b9 100644 --- a/src/Record/HlsMediaSource.h +++ b/src/Record/HlsMediaSource.h @@ -79,7 +79,7 @@ private: return; } - if (--_readerCount == 0 && totalReaderCount() == 0) { + if (--_readerCount == 0) { onNoneReader(); } } diff --git a/src/Record/MP4Reader.cpp b/src/Record/MP4Reader.cpp index 795cbb09..0011d91b 100644 --- a/src/Record/MP4Reader.cpp +++ b/src/Record/MP4Reader.cpp @@ -190,13 +190,6 @@ bool MP4Reader::close(MediaSource &sender,bool force){ return true; } -void MP4Reader::onNoneReader(MediaSource &sender) { - if(!_mediaMuxer || _mediaMuxer->totalReaderCount()){ - return; - } - MediaSourceEvent::onNoneReader(sender); -} - int MP4Reader::totalReaderCount(MediaSource &sender) { return _mediaMuxer ? _mediaMuxer->totalReaderCount() : sender.readerCount(); } diff --git a/src/Record/MP4Reader.h b/src/Record/MP4Reader.h index 09d7ef9b..02ded721 100644 --- a/src/Record/MP4Reader.h +++ b/src/Record/MP4Reader.h @@ -77,7 +77,6 @@ private: //MediaSourceEvent override bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override; bool close(MediaSource &sender,bool force) override; - void onNoneReader(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override; #ifdef ENABLE_MP4V2 void seek(uint32_t iSeekTime,bool bReStart = true); diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index a4f221e1..24a8302d 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -161,7 +161,6 @@ public: _track_stamps_map[pkt->typeId] = pkt->timeStamp; //不存在视频,为了减少缓存延时,那么关闭GOP缓存 _ring->write(pkt, _have_video ? pkt->isVideoKeyFrame() : true); - checkNoneReader(); } /** @@ -184,33 +183,15 @@ private: * 每次增减消费者都会触发该函数 */ void onReaderChanged(int size) { - //我们记录最后一次活动时间 - _reader_changed_ticker.resetTime(); - if (size != 0 || totalReaderCount() != 0) { - //还有消费者正在观看该流 - _async_emit_none_reader = false; - return; - } - _async_emit_none_reader = true; - } - - /** - * 检查是否无人消费该流, - * 如果无人消费且超过一定时间会触发onNoneReader事件 - */ - void checkNoneReader() { - GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS); - if (_async_emit_none_reader && _reader_changed_ticker.elapsedTime() > stream_none_reader_delay) { - _async_emit_none_reader = false; + if (size == 0) { onNoneReader(); } } -protected: + +private: int _ring_size; - bool _async_emit_none_reader = false; bool _have_video = false; mutable recursive_mutex _mtx; - Ticker _reader_changed_ticker; AMFValue _metadata; RingBuffer::Ptr _ring; unordered_map _track_stamps_map; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 27cfcbd3..70fafcda 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -544,14 +544,6 @@ bool RtmpSession::close(MediaSource &sender,bool force) { return true; } -void RtmpSession::onNoneReader(MediaSource &sender) { - //此回调在其他线程触发 - if(!_pPublisherSrc || _pPublisherSrc->totalReaderCount()){ - return; - } - MediaSourceEvent::onNoneReader(sender); -} - int RtmpSession::totalReaderCount(MediaSource &sender) { return _pPublisherSrc ? _pPublisherSrc->totalReaderCount() : sender.readerCount(); } diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 63f9d877..42dc3980 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -85,7 +85,6 @@ private: //MediaSourceEvent override bool close(MediaSource &sender,bool force) override ; - void onNoneReader(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override; void setSocketFlags(); diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 95c6e473..98d5f67a 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -142,13 +142,6 @@ bool RtpProcessHelper::close(MediaSource &sender, bool force) { return true; } -void RtpProcessHelper::onNoneReader(MediaSource &sender) { - if(!_process || _process->totalReaderCount()){ - return; - } - MediaSourceEvent::onNoneReader(sender); -} - int RtpProcessHelper::totalReaderCount(MediaSource &sender) { return _process ? _process->totalReaderCount() : sender.totalReaderCount(); } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 9edaca54..266c0c63 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -47,8 +47,6 @@ public: protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override; - // 通知无人观看 - void onNoneReader(MediaSource &sender) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; private: diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index e638b88e..ad7f875a 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -87,14 +87,6 @@ bool RtpSession::close(MediaSource &sender, bool force) { return true; } -void RtpSession::onNoneReader(MediaSource &sender) { - //此回调在其他线程触发 - if(!_process || _process->totalReaderCount()){ - return; - } - MediaSourceEvent::onNoneReader(sender); -} - int RtpSession::totalReaderCount(MediaSource &sender) { //此回调在其他线程触发 return _process ? _process->totalReaderCount() : sender.totalReaderCount(); diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index e8d2afed..8de14ad1 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -46,8 +46,6 @@ public: protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override; - // 通知无人观看 - void onNoneReader(MediaSource &sender) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; void onRtpPacket(const char *data,uint64_t len) override; diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index 51133560..d191cad5 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -191,39 +191,19 @@ public: } //不存在视频,为了减少缓存延时,那么关闭GOP缓存 _ring->write(rtp, _have_video ? keyPos : true); - checkNoneReader(); } private: /** * 每次增减消费者都会触发该函数 */ void onReaderChanged(int size) { - //我们记录最后一次活动时间 - _reader_changed_ticker.resetTime(); - if (size != 0 || totalReaderCount() != 0) { - //还有消费者正在观看该流 - _async_emit_none_reader = false; - return; - } - _async_emit_none_reader = true; - } - - /** - * 检查是否无人消费该流, - * 如果无人消费且超过一定时间会触发onNoneReader事件 - */ - void checkNoneReader() { - GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS); - if (_async_emit_none_reader && _reader_changed_ticker.elapsedTime() > stream_none_reader_delay) { - _async_emit_none_reader = false; + if (size == 0) { onNoneReader(); } } -protected: +private: int _ring_size; - bool _async_emit_none_reader = false; bool _have_video = false; - Ticker _reader_changed_ticker; SdpParser _sdp_parser; string _sdp; RingType::Ptr _ring; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index a5e2db68..0c2e3f1a 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1135,15 +1135,6 @@ bool RtspSession::close(MediaSource &sender,bool force) { return true; } - -void RtspSession::onNoneReader(MediaSource &sender){ - //此回调在其他线程触发 - if(!_pushSrc || _pushSrc->totalReaderCount()){ - return; - } - MediaSourceEvent::onNoneReader(sender); -} - int RtspSession::totalReaderCount(MediaSource &sender) { return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount(); } diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index ae234493..3747fda8 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -107,7 +107,6 @@ protected: void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; //MediaSourceEvent override bool close(MediaSource &sender,bool force) override ; - void onNoneReader(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override; //TcpSession override