diff --git a/src/Http/HlsPlayer.cpp b/src/Http/HlsPlayer.cpp index 57dfe8a5..5363c478 100644 --- a/src/Http/HlsPlayer.cpp +++ b/src/Http/HlsPlayer.cpp @@ -9,9 +9,10 @@ */ #include "HlsPlayer.h" + namespace mediakit { -HlsPlayer::HlsPlayer(const EventPoller::Ptr &poller){ +HlsPlayer::HlsPlayer(const EventPoller::Ptr &poller) { _segment.setOnSegment([this](const char *data, size_t len) { onPacket(data, len); }); setPoller(poller ? poller : EventPollerPool::Instance().getPoller()); } @@ -23,24 +24,23 @@ void HlsPlayer::play(const string &strUrl) { play_l(); } -void HlsPlayer::play_l(){ +void HlsPlayer::play_l() { if (_m3u8_list.empty()) { teardown_l(SockException(Err_shutdown, "所有hls url都尝试播放失败!")); return; } - if (alive() && _waiting_response) { + if (waitResponse()) { return; } float playTimeOutSec = (*this)[Client::kTimeoutMS].as() / 1000.0f; setMethod("GET"); - if(!(*this)[kNetAdapter].empty()) { + if (!(*this)[kNetAdapter].empty()) { setNetAdapter((*this)[kNetAdapter]); } - _waiting_response = true; sendRequest(_m3u8_list.back(), playTimeOutSec); } -void HlsPlayer::teardown_l(const SockException &ex){ +void HlsPlayer::teardown_l(const SockException &ex) { _timer.reset(); _timer_ts.reset(); _http_ts_player.reset(); @@ -48,75 +48,79 @@ void HlsPlayer::teardown_l(const SockException &ex){ } void HlsPlayer::teardown() { - teardown_l(SockException(Err_shutdown,"teardown")); + teardown_l(SockException(Err_shutdown, "teardown")); } -void HlsPlayer::playNextTs(bool force){ +void HlsPlayer::playNextTs() { if (_ts_list.empty()) { //播放列表为空,那么立即重新下载m3u8文件 _timer.reset(); play_l(); return; } - if (!force && _http_ts_player && _http_ts_player->alive()) { + if (_http_ts_player && _http_ts_player->waitResponse()) { //播放器目前还存活,正在下载中 return; } - auto ts_duration = _ts_list.front().duration * 1000; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - std::shared_ptr ticker(new Ticker); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + if (!_http_ts_player) { + _http_ts_player = std::make_shared(getPoller(), false); + _http_ts_player->setOnCreateSocket([weak_self](const EventPoller::Ptr &poller) { + auto strong_self = weak_self.lock(); + if (strong_self) { + return strong_self->createSocket(); + } + return Socket::createSocket(poller, true); + }); - _http_ts_player = std::make_shared(getPoller(), false); + _http_ts_player->setOnPacket([weak_self](const char *data, size_t len) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + //收到ts包 + strong_self->onPacket_l(data, len); + }); - _http_ts_player->setOnCreateSocket([weakSelf](const EventPoller::Ptr &poller) { - auto strongSelf = weakSelf.lock(); - if (strongSelf) { - return strongSelf->createSocket(); + if (!(*this)[kNetAdapter].empty()) { + _http_ts_player->setNetAdapter((*this)[Client::kNetAdapter]); } - return Socket::createSocket(poller, true); - }); + } - _http_ts_player->setOnDisconnect([weakSelf, ticker, ts_duration](const SockException &err) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + Ticker ticker; + auto url = _ts_list.front().url; + auto duration = _ts_list.front().duration; + _ts_list.pop_front(); + + _http_ts_player->setOnComplete([weak_self, ticker, duration, url](const SockException &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - auto delay = ts_duration - 500 - ticker->elapsedTime(); + if (err) { + WarnL << "download ts segment " << url << " failed:" << err.what(); + } + //提前半秒下载好 + auto delay = duration - ticker.elapsedTime() / 1000.0f - 0.5; if (delay <= 0) { - //播放这个ts切片花费了太长时间,我们立即下一个切片的播放 - strongSelf->playNextTs(true); - } else { - //下一个切片慢点播放 - strongSelf->_timer_ts.reset(new Timer(delay / 1000.0f, [weakSelf]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return false; - } - strongSelf->playNextTs(true); - return false; - }, strongSelf->getPoller())); + //延时最小10ms + delay = 10; } - }); - - _http_ts_player->setOnPacket([weakSelf](const char *data, size_t len) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return; - } - //收到ts包 - strongSelf->onPacket_l(data, len); + //延时下载下一个切片 + strong_self->_timer_ts.reset(new Timer(delay / 1000.0f, [weak_self]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->playNextTs(); + } + return false; + }, strong_self->getPoller())); }); _http_ts_player->setMethod("GET"); - if (!(*this)[kNetAdapter].empty()) { - _http_ts_player->setNetAdapter((*this)[Client::kNetAdapter]); - } - - _http_ts_player->sendRequest(_ts_list.front().url, 2 * _ts_list.front().duration); - _ts_list.pop_front(); + _http_ts_player->sendRequest(url, 2 * duration); } -void HlsPlayer::onParsed(bool is_m3u8_inner,int64_t sequence,const map &ts_map){ +void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map &ts_map) { if (!is_m3u8_inner) { //这是ts播放列表 if (_last_sequence == sequence) { @@ -146,12 +150,12 @@ void HlsPlayer::onParsed(bool is_m3u8_inner,int64_t sequence,const map weakSelf = dynamic_pointer_cast(shared_from_this()); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); auto url = ts_map.rbegin()->second.url; - getPoller()->async([weakSelf, url]() { - auto strongSelf = weakSelf.lock(); - if (strongSelf) { - strongSelf->play(url); + getPoller()->async([weak_self, url]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->play(url); } }, false); } @@ -163,8 +167,8 @@ ssize_t HlsPlayer::onResponseHeader(const string &status, const HttpClient::Http teardown_l(SockException(Err_shutdown, StrPrinter << "bad http status code:" + status)); return 0; } - auto contet_type = const_cast< HttpClient::HttpHeader &>(headers)["Content-Type"]; - _is_m3u8 = (contet_type.find("application/vnd.apple.mpegurl") == 0); + auto content_type = const_cast< HttpClient::HttpHeader &>(headers)["Content-Type"]; + _is_m3u8 = (content_type.find("application/vnd.apple.mpegurl") == 0); return -1; } @@ -177,13 +181,12 @@ void HlsPlayer::onResponseBody(const char *buf, size_t size, size_t recvedSize, } void HlsPlayer::onResponseCompleted() { - _waiting_response = false; if (HlsParser::parse(getUrl(), _m3u8)) { - playDelay(); if (_first) { _first = false; onPlayResult(SockException(Err_success, "play success")); } + playDelay(); } else { teardown_l(SockException(Err_shutdown, "解析m3u8文件失败")); } @@ -213,7 +216,6 @@ float HlsPlayer::delaySecond() { } void HlsPlayer::onDisconnect(const SockException &ex) { - _waiting_response = false; if (_first) { //第一次失败,则播放失败 _first = false; @@ -240,24 +242,24 @@ void HlsPlayer::onDisconnect(const SockException &ex) { playDelay(); } -bool HlsPlayer::onRedirectUrl(const string &url,bool temporary) { +bool HlsPlayer::onRedirectUrl(const string &url, bool temporary) { _m3u8_list.emplace_back(url); return true; } -void HlsPlayer::playDelay(){ - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _timer.reset(new Timer(delaySecond(), [weakSelf]() { - auto strongSelf = weakSelf.lock(); - if (strongSelf) { - strongSelf->play_l(); +void HlsPlayer::playDelay() { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _timer.reset(new Timer(delaySecond(), [weak_self]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->play_l(); } return false; }, getPoller())); } -void HlsPlayer::onPacket_l(const char *data, size_t len){ - _segment.input(data,len); +void HlsPlayer::onPacket_l(const char *data, size_t len) { + _segment.input(data, len); } ////////////////////////////////////////////////////////////////////////// @@ -280,7 +282,7 @@ public: } void resetTracks() override { - ((MediaSink &)_delegate).resetTracks(); + ((MediaSink &) _delegate).resetTracks(); } vector getTracks(bool ready = true) const override { @@ -329,7 +331,7 @@ bool HlsDemuxer::inputFrame(const Frame::Ptr &frame) { _delegate.inputFrame(frame); return true; } - + //计算相对时间戳 int64_t dts, pts; _stamp[frame->getTrackType()].revise(frame->dts(), frame->pts(), dts, pts); @@ -388,7 +390,7 @@ void HlsDemuxer::onTick() { HlsPlayerImp::HlsPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller) {} -void HlsPlayerImp::onPacket(const char *data,size_t len) { +void HlsPlayerImp::onPacket(const char *data, size_t len) { if (!_decoder) { _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); } diff --git a/src/Http/HlsPlayer.h b/src/Http/HlsPlayer.h index 2ad94dae..4da2be44 100644 --- a/src/Http/HlsPlayer.h +++ b/src/Http/HlsPlayer.h @@ -96,7 +96,7 @@ private: private: void playDelay(); float delaySecond(); - void playNextTs(bool force = false); + void playNextTs(); void teardown_l(const SockException &ex); void play_l(); void onPacket_l(const char *data, size_t len); @@ -112,7 +112,6 @@ private: private: bool _is_m3u8 = false; bool _first = true; - bool _waiting_response = false; int64_t _last_sequence = -1; string _m3u8; Timer::Ptr _timer; diff --git a/src/Http/HttpTSPlayer.cpp b/src/Http/HttpTSPlayer.cpp index 70cf3b42..68e2db1a 100644 --- a/src/Http/HttpTSPlayer.cpp +++ b/src/Http/HttpTSPlayer.cpp @@ -9,24 +9,23 @@ */ #include "HttpTSPlayer.h" + namespace mediakit { -HttpTSPlayer::HttpTSPlayer(const EventPoller::Ptr &poller, bool split_ts){ +HttpTSPlayer::HttpTSPlayer(const EventPoller::Ptr &poller, bool split_ts) { _split_ts = split_ts; _segment.setOnSegment([this](const char *data, size_t len) { onPacket(data, len); }); setPoller(poller ? poller : EventPollerPool::Instance().getPoller()); } -HttpTSPlayer::~HttpTSPlayer() {} - -ssize_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &headers) { +ssize_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) { _status = status; if (status != "200" && status != "206") { //http状态码不符合预期 shutdown(SockException(Err_other, StrPrinter << "bad http status code:" + status)); return 0; } - auto content_type = const_cast< HttpClient::HttpHeader &>(headers)["Content-Type"]; + auto content_type = const_cast< HttpClient::HttpHeader &>(header)["Content-Type"]; if (content_type.find("video/mp2t") == 0 || content_type.find("video/mpeg") == 0) { _is_ts_content = true; } @@ -35,11 +34,11 @@ ssize_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::H return -1; } -void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) { +void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_size, size_t total_size) { if (_status != "200" && _status != "206") { return; } - if (recvedSize == size) { + if (recved_size == size) { //开始接收数据 if (buf[0] == TS_SYNC_BYTE) { //这是ts头 @@ -57,14 +56,16 @@ void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recvedSiz } void HttpTSPlayer::onResponseCompleted() { - //接收完毕 - shutdown(SockException(Err_success, "play completed")); + emitOnComplete(SockException(Err_success, "play completed")); } void HttpTSPlayer::onDisconnect(const SockException &ex) { - if (_on_disconnect) { - _on_disconnect(ex); - _on_disconnect = nullptr; + emitOnComplete(ex); +} + +void HttpTSPlayer::emitOnComplete(const SockException &ex) { + if (_on_complete) { + _on_complete(ex); } } @@ -74,12 +75,12 @@ void HttpTSPlayer::onPacket(const char *data, size_t len) { } } -void HttpTSPlayer::setOnDisconnect(const HttpTSPlayer::onShutdown &cb) { - _on_disconnect = cb; +void HttpTSPlayer::setOnComplete(onComplete cb) { + _on_complete = std::move(cb); } -void HttpTSPlayer::setOnPacket(const TSSegment::onSegment &cb) { - _on_segment = cb; +void HttpTSPlayer::setOnPacket(TSSegment::onSegment cb) { + _on_segment = std::move(cb); } }//namespace mediakit diff --git a/src/Http/HttpTSPlayer.h b/src/Http/HttpTSPlayer.h index 2dbe4aa9..2d42f5df 100644 --- a/src/Http/HttpTSPlayer.h +++ b/src/Http/HttpTSPlayer.h @@ -14,33 +14,46 @@ #include "Http/HttpDownloader.h" #include "Player/MediaPlayer.h" #include "Rtp/TSDecoder.h" + using namespace toolkit; + namespace mediakit { //http-ts播发器,未实现ts解复用 -class HttpTSPlayer : public HttpClientImp{ +class HttpTSPlayer : public HttpClientImp { public: - typedef function onShutdown; - typedef std::shared_ptr Ptr; + using Ptr = std::shared_ptr; + using onComplete = std::function; HttpTSPlayer(const EventPoller::Ptr &poller = nullptr, bool split_ts = true); - ~HttpTSPlayer() override ; + ~HttpTSPlayer() override = default; - //设置异常断开回调 - void setOnDisconnect(const onShutdown &cb); - //设置接收ts包回调 - void setOnPacket(const TSSegment::onSegment &cb); + /** + * 设置下载完毕或异常断开回调 + */ + void setOnComplete(onComplete cb); + + /** + * 设置接收ts包回调 + */ + void setOnPacket(TSSegment::onSegment cb); protected: ///HttpClient override/// - ssize_t onResponseHeader(const string &status,const HttpHeader &headers) override; - void onResponseBody(const char *buf,size_t size,size_t recvedSize,size_t totalSize) override; + ssize_t onResponseHeader(const string &status, const HttpHeader &header) override; + void onResponseBody(const char *buf, size_t size, size_t recved_size, size_t total_size) override; void onResponseCompleted() override; - void onDisconnect(const SockException &ex) override ; + void onDisconnect(const SockException &ex) override; - //收到ts包 +protected: + /** + * 收到ts数据 + */ virtual void onPacket(const char *data, size_t len); +private: + void emitOnComplete(const SockException &ex); + private: //是否为mpegts负载 bool _is_ts_content = false; @@ -50,7 +63,7 @@ private: bool _split_ts; string _status; TSSegment _segment; - onShutdown _on_disconnect; + onComplete _on_complete; TSSegment::onSegment _on_segment; };