解决开启rtp推流鉴权导致udp丢包的问题,提升GB28181推流秒开体验

This commit is contained in:
ziyue 2021-06-23 11:02:39 +08:00
parent e8c9666af0
commit d9de40526d
2 changed files with 52 additions and 18 deletions

View File

@ -78,11 +78,6 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
emitOnPublish(); emitOnPublish();
} }
if (!_muxer) {
//无权限推流
return false;
}
_total_bytes += len; _total_bytes += len;
if (_save_file_rtp) { if (_save_file_rtp) {
uint16_t size = (uint16_t)len; uint16_t size = (uint16_t)len;
@ -95,7 +90,7 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
} }
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
if (!_muxer->isEnabled() && !dts_out && dump_dir.empty()) { if (_muxer && !_muxer->isEnabled() && !dts_out && dump_dir.empty()) {
//无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据 //无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据
_last_frame_time.resetTime(); _last_frame_time.resetTime();
return false; return false;
@ -109,20 +104,55 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
} }
void RtpProcess::inputFrame(const Frame::Ptr &frame) { void RtpProcess::inputFrame(const Frame::Ptr &frame) {
_last_frame_time.resetTime();
_dts = frame->dts(); _dts = frame->dts();
if (_save_file_video && frame->getTrackType() == TrackVideo) { if (_save_file_video && frame->getTrackType() == TrackVideo) {
fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get()); fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get());
} }
if (_muxer) {
_last_frame_time.resetTime();
_muxer->inputFrame(frame); _muxer->inputFrame(frame);
} else {
if (_cached_func.size() > 100) {
WarnL << "cached frame of track(" << frame->getCodecName() << ") is too much, now droped";
return;
}
auto frame_cached = Frame::getCacheAbleFrame(frame);
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, frame_cached]() {
_last_frame_time.resetTime();
_muxer->inputFrame(frame_cached);
});
}
} }
void RtpProcess::addTrack(const Track::Ptr &track) { void RtpProcess::addTrack(const Track::Ptr &track) {
if (_muxer) {
_muxer->addTrack(track); _muxer->addTrack(track);
} else {
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, track]() {
_muxer->addTrack(track);
});
}
} }
void RtpProcess::addTrackCompleted() { void RtpProcess::addTrackCompleted() {
if (_muxer) {
_muxer->addTrackCompleted(); _muxer->addTrackCompleted();
} else {
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this]() {
_muxer->addTrackCompleted();
});
}
}
void RtpProcess::doCachedFunc() {
lock_guard<recursive_mutex> lck(_func_mtx);
for (auto &func : _cached_func) {
func();
}
_cached_func.clear();
} }
bool RtpProcess::alive() { bool RtpProcess::alive() {
@ -197,19 +227,20 @@ void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener) {
void RtpProcess::emitOnPublish() { void RtpProcess::emitOnPublish() {
weak_ptr<RtpProcess> weak_self = shared_from_this(); weak_ptr<RtpProcess> weak_self = shared_from_this();
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) { Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) {
auto strongSelf = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
if (err.empty()) { if (err.empty()) {
strongSelf->_muxer = std::make_shared<MultiMediaSourceMuxer>(strongSelf->_media_info._vhost, strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info._vhost,
strongSelf->_media_info._app, strong_self->_media_info._app,
strongSelf->_media_info._streamid, 0.0f, strong_self->_media_info._streamid, 0.0f,
true, true, enableHls, enableMP4); true, true, enableHls, enableMP4);
strongSelf->_muxer->setMediaListener(strongSelf); strong_self->_muxer->setMediaListener(strong_self);
InfoP(strongSelf) << "允许RTP推流"; strong_self->doCachedFunc();
InfoP(strong_self) << "允许RTP推流";
} else { } else {
WarnP(strongSelf) << "禁止RTP推流:" << err; WarnP(strong_self) << "禁止RTP推流:" << err;
} }
}; };

View File

@ -79,6 +79,7 @@ protected:
private: private:
void emitOnPublish(); void emitOnPublish();
void doCachedFunc();
private: private:
uint32_t _dts = 0; uint32_t _dts = 0;
@ -95,6 +96,8 @@ private:
atomic_bool _stop_rtp_check{false}; atomic_bool _stop_rtp_check{false};
atomic_flag _busy_flag{false}; atomic_flag _busy_flag{false};
Ticker _last_check_alive; Ticker _last_check_alive;
recursive_mutex _func_mtx;
deque<function<void()> > _cached_func;
}; };
}//namespace mediakit }//namespace mediakit