diff --git a/src/Pusher/PusherProxy.cpp b/src/Pusher/PusherProxy.cpp index 98c2724c..447a5539 100644 --- a/src/Pusher/PusherProxy.cpp +++ b/src/Pusher/PusherProxy.cpp @@ -18,6 +18,7 @@ PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const Eve : MediaPusher(src, poller){ _retry_count = retry_count; _on_close = [](const SockException &) {}; + _weak_src = src; } PusherProxy::~PusherProxy() { @@ -32,59 +33,65 @@ void PusherProxy::setOnClose(const function &cb) _on_close = cb; } -void PusherProxy::publish(const string& dstUrl) { - std::weak_ptr weakSelf = shared_from_this(); - std::shared_ptr piFailedCnt(new int(0)); +void PusherProxy::publish(const string &dst_url) { + std::weak_ptr weak_self = shared_from_this(); + std::shared_ptr failed_cnt(new int(0)); - setOnPublished([weakSelf, dstUrl, piFailedCnt](const SockException &err) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) return; + setOnPublished([weak_self, dst_url, failed_cnt](const SockException &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } - if (strongSelf->_on_publish) { - strongSelf->_on_publish(err); - strongSelf->_on_publish = nullptr; - } + if (strong_self->_on_publish) { + strong_self->_on_publish(err); + strong_self->_on_publish = nullptr; + } + auto src = strong_self->_weak_src.lock(); if (!err) { // 推流成功 - *piFailedCnt = 0; - InfoL << "Publish " << dstUrl << " success"; - } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + *failed_cnt = 0; + InfoL << "Publish " << dst_url << " success"; + } else if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) { // 推流失败,延时重试推送 - strongSelf->rePublish(dstUrl, (*piFailedCnt)++); + strong_self->rePublish(dst_url, (*failed_cnt)++); } else { - //达到了最大重试次数,回调关闭 - strongSelf->_on_close(err); + //如果媒体源已经注销, 或达到了最大重试次数,回调关闭 + strong_self->_on_close(err); } }); - setOnShutdown([weakSelf, dstUrl, piFailedCnt](const SockException &err) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) return; + setOnShutdown([weak_self, dst_url, failed_cnt](const SockException &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + auto src = strong_self->_weak_src.lock(); //推流异常中断,延时重试播放 - if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { - strongSelf->rePublish(dstUrl, (*piFailedCnt)++); + if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) { + strong_self->rePublish(dst_url, (*failed_cnt)++); } else { - //达到了最大重试次数,回调关闭 - strongSelf->_on_close(err); + //如果媒体源已经注销, 或达到了最大重试次数,回调关闭 + strong_self->_on_close(err); } }); - MediaPusher::publish(dstUrl); + MediaPusher::publish(dst_url); } -void PusherProxy::rePublish(const string &dstUrl, int iFailedCnt) { - auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60 * 1000)); - weak_ptr weakSelf = shared_from_this(); - _timer = std::make_shared(iDelay / 1000.0f, [weakSelf, dstUrl, iFailedCnt]() { +void PusherProxy::rePublish(const string &dst_url, int failed_cnt) { + auto delay = MAX(2 * 1000, MIN(failed_cnt * 3000, 60 * 1000)); + weak_ptr weak_self = shared_from_this(); + _timer = std::make_shared(delay / 1000.0f, [weak_self, dst_url, failed_cnt]() { //推流失败次数越多,则延时越长 - auto strongPusher = weakSelf.lock(); - if (!strongPusher) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return false; } - WarnL << "推流重试[" << iFailedCnt << "]:" << dstUrl; - strongPusher->MediaPusher::publish(dstUrl); + WarnL << "推流重试[" << failed_cnt << "]:" << dst_url; + strong_self->MediaPusher::publish(dst_url); return false; }, getPoller()); } diff --git a/src/Pusher/PusherProxy.h b/src/Pusher/PusherProxy.h index a21ea916..3e04a8d3 100644 --- a/src/Pusher/PusherProxy.h +++ b/src/Pusher/PusherProxy.h @@ -53,7 +53,7 @@ private: private: int _retry_count; Timer::Ptr _timer; - + std::weak_ptr _weak_src; function _on_close; function _on_publish; };