解决rtmp过早注册的问题

This commit is contained in:
xiongziliang 2020-01-13 15:48:55 +08:00
parent 66ec67bfb9
commit b55db11de3
7 changed files with 75 additions and 39 deletions

View File

@ -26,7 +26,11 @@
#include "MediaSink.h" #include "MediaSink.h"
//最多等待未初始化的Track 10秒超时之后会忽略未初始化的Track //最多等待未初始化的Track 10秒超时之后会忽略未初始化的Track
#define MAX_WAIT_MS 10000 #define MAX_WAIT_MS_READY 10000
//如果添加Track最多等待3秒
#define MAX_WAIT_MS_ADD_TRACK 3000
namespace mediakit{ namespace mediakit{
@ -34,23 +38,16 @@ void MediaSink::addTrack(const Track::Ptr &track_in) {
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
//克隆Track只拷贝其数据不拷贝其数据转发关系 //克隆Track只拷贝其数据不拷贝其数据转发关系
auto track = track_in->clone(); auto track = track_in->clone();
auto codec_id = track->getCodecId(); auto codec_id = track->getCodecId();
_track_map[codec_id] = track; _track_map[codec_id] = track;
auto lam = [this,track](){ _allTrackReady = false;
_trackReadyCallback[codec_id] = [this, track]() {
onTrackReady(track); onTrackReady(track);
}; };
if(track->ready()){
lam();
}else{
_anyTrackUnReady = true;
_allTrackReady = false;
_trackReadyCallback[codec_id] = lam;
_ticker.resetTime(); _ticker.resetTime();
}
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame){ track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
if(!_anyTrackUnReady){ if (_allTrackReady) {
onTrackFrame(frame); onTrackFrame(frame);
} }
})); }));
@ -58,7 +55,6 @@ void MediaSink::addTrack(const Track::Ptr &track_in) {
void MediaSink::resetTracks() { void MediaSink::resetTracks() {
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
_anyTrackUnReady = false;
_allTrackReady = false; _allTrackReady = false;
_track_map.clear(); _track_map.clear();
_trackReadyCallback.clear(); _trackReadyCallback.clear();
@ -83,9 +79,34 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) {
} }
} }
if(!_allTrackReady && (_trackReadyCallback.empty() || _ticker.elapsedTime() > MAX_WAIT_MS)){ if(!_allTrackReady){
if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){
//如果超过规定时间那么不再等待并忽略未准备好的Track
emitAllTrackReady();
return;
}
if(!_trackReadyCallback.empty()){
//在超时时间内如果存在未准备好的Track那么继续等待
return;
}
if(_track_map.size() == 2){
//如果已经添加了音视频Track并且不存在未准备好的Track那么说明所有Track都准备好了
emitAllTrackReady();
return;
}
if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){
//如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track)
emitAllTrackReady();
return;
}
}
}
void MediaSink::emitAllTrackReady() {
_allTrackReady = true; _allTrackReady = true;
_anyTrackUnReady = false;
if(!_trackReadyCallback.empty()){ if(!_trackReadyCallback.empty()){
//这是超时强制忽略未准备好的Track //这是超时强制忽略未准备好的Track
_trackReadyCallback.clear(); _trackReadyCallback.clear();
@ -103,7 +124,6 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) {
//最少有一个有效的Track //最少有一个有效的Track
onAllTrackReady(); onAllTrackReady();
} }
}
} }
vector<Track::Ptr> MediaSink::getTracks(bool trackReady) const{ vector<Track::Ptr> MediaSink::getTracks(bool trackReady) const{

View File

@ -109,12 +109,13 @@ protected:
* @param frame * @param frame
*/ */
virtual void onTrackFrame(const Frame::Ptr &frame) {}; virtual void onTrackFrame(const Frame::Ptr &frame) {};
private:
void emitAllTrackReady();
private: private:
mutable recursive_mutex _mtx; mutable recursive_mutex _mtx;
map<int,Track::Ptr> _track_map; map<int,Track::Ptr> _track_map;
map<int,function<void()> > _trackReadyCallback; map<int,function<void()> > _trackReadyCallback;
bool _allTrackReady = false; bool _allTrackReady = false;
bool _anyTrackUnReady = false;
Ticker _ticker; Ticker _ticker;
}; };

View File

