整理MediaSource派生类代码

This commit is contained in:
xiongziliang 2019-12-25 11:04:12 +08:00
parent bb4e8b73b5
commit 1bfe4937cd
10 changed files with 246 additions and 153 deletions

View File

@ -48,118 +48,161 @@ using namespace toolkit;
namespace mediakit {
/**
* rtmp媒体源的数据抽象
* rtmp有关键的三要素metadataconfig帧
* metadata是非必须的config帧(MP3)
* rtmp推流rtmp服务器就很简单了
* rtmp推拉流协议中metadataconfig帧
*/
class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr> {
public:
typedef std::shared_ptr<RtmpMediaSource> Ptr;
typedef RingBuffer<RtmpPacket::Ptr> RingType;
/**
*
* @param vhost
* @param app
* @param stream_id id
* @param ring_size 0
*/
RtmpMediaSource(const string &vhost,
const string &strApp,
const string &strId,
int ringSize = 0) :
MediaSource(RTMP_SCHEMA,vhost,strApp,strId), _ringSize(ringSize) {
const string &app,
const string &stream_id,
int ring_size = 0) :
MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {
_metadata = TitleMeta().getMetadata();
}
virtual ~RtmpMediaSource() {}
/**
*
*/
const RingType::Ptr &getRing() const {
//获取媒体源的rtp环形缓冲
return _pRing;
return _ring;
}
/**
*
* @return
*/
int readerCount() override {
return _pRing ? _pRing->readerCount() : 0;
return _ring ? _ring->readerCount() : 0;
}
/**
* metadata
*/
const AMFValue &getMetaData() const {
lock_guard<recursive_mutex> lock(_mtxMap);
lock_guard<recursive_mutex> lock(_mtx);
return _metadata;
}
template<typename FUN>
void getConfigFrame(const FUN &f) {
lock_guard<recursive_mutex> lock(_mtxMap);
for (auto &pr : _mapCfgFrame) {
/**
* config帧
*/
template<typename FUNC>
void getConfigFrame(const FUNC &f) {
lock_guard<recursive_mutex> lock(_mtx);
for (auto &pr : _config_frame_map) {
f(pr.second);
}
}
virtual void onGetMetaData(const AMFValue &metadata) {
lock_guard<recursive_mutex> lock(_mtxMap);
/**
* metadata
*/
virtual void setMetaData(const AMFValue &metadata) {
lock_guard<recursive_mutex> lock(_mtx);
_metadata = metadata;
}
/**
* rtmp包
* @param pkt rtmp包
* @param isKey
*/
void onWrite(const RtmpPacket::Ptr &pkt, bool isKey = true) override {
lock_guard<recursive_mutex> lock(_mtxMap);
lock_guard<recursive_mutex> lock(_mtx);
if (pkt->isCfgFrame()) {
_mapCfgFrame[pkt->typeId] = pkt;
_config_frame_map[pkt->typeId] = pkt;
return;
}
if(!_pRing){
if (!_ring) {
weak_ptr<RtmpMediaSource> weakSelf = dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
_pRing = std::make_shared<RingType>(_ringSize,[weakSelf](const EventPoller::Ptr &,int size,bool){
auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
});
};
_ring = std::make_shared<RingType>(_ring_size, std::move(lam));
onReaderChanged(0);
}
//如果输入了非config帧那么说明不再可能获取config帧以及metadata,所以我们强制其为已注册
if(!_registed){
_registed = true;
//如果输入了非config帧
//那么说明不再可能获取config帧以及metadata,
//所以我们强制其为已注册
regist();
}
_mapStamp[pkt->typeId] = pkt->timeStamp;
_pRing->write(pkt,pkt->isVideoKeyFrame());
_track_stamps_map[pkt->typeId] = pkt->timeStamp;
_ring->write(pkt, pkt->isVideoKeyFrame());
checkNoneReader();
}
/**
*
*/
uint32_t getTimeStamp(TrackType trackType) override {
lock_guard<recursive_mutex> lock(_mtxMap);
lock_guard<recursive_mutex> lock(_mtx);
switch (trackType) {
case TrackVideo:
return _mapStamp[MSG_VIDEO];
return _track_stamps_map[MSG_VIDEO];
case TrackAudio:
return _mapStamp[MSG_AUDIO];
return _track_stamps_map[MSG_AUDIO];
default:
return MAX(_mapStamp[MSG_VIDEO],_mapStamp[MSG_AUDIO]);
return MAX(_track_stamps_map[MSG_VIDEO], _track_stamps_map[MSG_AUDIO]);
}
}
private:
/**
*
*/
void onReaderChanged(int size) {
//我们记录最后一次活动时间
_readerTicker.resetTime();
_reader_changed_ticker.resetTime();
if (size != 0 || readerCount() != 0) {
//还有消费者正在观看该流
_asyncEmitNoneReader = false;
_async_emit_none_reader = false;
return;
}
_asyncEmitNoneReader = true;
_async_emit_none_reader = true;
}
/**
*
* onNoneReader事件
*/
void checkNoneReader() {
GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS);
if(_asyncEmitNoneReader && _readerTicker.elapsedTime() > stream_none_reader_delay){
_asyncEmitNoneReader = false;
if (_async_emit_none_reader && _reader_changed_ticker.elapsedTime() > stream_none_reader_delay) {
_async_emit_none_reader = false;
onNoneReader();
}
}
protected:
int _ring_size;
bool _async_emit_none_reader = false;
mutable recursive_mutex _mtx;
Ticker _reader_changed_ticker;
AMFValue _metadata;
unordered_map<int, RtmpPacket::Ptr> _mapCfgFrame;
unordered_map<int,uint32_t> _mapStamp;
mutable recursive_mutex _mtxMap;
RingBuffer<RtmpPacket::Ptr>::Ptr _pRing; //rtp环形缓冲
int _ringSize;
Ticker _readerTicker;
bool _asyncEmitNoneReader = false;
bool _registed = false;
RingBuffer<RtmpPacket::Ptr>::Ptr _ring;
unordered_map<int, uint32_t> _track_stamps_map;
unordered_map<int, RtmpPacket::Ptr> _config_frame_map;
};
} /* namespace mediakit */

View File

@ -54,7 +54,7 @@ public:
}
void onAllTrackReady(){
_mediaSouce->onGetMetaData(getMetadata());
_mediaSouce->setMetaData(getMetadata());
}
// 设置TrackSource

View File

@ -65,7 +65,7 @@ private:
bool onCheckMeta(const AMFValue &val) override {
_pRtmpMediaSrc = dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc);
if(_pRtmpMediaSrc){
_pRtmpMediaSrc->onGetMetaData(val);
_pRtmpMediaSrc->setMetaData(val);
}
_delegate.reset(new RtmpDemuxer(val));
return true;

View File

@ -438,7 +438,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) {
}
auto metadata = dec.load<AMFValue>();
// dumpMetadata(metadata);
_pPublisherSrc->onGetMetaData(metadata);
_pPublisherSrc->setMetaData(metadata);
}
void RtmpSession::onProcessCmd(AMFDecoder &dec) {

View File

@ -57,12 +57,12 @@ public:
}
virtual ~RtmpToRtspMediaSource(){}
void onGetMetaData(const AMFValue &metadata) override {
void setMetaData(const AMFValue &metadata) override {
if(!_demuxer){
//在未调用onWrite前设置Metadata能触发生成RtmpDemuxer
_demuxer = std::make_shared<RtmpDemuxer>(metadata);
}
RtmpMediaSource::onGetMetaData(metadata);
RtmpMediaSource::setMetaData(metadata);
}
void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos = true) override {

View File

@ -48,129 +48,179 @@ using namespace toolkit;
namespace mediakit {
/**
* rtsp媒体源的数据抽象
* rtsp有关键的两要素sdprtp包
* rtsp推流rtsp服务器就很简单了
* rtsp推拉流协议中sdp(tcp/udp/)rtp
*/
class RtspMediaSource : public MediaSource, public RingDelegate<RtpPacket::Ptr> {
public:
typedef ResourcePool<RtpPacket> PoolType;
typedef std::shared_ptr<RtspMediaSource> Ptr;
typedef RingBuffer<RtpPacket::Ptr> RingType;
RtspMediaSource(const string &strVhost,
const string &strApp,
const string &strId,
int ringSize = 0) :
MediaSource(RTSP_SCHEMA,strVhost,strApp,strId),
_ringSize(ringSize){}
/**
*
* @param vhost
* @param app
* @param stream_id id
* @param ring_size 0
*/
RtspMediaSource(const string &vhost,
const string &app,
const string &stream_id,
int ring_size = 0) :
MediaSource(RTSP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {}
virtual ~RtspMediaSource() {}
/**
*
*/
const RingType::Ptr &getRing() const {
//获取媒体源的rtp环形缓冲
return _pRing;
return _ring;
}
/**
*
*/
int readerCount() override {
return _pRing ? _pRing->readerCount() : 0;
return _ring ? _ring->readerCount() : 0;
}
/**
* sdp
*/
const string &getSdp() const {
//获取该源的媒体描述信息
return _strSdp;
return _sdp;
}
/**
* ssrc
*/
virtual uint32_t getSsrc(TrackType trackType) {
auto track = _sdpParser.getTrack(trackType);
auto track = _sdp_parser.getTrack(trackType);
if (!track) {
return 0;
}
return track->_ssrc;
}
/**
* seqence
*/
virtual uint16_t getSeqence(TrackType trackType) {
auto track = _sdpParser.getTrack(trackType);
auto track = _sdp_parser.getTrack(trackType);
if (!track) {
return 0;
}
return track->_seq;
}
/**
*
*/
uint32_t getTimeStamp(TrackType trackType) override {
auto track = _sdpParser.getTrack(trackType);
auto track = _sdp_parser.getTrack(trackType);
if (track) {
return track->_time_stamp;
}
auto tracks = _sdpParser.getAvailableTrack();
auto tracks = _sdp_parser.getAvailableTrack();
switch (tracks.size()) {
case 0: return 0;
case 1: return tracks[0]->_time_stamp;
default:return MAX(tracks[0]->_time_stamp,tracks[1]->_time_stamp);
case 0:
return 0;
case 1:
return tracks[0]->_time_stamp;
default:
return MAX(tracks[0]->_time_stamp, tracks[1]->_time_stamp);
}
}
/**
*
*/
virtual void setTimeStamp(uint32_t uiStamp) {
auto tracks = _sdpParser.getAvailableTrack();
auto tracks = _sdp_parser.getAvailableTrack();
for (auto &track : tracks) {
track->_time_stamp = uiStamp;
}
}
virtual void onGetSDP(const string& sdp) {
//派生类设置该媒体源媒体描述信息
_strSdp = sdp;
_sdpParser.load(sdp);
if(_pRing){
/**
* sdp
*/
virtual void setSdp(const string &sdp) {
_sdp = sdp;
_sdp_parser.load(sdp);
if (_ring) {
regist();
}
}
void onWrite(const RtpPacket::Ptr &rtppt, bool keyPos) override {
auto track = _sdpParser.getTrack(rtppt->type);
/**
* rtp
* @param rtp rtp包
* @param keyPos
*/
void onWrite(const RtpPacket::Ptr &rtp, bool keyPos) override {
auto track = _sdp_parser.getTrack(rtp->type);
if (track) {
track->_seq = rtppt->sequence;
track->_time_stamp = rtppt->timeStamp;
track->_ssrc = rtppt->ssrc;
track->_seq = rtp->sequence;
track->_time_stamp = rtp->timeStamp;
track->_ssrc = rtp->ssrc;
}
if(!_pRing){
if (!_ring) {
weak_ptr<RtspMediaSource> weakSelf = dynamic_pointer_cast<RtspMediaSource>(shared_from_this());
_pRing = std::make_shared<RingType>(_ringSize,[weakSelf](const EventPoller::Ptr &,int size,bool){
auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
});
};
_ring = std::make_shared<RingType>(_ring_size, std::move(lam));
onReaderChanged(0);
if(!_strSdp.empty()){
if (!_sdp.empty()) {
regist();
}
}
_pRing->write(rtppt,keyPos);
_ring->write(rtp, keyPos);
checkNoneReader();
}
private:
/**
*
*/
void onReaderChanged(int size) {
//我们记录最后一次活动时间
_readerTicker.resetTime();
_reader_changed_ticker.resetTime();
if (size != 0 || readerCount() != 0) {
//还有消费者正在观看该流
_asyncEmitNoneReader = false;
_async_emit_none_reader = false;
return;
}
_asyncEmitNoneReader = true;
_async_emit_none_reader = true;
}
/**
*
* onNoneReader事件
*/
void checkNoneReader() {
GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS);
if(_asyncEmitNoneReader && _readerTicker.elapsedTime() > stream_none_reader_delay){
_asyncEmitNoneReader = false;
if (_async_emit_none_reader && _reader_changed_ticker.elapsedTime() > stream_none_reader_delay) {
_async_emit_none_reader = false;
onNoneReader();
}
}
protected:
SdpParser _sdpParser;
string _strSdp; //媒体描述信息
RingType::Ptr _pRing; //rtp环形缓冲
int _ringSize;
Ticker _readerTicker;
bool _asyncEmitNoneReader = false;
int _ring_size;
bool _async_emit_none_reader = false;
Ticker _reader_changed_ticker;
SdpParser _sdp_parser;
string _sdp;
RingType::Ptr _ring;
};
} /* namespace mediakit */

View File

@ -58,7 +58,7 @@ public:
}
void onAllTrackReady(){
_mediaSouce->onGetSDP(getSdp());
_mediaSouce->setSdp(getSdp());
}
// 设置TrackSource

View File

@ -64,7 +64,7 @@ private:
bool onCheckSDP(const string &sdp) override {
_pRtspMediaSrc = dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc);
if(_pRtspMediaSrc){
_pRtspMediaSrc->onGetSDP(sdp);
_pRtspMediaSrc->setSdp(sdp);
}
_delegate.reset(new RtspDemuxer(sdp));
return true;

View File

@ -265,7 +265,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
_pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_pushSrc->onGetSDP(sdpParser.toString());
_pushSrc->setSdp(sdpParser.toString());
sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"});
}

View File

@ -49,9 +49,9 @@ public:
virtual ~RtspToRtmpMediaSource() {}
virtual void onGetSDP(const string &strSdp) override {
virtual void setSdp(const string &strSdp) override {
_demuxer = std::make_shared<RtspDemuxer>(strSdp);
RtspMediaSource::onGetSDP(strSdp);
RtspMediaSource::setSdp(strSdp);
}
virtual void onWrite(const RtpPacket::Ptr &rtp, bool bKeyPos) override {