重写http客户端异常处理机制

This commit is contained in:
ziyue 2022-01-20 14:48:45 +08:00
parent 0f905b7333
commit 88dc8d0a73
20 changed files with 300 additions and 560 deletions

View File

@ -32,9 +32,9 @@ API_EXPORT void API_CALL mk_http_downloader_release(mk_http_downloader ctx) {
API_EXPORT void API_CALL mk_http_downloader_start(mk_http_downloader ctx, const char *url, const char *file, on_mk_download_complete cb, void *user_data) {
assert(ctx && url && file);
HttpDownloader::Ptr *obj = (HttpDownloader::Ptr *) ctx;
(*obj)->setOnResult([cb, user_data](ErrCode code, const string &errMsg, const string &filePath) {
(*obj)->setOnResult([cb, user_data](const SockException &ex, const string &filePath) {
if (cb) {
cb(user_data, code, errMsg.data(), filePath.data());
cb(user_data, ex.getErrCode(), ex.what(), filePath.data());
}
});
(*obj)->startDownload(url, file, false);

View File

@ -112,6 +112,10 @@ int main(int argc, char *argv[]) {
}
});
player->setOnShutdown([](const SockException &ex){
WarnL << "play shutdown: " << ex.what();
});
(*player)[kRtpType] = atoi(argv[2]);
//不等待track ready再回调播放成功事件这样可以加快秒开速度
(*player)[Client::kWaitTrackReady] = false;

View File

@ -17,30 +17,31 @@ HlsPlayer::HlsPlayer(const EventPoller::Ptr &poller) {
setPoller(poller ? poller : EventPollerPool::Instance().getPoller());
}
HlsPlayer::~HlsPlayer() {}
void HlsPlayer::play(const string &strUrl) {
_m3u8_list.emplace_back(strUrl);
play_l();
void HlsPlayer::play(const string &url) {
_play_result = false;
_play_url = url;
fetchIndexFile();
}
void HlsPlayer::play_l() {
if (_m3u8_list.empty()) {
teardown_l(SockException(Err_shutdown, "所有hls url都尝试播放失败!"));
return;
}
void HlsPlayer::fetchIndexFile() {
if (waitResponse()) {
return;
}
setMethod("GET");
if (!(*this)[kNetAdapter].empty()) {
setNetAdapter((*this)[kNetAdapter]);
}
setCompleteTimeout((*this)[Client::kTimeoutMS].as<int>());
sendRequest(_m3u8_list.back());
setMethod("GET");
sendRequest(_play_url);
}
void HlsPlayer::teardown_l(const SockException &ex) {
if (!_play_result) {
_play_result = true;
onPlayResult(ex);
} else {
onShutdown(ex);
}
_timer.reset();
_timer_ts.reset();
_http_ts_player.reset();
@ -51,11 +52,11 @@ void HlsPlayer::teardown() {
teardown_l(SockException(Err_shutdown, "teardown"));
}
void HlsPlayer::playNextTs() {
void HlsPlayer::fetchSegment() {
if (_ts_list.empty()) {
//播放列表为空那么立即重新下载m3u8文件
_timer.reset();
play_l();
fetchIndexFile();
return;
}
if (_http_ts_player && _http_ts_player->waitResponse()) {
@ -110,7 +111,7 @@ void HlsPlayer::playNextTs() {
strong_self->_timer_ts.reset(new Timer(delay / 1000.0f, [weak_self]() {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->playNextTs();
strong_self->fetchSegment();
}
return false;
}, strong_self->getPoller()));
@ -143,15 +144,13 @@ void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map<int, ts
_ts_url_cache.erase(_ts_url_sort.front());
_ts_url_sort.pop_front();
}
playNextTs();
fetchSegment();
} else {
//这是m3u8列表,我们播放最高清的子hls
if (ts_map.empty()) {
teardown_l(SockException(Err_shutdown, StrPrinter << "empty sub hls list:" + getUrl()));
return;
throw invalid_argument("empty sub hls list:" + getUrl());
}
_timer.reset();
weak_ptr<HlsPlayer> weak_self = dynamic_pointer_cast<HlsPlayer>(shared_from_this());
auto url = ts_map.rbegin()->second.url;
getPoller()->async([weak_self, url]() {
@ -163,42 +162,36 @@ void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map<int, ts
}
}
ssize_t HlsPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &headers) {
void HlsPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &headers) {
if (status != "200" && status != "206") {
//失败
teardown_l(SockException(Err_shutdown, StrPrinter << "bad http status code:" + status));
return 0;
throw invalid_argument("bad http status code:" + status);
}
auto content_type = const_cast<HttpClient::HttpHeader &>(headers)["Content-Type"];
_is_m3u8 = (content_type.find("application/vnd.apple.mpegurl") == 0);
if(!_is_m3u8) {
auto it = headers.find("Content-Length");
//如果没有长度或者长度小于等于0, 那么肯定不是m3u8
if (it == headers.end() || atoll(it->second.data()) <=0) {
teardown_l(SockException(Err_shutdown, "可能不是m3u8文件"));
if (content_type.find("application/vnd.apple.mpegurl") != 0) {
throw invalid_argument("content type not m3u8: " + content_type);
}
}
return -1;
}
void HlsPlayer::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) {
if (recvedSize == size) {
//刚开始
_m3u8.clear();
}
void HlsPlayer::onResponseBody(const char *buf, size_t size) {
_m3u8.append(buf, size);
}
void HlsPlayer::onResponseCompleted() {
if (HlsParser::parse(getUrl(), _m3u8)) {
if (_first) {
_first = false;
onPlayResult(SockException(Err_success, "play success"));
void HlsPlayer::onResponseCompleted(const SockException &ex) {
if (ex) {
teardown_l(ex);
return;
}
if (!HlsParser::parse(getUrl(), _m3u8)) {
teardown_l(SockException(Err_other, "parse m3u8 failed:" + _m3u8));
return;
}
if (!_play_result) {
_play_result = true;
onPlayResult(SockException());
}
playDelay();
} else {
teardown_l(SockException(Err_shutdown, "解析m3u8文件失败"));
}
}
float HlsPlayer::delaySecond() {
@ -224,35 +217,8 @@ float HlsPlayer::delaySecond() {
return 1.0f;
}
void HlsPlayer::onDisconnect(const SockException &ex) {
if (_first) {
//第一次失败,则播放失败
_first = false;
onPlayResult(ex);
return;
}
//主动shutdown
if (ex.getErrCode() == Err_shutdown) {
if (_m3u8_list.size() <= 1) {
//全部url都播放失败
_timer = nullptr;
_timer_ts = nullptr;
onShutdown(ex);
} else {
_m3u8_list.pop_back();
//还有上一级url可以重试播放
play_l();
}
return;
}
//eof等之后播放失败那么重试播放m3u8
playDelay();
}
bool HlsPlayer::onRedirectUrl(const string &url, bool temporary) {
_m3u8_list.emplace_back(url);
_play_url = url;
return true;
}
@ -261,7 +227,7 @@ void HlsPlayer::playDelay() {
_timer.reset(new Timer(delaySecond(), [weak_self]() {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->play_l();
strong_self->fetchIndexFile();
}
return false;
}, getPoller()));
@ -279,46 +245,6 @@ void HlsPlayer::onPacket_l(const char *data, size_t len) {
//////////////////////////////////////////////////////////////////////////
class HlsDemuxer : public MediaSinkInterface, public TrackSource, public std::enable_shared_from_this<HlsDemuxer> {
public:
HlsDemuxer() = default;
~HlsDemuxer() override { _timer = nullptr; }
void start(const EventPoller::Ptr &poller, TrackListener *listener);
bool inputFrame(const Frame::Ptr &frame) override;
bool addTrack(const Track::Ptr &track) override {
return _delegate.addTrack(track);
}
void addTrackCompleted() override {
_delegate.addTrackCompleted();
}
void resetTracks() override {
((MediaSink &) _delegate).resetTracks();
}
vector<Track::Ptr> getTracks(bool ready = true) const override {
return _delegate.getTracks(ready);
}
private:
void onTick();
int64_t getBufferMS();
int64_t getPlayPosition();
void setPlayPosition(int64_t pos);
private:
int64_t _ticker_offset = 0;
Ticker _ticker;
Stamp _stamp[2];
Timer::Ptr _timer;
MediaSinkDelegate _delegate;
multimap<int64_t, Frame::Ptr> _frame_cache;
};
void HlsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) {
_frame_cache.clear();
_stamp[TrackAudio].setRelativeStamp(0);
@ -406,7 +332,7 @@ void HlsDemuxer::onTick() {
HlsPlayerImp::HlsPlayerImp(const EventPoller::Ptr &poller) : PlayerImp<HlsPlayer, PlayerBase>(poller) {}
void HlsPlayerImp::onPacket(const char *data, size_t len) {
if (!_decoder) {
if (!_decoder && _demuxer) {
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get());
}
@ -430,7 +356,6 @@ void HlsPlayerImp::onPlayResult(const SockException &ex) {
}
void HlsPlayerImp::onShutdown(const SockException &ex) {
WarnL << ex.what();
PlayerImp<HlsPlayer, PlayerBase>::onShutdown(ex);
_demuxer = nullptr;
}

View File

@ -25,16 +25,45 @@ using namespace toolkit;
namespace mediakit {
class HlsDemuxer
: public MediaSinkInterface
, public TrackSource
, public std::enable_shared_from_this<HlsDemuxer> {
public:
HlsDemuxer() = default;
~HlsDemuxer() override { _timer = nullptr; }
void start(const EventPoller::Ptr &poller, TrackListener *listener);
bool inputFrame(const Frame::Ptr &frame) override;
bool addTrack(const Track::Ptr &track) override { return _delegate.addTrack(track); }
void addTrackCompleted() override { _delegate.addTrackCompleted(); }
void resetTracks() override { ((MediaSink &)_delegate).resetTracks(); }
vector<Track::Ptr> getTracks(bool ready = true) const override { return _delegate.getTracks(ready); }
private:
void onTick();
int64_t getBufferMS();
int64_t getPlayPosition();
void setPlayPosition(int64_t pos);
private:
int64_t _ticker_offset = 0;
Ticker _ticker;
Stamp _stamp[2];
Timer::Ptr _timer;
MediaSinkDelegate _delegate;
multimap<int64_t, Frame::Ptr> _frame_cache;
};
class HlsPlayer : public HttpClientImp , public PlayerBase , public HlsParser{
public:
HlsPlayer(const EventPoller::Ptr &poller);
~HlsPlayer() override;
~HlsPlayer() override = default;
/**
*
* @param strUrl
*/
void play(const string &strUrl) override;
void play(const string &url) override;
/**
*
@ -50,55 +79,18 @@ protected:
virtual void onPacket(const char *data, size_t len) = 0;
private:
/**
* m3u8成功
* @param is_m3u8_inner m3u8列表
* @param sequence ts列表seq
* @param ts_map ts列表或m3u8列表
*/
void onParsed(bool is_m3u8_inner,int64_t sequence,const map<int,ts_segment> &ts_map) override;
/**
* http回复头
* @param status :200 OK
* @param headers http头
* @return content的长度-1:content>=0:content
* http头中带有Content-Length字段时
*/
ssize_t onResponseHeader(const string &status,const HttpHeader &headers) override;
/**
* http conten数据
* @param buf
* @param size
* @param recvedSize (),totalSize时将触发onResponseCompleted回调
* @param totalSize
*/
void onResponseBody(const char *buf,size_t size,size_t recvedSize, size_t totalSize) override;
/**
* http回复完毕,
*/
void onResponseCompleted() override;
/**
* http链接断开回调
* @param ex
*/
void onDisconnect(const SockException &ex) override;
/**
*
* @param url url
* @param temporary
* @return
*/
void onResponseHeader(const string &status,const HttpHeader &headers) override;
void onResponseBody(const char *buf,size_t size) override;
void onResponseCompleted(const SockException &e) override;
bool onRedirectUrl(const string &url,bool temporary) override;
private:
void playDelay();
float delaySecond();
void playNextTs();
void fetchSegment();
void teardown_l(const SockException &ex);
void play_l();
void fetchIndexFile();
void onPacket_l(const char *data, size_t len);
private:
@ -110,15 +102,14 @@ private:
};
private:
bool _is_m3u8 = false;
bool _first = true;
bool _play_result = false;
int64_t _last_sequence = -1;
string _m3u8;
string _play_url;
Timer::Ptr _timer;
Timer::Ptr _timer_ts;
list<ts_segment> _ts_list;
list<string> _ts_url_sort;
list<string> _m3u8_list;
set<string, UrlComp> _ts_url_cache;
HttpTSPlayer::Ptr _http_ts_player;
TSSegment _segment;

View File

@ -141,6 +141,14 @@ const Parser &HttpClient::response() const {
return _parser;
}
ssize_t HttpClient::responseBodyTotalSize() const {
return _total_body_size;
}
size_t HttpClient::responseBodySize() const {
return _recved_body_size;
}
const string &HttpClient::getUrl() const {
return _url;
}
@ -151,7 +159,7 @@ void HttpClient::onConnect(const SockException &ex) {
void HttpClient::onConnect_l(const SockException &ex) {
if (ex) {
onDisconnect(ex);
onResponseCompleted_l(ex);
return;
}
@ -173,12 +181,7 @@ void HttpClient::onRecv(const Buffer::Ptr &pBuf) {
}
void HttpClient::onErr(const SockException &ex) {
if (ex.getErrCode() == Err_eof && _total_body_size < 0) {
//如果Content-Length未指定 但服务器断开链接
//则认为本次http请求完成
onResponseCompleted_l();
}
onDisconnect(ex);
onResponseCompleted_l(ex);
}
ssize_t HttpClient::onRecvHeader(const char *data, size_t len) {
@ -186,42 +189,44 @@ ssize_t HttpClient::onRecvHeader(const char *data, size_t len) {
if (_parser.Url() == "302" || _parser.Url() == "301") {
auto new_url = _parser["Location"];
if (new_url.empty()) {
shutdown(SockException(Err_shutdown, "未找到Location字段(跳转url)"));
return 0;
throw invalid_argument("未找到Location字段(跳转url)");
}
if (onRedirectUrl(new_url, _parser.Url() == "302")) {
setMethod("GET");
HttpClient::sendRequest(new_url);
return 0;
}
}
checkCookie(_parser.getHeader());
onResponseHeader(_parser.Url(), _parser.getHeader());
_header_recved = true;
_total_body_size = onResponseHeader(_parser.Url(), _parser.getHeader());
if (!_parser["Content-Length"].empty()) {
//有Content-Length字段时忽略onResponseHeader的返回值
_total_body_size = atoll(_parser["Content-Length"].data());
}
if (_parser["Transfer-Encoding"] == "chunked") {
//如果Transfer-Encoding字段等于chunked则认为后续的content是不限制长度的
_total_body_size = -1;
_chunked_splitter = std::make_shared<HttpChunkedSplitter>([this](const char *data, size_t len) {
if (len > 0) {
auto recved_body_size = _recved_body_size + len;
onResponseBody(data, len, recved_body_size, SIZE_MAX);
_recved_body_size = recved_body_size;
_recved_body_size += len;
onResponseBody(data, len);
} else {
onResponseCompleted_l();
_total_body_size = _recved_body_size;
onResponseCompleted_l(SockException(Err_success, "success"));
}
});
//后续为源源不断的body
return -1;
}
if (!_parser["Content-Length"].empty()) {
//有Content-Length字段时忽略onResponseHeader的返回值
_total_body_size = atoll(_parser["Content-Length"].data());
} else {
_total_body_size = -1;
}
if (_total_body_size == 0) {
//后续没content本次http请求结束
onResponseCompleted_l();
onResponseCompleted_l(SockException(Err_success, "success"));
return 0;
}
@ -238,30 +243,30 @@ void HttpClient::onRecvContent(const char *data, size_t len) {
_chunked_splitter->input(data, len);
return;
}
auto recved_body_size = _recved_body_size + len;
_recved_body_size += len;
if (_total_body_size < 0) {
//不限长度的content,最大支持SIZE_MAX个字节
onResponseBody(data, len, recved_body_size, SIZE_MAX);
_recved_body_size = recved_body_size;
//不限长度的content
onResponseBody(data, len);
return;
}
//固定长度的content
if (recved_body_size < (size_t) _total_body_size) {
if (_recved_body_size < (size_t) _total_body_size) {
//content还未接收完毕
onResponseBody(data, len, recved_body_size, _total_body_size);
_recved_body_size = recved_body_size;
onResponseBody(data, len);
return;
}
if (_recved_body_size == (size_t)_total_body_size) {
//content接收完毕
onResponseBody(data, _total_body_size - _recved_body_size, _total_body_size, _total_body_size);
bool bigger_than_expected = recved_body_size > (size_t) _total_body_size;
onResponseCompleted_l();
if (bigger_than_expected) {
//声明的content数据比真实的小那么我们只截取前面部分的并断开链接
shutdown(SockException(Err_shutdown, "http response content size bigger than expected"));
onResponseBody(data, len);
onResponseCompleted_l(SockException(Err_success, "success"));
return;
}
//声明的content数据比真实的小断开链接
onResponseBody(data, len);
throw invalid_argument("http response content size bigger than expected");
}
void HttpClient::onFlush() {
@ -308,10 +313,34 @@ void HttpClient::onManager() {
}
}
void HttpClient::onResponseCompleted_l() {
void HttpClient::onResponseCompleted_l(const SockException &ex) {
if (_complete) {
return;
}
_complete = true;
_wait_complete.resetTime();
onResponseCompleted();
if (!ex) {
//确认无疑的成功
onResponseCompleted(ex);
return;
}
//可疑的失败
if (_total_body_size > 0 && _recved_body_size >= _total_body_size) {
//回复header中有content-length信息那么收到的body大于等于声明值则认为成功
onResponseCompleted(SockException(Err_success, "success"));
return;
}
if (_total_body_size == -1 && _recved_body_size > 0) {
//回复header中无content-length信息那么收到一点body也认为成功
onResponseCompleted(SockException(Err_success, ex.what()));
return;
}
//确认无疑的失败
onResponseCompleted(ex);
}
bool HttpClient::waitResponse() const {

View File

@ -100,6 +100,16 @@ public:
*/
const Parser &response() const;
/**
* header声明的body大小
*/
ssize_t responseBodyTotalSize() const;
/**
* body的大小
*/
size_t responseBodySize() const;
/**
* url
*/
@ -139,37 +149,20 @@ protected:
* http回复头
* @param status :200 OK
* @param headers http头
* @return content的长度-1:content>=0:content
* http头中带有Content-Length字段时
*/
virtual ssize_t onResponseHeader(const string &status, const HttpHeader &headers) {
//无Content-Length字段时默认后面全是content
return -1;
}
virtual void onResponseHeader(const string &status, const HttpHeader &headers) = 0;
/**
* http conten数据
* @param buf
* @param size
* @param recvedSize (),totalSize时将触发onResponseCompleted回调
* @param totalSize
*/
virtual void onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) {
DebugL << size << " " << recvedSize << " " << totalSize;
}
virtual void onResponseBody(const char *buf, size_t size) = 0;
/**
* http回复完毕,
*/
virtual void onResponseCompleted() {
DebugL;
}
/**
* http链接断开回调
* @param ex
*/
virtual void onDisconnect(const SockException &ex) {}
virtual void onResponseCompleted(const SockException &ex) = 0;
/**
*
@ -179,11 +172,11 @@ protected:
*/
virtual bool onRedirectUrl(const string &url, bool temporary) { return true; };
protected:
//// HttpRequestSplitter override ////
ssize_t onRecvHeader(const char *data, size_t len) override;
void onRecvContent(const char *data, size_t len) override;
protected:
//// TcpClient override ////
void onConnect(const SockException &ex) override;
void onRecv(const Buffer::Ptr &pBuf) override;
@ -192,7 +185,7 @@ protected:
void onManager() override;
private:
void onResponseCompleted_l();
void onResponseCompleted_l(const SockException &ex);
void onConnect_l(const SockException &ex);
void checkCookie(HttpHeader &headers);
void clearResponse();

View File

@ -18,7 +18,6 @@ void HttpClientImp::onConnect(const SockException &ex) {
} else {
TcpClientWithSSL<HttpClient>::onConnect(ex);
}
}
} /* namespace mediakit */

View File

@ -18,9 +18,10 @@ namespace mediakit {
class HttpClientImp : public TcpClientWithSSL<HttpClient> {
public:
typedef std::shared_ptr<HttpClientImp> Ptr;
HttpClientImp() {}
virtual ~HttpClientImp() {}
using Ptr = std::shared_ptr<HttpClientImp>;
HttpClientImp() = default;
~HttpClientImp() override = default;
protected:
void onConnect(const SockException &ex) override;
};

View File

@ -15,78 +15,59 @@ using namespace toolkit;
namespace mediakit {
HttpDownloader::HttpDownloader() {}
HttpDownloader::~HttpDownloader() {
closeFile();
}
void HttpDownloader::startDownload(const string &url, const string &filePath, bool bAppend) {
_filePath = filePath;
if (_filePath.empty()) {
_filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest();
void HttpDownloader::startDownload(const string &url, const string &file_path, bool append) {
_file_path = file_path;
if (_file_path.empty()) {
_file_path = exeDir() + "HttpDownloader/" + MD5(url).hexdigest();
}
_saveFile = File::create_file(_filePath.data(), bAppend ? "ab" : "wb");
if (!_saveFile) {
auto strErr = StrPrinter << "打开文件失败:" << filePath << endl;
_save_file = File::create_file(_file_path.data(), append ? "ab" : "wb");
if (!_save_file) {
auto strErr = StrPrinter << "打开文件失败:" << file_path << endl;
throw std::runtime_error(strErr);
}
_bDownloadSuccess = false;
if (bAppend) {
auto currentLen = ftell(_saveFile);
if (append) {
auto currentLen = ftell(_save_file);
if (currentLen) {
//最少续传一个字节怕遇到http 416的错误
currentLen -= 1;
fseek(_saveFile, -1, SEEK_CUR);
fseek(_save_file, -1, SEEK_CUR);
}
addHeader("Range", StrPrinter << "bytes=" << currentLen << "-" << endl);
}
setMethod("GET");
sendRequest(url);
}
ssize_t HttpDownloader::onResponseHeader(const string &status, const HttpHeader &headers) {
void HttpDownloader::onResponseHeader(const string &status, const HttpHeader &headers) {
if (status != "200" && status != "206") {
//失败
shutdown(SockException(Err_shutdown, StrPrinter << "Http Status:" << status));
}
//后续全部是content
return -1;
}
void HttpDownloader::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) {
if (_saveFile) {
fwrite(buf, size, 1, _saveFile);
throw std::invalid_argument("bad http status: " + status);
}
}
void HttpDownloader::onResponseCompleted() {
void HttpDownloader::onResponseBody(const char *buf, size_t size) {
if (_save_file) {
fwrite(buf, size, 1, _save_file);
}
}
void HttpDownloader::onResponseCompleted(const SockException &ex) {
closeFile();
// InfoL << "md5Sum:" << getMd5Sum(_filePath);
_bDownloadSuccess = true;
if (_onResult) {
_onResult(Err_success, "success", _filePath);
_onResult = nullptr;
}
}
void HttpDownloader::onDisconnect(const SockException &ex) {
closeFile();
if (!_bDownloadSuccess) {
File::delete_file(_filePath.data());
}
if (_onResult) {
_onResult(ex.getErrCode(), ex.what(), _filePath);
_onResult = nullptr;
if (_on_result) {
_on_result(ex, _file_path);
_on_result = nullptr;
}
}
void HttpDownloader::closeFile() {
if (_saveFile) {
fflush(_saveFile);
fclose(_saveFile);
_saveFile = nullptr;
if (_save_file) {
fflush(_save_file);
fclose(_save_file);
_save_file = nullptr;
}
}

View File

@ -17,32 +17,39 @@ namespace mediakit {
class HttpDownloader : public HttpClientImp {
public:
typedef std::shared_ptr<HttpDownloader> Ptr;
typedef std::function<void(ErrCode code, const string &errMsg, const string &filePath)> onDownloadResult;
HttpDownloader();
virtual ~HttpDownloader();
//开始下载文件,默认断点续传方式下载
void startDownload(const string &url, const string &filePath = "", bool bAppend = false);
using Ptr = std::shared_ptr<HttpDownloader>;
using onDownloadResult = std::function<void(const SockException &ex, const string &filePath)>;
HttpDownloader() = default;
~HttpDownloader() override;
/**
* ,
* @param url http url
* @param file_path
* @param append
*/
void startDownload(const string &url, const string &file_path = "", bool append = false);
void startDownload(const string &url, const onDownloadResult &cb) {
setOnResult(cb);
startDownload(url, "", false);
}
void setOnResult(const onDownloadResult &cb) { _onResult = cb; }
void setOnResult(const onDownloadResult &cb) { _on_result = cb; }
protected:
void onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) override;
ssize_t onResponseHeader(const string &status, const HttpHeader &headers) override;
void onResponseCompleted() override;
void onDisconnect(const SockException &ex) override;
void onResponseBody(const char *buf, size_t size) override;
void onResponseHeader(const string &status, const HttpHeader &headers) override;
void onResponseCompleted(const SockException &ex) override;
private:
void closeFile();
private:
FILE *_saveFile = nullptr;
string _filePath;
onDownloadResult _onResult;
bool _bDownloadSuccess = false;
FILE *_save_file = nullptr;
string _file_path;
onDownloadResult _on_result;
};
} /* namespace mediakit */

View File

@ -12,46 +12,36 @@
namespace mediakit {
ssize_t HttpRequester::onResponseHeader(const string &status, const HttpHeader &headers) {
_strRecvBody.clear();
return HttpClientImp::onResponseHeader(status, headers);
void HttpRequester::onResponseHeader(const string &status, const HttpHeader &headers) {
_res_body.clear();
}
void HttpRequester::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) {
_strRecvBody.append(buf, size);
void HttpRequester::onResponseBody(const char *buf, size_t size) {
_res_body.append(buf, size);
}
void HttpRequester::onResponseCompleted() {
const_cast<Parser &> (response()).setContent(std::move(_strRecvBody));
if (_onResult) {
_onResult(SockException(), response());
_onResult = nullptr;
void HttpRequester::onResponseCompleted(const SockException &ex) {
const_cast<Parser &>(response()).setContent(std::move(_res_body));
if (_on_result) {
_on_result(ex, response());
_on_result = nullptr;
}
}
void HttpRequester::onDisconnect(const SockException &ex) {
const_cast<Parser &> (response()).setContent(std::move(_strRecvBody));
if (_onResult) {
_onResult(ex, response());
_onResult = nullptr;
}
}
void HttpRequester::startRequester(const string &url, const HttpRequesterResult &onResult, float timeOutSecond) {
_onResult = onResult;
setCompleteTimeout(timeOutSecond * 1000);
void HttpRequester::startRequester(const string &url, const HttpRequesterResult &on_result, float timeout_sec) {
_on_result = on_result;
setCompleteTimeout(timeout_sec * 1000);
sendRequest(url);
}
void HttpRequester::clear() {
HttpClientImp::clear();
_strRecvBody.clear();
_onResult = nullptr;
_res_body.clear();
_on_result = nullptr;
}
void HttpRequester::setOnResult(const HttpRequesterResult &onResult) {
_onResult = onResult;
_on_result = onResult;
}
} // namespace mediakit

View File

@ -24,18 +24,17 @@ public:
~HttpRequester() override = default;
void setOnResult(const HttpRequesterResult &onResult);
void startRequester(const string &url, const HttpRequesterResult &onResult, float timeOutSecond = 10);
void startRequester(const string &url, const HttpRequesterResult &on_result, float timeout_sec = 10);
void clear() override;
private:
ssize_t onResponseHeader(const string &status, const HttpHeader &headers) override;
void onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) override;
void onResponseCompleted() override;
void onDisconnect(const SockException &ex) override;
void onResponseHeader(const string &status, const HttpHeader &headers) override;
void onResponseBody(const char *buf, size_t size) override;
void onResponseCompleted(const SockException &ex) override;
private:
string _strRecvBody;
HttpRequesterResult _onResult;
string _res_body;
HttpRequesterResult _on_result;
};
}//namespace mediakit

View File

@ -18,36 +18,19 @@ HttpTSPlayer::HttpTSPlayer(const EventPoller::Ptr &poller, bool split_ts) {
setPoller(poller ? poller : EventPollerPool::Instance().getPoller());
}
ssize_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) {
_status = status;
void HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) {
if (status != "200" && status != "206") {
// http状态码不符合预期
shutdown(SockException(Err_other, StrPrinter << "bad http status code:" + status));
return 0;
throw invalid_argument("bad http status code:" + status);
}
auto content_type = const_cast<HttpClient::HttpHeader &>(header)["Content-Type"];
if (content_type.find("video/mp2t") == 0 || content_type.find("video/mpeg") == 0) {
_is_ts_content = true;
}
//后续是不定长content
return -1;
}
void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_size, size_t total_size) {
if (_status != "200" && _status != "206") {
return;
}
if (recved_size == size) {
//开始接收数据
if (buf[0] == TS_SYNC_BYTE) {
//这是ts头
_is_first_packet_ts = true;
} else {
WarnL << "可能不是http-ts流";
if (content_type.find("video/mp2t") != 0 && content_type.find("video/mpeg") != 0) {
throw invalid_argument("content type not mpeg-ts: " + content_type);
}
}
void HttpTSPlayer::onResponseBody(const char *buf, size_t size) {
if (_split_ts) {
try {
_segment.input(buf, size);
@ -62,11 +45,7 @@ void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_si
}
}
void HttpTSPlayer::onResponseCompleted() {
emitOnComplete(SockException(Err_success, "play completed"));
}
void HttpTSPlayer::onDisconnect(const SockException &ex) {
void HttpTSPlayer::onResponseCompleted(const SockException &ex) {
emitOnComplete(ex);
}

View File

@ -40,10 +40,9 @@ public:
protected:
///HttpClient override///
ssize_t onResponseHeader(const string &status, const HttpHeader &header) override;
void onResponseBody(const char *buf, size_t size, size_t recved_size, size_t total_size) override;
void onResponseCompleted() override;
void onDisconnect(const SockException &ex) override;
void onResponseHeader(const string &status, const HttpHeader &header) override;
void onResponseBody(const char *buf, size_t size) override;
void onResponseCompleted(const SockException &ex) override;
protected:
/**
@ -55,13 +54,7 @@ private:
void emitOnComplete(const SockException &ex);
private:
//是否为mpegts负载
bool _is_ts_content = false;
//第一个包是否为ts包
bool _is_first_packet_ts = false;
//是否判断是否是ts并split
bool _split_ts;
string _status;
TSSegment _segment;
onComplete _on_complete;
TSSegment::onSegment _on_segment;

View File

@ -14,56 +14,35 @@ namespace mediakit {
TsPlayer::TsPlayer(const EventPoller::Ptr &poller) : HttpTSPlayer(poller, true) {}
void TsPlayer::play(const string &strUrl) {
_ts_url.append(strUrl);
playTs();
void TsPlayer::play(const string &url) {
TraceL << "play http-ts: " << url;
_play_result = false;
setHeaderTimeout((*this)[Client::kTimeoutMS].as<int>());
setBodyTimeout((*this)[Client::kMediaTimeoutMS].as<int>());
setMethod("GET");
sendRequest(url);
}
void TsPlayer::teardown() {
shutdown(SockException(Err_shutdown, "teardown"));
}
void TsPlayer::playTs() {
if (waitResponse()) {
//播放器目前还存活,正在下载中
return;
}
TraceL << "play http-ts: " << _ts_url;
weak_ptr <TsPlayer> weak_self = dynamic_pointer_cast<TsPlayer>(shared_from_this());
setMethod("GET");
setHeaderTimeout((*this)[Client::kTimeoutMS].as<int>());
setBodyTimeout((*this)[Client::kMediaTimeoutMS].as<int>());
sendRequest(_ts_url);
}
void TsPlayer::onResponseCompleted() {
//接收完毕
shutdown(SockException(Err_success, StrPrinter << "play " << _ts_url << " completed"));
}
void TsPlayer::onDisconnect(const SockException &ex) {
WarnL << "play " << _ts_url << " failed: " << ex.getErrCode() << " " << ex.what();
if (_first) {
//第一次失败,则播放失败
_first = false;
void TsPlayer::onResponseCompleted(const SockException &ex) {
if (!_play_result) {
_play_result = true;
onPlayResult(ex);
return;
}
if (ex.getErrCode() == Err_shutdown) {
onShutdown(ex);
} else {
onResponseCompleted();
onShutdown(ex);
}
HttpTSPlayer::onResponseCompleted(ex);
}
ssize_t TsPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) {
ssize_t ret = HttpTSPlayer::onResponseHeader(status, header);
if (_first) {
_first = false;
onPlayResult(SockException(Err_success, "play success"));
void TsPlayer::onResponseBody(const char *buf, size_t size) {
if (!_play_result) {
_play_result = true;
onPlayResult(SockException(Err_success, "play http-ts success"));
}
return ret;
HttpTSPlayer::onResponseBody(buf, size);
}
} // namespace mediakit

View File

@ -11,14 +11,14 @@
#ifndef HTTP_TSPLAYER_H
#define HTTP_TSPLAYER_H
#include <unordered_set>
#include "Util/util.h"
#include "Poller/Timer.h"
#include "Http/HttpDownloader.h"
#include "HttpTSPlayer.h"
#include "Player/MediaPlayer.h"
#include "Poller/Timer.h"
#include "Rtp/Decoder.h"
#include "Rtp/TSDecoder.h"
#include "HttpTSPlayer.h"
#include "Util/util.h"
#include <unordered_set>
using namespace toolkit;
namespace mediakit {
@ -38,17 +38,12 @@ public:
*/
void teardown() override;
private:
void playTs();
protected:
virtual void onResponseCompleted() override;
virtual void onDisconnect(const SockException &ex) override;
virtual ssize_t onResponseHeader(const string &status, const HttpHeader &header) override;
void onResponseBody(const char *buf, size_t size) override;
void onResponseCompleted(const SockException &ex) override;
private:
bool _first = true;
string _ts_url;
bool _play_result = true;
};
} // namespace mediakit

View File

@ -24,46 +24,6 @@ using namespace toolkit;
namespace mediakit {
class TsDemuxer : public MediaSinkInterface, public TrackSource, public std::enable_shared_from_this<TsDemuxer> {
public:
TsDemuxer() = default;
~TsDemuxer() override { _timer = nullptr; }
void start(const EventPoller::Ptr &poller, TrackListener *listener);
bool inputFrame(const Frame::Ptr &frame) override;
bool addTrack(const Track::Ptr &track) override {
return _delegate.addTrack(track);
}
void addTrackCompleted() override {
_delegate.addTrackCompleted();
}
void resetTracks() override {
((MediaSink &) _delegate).resetTracks();
}
vector<Track::Ptr> getTracks(bool ready = true) const override {
return _delegate.getTracks(ready);
}
private:
void onTick();
int64_t getBufferMS();
int64_t getPlayPosition();
void setPlayPosition(int64_t pos);
private:
int64_t _ticker_offset = 0;
Ticker _ticker;
Stamp _stamp[2];
Timer::Ptr _timer;
MediaSinkDelegate _delegate;
multimap<int64_t, Frame::Ptr> _frame_cache;
};
class TsPlayerImp : public PlayerImp<TsPlayer, PlayerBase>, private TrackListener {
public:
using Ptr = std::shared_ptr<TsPlayerImp>;

View File

@ -9,94 +9,14 @@
*/
#include "TsPlayerImp.h"
#include "HlsPlayer.h"
namespace mediakit {
void TsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) {
_frame_cache.clear();
_stamp[TrackAudio].setRelativeStamp(0);
_stamp[TrackVideo].setRelativeStamp(0);
_stamp[TrackAudio].syncTo(_stamp[TrackVideo]);
setPlayPosition(0);
_delegate.setTrackListener(listener);
//每50毫秒执行一次
weak_ptr <TsDemuxer> weak_self = shared_from_this();
_timer = std::make_shared<Timer>(0.05f, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strong_self->onTick();
return true;
}, poller);
}
bool TsDemuxer::inputFrame(const Frame::Ptr &frame) {
//为了避免track准备时间过长, 因此在没准备好之前, 直接消费掉所有的帧
if (!_delegate.isAllTrackReady()) {
_delegate.inputFrame(frame);
return true;
}
//计算相对时间戳
int64_t dts, pts;
//根据时间戳缓存frame
_stamp[frame->getTrackType()].revise(frame->dts(), frame->pts(), dts, pts);
_frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame));
if (getBufferMS() > 30 * 1000) {
//缓存超过30秒强制消费至15秒(减少延时或内存占用)
while (getBufferMS() > 15 * 1000) {
_delegate.inputFrame(_frame_cache.begin()->second);
_frame_cache.erase(_frame_cache.begin());
}
//接着播放缓存中最早的帧
setPlayPosition(_frame_cache.begin()->first);
}
return true;
}
int64_t TsDemuxer::getPlayPosition() {
return _ticker.elapsedTime() + _ticker_offset;
}
int64_t TsDemuxer::getBufferMS() {
if (_frame_cache.empty()) {
return 0;
}
return _frame_cache.rbegin()->first - _frame_cache.begin()->first;
}
void TsDemuxer::setPlayPosition(int64_t pos) {
_ticker.resetTime();
_ticker_offset = pos;
}
void TsDemuxer::onTick() {
auto it = _frame_cache.begin();
while (it != _frame_cache.end()) {
if (it->first > getPlayPosition()) {
//这些帧还未到时间播放
break;
}
if (getBufferMS() < 3 * 1000) {
//缓存小于3秒,那么降低定时器消费速度(让剩余的数据在3秒后消费完毕)
//目的是为了防止定时器长时间干等后,数据瞬间消费完毕
setPlayPosition(_frame_cache.begin()->first);
}
//消费掉已经到期的帧
_delegate.inputFrame(it->second);
it = _frame_cache.erase(it);
}
}
//////////////////////////////////////////////////////////////////////////
TsPlayerImp::TsPlayerImp(const EventPoller::Ptr &poller) : PlayerImp<TsPlayer, PlayerBase>(poller) {}
void TsPlayerImp::onPacket(const char *data, size_t len) {
if (!_decoder) {
if (!_decoder && _demuxer) {
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get());
}
@ -106,15 +26,14 @@ void TsPlayerImp::onPacket(const char *data, size_t len) {
}
void TsPlayerImp::addTrackCompleted() {
PlayerImp<TsPlayer, PlayerBase>::onPlayResult(SockException(Err_success, "play hls success"));
PlayerImp<TsPlayer, PlayerBase>::onPlayResult(SockException(Err_success, "play http-ts success"));
}
void TsPlayerImp::onPlayResult(const SockException &ex) {
WarnL << ex.getErrCode() << " " << ex.what();
if (ex) {
PlayerImp<TsPlayer, PlayerBase>::onPlayResult(ex);
} else {
auto demuxer = std::make_shared<TsDemuxer>();
auto demuxer = std::make_shared<HlsDemuxer>();
demuxer->start(getPoller(), this);
_demuxer = std::move(demuxer);
}
@ -126,7 +45,7 @@ void TsPlayerImp::onShutdown(const SockException &ex) {
}
vector <Track::Ptr> TsPlayerImp::getTracks(bool ready) const {
return static_pointer_cast<TsDemuxer>(_demuxer)->getTracks(ready);
return static_pointer_cast<HlsDemuxer>(_demuxer)->getTracks(ready);
}
}//namespace mediakit

View File

@ -118,10 +118,8 @@ protected:
* http回复头
* @param status :200 OK
* @param headers http头
* @return content的长度-1:content>=0:content
* http头中带有Content-Length字段时
*/
ssize_t onResponseHeader(const string &status,const HttpHeader &headers) override {
void onResponseHeader(const string &status,const HttpHeader &headers) override {
if(status == "101"){
auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(_Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
if(Sec_WebSocket_Accept == const_cast<HttpHeader &>(headers)["Sec-WebSocket-Accept"]){
@ -129,26 +127,24 @@ protected:
onWebSocketException(SockException());
//防止ws服务器返回Content-Length
const_cast<HttpHeader &>(headers).erase("Content-Length");
//后续全是websocket负载数据
return -1;
return;
}
shutdown(SockException(Err_shutdown, StrPrinter << "Sec-WebSocket-Accept mismatch"));
return 0;
return;
}
shutdown(SockException(Err_shutdown,StrPrinter << "bad http status code:" << status));
return 0;
};
/**
* http回复完毕,
*/
void onResponseCompleted() override {}
void onResponseCompleted(const SockException &ex) override {}
/**
* websocket负载数据
*/
void onResponseBody(const char *buf,size_t size,size_t recvedSize,size_t totalSize) override{
void onResponseBody(const char *buf,size_t size) override{
if(_onRecv){
//完成websocket握手后拦截websocket数据并解析
_onRecv(buf, size);

View File

@ -49,15 +49,15 @@ int main(int argc, char *argv[]) {
for (auto &url : urlList) {
//创建下载器
HttpDownloader::Ptr downloader(new HttpDownloader());
downloader->setOnResult([](ErrCode code, const string &errMsg, const string &filePath) {
downloader->setOnResult([](const SockException &ex, const string &filePath) {
DebugL << "=====================HttpDownloader result=======================";
//下载结果回调
if (code == Err_success) {
if (!ex) {
//文件下载成功
InfoL << "download file success:" << filePath;
} else {
//下载失败
WarnL << "code:" << code << " msg:" << errMsg;
WarnL << "code:" << ex.getErrCode() << " msg:" << ex.what();
}
});
//断点续传功能,开启后可能会遇到416的错误因为上次文件已经下载完全