From 1bfe4937cd947727246852bc00af0e881f939ffb Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 25 Dec 2019 11:04:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86MediaSource=E6=B4=BE=E7=94=9F?= =?UTF-8?q?=E7=B1=BB=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtmp/RtmpMediaSource.h | 177 +++++++++++++++++---------- src/Rtmp/RtmpMediaSourceMuxer.h | 2 +- src/Rtmp/RtmpPlayerImp.h | 2 +- src/Rtmp/RtmpSession.cpp | 2 +- src/Rtmp/RtmpToRtspMediaSource.h | 4 +- src/Rtsp/RtspMediaSource.h | 202 +++++++++++++++++++------------ src/Rtsp/RtspMediaSourceMuxer.h | 2 +- src/Rtsp/RtspPlayerImp.h | 2 +- src/Rtsp/RtspSession.cpp | 2 +- src/Rtsp/RtspToRtmpMediaSource.h | 4 +- 10 files changed, 246 insertions(+), 153 deletions(-) diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index e9f31915..77c014eb 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -48,118 +48,161 @@ using namespace toolkit; namespace mediakit { -class RtmpMediaSource: public MediaSource ,public RingDelegate { +/** + * rtmp媒体源的数据抽象 + * rtmp有关键的三要素,分别是metadata、config帧,普通帧 + * 其中metadata是非必须的,有些编码格式也没有config帧(比如MP3) + * 只要生成了这三要素,那么要实现rtmp推流、rtmp服务器就很简单了 + * rtmp推拉流协议中,先传递metadata,然后传递config帧,然后一直传递普通帧 + */ +class RtmpMediaSource : public MediaSource, public RingDelegate { public: typedef std::shared_ptr Ptr; typedef RingBuffer RingType; + /** + * 构造函数 + * @param vhost 虚拟主机名 + * @param app 应用名 + * @param stream_id 流id + * @param ring_size 可以设置固定的环形缓冲大小,0则自适应 + */ RtmpMediaSource(const string &vhost, - const string &strApp, - const string &strId, - int ringSize = 0) : - MediaSource(RTMP_SCHEMA,vhost,strApp,strId), _ringSize(ringSize) { + const string &app, + const string &stream_id, + int ring_size = 0) : + MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) { _metadata = TitleMeta().getMetadata(); } virtual ~RtmpMediaSource() {} + /** + * 获取媒体源的环形缓冲 + */ const RingType::Ptr &getRing() const { - //获取媒体源的rtp环形缓冲 - return _pRing; + return _ring; } + /** + * 获取播放器个数 + * @return + */ int readerCount() override { - return _pRing ? _pRing->readerCount() : 0; + return _ring ? _ring->readerCount() : 0; } + /** + * 获取metadata + */ const AMFValue &getMetaData() const { - lock_guard lock(_mtxMap); + lock_guard lock(_mtx); return _metadata; } - template - void getConfigFrame(const FUN &f) { - lock_guard lock(_mtxMap); - for (auto &pr : _mapCfgFrame) { + + /** + * 获取所有的config帧 + */ + template + void getConfigFrame(const FUNC &f) { + lock_guard lock(_mtx); + for (auto &pr : _config_frame_map) { f(pr.second); } } - virtual void onGetMetaData(const AMFValue &metadata) { - lock_guard lock(_mtxMap); + /** + * 设置metadata + */ + virtual void setMetaData(const AMFValue &metadata) { + lock_guard lock(_mtx); _metadata = metadata; } - void onWrite(const RtmpPacket::Ptr &pkt,bool isKey = true) override { - lock_guard lock(_mtxMap); + /** + * 输入rtmp包 + * @param pkt rtmp包 + * @param isKey 是否为关键帧 + */ + void onWrite(const RtmpPacket::Ptr &pkt, bool isKey = true) override { + lock_guard lock(_mtx); if (pkt->isCfgFrame()) { - _mapCfgFrame[pkt->typeId] = pkt; - return; + _config_frame_map[pkt->typeId] = pkt; + return; } - if(!_pRing){ - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRing = std::make_shared(_ringSize,[weakSelf](const EventPoller::Ptr &,int size,bool){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - return; - } - strongSelf->onReaderChanged(size); - }); - onReaderChanged(0); - } + if (!_ring) { + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return; + } + strongSelf->onReaderChanged(size); + }; + _ring = std::make_shared(_ring_size, std::move(lam)); + onReaderChanged(0); - //如果输入了非config帧,那么说明不再可能获取config帧以及metadata,所以我们强制其为已注册 - if(!_registed){ - _registed = true; + //如果输入了非config帧, + //那么说明不再可能获取config帧以及metadata, + //所以我们强制其为已注册 regist(); } - - _mapStamp[pkt->typeId] = pkt->timeStamp; - _pRing->write(pkt,pkt->isVideoKeyFrame()); + _track_stamps_map[pkt->typeId] = pkt->timeStamp; + _ring->write(pkt, pkt->isVideoKeyFrame()); checkNoneReader(); - } + } + /** + * 获取当前时间戳 + */ uint32_t getTimeStamp(TrackType trackType) override { - lock_guard lock(_mtxMap); - switch (trackType){ + lock_guard lock(_mtx); + switch (trackType) { case TrackVideo: - return _mapStamp[MSG_VIDEO]; + return _track_stamps_map[MSG_VIDEO]; case TrackAudio: - return _mapStamp[MSG_AUDIO]; + return _track_stamps_map[MSG_AUDIO]; default: - return MAX(_mapStamp[MSG_VIDEO],_mapStamp[MSG_AUDIO]); + return MAX(_track_stamps_map[MSG_VIDEO], _track_stamps_map[MSG_AUDIO]); } } private: - void onReaderChanged(int size){ - //我们记录最后一次活动时间 - _readerTicker.resetTime(); - if(size != 0 || readerCount() != 0){ - //还有消费者正在观看该流 - _asyncEmitNoneReader = false; - return; - } - _asyncEmitNoneReader = true; - } + /** + * 每次增减消费者都会触发该函数 + */ + void onReaderChanged(int size) { + //我们记录最后一次活动时间 + _reader_changed_ticker.resetTime(); + if (size != 0 || readerCount() != 0) { + //还有消费者正在观看该流 + _async_emit_none_reader = false; + return; + } + _async_emit_none_reader = true; + } - void checkNoneReader(){ - GET_CONFIG(int,stream_none_reader_delay,General::kStreamNoneReaderDelayMS); - if(_asyncEmitNoneReader && _readerTicker.elapsedTime() > stream_none_reader_delay){ - _asyncEmitNoneReader = false; - onNoneReader(); - } - } + /** + * 检查是否无人消费该流, + * 如果无人消费且超过一定时间会触发onNoneReader事件 + */ + void checkNoneReader() { + GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS); + if (_async_emit_none_reader && _reader_changed_ticker.elapsedTime() > stream_none_reader_delay) { + _async_emit_none_reader = false; + onNoneReader(); + } + } protected: + int _ring_size; + bool _async_emit_none_reader = false; + mutable recursive_mutex _mtx; + Ticker _reader_changed_ticker; AMFValue _metadata; - unordered_map _mapCfgFrame; - unordered_map _mapStamp; - mutable recursive_mutex _mtxMap; - RingBuffer::Ptr _pRing; //rtp环形缓冲 - int _ringSize; - Ticker _readerTicker; - bool _asyncEmitNoneReader = false; - bool _registed = false; + RingBuffer::Ptr _ring; + unordered_map _track_stamps_map; + unordered_map _config_frame_map; }; } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpMediaSourceMuxer.h b/src/Rtmp/RtmpMediaSourceMuxer.h index 55112dea..1ddb082a 100644 --- a/src/Rtmp/RtmpMediaSourceMuxer.h +++ b/src/Rtmp/RtmpMediaSourceMuxer.h @@ -54,7 +54,7 @@ public: } void onAllTrackReady(){ - _mediaSouce->onGetMetaData(getMetadata()); + _mediaSouce->setMetaData(getMetadata()); } // 设置TrackSource diff --git a/src/Rtmp/RtmpPlayerImp.h b/src/Rtmp/RtmpPlayerImp.h index b170a19f..3bcb5067 100644 --- a/src/Rtmp/RtmpPlayerImp.h +++ b/src/Rtmp/RtmpPlayerImp.h @@ -65,7 +65,7 @@ private: bool onCheckMeta(const AMFValue &val) override { _pRtmpMediaSrc = dynamic_pointer_cast(_pMediaSrc); if(_pRtmpMediaSrc){ - _pRtmpMediaSrc->onGetMetaData(val); + _pRtmpMediaSrc->setMetaData(val); } _delegate.reset(new RtmpDemuxer(val)); return true; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 93ac1929..f7f3632a 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -438,7 +438,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) { } auto metadata = dec.load(); // dumpMetadata(metadata); - _pPublisherSrc->onGetMetaData(metadata); + _pPublisherSrc->setMetaData(metadata); } void RtmpSession::onProcessCmd(AMFDecoder &dec) { diff --git a/src/Rtmp/RtmpToRtspMediaSource.h b/src/Rtmp/RtmpToRtspMediaSource.h index 31df804f..5acd206d 100644 --- a/src/Rtmp/RtmpToRtspMediaSource.h +++ b/src/Rtmp/RtmpToRtspMediaSource.h @@ -57,12 +57,12 @@ public: } virtual ~RtmpToRtspMediaSource(){} - void onGetMetaData(const AMFValue &metadata) override { + void setMetaData(const AMFValue &metadata) override { if(!_demuxer){ //在未调用onWrite前,设置Metadata能触发生成RtmpDemuxer _demuxer = std::make_shared(metadata); } - RtmpMediaSource::onGetMetaData(metadata); + RtmpMediaSource::setMetaData(metadata); } void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos = true) override { diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index ef888ff7..b63b7be8 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -48,129 +48,179 @@ using namespace toolkit; namespace mediakit { -class RtspMediaSource: public MediaSource , public RingDelegate { +/** + * rtsp媒体源的数据抽象 + * rtsp有关键的两要素,分别是sdp、rtp包 + * 只要生成了这两要素,那么要实现rtsp推流、rtsp服务器就很简单了 + * rtsp推拉流协议中,先传递sdp,然后再协商传输方式(tcp/udp/组播),最后一直传递rtp + */ +class RtspMediaSource : public MediaSource, public RingDelegate { public: typedef ResourcePool PoolType; typedef std::shared_ptr Ptr; typedef RingBuffer RingType; - RtspMediaSource(const string &strVhost, - const string &strApp, - const string &strId, - int ringSize = 0) : - MediaSource(RTSP_SCHEMA,strVhost,strApp,strId), - _ringSize(ringSize){} + /** + * 构造函数 + * @param vhost 虚拟主机名 + * @param app 应用名 + * @param stream_id 流id + * @param ring_size 可以设置固定的环形缓冲大小,0则自适应 + */ + RtspMediaSource(const string &vhost, + const string &app, + const string &stream_id, + int ring_size = 0) : + MediaSource(RTSP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {} virtual ~RtspMediaSource() {} + /** + * 获取媒体源的环形缓冲 + */ const RingType::Ptr &getRing() const { - //获取媒体源的rtp环形缓冲 - return _pRing; + return _ring; } - int readerCount() override { - return _pRing ? _pRing->readerCount() : 0; + /** + * 获取播放器个数 + */ + int readerCount() override { + return _ring ? _ring->readerCount() : 0; } - const string& getSdp() const { - //获取该源的媒体描述信息 - return _strSdp; + /** + * 获取该源的sdp + */ + const string &getSdp() const { + return _sdp; } + /** + * 获取相应轨道的ssrc + */ virtual uint32_t getSsrc(TrackType trackType) { - auto track = _sdpParser.getTrack(trackType); - if(!track){ + auto track = _sdp_parser.getTrack(trackType); + if (!track) { return 0; } return track->_ssrc; } + + /** + * 获取相应轨道的seqence + */ virtual uint16_t getSeqence(TrackType trackType) { - auto track = _sdpParser.getTrack(trackType); - if(!track){ + auto track = _sdp_parser.getTrack(trackType); + if (!track) { return 0; } return track->_seq; } + /** + * 获取相应轨道的时间戳,单位毫秒 + */ uint32_t getTimeStamp(TrackType trackType) override { - auto track = _sdpParser.getTrack(trackType); - if(track) { + auto track = _sdp_parser.getTrack(trackType); + if (track) { return track->_time_stamp; } - auto tracks = _sdpParser.getAvailableTrack(); - switch (tracks.size()){ - case 0: return 0; - case 1: return tracks[0]->_time_stamp; - default:return MAX(tracks[0]->_time_stamp,tracks[1]->_time_stamp); + auto tracks = _sdp_parser.getAvailableTrack(); + switch (tracks.size()) { + case 0: + return 0; + case 1: + return tracks[0]->_time_stamp; + default: + return MAX(tracks[0]->_time_stamp, tracks[1]->_time_stamp); } } + /** + * 更新时间戳 + */ virtual void setTimeStamp(uint32_t uiStamp) { - auto tracks = _sdpParser.getAvailableTrack(); + auto tracks = _sdp_parser.getAvailableTrack(); for (auto &track : tracks) { - track->_time_stamp = uiStamp; + track->_time_stamp = uiStamp; } } - virtual void onGetSDP(const string& sdp) { - //派生类设置该媒体源媒体描述信息 - _strSdp = sdp; - _sdpParser.load(sdp); - if(_pRing){ - regist(); + /** + * 设置sdp + */ + virtual void setSdp(const string &sdp) { + _sdp = sdp; + _sdp_parser.load(sdp); + if (_ring) { + regist(); } } - void onWrite(const RtpPacket::Ptr &rtppt, bool keyPos) override { - auto track = _sdpParser.getTrack(rtppt->type); - if(track){ - track->_seq = rtppt->sequence; - track->_time_stamp = rtppt->timeStamp; - track->_ssrc = rtppt->ssrc; + /** + * 输入rtp + * @param rtp rtp包 + * @param keyPos 该包是否为关键帧的第一个包 + */ + void onWrite(const RtpPacket::Ptr &rtp, bool keyPos) override { + auto track = _sdp_parser.getTrack(rtp->type); + if (track) { + track->_seq = rtp->sequence; + track->_time_stamp = rtp->timeStamp; + track->_ssrc = rtp->ssrc; } - if(!_pRing){ - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRing = std::make_shared(_ringSize,[weakSelf](const EventPoller::Ptr &,int size,bool){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - return; - } - strongSelf->onReaderChanged(size); - }); - onReaderChanged(0); - if(!_strSdp.empty()){ - regist(); - } + if (!_ring) { + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return; + } + strongSelf->onReaderChanged(size); + }; + _ring = std::make_shared(_ring_size, std::move(lam)); + onReaderChanged(0); + if (!_sdp.empty()) { + regist(); + } } - _pRing->write(rtppt,keyPos); - checkNoneReader(); + _ring->write(rtp, keyPos); + checkNoneReader(); } private: - void onReaderChanged(int size){ - //我们记录最后一次活动时间 - _readerTicker.resetTime(); - if(size != 0 || readerCount() != 0){ - //还有消费者正在观看该流 - _asyncEmitNoneReader = false; - return; - } - _asyncEmitNoneReader = true; - } + /** + * 每次增减消费者都会触发该函数 + */ + void onReaderChanged(int size) { + //我们记录最后一次活动时间 + _reader_changed_ticker.resetTime(); + if (size != 0 || readerCount() != 0) { + //还有消费者正在观看该流 + _async_emit_none_reader = false; + return; + } + _async_emit_none_reader = true; + } - void checkNoneReader(){ - GET_CONFIG(int,stream_none_reader_delay,General::kStreamNoneReaderDelayMS); - if(_asyncEmitNoneReader && _readerTicker.elapsedTime() > stream_none_reader_delay){ - _asyncEmitNoneReader = false; - onNoneReader(); - } + /** + * 检查是否无人消费该流, + * 如果无人消费且超过一定时间会触发onNoneReader事件 + */ + void checkNoneReader() { + GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS); + if (_async_emit_none_reader && _reader_changed_ticker.elapsedTime() > stream_none_reader_delay) { + _async_emit_none_reader = false; + onNoneReader(); + } } protected: - SdpParser _sdpParser; - string _strSdp; //媒体描述信息 - RingType::Ptr _pRing; //rtp环形缓冲 - int _ringSize; - Ticker _readerTicker; - bool _asyncEmitNoneReader = false; + int _ring_size; + bool _async_emit_none_reader = false; + Ticker _reader_changed_ticker; + SdpParser _sdp_parser; + string _sdp; + RingType::Ptr _ring; }; } /* namespace mediakit */ diff --git a/src/Rtsp/RtspMediaSourceMuxer.h b/src/Rtsp/RtspMediaSourceMuxer.h index 050e83c2..834374e6 100644 --- a/src/Rtsp/RtspMediaSourceMuxer.h +++ b/src/Rtsp/RtspMediaSourceMuxer.h @@ -58,7 +58,7 @@ public: } void onAllTrackReady(){ - _mediaSouce->onGetSDP(getSdp()); + _mediaSouce->setSdp(getSdp()); } // 设置TrackSource diff --git a/src/Rtsp/RtspPlayerImp.h b/src/Rtsp/RtspPlayerImp.h index 8992bded..fe6eff40 100644 --- a/src/Rtsp/RtspPlayerImp.h +++ b/src/Rtsp/RtspPlayerImp.h @@ -64,7 +64,7 @@ private: bool onCheckSDP(const string &sdp) override { _pRtspMediaSrc = dynamic_pointer_cast(_pMediaSrc); if(_pRtspMediaSrc){ - _pRtspMediaSrc->onGetSDP(sdp); + _pRtspMediaSrc->setSdp(sdp); } _delegate.reset(new RtspDemuxer(sdp)); return true; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 1d679e23..993ae7b3 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -265,7 +265,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { _pushSrc = std::make_shared(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid); _pushSrc->setListener(dynamic_pointer_cast(shared_from_this())); - _pushSrc->onGetSDP(sdpParser.toString()); + _pushSrc->setSdp(sdpParser.toString()); sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"}); } diff --git a/src/Rtsp/RtspToRtmpMediaSource.h b/src/Rtsp/RtspToRtmpMediaSource.h index c4cdaf18..f27997f8 100644 --- a/src/Rtsp/RtspToRtmpMediaSource.h +++ b/src/Rtsp/RtspToRtmpMediaSource.h @@ -49,9 +49,9 @@ public: virtual ~RtspToRtmpMediaSource() {} - virtual void onGetSDP(const string &strSdp) override { + virtual void setSdp(const string &strSdp) override { _demuxer = std::make_shared(strSdp); - RtspMediaSource::onGetSDP(strSdp); + RtspMediaSource::setSdp(strSdp); } virtual void onWrite(const RtpPacket::Ptr &rtp, bool bKeyPos) override {