确保on_publish事件回调时线程安全

This commit is contained in:
ziyue 2022-08-11 18:36:17 +08:00
parent 4e8c56e2be
commit 6382fcb3be
2 changed files with 36 additions and 22 deletions

View File

@ -240,6 +240,12 @@ void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener) {
void RtpProcess::emitOnPublish() {
weak_ptr<RtpProcess> weak_self = shared_from_this();
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, const ProtocolOption &option) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
auto poller = strong_self->_sock ? strong_self->_sock->getPoller() : EventPollerPool::Instance().getPoller();
poller->async([weak_self, err, option]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
@ -255,6 +261,7 @@ void RtpProcess::emitOnPublish() {
} else {
WarnP(strong_self) << "禁止RTP推流:" << err;
}
});
};
//触发推流鉴权事件

View File

@ -157,14 +157,20 @@ toolkit::EventPoller::Ptr SrtTransportImp::getOwnerPoller(MediaSource &sender){
void SrtTransportImp::emitOnPublish() {
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
Broadcast::PublishAuthInvoker invoker = [weak_self](const std::string &err, const ProtocolOption &option) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
strong_self->getPoller()->async([weak_self, err, option](){
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(
strong_self->_media_info._vhost, strong_self->_media_info._app, strong_self->_media_info._streamid,
0.0f, option);
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info._vhost,
strong_self->_media_info._app,
strong_self->_media_info._streamid,0.0f,
option);
strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc();
InfoP(strong_self) << "允许 srt 推流";
@ -172,6 +178,7 @@ void SrtTransportImp::emitOnPublish() {
WarnP(strong_self) << "禁止 srt 推流:" << err;
strong_self->onShutdown(SockException(Err_refused, err));
}
});
};
// 触发推流鉴权事件