完善PlayerProxy关闭机制,重试次数超限后自动关闭

This commit is contained in:
ziyue 2021-06-09 15:01:45 +08:00
parent ad2cd11eec
commit 92f879d703
5 changed files with 53 additions and 48 deletions

View File

@ -59,7 +59,7 @@ API_EXPORT void API_CALL mk_proxy_player_play(mk_proxy_player ctx, const char *u
* mk_proxy_player_release函数MediaSource.close() * mk_proxy_player_release函数MediaSource.close()
* @param user_data mk_proxy_player_set_on_close函数设置 * @param user_data mk_proxy_player_set_on_close函数设置
*/ */
typedef void(API_CALL *on_mk_proxy_player_close)(void *user_data); typedef void(API_CALL *on_mk_proxy_player_close)(void *user_data, int err, const char *what, int sys_err);
/** /**
* MediaSource.close() * MediaSource.close()

View File

@ -51,9 +51,9 @@ API_EXPORT void API_CALL mk_proxy_player_set_on_close(mk_proxy_player ctx, on_mk
PlayerProxy::Ptr &obj = *((PlayerProxy::Ptr *) ctx); PlayerProxy::Ptr &obj = *((PlayerProxy::Ptr *) ctx);
obj->getPoller()->async([obj,cb,user_data](){ obj->getPoller()->async([obj,cb,user_data](){
//切换线程再操作 //切换线程再操作
obj->setOnClose([cb,user_data](){ obj->setOnClose([cb,user_data](const SockException &ex){
if(cb){ if(cb){
cb(user_data); cb(user_data, ex.getErrCode(), ex.what(), ex.getCustomCode());
} }
}); });
}); });

View File

@ -634,7 +634,7 @@ void installWebApi() {
}); });
//被主动关闭拉流 //被主动关闭拉流
player->setOnClose([key](){ player->setOnClose([key](const SockException &ex){
lock_guard<recursive_mutex> lck(s_proxyMapMtx); lock_guard<recursive_mutex> lck(s_proxyMapMtx);
s_proxyMap.erase(key); s_proxyMap.erase(key);
}); });

View File

@ -48,62 +48,66 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00,
PlayerProxy::PlayerProxy(const string &vhost, const string &app, const string &stream_id, PlayerProxy::PlayerProxy(const string &vhost, const string &app, const string &stream_id,
bool enable_hls, bool enable_mp4, int retry_count, const EventPoller::Ptr &poller) bool enable_hls, bool enable_mp4, int retry_count, const EventPoller::Ptr &poller)
: MediaPlayer(poller) { : MediaPlayer(poller) {
_vhost = vhost; _vhost = vhost;
_app = app; _app = app;
_stream_id = stream_id; _stream_id = stream_id;
_enable_hls = enable_hls; _enable_hls = enable_hls;
_enable_mp4 = enable_mp4; _enable_mp4 = enable_mp4;
_retry_count = retry_count; _retry_count = retry_count;
_on_close = [](const SockException &) {};
} }
void PlayerProxy::setPlayCallbackOnce(const function<void(const SockException &ex)> &cb){ void PlayerProxy::setPlayCallbackOnce(const function<void(const SockException &ex)> &cb) {
_on_play = cb; _on_play = cb;
} }
void PlayerProxy::setOnClose(const function<void()> &cb){ void PlayerProxy::setOnClose(const function<void(const SockException &ex)> &cb) {
_on_close = cb; _on_close = cb ? cb : [](const SockException &) {};
} }
void PlayerProxy::play(const string &strUrlTmp) { void PlayerProxy::play(const string &strUrlTmp) {
weak_ptr<PlayerProxy> weakSelf = shared_from_this(); weak_ptr<PlayerProxy> weakSelf = shared_from_this();
std::shared_ptr<int> piFailedCnt(new int(0)); //连续播放失败次数 std::shared_ptr<int> piFailedCnt(new int(0)); //连续播放失败次数
setOnPlayResult([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) { setOnPlayResult([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if (!strongSelf) {
return; return;
} }
if(strongSelf->_on_play) { if (strongSelf->_on_play) {
strongSelf->_on_play(err); strongSelf->_on_play(err);
strongSelf->_on_play = nullptr; strongSelf->_on_play = nullptr;
} }
if(!err) { if (!err) {
// 播放成功 // 播放成功
*piFailedCnt = 0;//连续播放失败次数清0 *piFailedCnt = 0;//连续播放失败次数清0
strongSelf->onPlaySuccess(); strongSelf->onPlaySuccess();
}else if(*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) {
// 播放失败,延时重试播放 // 播放失败,延时重试播放
strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++); strongSelf->rePlay(strUrlTmp, (*piFailedCnt)++);
} else {
//达到了最大重试次数,回调关闭
strongSelf->_on_close(err);
} }
}); });
setOnShutdown([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) { setOnShutdown([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if (!strongSelf) {
return; return;
} }
//注销直接拉流代理产生的流:#532 //注销直接拉流代理产生的流:#532
strongSelf->setMediaSource(nullptr); strongSelf->setMediaSource(nullptr);
if(strongSelf->_muxer) { if (strongSelf->_muxer) {
auto tracks = strongSelf->MediaPlayer::getTracks(false); auto tracks = strongSelf->MediaPlayer::getTracks(false);
for (auto & track : tracks){ for (auto &track : tracks) {
track->delDelegate(strongSelf->_muxer.get()); track->delDelegate(strongSelf->_muxer.get());
} }
GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay); GET_CONFIG(bool, resetWhenRePlay, General::kResetWhenRePlay);
if (resetWhenRePlay) { if (resetWhenRePlay) {
strongSelf->_muxer.reset(); strongSelf->_muxer.reset();
} else { } else {
@ -111,8 +115,11 @@ void PlayerProxy::play(const string &strUrlTmp) {
} }
} }
//播放异常中断,延时重试播放 //播放异常中断,延时重试播放
if(*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) {
strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++); strongSelf->rePlay(strUrlTmp, (*piFailedCnt)++);
} else {
//达到了最大重试次数,回调关闭
strongSelf->_on_close(err);
} }
}); });
MediaPlayer::play(strUrlTmp); MediaPlayer::play(strUrlTmp);
@ -120,7 +127,7 @@ void PlayerProxy::play(const string &strUrlTmp) {
setDirectProxy(); setDirectProxy();
} }
void PlayerProxy::setDirectProxy(){ void PlayerProxy::setDirectProxy() {
MediaSource::Ptr mediaSource; MediaSource::Ptr mediaSource;
if (dynamic_pointer_cast<RtspPlayer>(_delegate)) { if (dynamic_pointer_cast<RtspPlayer>(_delegate)) {
//rtsp拉流 //rtsp拉流
@ -142,24 +149,24 @@ PlayerProxy::~PlayerProxy() {
_timer.reset(); _timer.reset();
} }
void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ void PlayerProxy::rePlay(const string &strUrl, int iFailedCnt) {
auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60*1000)); auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60 * 1000));
weak_ptr<PlayerProxy> weakSelf = shared_from_this(); weak_ptr<PlayerProxy> weakSelf = shared_from_this();
_timer = std::make_shared<Timer>(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() { _timer = std::make_shared<Timer>(iDelay / 1000.0f, [weakSelf, strUrl, iFailedCnt]() {
//播放失败次数越多,则延时越长 //播放失败次数越多,则延时越长
auto strongPlayer = weakSelf.lock(); auto strongPlayer = weakSelf.lock();
if(!strongPlayer) { if (!strongPlayer) {
return false; return false;
} }
WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl; WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl;
strongPlayer->MediaPlayer::play(strUrl); strongPlayer->MediaPlayer::play(strUrl);
strongPlayer->setDirectProxy(); strongPlayer->setDirectProxy();
return false; return false;
}, getPoller()); }, getPoller());
} }
bool PlayerProxy::close(MediaSource &sender,bool force) { bool PlayerProxy::close(MediaSource &sender, bool force) {
if(!force && totalReaderCount()){ if (!force && totalReaderCount()) {
return false; return false;
} }
@ -174,14 +181,12 @@ bool PlayerProxy::close(MediaSource &sender,bool force) {
strongSelf->setMediaSource(nullptr); strongSelf->setMediaSource(nullptr);
strongSelf->teardown(); strongSelf->teardown();
}); });
if (_on_close) { _on_close(SockException(Err_shutdown, "closed by user"));
_on_close();
}
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
return true; return true;
} }
int PlayerProxy::totalReaderCount(){ int PlayerProxy::totalReaderCount() {
return (_muxer ? _muxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0); return (_muxer ? _muxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0);
} }
@ -189,29 +194,29 @@ int PlayerProxy::totalReaderCount(MediaSource &sender) {
return totalReaderCount(); return totalReaderCount();
} }
MediaOriginType PlayerProxy::getOriginType(MediaSource &sender) const{ MediaOriginType PlayerProxy::getOriginType(MediaSource &sender) const {
return MediaOriginType::pull; return MediaOriginType::pull;
} }
string PlayerProxy::getOriginUrl(MediaSource &sender) const{ string PlayerProxy::getOriginUrl(MediaSource &sender) const {
return _pull_url; return _pull_url;
} }
std::shared_ptr<SockInfo> PlayerProxy::getOriginSock(MediaSource &sender) const{ std::shared_ptr<SockInfo> PlayerProxy::getOriginSock(MediaSource &sender) const {
return getSockInfo(); return getSockInfo();
} }
class MuteAudioMaker : public FrameDispatcher{ class MuteAudioMaker : public FrameDispatcher {
public: public:
typedef std::shared_ptr<MuteAudioMaker> Ptr; typedef std::shared_ptr<MuteAudioMaker> Ptr;
MuteAudioMaker(){}; MuteAudioMaker() {};
~MuteAudioMaker() override {} ~MuteAudioMaker() override {}
void inputFrame(const Frame::Ptr &frame) override { void inputFrame(const Frame::Ptr &frame) override {
if(frame->getTrackType() == TrackVideo){ if (frame->getTrackType() == TrackVideo) {
auto audio_idx = frame->dts() / MUTE_ADTS_DATA_MS; auto audio_idx = frame->dts() / MUTE_ADTS_DATA_MS;
if(_audio_idx != audio_idx){ if (_audio_idx != audio_idx) {
_audio_idx = audio_idx; _audio_idx = audio_idx;
auto aacFrame = std::make_shared<FrameFromStaticPtr>(CodecAAC, (char *)MUTE_ADTS_DATA, MUTE_ADTS_DATA_LEN, _audio_idx * MUTE_ADTS_DATA_MS, 0 ,ADTS_HEADER_LEN); auto aacFrame = std::make_shared<FrameFromStaticPtr>(CodecAAC, (char *)MUTE_ADTS_DATA, MUTE_ADTS_DATA_LEN, _audio_idx * MUTE_ADTS_DATA_MS, 0 ,ADTS_HEADER_LEN);
FrameDispatcher::inputFrame(aacFrame); FrameDispatcher::inputFrame(aacFrame);
@ -220,10 +225,10 @@ public:
} }
private: private:
class FrameFromStaticPtr : public FrameFromPtr{ class FrameFromStaticPtr : public FrameFromPtr {
public: public:
template <typename ... ARGS> template<typename ... ARGS>
FrameFromStaticPtr(ARGS && ...args) : FrameFromPtr(std::forward<ARGS>(args)...) {}; FrameFromStaticPtr(ARGS &&...args) : FrameFromPtr(std::forward<ARGS>(args)...) {};
~FrameFromStaticPtr() override = default; ~FrameFromStaticPtr() override = default;
bool cacheAble() const override { bool cacheAble() const override {
@ -236,7 +241,7 @@ private:
}; };
void PlayerProxy::onPlaySuccess() { void PlayerProxy::onPlaySuccess() {
GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay); GET_CONFIG(bool, resetWhenRePlay, General::kResetWhenRePlay);
if (dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc)) { if (dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc)) {
//rtsp拉流代理 //rtsp拉流代理
if (resetWhenRePlay || !_muxer) { if (resetWhenRePlay || !_muxer) {

View File

@ -34,15 +34,15 @@ public:
/** /**
* play结果回调play执行之前有效 * play结果回调play执行之前有效
* @param cb * @param cb
*/ */
void setPlayCallbackOnce(const function<void(const SockException &ex)> &cb); void setPlayCallbackOnce(const function<void(const SockException &ex)> &cb);
/** /**
* *
* @param cb * @param cb
*/ */
void setOnClose(const function<void()> &cb); void setOnClose(const function<void(const SockException &ex)> &cb);
/** /**
* *
@ -76,7 +76,7 @@ private:
string _stream_id; string _stream_id;
string _pull_url; string _pull_url;
Timer::Ptr _timer; Timer::Ptr _timer;
function<void()> _on_close; function<void(const SockException &ex)> _on_close;
function<void(const SockException &ex)> _on_play; function<void(const SockException &ex)> _on_play;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
}; };