@ -26,7 +26,8 @@
#include "Stamp.h" #include "Stamp.h"
#define MAX_DELTA_STAMP 300 #define MAX_DELTA_STAMP 1000
#define MAX_CTS 500
#define ABS(x) ((x) > 0 ? (x) : (-x)) #define ABS(x) ((x) > 0 ? (x) : (-x))
namespace mediakit { namespace mediakit {
@ -77,7 +78,7 @@ void Stamp::revise(int64_t dts, int64_t pts, int64_t &dts_out, int64_t &pts_out,
dts_out = _relativeStamp; dts_out = _relativeStamp;
//////////////以下是播放时间戳的计算////////////////// //////////////以下是播放时间戳的计算//////////////////
if(pts_dts_diff > MAX_DELTA_STAMP || pts_dts_diff < -MAX_DELTA_STAMP){ if(ABS(pts_dts_diff) > MAX_CTS){
//如果差值太大,则认为由于回环导致时间戳错乱了 //如果差值太大,则认为由于回环导致时间戳错乱了
pts_dts_diff = 0; pts_dts_diff = 0;
} }

View File

@ -72,7 +72,6 @@ public:
const string &stream_id, const string &stream_id,
int ring_size = 0) : int ring_size = 0) :
MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) { MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {
_metadata = TitleMeta().getMetadata();
} }
virtual ~RtmpMediaSource() {} virtual ~RtmpMediaSource() {}
@ -117,6 +116,9 @@ public:
virtual void setMetaData(const AMFValue &metadata) { virtual void setMetaData(const AMFValue &metadata) {
lock_guard<recursive_mutex> lock(_mtx); lock_guard<recursive_mutex> lock(_mtx);
_metadata = metadata; _metadata = metadata;
if(_ring){
regist();
}
} }
/** /**
@ -143,11 +145,10 @@ public:
_ring = std::make_shared<RingType>(_ring_size, std::move(lam)); _ring = std::make_shared<RingType>(_ring_size, std::move(lam));
onReaderChanged(0); onReaderChanged(0);
//如果输入了非config帧 if(_metadata){
//那么说明不再可能获取config帧以及metadata,
//所以我们强制其为已注册
regist(); regist();
} }
}
_track_stamps_map[pkt->typeId] = pkt->timeStamp; _track_stamps_map[pkt->typeId] = pkt->timeStamp;
_ring->write(pkt, pkt->isVideoKeyFrame()); _ring->write(pkt, pkt->isVideoKeyFrame());
checkNoneReader(); checkNoneReader();

View File

@ -66,6 +66,7 @@ private:
_pRtmpMediaSrc = dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc); _pRtmpMediaSrc = dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc);
if(_pRtmpMediaSrc){ if(_pRtmpMediaSrc){
_pRtmpMediaSrc->setMetaData(val); _pRtmpMediaSrc->setMetaData(val);
_set_meta_data = true;
} }
_delegate.reset(new RtmpDemuxer); _delegate.reset(new RtmpDemuxer);
_delegate->loadMetaData(val); _delegate->loadMetaData(val);
@ -73,6 +74,10 @@ private:
} }
void onMediaData(const RtmpPacket::Ptr &chunkData) override { void onMediaData(const RtmpPacket::Ptr &chunkData) override {
if(_pRtmpMediaSrc){ if(_pRtmpMediaSrc){
if(!_set_meta_data && !chunkData->isCfgFrame()){
_set_meta_data = true;
_pRtmpMediaSrc->setMetaData(TitleMeta().getMetadata());
}
_pRtmpMediaSrc->onWrite(chunkData); _pRtmpMediaSrc->onWrite(chunkData);
} }
if(!_delegate){ if(!_delegate){
@ -83,6 +88,7 @@ private:
} }
private: private:
RtmpMediaSource::Ptr _pRtmpMediaSrc; RtmpMediaSource::Ptr _pRtmpMediaSrc;
bool _set_meta_data = false;
}; };

View File

@ -434,6 +434,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) {
auto metadata = dec.load<AMFValue>(); auto metadata = dec.load<AMFValue>();
// dumpMetadata(metadata); // dumpMetadata(metadata);
_pPublisherSrc->setMetaData(metadata); _pPublisherSrc->setMetaData(metadata);
_set_meta_data = true;
} }
void RtmpSession::onProcessCmd(AMFDecoder &dec) { void RtmpSession::onProcessCmd(AMFDecoder &dec) {
@ -491,6 +492,11 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
_stamp[chunkData.typeId % 2].revise(chunkData.timeStamp, chunkData.timeStamp, dts_out, dts_out, true); _stamp[chunkData.typeId % 2].revise(chunkData.timeStamp, chunkData.timeStamp, dts_out, dts_out, true);
chunkData.timeStamp = dts_out; chunkData.timeStamp = dts_out;
} }
if(!_set_meta_data && !chunkData.isCfgFrame()){
_set_meta_data = true;
_pPublisherSrc->setMetaData(TitleMeta().getMetadata());
}
_pPublisherSrc->onWrite(std::make_shared<RtmpPacket>(std::move(chunkData))); _pPublisherSrc->onWrite(std::make_shared<RtmpPacket>(std::move(chunkData)));
} }
break; break;

View File

@ -95,6 +95,7 @@ private:
std::string _strTcUrl; std::string _strTcUrl;
MediaInfo _mediaInfo; MediaInfo _mediaInfo;
double _dNowReqID = 0; double _dNowReqID = 0;
bool _set_meta_data = false;
Ticker _ticker;//数据接收时间 Ticker _ticker;//数据接收时间
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader; RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader;
std::shared_ptr<RtmpMediaSourceImp> _pPublisherSrc; std::shared_ptr<RtmpMediaSourceImp> _pPublisherSrc;