HLS: hls播放解决播放下一个切片导致数据丢失的bug: #1294

This commit is contained in:
ziyue 2021-12-24 13:29:16 +08:00
parent 0bf75529a2
commit c25ff6400f
4 changed files with 119 additions and 104 deletions

View File

@ -9,6 +9,7 @@
*/ */
#include "HlsPlayer.h" #include "HlsPlayer.h"
namespace mediakit { namespace mediakit {
HlsPlayer::HlsPlayer(const EventPoller::Ptr &poller) { HlsPlayer::HlsPlayer(const EventPoller::Ptr &poller) {
@ -28,7 +29,7 @@ void HlsPlayer::play_l(){
teardown_l(SockException(Err_shutdown, "所有hls url都尝试播放失败!")); teardown_l(SockException(Err_shutdown, "所有hls url都尝试播放失败!"));
return; return;
} }
if (alive() && _waiting_response) { if (waitResponse()) {
return; return;
} }
float playTimeOutSec = (*this)[Client::kTimeoutMS].as<int>() / 1000.0f; float playTimeOutSec = (*this)[Client::kTimeoutMS].as<int>() / 1000.0f;
@ -36,7 +37,6 @@ void HlsPlayer::play_l(){
if (!(*this)[kNetAdapter].empty()) { if (!(*this)[kNetAdapter].empty()) {
setNetAdapter((*this)[kNetAdapter]); setNetAdapter((*this)[kNetAdapter]);
} }
_waiting_response = true;
sendRequest(_m3u8_list.back(), playTimeOutSec); sendRequest(_m3u8_list.back(), playTimeOutSec);
} }
@ -51,69 +51,73 @@ void HlsPlayer::teardown() {
teardown_l(SockException(Err_shutdown, "teardown")); teardown_l(SockException(Err_shutdown, "teardown"));
} }
void HlsPlayer::playNextTs(bool force){ void HlsPlayer::playNextTs() {
if (_ts_list.empty()) { if (_ts_list.empty()) {
//播放列表为空那么立即重新下载m3u8文件 //播放列表为空那么立即重新下载m3u8文件
_timer.reset(); _timer.reset();
play_l(); play_l();
return; return;
} }
if (!force && _http_ts_player && _http_ts_player->alive()) { if (_http_ts_player && _http_ts_player->waitResponse()) {
//播放器目前还存活,正在下载中 //播放器目前还存活,正在下载中
return; return;
} }
auto ts_duration = _ts_list.front().duration * 1000; weak_ptr<HlsPlayer> weak_self = dynamic_pointer_cast<HlsPlayer>(shared_from_this());
weak_ptr<HlsPlayer> weakSelf = dynamic_pointer_cast<HlsPlayer>(shared_from_this()); if (!_http_ts_player) {
std::shared_ptr<Ticker> ticker(new Ticker);
_http_ts_player = std::make_shared<HttpTSPlayer>(getPoller(), false); _http_ts_player = std::make_shared<HttpTSPlayer>(getPoller(), false);
_http_ts_player->setOnCreateSocket([weak_self](const EventPoller::Ptr &poller) {
_http_ts_player->setOnCreateSocket([weakSelf](const EventPoller::Ptr &poller) { auto strong_self = weak_self.lock();
auto strongSelf = weakSelf.lock(); if (strong_self) {
if (strongSelf) { return strong_self->createSocket();
return strongSelf->createSocket();
} }
return Socket::createSocket(poller, true); return Socket::createSocket(poller, true);
}); });
_http_ts_player->setOnDisconnect([weakSelf, ticker, ts_duration](const SockException &err) { _http_ts_player->setOnPacket([weak_self](const char *data, size_t len) {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return;
}
auto delay = ts_duration - 500 - ticker->elapsedTime();
if (delay <= 0) {
//播放这个ts切片花费了太长时间我们立即下一个切片的播放
strongSelf->playNextTs(true);
} else {
//下一个切片慢点播放
strongSelf->_timer_ts.reset(new Timer(delay / 1000.0f, [weakSelf]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return false;
}
strongSelf->playNextTs(true);
return false;
}, strongSelf->getPoller()));
}
});
_http_ts_player->setOnPacket([weakSelf](const char *data, size_t len) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return; return;
} }
//收到ts包 //收到ts包
strongSelf->onPacket_l(data, len); strong_self->onPacket_l(data, len);
}); });
_http_ts_player->setMethod("GET");
if (!(*this)[kNetAdapter].empty()) { if (!(*this)[kNetAdapter].empty()) {
_http_ts_player->setNetAdapter((*this)[Client::kNetAdapter]); _http_ts_player->setNetAdapter((*this)[Client::kNetAdapter]);
} }
}
_http_ts_player->sendRequest(_ts_list.front().url, 2 * _ts_list.front().duration); Ticker ticker;
auto url = _ts_list.front().url;
auto duration = _ts_list.front().duration;
_ts_list.pop_front(); _ts_list.pop_front();
_http_ts_player->setOnComplete([weak_self, ticker, duration, url](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
if (err) {
WarnL << "download ts segment " << url << " failed:" << err.what();
}
//提前半秒下载好
auto delay = duration - ticker.elapsedTime() / 1000.0f - 0.5;
if (delay <= 0) {
//延时最小10ms
delay = 10;
}
//延时下载下一个切片
strong_self->_timer_ts.reset(new Timer(delay / 1000.0f, [weak_self]() {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->playNextTs();
}
return false;
}, strong_self->getPoller()));
});
_http_ts_player->setMethod("GET");
_http_ts_player->sendRequest(url, 2 * duration);
} }
void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map<int, ts_segment> &ts_map) { void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map<int, ts_segment> &ts_map) {
@ -146,12 +150,12 @@ void HlsPlayer::onParsed(bool is_m3u8_inner,int64_t sequence,const map<int,ts_se
} }
_timer.reset(); _timer.reset();
weak_ptr<HlsPlayer> weakSelf = dynamic_pointer_cast<HlsPlayer>(shared_from_this()); weak_ptr<HlsPlayer> weak_self = dynamic_pointer_cast<HlsPlayer>(shared_from_this());
auto url = ts_map.rbegin()->second.url; auto url = ts_map.rbegin()->second.url;
getPoller()->async([weakSelf, url]() { getPoller()->async([weak_self, url]() {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (strongSelf) { if (strong_self) {
strongSelf->play(url); strong_self->play(url);
} }
}, false); }, false);
} }
@ -163,8 +167,8 @@ ssize_t HlsPlayer::onResponseHeader(const string &status, const HttpClient::Http
teardown_l(SockException(Err_shutdown, StrPrinter << "bad http status code:" + status)); teardown_l(SockException(Err_shutdown, StrPrinter << "bad http status code:" + status));
return 0; return 0;
} }
auto contet_type = const_cast< HttpClient::HttpHeader &>(headers)["Content-Type"]; auto content_type = const_cast< HttpClient::HttpHeader &>(headers)["Content-Type"];
_is_m3u8 = (contet_type.find("application/vnd.apple.mpegurl") == 0); _is_m3u8 = (content_type.find("application/vnd.apple.mpegurl") == 0);
return -1; return -1;
} }
@ -177,13 +181,12 @@ void HlsPlayer::onResponseBody(const char *buf, size_t size, size_t recvedSize,
} }
void HlsPlayer::onResponseCompleted() { void HlsPlayer::onResponseCompleted() {
_waiting_response = false;
if (HlsParser::parse(getUrl(), _m3u8)) { if (HlsParser::parse(getUrl(), _m3u8)) {
playDelay();
if (_first) { if (_first) {
_first = false; _first = false;
onPlayResult(SockException(Err_success, "play success")); onPlayResult(SockException(Err_success, "play success"));
} }
playDelay();
} else { } else {
teardown_l(SockException(Err_shutdown, "解析m3u8文件失败")); teardown_l(SockException(Err_shutdown, "解析m3u8文件失败"));
} }
@ -213,7 +216,6 @@ float HlsPlayer::delaySecond() {
} }
void HlsPlayer::onDisconnect(const SockException &ex) { void HlsPlayer::onDisconnect(const SockException &ex) {
_waiting_response = false;
if (_first) { if (_first) {
//第一次失败,则播放失败 //第一次失败,则播放失败
_first = false; _first = false;
@ -246,11 +248,11 @@ bool HlsPlayer::onRedirectUrl(const string &url,bool temporary) {
} }
void HlsPlayer::playDelay() { void HlsPlayer::playDelay() {
weak_ptr<HlsPlayer> weakSelf = dynamic_pointer_cast<HlsPlayer>(shared_from_this()); weak_ptr<HlsPlayer> weak_self = dynamic_pointer_cast<HlsPlayer>(shared_from_this());
_timer.reset(new Timer(delaySecond(), [weakSelf]() { _timer.reset(new Timer(delaySecond(), [weak_self]() {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (strongSelf) { if (strong_self) {
strongSelf->play_l(); strong_self->play_l();
} }
return false; return false;
}, getPoller())); }, getPoller()));

View File

@ -96,7 +96,7 @@ private:
private: private:
void playDelay(); void playDelay();
float delaySecond(); float delaySecond();
void playNextTs(bool force = false); void playNextTs();
void teardown_l(const SockException &ex); void teardown_l(const SockException &ex);
void play_l(); void play_l();
void onPacket_l(const char *data, size_t len); void onPacket_l(const char *data, size_t len);
@ -112,7 +112,6 @@ private:
private: private:
bool _is_m3u8 = false; bool _is_m3u8 = false;
bool _first = true; bool _first = true;
bool _waiting_response = false;
int64_t _last_sequence = -1; int64_t _last_sequence = -1;
string _m3u8; string _m3u8;
Timer::Ptr _timer; Timer::Ptr _timer;

View File

@ -9,6 +9,7 @@
*/ */
#include "HttpTSPlayer.h" #include "HttpTSPlayer.h"
namespace mediakit { namespace mediakit {
HttpTSPlayer::HttpTSPlayer(const EventPoller::Ptr &poller, bool split_ts) { HttpTSPlayer::HttpTSPlayer(const EventPoller::Ptr &poller, bool split_ts) {
@ -17,16 +18,14 @@ HttpTSPlayer::HttpTSPlayer(const EventPoller::Ptr &poller, bool split_ts){
setPoller(poller ? poller : EventPollerPool::Instance().getPoller()); setPoller(poller ? poller : EventPollerPool::Instance().getPoller());
} }
HttpTSPlayer::~HttpTSPlayer() {} ssize_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) {
ssize_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &headers) {
_status = status; _status = status;
if (status != "200" && status != "206") { if (status != "200" && status != "206") {
//http状态码不符合预期 //http状态码不符合预期
shutdown(SockException(Err_other, StrPrinter << "bad http status code:" + status)); shutdown(SockException(Err_other, StrPrinter << "bad http status code:" + status));
return 0; return 0;
} }
auto content_type = const_cast< HttpClient::HttpHeader &>(headers)["Content-Type"]; auto content_type = const_cast< HttpClient::HttpHeader &>(header)["Content-Type"];
if (content_type.find("video/mp2t") == 0 || content_type.find("video/mpeg") == 0) { if (content_type.find("video/mp2t") == 0 || content_type.find("video/mpeg") == 0) {
_is_ts_content = true; _is_ts_content = true;
} }
@ -35,11 +34,11 @@ ssize_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::H
return -1; return -1;
} }
void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) { void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_size, size_t total_size) {
if (_status != "200" && _status != "206") { if (_status != "200" && _status != "206") {
return; return;
} }
if (recvedSize == size) { if (recved_size == size) {
//开始接收数据 //开始接收数据
if (buf[0] == TS_SYNC_BYTE) { if (buf[0] == TS_SYNC_BYTE) {
//这是ts头 //这是ts头
@ -57,14 +56,16 @@ void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recvedSiz
} }
void HttpTSPlayer::onResponseCompleted() { void HttpTSPlayer::onResponseCompleted() {
//接收完毕 emitOnComplete(SockException(Err_success, "play completed"));
shutdown(SockException(Err_success, "play completed"));
} }
void HttpTSPlayer::onDisconnect(const SockException &ex) { void HttpTSPlayer::onDisconnect(const SockException &ex) {
if (_on_disconnect) { emitOnComplete(ex);
_on_disconnect(ex); }
_on_disconnect = nullptr;
void HttpTSPlayer::emitOnComplete(const SockException &ex) {
if (_on_complete) {
_on_complete(ex);
} }
} }
@ -74,12 +75,12 @@ void HttpTSPlayer::onPacket(const char *data, size_t len) {
} }
} }
void HttpTSPlayer::setOnDisconnect(const HttpTSPlayer::onShutdown &cb) { void HttpTSPlayer::setOnComplete(onComplete cb) {
_on_disconnect = cb; _on_complete = std::move(cb);
} }
void HttpTSPlayer::setOnPacket(const TSSegment::onSegment &cb) { void HttpTSPlayer::setOnPacket(TSSegment::onSegment cb) {
_on_segment = cb; _on_segment = std::move(cb);
} }
}//namespace mediakit }//namespace mediakit

View File

@ -14,33 +14,46 @@
#include "Http/HttpDownloader.h" #include "Http/HttpDownloader.h"
#include "Player/MediaPlayer.h" #include "Player/MediaPlayer.h"
#include "Rtp/TSDecoder.h" #include "Rtp/TSDecoder.h"
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
//http-ts播发器未实现ts解复用 //http-ts播发器未实现ts解复用
class HttpTSPlayer : public HttpClientImp { class HttpTSPlayer : public HttpClientImp {
public: public:
typedef function<void(const SockException &)> onShutdown; using Ptr = std::shared_ptr<HttpTSPlayer>;
typedef std::shared_ptr<HttpTSPlayer> Ptr; using onComplete = std::function<void(const SockException &)>;
HttpTSPlayer(const EventPoller::Ptr &poller = nullptr, bool split_ts = true); HttpTSPlayer(const EventPoller::Ptr &poller = nullptr, bool split_ts = true);
~HttpTSPlayer() override ; ~HttpTSPlayer() override = default;
//设置异常断开回调 /**
void setOnDisconnect(const onShutdown &cb); *
//设置接收ts包回调 */
void setOnPacket(const TSSegment::onSegment &cb); void setOnComplete(onComplete cb);
/**
* ts包回调
*/
void setOnPacket(TSSegment::onSegment cb);
protected: protected:
///HttpClient override/// ///HttpClient override///
ssize_t onResponseHeader(const string &status,const HttpHeader &headers) override; ssize_t onResponseHeader(const string &status, const HttpHeader &header) override;
void onResponseBody(const char *buf,size_t size,size_t recvedSize,size_t totalSize) override; void onResponseBody(const char *buf, size_t size, size_t recved_size, size_t total_size) override;
void onResponseCompleted() override; void onResponseCompleted() override;
void onDisconnect(const SockException &ex) override; void onDisconnect(const SockException &ex) override;
//收到ts包 protected:
/**
* ts数据
*/
virtual void onPacket(const char *data, size_t len); virtual void onPacket(const char *data, size_t len);
private:
void emitOnComplete(const SockException &ex);
private: private:
//是否为mpegts负载 //是否为mpegts负载
bool _is_ts_content = false; bool _is_ts_content = false;
@ -50,7 +63,7 @@ private:
bool _split_ts; bool _split_ts;
string _status; string _status;
TSSegment _segment; TSSegment _segment;
onShutdown _on_disconnect; onComplete _on_complete;
TSSegment::onSegment _on_segment; TSSegment::onSegment _on_segment;
}; };