diff --git a/api/source/mk_httpclient.cpp b/api/source/mk_httpclient.cpp index 84027c16..e246335b 100755 --- a/api/source/mk_httpclient.cpp +++ b/api/source/mk_httpclient.cpp @@ -32,9 +32,9 @@ API_EXPORT void API_CALL mk_http_downloader_release(mk_http_downloader ctx) { API_EXPORT void API_CALL mk_http_downloader_start(mk_http_downloader ctx, const char *url, const char *file, on_mk_download_complete cb, void *user_data) { assert(ctx && url && file); HttpDownloader::Ptr *obj = (HttpDownloader::Ptr *) ctx; - (*obj)->setOnResult([cb, user_data](ErrCode code, const string &errMsg, const string &filePath) { + (*obj)->setOnResult([cb, user_data](const SockException &ex, const string &filePath) { if (cb) { - cb(user_data, code, errMsg.data(), filePath.data()); + cb(user_data, ex.getErrCode(), ex.what(), filePath.data()); } }); (*obj)->startDownload(url, file, false); diff --git a/player/test_player.cpp b/player/test_player.cpp index 704e7237..bd0c492b 100644 --- a/player/test_player.cpp +++ b/player/test_player.cpp @@ -112,6 +112,10 @@ int main(int argc, char *argv[]) { } }); + player->setOnShutdown([](const SockException &ex){ + WarnL << "play shutdown: " << ex.what(); + }); + (*player)[kRtpType] = atoi(argv[2]); //不等待track ready再回调播放成功事件,这样可以加快秒开速度 (*player)[Client::kWaitTrackReady] = false; diff --git a/src/Http/HlsPlayer.cpp b/src/Http/HlsPlayer.cpp index d3667a91..f17b6f16 100644 --- a/src/Http/HlsPlayer.cpp +++ b/src/Http/HlsPlayer.cpp @@ -17,30 +17,31 @@ HlsPlayer::HlsPlayer(const EventPoller::Ptr &poller) { setPoller(poller ? poller : EventPollerPool::Instance().getPoller()); } -HlsPlayer::~HlsPlayer() {} - -void HlsPlayer::play(const string &strUrl) { - _m3u8_list.emplace_back(strUrl); - play_l(); +void HlsPlayer::play(const string &url) { + _play_result = false; + _play_url = url; + fetchIndexFile(); } -void HlsPlayer::play_l() { - if (_m3u8_list.empty()) { - teardown_l(SockException(Err_shutdown, "所有hls url都尝试播放失败!")); - return; - } +void HlsPlayer::fetchIndexFile() { if (waitResponse()) { return; } - setMethod("GET"); if (!(*this)[kNetAdapter].empty()) { setNetAdapter((*this)[kNetAdapter]); } setCompleteTimeout((*this)[Client::kTimeoutMS].as()); - sendRequest(_m3u8_list.back()); + setMethod("GET"); + sendRequest(_play_url); } void HlsPlayer::teardown_l(const SockException &ex) { + if (!_play_result) { + _play_result = true; + onPlayResult(ex); + } else { + onShutdown(ex); + } _timer.reset(); _timer_ts.reset(); _http_ts_player.reset(); @@ -51,11 +52,11 @@ void HlsPlayer::teardown() { teardown_l(SockException(Err_shutdown, "teardown")); } -void HlsPlayer::playNextTs() { +void HlsPlayer::fetchSegment() { if (_ts_list.empty()) { //播放列表为空,那么立即重新下载m3u8文件 _timer.reset(); - play_l(); + fetchIndexFile(); return; } if (_http_ts_player && _http_ts_player->waitResponse()) { @@ -110,7 +111,7 @@ void HlsPlayer::playNextTs() { strong_self->_timer_ts.reset(new Timer(delay / 1000.0f, [weak_self]() { auto strong_self = weak_self.lock(); if (strong_self) { - strong_self->playNextTs(); + strong_self->fetchSegment(); } return false; }, strong_self->getPoller())); @@ -143,15 +144,13 @@ void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map weak_self = dynamic_pointer_cast(shared_from_this()); auto url = ts_map.rbegin()->second.url; getPoller()->async([weak_self, url]() { @@ -163,42 +162,36 @@ void HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map(headers)["Content-Type"]; - _is_m3u8 = (content_type.find("application/vnd.apple.mpegurl") == 0); - if(!_is_m3u8) { - auto it = headers.find("Content-Length"); - //如果没有长度或者长度小于等于0, 那么肯定不是m3u8 - if (it == headers.end() || atoll(it->second.data()) <=0) { - teardown_l(SockException(Err_shutdown, "可能不是m3u8文件")); - } + auto content_type = const_cast(headers)["Content-Type"]; + if (content_type.find("application/vnd.apple.mpegurl") != 0) { + throw invalid_argument("content type not m3u8: " + content_type); } - return -1; + _m3u8.clear(); } -void HlsPlayer::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) { - if (recvedSize == size) { - //刚开始 - _m3u8.clear(); - } +void HlsPlayer::onResponseBody(const char *buf, size_t size) { _m3u8.append(buf, size); } -void HlsPlayer::onResponseCompleted() { - if (HlsParser::parse(getUrl(), _m3u8)) { - if (_first) { - _first = false; - onPlayResult(SockException(Err_success, "play success")); - } - playDelay(); - } else { - teardown_l(SockException(Err_shutdown, "解析m3u8文件失败")); +void HlsPlayer::onResponseCompleted(const SockException &ex) { + if (ex) { + teardown_l(ex); + return; } + if (!HlsParser::parse(getUrl(), _m3u8)) { + teardown_l(SockException(Err_other, "parse m3u8 failed:" + _m3u8)); + return; + } + if (!_play_result) { + _play_result = true; + onPlayResult(SockException()); + } + playDelay(); } float HlsPlayer::delaySecond() { @@ -224,35 +217,8 @@ float HlsPlayer::delaySecond() { return 1.0f; } -void HlsPlayer::onDisconnect(const SockException &ex) { - if (_first) { - //第一次失败,则播放失败 - _first = false; - onPlayResult(ex); - return; - } - - //主动shutdown - if (ex.getErrCode() == Err_shutdown) { - if (_m3u8_list.size() <= 1) { - //全部url都播放失败 - _timer = nullptr; - _timer_ts = nullptr; - onShutdown(ex); - } else { - _m3u8_list.pop_back(); - //还有上一级url可以重试播放 - play_l(); - } - return; - } - - //eof等,之后播放失败,那么重试播放m3u8 - playDelay(); -} - bool HlsPlayer::onRedirectUrl(const string &url, bool temporary) { - _m3u8_list.emplace_back(url); + _play_url = url; return true; } @@ -261,7 +227,7 @@ void HlsPlayer::playDelay() { _timer.reset(new Timer(delaySecond(), [weak_self]() { auto strong_self = weak_self.lock(); if (strong_self) { - strong_self->play_l(); + strong_self->fetchIndexFile(); } return false; }, getPoller())); @@ -279,46 +245,6 @@ void HlsPlayer::onPacket_l(const char *data, size_t len) { ////////////////////////////////////////////////////////////////////////// -class HlsDemuxer : public MediaSinkInterface, public TrackSource, public std::enable_shared_from_this { -public: - HlsDemuxer() = default; - ~HlsDemuxer() override { _timer = nullptr; } - - void start(const EventPoller::Ptr &poller, TrackListener *listener); - - bool inputFrame(const Frame::Ptr &frame) override; - - bool addTrack(const Track::Ptr &track) override { - return _delegate.addTrack(track); - } - - void addTrackCompleted() override { - _delegate.addTrackCompleted(); - } - - void resetTracks() override { - ((MediaSink &) _delegate).resetTracks(); - } - - vector getTracks(bool ready = true) const override { - return _delegate.getTracks(ready); - } - -private: - void onTick(); - int64_t getBufferMS(); - int64_t getPlayPosition(); - void setPlayPosition(int64_t pos); - -private: - int64_t _ticker_offset = 0; - Ticker _ticker; - Stamp _stamp[2]; - Timer::Ptr _timer; - MediaSinkDelegate _delegate; - multimap _frame_cache; -}; - void HlsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) { _frame_cache.clear(); _stamp[TrackAudio].setRelativeStamp(0); @@ -406,7 +332,7 @@ void HlsDemuxer::onTick() { HlsPlayerImp::HlsPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller) {} void HlsPlayerImp::onPacket(const char *data, size_t len) { - if (!_decoder) { + if (!_decoder && _demuxer) { _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); } @@ -430,7 +356,6 @@ void HlsPlayerImp::onPlayResult(const SockException &ex) { } void HlsPlayerImp::onShutdown(const SockException &ex) { - WarnL << ex.what(); PlayerImp::onShutdown(ex); _demuxer = nullptr; } diff --git a/src/Http/HlsPlayer.h b/src/Http/HlsPlayer.h index 4da2be44..fb45cd9b 100644 --- a/src/Http/HlsPlayer.h +++ b/src/Http/HlsPlayer.h @@ -25,16 +25,45 @@ using namespace toolkit; namespace mediakit { +class HlsDemuxer + : public MediaSinkInterface + , public TrackSource + , public std::enable_shared_from_this { +public: + HlsDemuxer() = default; + ~HlsDemuxer() override { _timer = nullptr; } + + void start(const EventPoller::Ptr &poller, TrackListener *listener); + bool inputFrame(const Frame::Ptr &frame) override; + bool addTrack(const Track::Ptr &track) override { return _delegate.addTrack(track); } + void addTrackCompleted() override { _delegate.addTrackCompleted(); } + void resetTracks() override { ((MediaSink &)_delegate).resetTracks(); } + vector getTracks(bool ready = true) const override { return _delegate.getTracks(ready); } + +private: + void onTick(); + int64_t getBufferMS(); + int64_t getPlayPosition(); + void setPlayPosition(int64_t pos); + +private: + int64_t _ticker_offset = 0; + Ticker _ticker; + Stamp _stamp[2]; + Timer::Ptr _timer; + MediaSinkDelegate _delegate; + multimap _frame_cache; +}; + class HlsPlayer : public HttpClientImp , public PlayerBase , public HlsParser{ public: HlsPlayer(const EventPoller::Ptr &poller); - ~HlsPlayer() override; + ~HlsPlayer() override = default; /** * 开始播放 - * @param strUrl */ - void play(const string &strUrl) override; + void play(const string &url) override; /** * 停止播放 @@ -50,55 +79,18 @@ protected: virtual void onPacket(const char *data, size_t len) = 0; private: - /** - * 解析m3u8成功 - * @param is_m3u8_inner 是否为m3u8列表 - * @param sequence ts列表seq - * @param ts_map ts列表或m3u8列表 - */ void onParsed(bool is_m3u8_inner,int64_t sequence,const map &ts_map) override; - /** - * 收到http回复头 - * @param status 状态码,譬如:200 OK - * @param headers http头 - * @return 返回后续content的长度;-1:后续数据全是content;>=0:固定长度content - * 需要指出的是,在http头中带有Content-Length字段时,该返回值无效 - */ - ssize_t onResponseHeader(const string &status,const HttpHeader &headers) override; - /** - * 收到http conten数据 - * @param buf 数据指针 - * @param size 数据大小 - * @param recvedSize 已收数据大小(包含本次数据大小),当其等于totalSize时将触发onResponseCompleted回调 - * @param totalSize 总数据大小 - */ - void onResponseBody(const char *buf,size_t size,size_t recvedSize, size_t totalSize) override; - - /** - * 接收http回复完毕, - */ - void onResponseCompleted() override; - - /** - * http链接断开回调 - * @param ex 断开原因 - */ - void onDisconnect(const SockException &ex) override; - - /** - * 重定向事件 - * @param url 重定向url - * @param temporary 是否为临时重定向 - * @return 是否继续 - */ + void onResponseHeader(const string &status,const HttpHeader &headers) override; + void onResponseBody(const char *buf,size_t size) override; + void onResponseCompleted(const SockException &e) override; bool onRedirectUrl(const string &url,bool temporary) override; private: void playDelay(); float delaySecond(); - void playNextTs(); + void fetchSegment(); void teardown_l(const SockException &ex); - void play_l(); + void fetchIndexFile(); void onPacket_l(const char *data, size_t len); private: @@ -110,15 +102,14 @@ private: }; private: - bool _is_m3u8 = false; - bool _first = true; + bool _play_result = false; int64_t _last_sequence = -1; string _m3u8; + string _play_url; Timer::Ptr _timer; Timer::Ptr _timer_ts; list _ts_list; list _ts_url_sort; - list _m3u8_list; set _ts_url_cache; HttpTSPlayer::Ptr _http_ts_player; TSSegment _segment; diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index 12e51357..775cef8a 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -141,6 +141,14 @@ const Parser &HttpClient::response() const { return _parser; } +ssize_t HttpClient::responseBodyTotalSize() const { + return _total_body_size; +} + +size_t HttpClient::responseBodySize() const { + return _recved_body_size; +} + const string &HttpClient::getUrl() const { return _url; } @@ -151,7 +159,7 @@ void HttpClient::onConnect(const SockException &ex) { void HttpClient::onConnect_l(const SockException &ex) { if (ex) { - onDisconnect(ex); + onResponseCompleted_l(ex); return; } @@ -173,12 +181,7 @@ void HttpClient::onRecv(const Buffer::Ptr &pBuf) { } void HttpClient::onErr(const SockException &ex) { - if (ex.getErrCode() == Err_eof && _total_body_size < 0) { - //如果Content-Length未指定 但服务器断开链接 - //则认为本次http请求完成 - onResponseCompleted_l(); - } - onDisconnect(ex); + onResponseCompleted_l(ex); } ssize_t HttpClient::onRecvHeader(const char *data, size_t len) { @@ -186,42 +189,44 @@ ssize_t HttpClient::onRecvHeader(const char *data, size_t len) { if (_parser.Url() == "302" || _parser.Url() == "301") { auto new_url = _parser["Location"]; if (new_url.empty()) { - shutdown(SockException(Err_shutdown, "未找到Location字段(跳转url)")); - return 0; + throw invalid_argument("未找到Location字段(跳转url)"); } if (onRedirectUrl(new_url, _parser.Url() == "302")) { - setMethod("GET"); HttpClient::sendRequest(new_url); return 0; } } checkCookie(_parser.getHeader()); + onResponseHeader(_parser.Url(), _parser.getHeader()); _header_recved = true; - _total_body_size = onResponseHeader(_parser.Url(), _parser.getHeader()); - - if (!_parser["Content-Length"].empty()) { - //有Content-Length字段时忽略onResponseHeader的返回值 - _total_body_size = atoll(_parser["Content-Length"].data()); - } if (_parser["Transfer-Encoding"] == "chunked") { //如果Transfer-Encoding字段等于chunked,则认为后续的content是不限制长度的 _total_body_size = -1; _chunked_splitter = std::make_shared([this](const char *data, size_t len) { if (len > 0) { - auto recved_body_size = _recved_body_size + len; - onResponseBody(data, len, recved_body_size, SIZE_MAX); - _recved_body_size = recved_body_size; + _recved_body_size += len; + onResponseBody(data, len); } else { - onResponseCompleted_l(); + _total_body_size = _recved_body_size; + onResponseCompleted_l(SockException(Err_success, "success")); } }); + //后续为源源不断的body + return -1; + } + + if (!_parser["Content-Length"].empty()) { + //有Content-Length字段时忽略onResponseHeader的返回值 + _total_body_size = atoll(_parser["Content-Length"].data()); + } else { + _total_body_size = -1; } if (_total_body_size == 0) { //后续没content,本次http请求结束 - onResponseCompleted_l(); + onResponseCompleted_l(SockException(Err_success, "success")); return 0; } @@ -238,30 +243,30 @@ void HttpClient::onRecvContent(const char *data, size_t len) { _chunked_splitter->input(data, len); return; } - auto recved_body_size = _recved_body_size + len; + _recved_body_size += len; if (_total_body_size < 0) { - //不限长度的content,最大支持SIZE_MAX个字节 - onResponseBody(data, len, recved_body_size, SIZE_MAX); - _recved_body_size = recved_body_size; + //不限长度的content + onResponseBody(data, len); return; } //固定长度的content - if (recved_body_size < (size_t) _total_body_size) { + if (_recved_body_size < (size_t) _total_body_size) { //content还未接收完毕 - onResponseBody(data, len, recved_body_size, _total_body_size); - _recved_body_size = recved_body_size; + onResponseBody(data, len); return; } - //content接收完毕 - onResponseBody(data, _total_body_size - _recved_body_size, _total_body_size, _total_body_size); - bool bigger_than_expected = recved_body_size > (size_t) _total_body_size; - onResponseCompleted_l(); - if (bigger_than_expected) { - //声明的content数据比真实的小,那么我们只截取前面部分的并断开链接 - shutdown(SockException(Err_shutdown, "http response content size bigger than expected")); + if (_recved_body_size == (size_t)_total_body_size) { + //content接收完毕 + onResponseBody(data, len); + onResponseCompleted_l(SockException(Err_success, "success")); + return; } + + //声明的content数据比真实的小,断开链接 + onResponseBody(data, len); + throw invalid_argument("http response content size bigger than expected"); } void HttpClient::onFlush() { @@ -308,10 +313,34 @@ void HttpClient::onManager() { } } -void HttpClient::onResponseCompleted_l() { +void HttpClient::onResponseCompleted_l(const SockException &ex) { + if (_complete) { + return; + } _complete = true; _wait_complete.resetTime(); - onResponseCompleted(); + + if (!ex) { + //确认无疑的成功 + onResponseCompleted(ex); + return; + } + //可疑的失败 + + if (_total_body_size > 0 && _recved_body_size >= _total_body_size) { + //回复header中有content-length信息,那么收到的body大于等于声明值则认为成功 + onResponseCompleted(SockException(Err_success, "success")); + return; + } + + if (_total_body_size == -1 && _recved_body_size > 0) { + //回复header中无content-length信息,那么收到一点body也认为成功 + onResponseCompleted(SockException(Err_success, ex.what())); + return; + } + + //确认无疑的失败 + onResponseCompleted(ex); } bool HttpClient::waitResponse() const { diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 6b215e8d..848b12a0 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -100,6 +100,16 @@ public: */ const Parser &response() const; + /** + * 获取回复header声明的body大小 + */ + ssize_t responseBodyTotalSize() const; + + /** + * 获取已经下载body的大小 + */ + size_t responseBodySize() const; + /** * 获取请求url */ @@ -139,37 +149,20 @@ protected: * 收到http回复头 * @param status 状态码,譬如:200 OK * @param headers http头 - * @return 返回后续content的长度;-1:后续数据全是content;>=0:固定长度content - * 需要指出的是,在http头中带有Content-Length字段时,该返回值无效 */ - virtual ssize_t onResponseHeader(const string &status, const HttpHeader &headers) { - //无Content-Length字段时默认后面全是content - return -1; - } + virtual void onResponseHeader(const string &status, const HttpHeader &headers) = 0; /** * 收到http conten数据 * @param buf 数据指针 * @param size 数据大小 - * @param recvedSize 已收数据大小(包含本次数据大小),当其等于totalSize时将触发onResponseCompleted回调 - * @param totalSize 总数据大小 */ - virtual void onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) { - DebugL << size << " " << recvedSize << " " << totalSize; - } + virtual void onResponseBody(const char *buf, size_t size) = 0; /** * 接收http回复完毕, */ - virtual void onResponseCompleted() { - DebugL; - } - - /** - * http链接断开回调 - * @param ex 断开原因 - */ - virtual void onDisconnect(const SockException &ex) {} + virtual void onResponseCompleted(const SockException &ex) = 0; /** * 重定向事件 @@ -179,11 +172,11 @@ protected: */ virtual bool onRedirectUrl(const string &url, bool temporary) { return true; }; +protected: //// HttpRequestSplitter override //// ssize_t onRecvHeader(const char *data, size_t len) override; void onRecvContent(const char *data, size_t len) override; -protected: //// TcpClient override //// void onConnect(const SockException &ex) override; void onRecv(const Buffer::Ptr &pBuf) override; @@ -192,7 +185,7 @@ protected: void onManager() override; private: - void onResponseCompleted_l(); + void onResponseCompleted_l(const SockException &ex); void onConnect_l(const SockException &ex); void checkCookie(HttpHeader &headers); void clearResponse(); diff --git a/src/Http/HttpClientImp.cpp b/src/Http/HttpClientImp.cpp index 5cb548b0..da806d38 100644 --- a/src/Http/HttpClientImp.cpp +++ b/src/Http/HttpClientImp.cpp @@ -13,12 +13,11 @@ namespace mediakit { void HttpClientImp::onConnect(const SockException &ex) { - if(!isHttps()){ + if (!isHttps()) { HttpClient::onConnect(ex); } else { TcpClientWithSSL::onConnect(ex); } - } } /* namespace mediakit */ diff --git a/src/Http/HttpClientImp.h b/src/Http/HttpClientImp.h index 4d59ce8f..1123cbf9 100644 --- a/src/Http/HttpClientImp.h +++ b/src/Http/HttpClientImp.h @@ -16,13 +16,14 @@ using namespace toolkit; namespace mediakit { -class HttpClientImp: public TcpClientWithSSL { +class HttpClientImp : public TcpClientWithSSL { public: - typedef std::shared_ptr Ptr; - HttpClientImp() {} - virtual ~HttpClientImp() {} + using Ptr = std::shared_ptr; + HttpClientImp() = default; + ~HttpClientImp() override = default; + protected: - void onConnect(const SockException &ex) override ; + void onConnect(const SockException &ex) override; }; } /* namespace mediakit */ diff --git a/src/Http/HttpDownloader.cpp b/src/Http/HttpDownloader.cpp index a8714dd7..c87e9a8a 100644 --- a/src/Http/HttpDownloader.cpp +++ b/src/Http/HttpDownloader.cpp @@ -15,78 +15,59 @@ using namespace toolkit; namespace mediakit { -HttpDownloader::HttpDownloader() {} - HttpDownloader::~HttpDownloader() { closeFile(); } -void HttpDownloader::startDownload(const string &url, const string &filePath, bool bAppend) { - _filePath = filePath; - if (_filePath.empty()) { - _filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest(); +void HttpDownloader::startDownload(const string &url, const string &file_path, bool append) { + _file_path = file_path; + if (_file_path.empty()) { + _file_path = exeDir() + "HttpDownloader/" + MD5(url).hexdigest(); } - _saveFile = File::create_file(_filePath.data(), bAppend ? "ab" : "wb"); - if (!_saveFile) { - auto strErr = StrPrinter << "打开文件失败:" << filePath << endl; + _save_file = File::create_file(_file_path.data(), append ? "ab" : "wb"); + if (!_save_file) { + auto strErr = StrPrinter << "打开文件失败:" << file_path << endl; throw std::runtime_error(strErr); } - _bDownloadSuccess = false; - if (bAppend) { - auto currentLen = ftell(_saveFile); + if (append) { + auto currentLen = ftell(_save_file); if (currentLen) { //最少续传一个字节,怕遇到http 416的错误 currentLen -= 1; - fseek(_saveFile, -1, SEEK_CUR); + fseek(_save_file, -1, SEEK_CUR); } addHeader("Range", StrPrinter << "bytes=" << currentLen << "-" << endl); } setMethod("GET"); - sendRequest(url); } -ssize_t HttpDownloader::onResponseHeader(const string &status, const HttpHeader &headers) { +void HttpDownloader::onResponseHeader(const string &status, const HttpHeader &headers) { if (status != "200" && status != "206") { //失败 - shutdown(SockException(Err_shutdown, StrPrinter << "Http Status:" << status)); - } - //后续全部是content - return -1; -} - -void HttpDownloader::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) { - if (_saveFile) { - fwrite(buf, size, 1, _saveFile); + throw std::invalid_argument("bad http status: " + status); } } -void HttpDownloader::onResponseCompleted() { +void HttpDownloader::onResponseBody(const char *buf, size_t size) { + if (_save_file) { + fwrite(buf, size, 1, _save_file); + } +} + +void HttpDownloader::onResponseCompleted(const SockException &ex) { closeFile(); - // InfoL << "md5Sum:" << getMd5Sum(_filePath); - _bDownloadSuccess = true; - if (_onResult) { - _onResult(Err_success, "success", _filePath); - _onResult = nullptr; - } -} - -void HttpDownloader::onDisconnect(const SockException &ex) { - closeFile(); - if (!_bDownloadSuccess) { - File::delete_file(_filePath.data()); - } - if (_onResult) { - _onResult(ex.getErrCode(), ex.what(), _filePath); - _onResult = nullptr; + if (_on_result) { + _on_result(ex, _file_path); + _on_result = nullptr; } } void HttpDownloader::closeFile() { - if (_saveFile) { - fflush(_saveFile); - fclose(_saveFile); - _saveFile = nullptr; + if (_save_file) { + fflush(_save_file); + fclose(_save_file); + _save_file = nullptr; } } diff --git a/src/Http/HttpDownloader.h b/src/Http/HttpDownloader.h index e0af8fca..a1e26dc6 100644 --- a/src/Http/HttpDownloader.h +++ b/src/Http/HttpDownloader.h @@ -15,34 +15,41 @@ namespace mediakit { -class HttpDownloader: public HttpClientImp { +class HttpDownloader : public HttpClientImp { public: - typedef std::shared_ptr Ptr; - typedef std::function onDownloadResult; - HttpDownloader(); - virtual ~HttpDownloader(); - //开始下载文件,默认断点续传方式下载 - void startDownload(const string &url, const string &filePath = "", bool bAppend = false); + using Ptr = std::shared_ptr; + using onDownloadResult = std::function; + + HttpDownloader() = default; + ~HttpDownloader() override; + + /** + * 开始下载文件,默认断点续传方式下载 + * @param url 下载http url + * @param file_path 文件保存地址,置空则选择默认文件路径 + * @param append 如果文件已经存在,是否断点续传方式下载 + */ + void startDownload(const string &url, const string &file_path = "", bool append = false); + void startDownload(const string &url, const onDownloadResult &cb) { setOnResult(cb); startDownload(url, "", false); } - void setOnResult(const onDownloadResult &cb) { _onResult = cb; } + + void setOnResult(const onDownloadResult &cb) { _on_result = cb; } protected: - void onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) override; - ssize_t onResponseHeader(const string &status, const HttpHeader &headers) override; - void onResponseCompleted() override; - void onDisconnect(const SockException &ex) override; + void onResponseBody(const char *buf, size_t size) override; + void onResponseHeader(const string &status, const HttpHeader &headers) override; + void onResponseCompleted(const SockException &ex) override; private: void closeFile(); private: - FILE *_saveFile = nullptr; - string _filePath; - onDownloadResult _onResult; - bool _bDownloadSuccess = false; + FILE *_save_file = nullptr; + string _file_path; + onDownloadResult _on_result; }; } /* namespace mediakit */ diff --git a/src/Http/HttpRequester.cpp b/src/Http/HttpRequester.cpp index aa346da3..284252e4 100644 --- a/src/Http/HttpRequester.cpp +++ b/src/Http/HttpRequester.cpp @@ -12,46 +12,36 @@ namespace mediakit { -ssize_t HttpRequester::onResponseHeader(const string &status, const HttpHeader &headers) { - _strRecvBody.clear(); - return HttpClientImp::onResponseHeader(status, headers); +void HttpRequester::onResponseHeader(const string &status, const HttpHeader &headers) { + _res_body.clear(); } -void HttpRequester::onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) { - _strRecvBody.append(buf, size); +void HttpRequester::onResponseBody(const char *buf, size_t size) { + _res_body.append(buf, size); } -void HttpRequester::onResponseCompleted() { - const_cast (response()).setContent(std::move(_strRecvBody)); - if (_onResult) { - _onResult(SockException(), response()); - _onResult = nullptr; +void HttpRequester::onResponseCompleted(const SockException &ex) { + const_cast(response()).setContent(std::move(_res_body)); + if (_on_result) { + _on_result(ex, response()); + _on_result = nullptr; } } -void HttpRequester::onDisconnect(const SockException &ex) { - const_cast (response()).setContent(std::move(_strRecvBody)); - if (_onResult) { - _onResult(ex, response()); - _onResult = nullptr; - } -} - -void HttpRequester::startRequester(const string &url, const HttpRequesterResult &onResult, float timeOutSecond) { - _onResult = onResult; - setCompleteTimeout(timeOutSecond * 1000); +void HttpRequester::startRequester(const string &url, const HttpRequesterResult &on_result, float timeout_sec) { + _on_result = on_result; + setCompleteTimeout(timeout_sec * 1000); sendRequest(url); } void HttpRequester::clear() { HttpClientImp::clear(); - _strRecvBody.clear(); - _onResult = nullptr; + _res_body.clear(); + _on_result = nullptr; } void HttpRequester::setOnResult(const HttpRequesterResult &onResult) { - _onResult = onResult; + _on_result = onResult; } - -}//namespace mediakit +} // namespace mediakit diff --git a/src/Http/HttpRequester.h b/src/Http/HttpRequester.h index dc602f5d..c314b3f3 100644 --- a/src/Http/HttpRequester.h +++ b/src/Http/HttpRequester.h @@ -24,18 +24,17 @@ public: ~HttpRequester() override = default; void setOnResult(const HttpRequesterResult &onResult); - void startRequester(const string &url, const HttpRequesterResult &onResult, float timeOutSecond = 10); + void startRequester(const string &url, const HttpRequesterResult &on_result, float timeout_sec = 10); void clear() override; private: - ssize_t onResponseHeader(const string &status, const HttpHeader &headers) override; - void onResponseBody(const char *buf, size_t size, size_t recvedSize, size_t totalSize) override; - void onResponseCompleted() override; - void onDisconnect(const SockException &ex) override; + void onResponseHeader(const string &status, const HttpHeader &headers) override; + void onResponseBody(const char *buf, size_t size) override; + void onResponseCompleted(const SockException &ex) override; private: - string _strRecvBody; - HttpRequesterResult _onResult; + string _res_body; + HttpRequesterResult _on_result; }; }//namespace mediakit diff --git a/src/Http/HttpTSPlayer.cpp b/src/Http/HttpTSPlayer.cpp index 58c66c30..e3a4b8ac 100644 --- a/src/Http/HttpTSPlayer.cpp +++ b/src/Http/HttpTSPlayer.cpp @@ -18,42 +18,25 @@ HttpTSPlayer::HttpTSPlayer(const EventPoller::Ptr &poller, bool split_ts) { setPoller(poller ? poller : EventPollerPool::Instance().getPoller()); } -ssize_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) { - _status = status; +void HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) { if (status != "200" && status != "206") { - //http状态码不符合预期 - shutdown(SockException(Err_other, StrPrinter << "bad http status code:" + status)); - return 0; - } - auto content_type = const_cast< HttpClient::HttpHeader &>(header)["Content-Type"]; - if (content_type.find("video/mp2t") == 0 || content_type.find("video/mpeg") == 0) { - _is_ts_content = true; + // http状态码不符合预期 + throw invalid_argument("bad http status code:" + status); } - //后续是不定长content - return -1; + auto content_type = const_cast(header)["Content-Type"]; + if (content_type.find("video/mp2t") != 0 && content_type.find("video/mpeg") != 0) { + throw invalid_argument("content type not mpeg-ts: " + content_type); + } } -void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_size, size_t total_size) { - if (_status != "200" && _status != "206") { - return; - } - if (recved_size == size) { - //开始接收数据 - if (buf[0] == TS_SYNC_BYTE) { - //这是ts头 - _is_first_packet_ts = true; - } else { - WarnL << "可能不是http-ts流"; - } - } - +void HttpTSPlayer::onResponseBody(const char *buf, size_t size) { if (_split_ts) { try { _segment.input(buf, size); } catch (std::exception &ex) { WarnL << ex.what(); - //ts解析失败,清空缓存数据 + // ts解析失败,清空缓存数据 _segment.reset(); throw; } @@ -62,11 +45,7 @@ void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_si } } -void HttpTSPlayer::onResponseCompleted() { - emitOnComplete(SockException(Err_success, "play completed")); -} - -void HttpTSPlayer::onDisconnect(const SockException &ex) { +void HttpTSPlayer::onResponseCompleted(const SockException &ex) { emitOnComplete(ex); } @@ -91,4 +70,4 @@ void HttpTSPlayer::setOnPacket(TSSegment::onSegment cb) { _on_segment = std::move(cb); } -}//namespace mediakit +} // namespace mediakit diff --git a/src/Http/HttpTSPlayer.h b/src/Http/HttpTSPlayer.h index 2d42f5df..ad7fd9ff 100644 --- a/src/Http/HttpTSPlayer.h +++ b/src/Http/HttpTSPlayer.h @@ -40,10 +40,9 @@ public: protected: ///HttpClient override/// - ssize_t onResponseHeader(const string &status, const HttpHeader &header) override; - void onResponseBody(const char *buf, size_t size, size_t recved_size, size_t total_size) override; - void onResponseCompleted() override; - void onDisconnect(const SockException &ex) override; + void onResponseHeader(const string &status, const HttpHeader &header) override; + void onResponseBody(const char *buf, size_t size) override; + void onResponseCompleted(const SockException &ex) override; protected: /** @@ -55,13 +54,7 @@ private: void emitOnComplete(const SockException &ex); private: - //是否为mpegts负载 - bool _is_ts_content = false; - //第一个包是否为ts包 - bool _is_first_packet_ts = false; - //是否判断是否是ts并split bool _split_ts; - string _status; TSSegment _segment; onComplete _on_complete; TSSegment::onSegment _on_segment; diff --git a/src/Http/TsPlayer.cpp b/src/Http/TsPlayer.cpp index fb6037f6..b8a7a84a 100644 --- a/src/Http/TsPlayer.cpp +++ b/src/Http/TsPlayer.cpp @@ -14,56 +14,35 @@ namespace mediakit { TsPlayer::TsPlayer(const EventPoller::Ptr &poller) : HttpTSPlayer(poller, true) {} -void TsPlayer::play(const string &strUrl) { - _ts_url.append(strUrl); - playTs(); +void TsPlayer::play(const string &url) { + TraceL << "play http-ts: " << url; + _play_result = false; + setHeaderTimeout((*this)[Client::kTimeoutMS].as()); + setBodyTimeout((*this)[Client::kMediaTimeoutMS].as()); + setMethod("GET"); + sendRequest(url); } void TsPlayer::teardown() { shutdown(SockException(Err_shutdown, "teardown")); } -void TsPlayer::playTs() { - if (waitResponse()) { - //播放器目前还存活,正在下载中 - return; - } - TraceL << "play http-ts: " << _ts_url; - weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); - setMethod("GET"); - setHeaderTimeout((*this)[Client::kTimeoutMS].as()); - setBodyTimeout((*this)[Client::kMediaTimeoutMS].as()); - sendRequest(_ts_url); -} - -void TsPlayer::onResponseCompleted() { - //接收完毕 - shutdown(SockException(Err_success, StrPrinter << "play " << _ts_url << " completed")); -} - -void TsPlayer::onDisconnect(const SockException &ex) { - WarnL << "play " << _ts_url << " failed: " << ex.getErrCode() << " " << ex.what(); - if (_first) { - //第一次失败,则播放失败 - _first = false; +void TsPlayer::onResponseCompleted(const SockException &ex) { + if (!_play_result) { + _play_result = true; onPlayResult(ex); - return; - } - if (ex.getErrCode() == Err_shutdown) { - onShutdown(ex); } else { - onResponseCompleted(); onShutdown(ex); } + HttpTSPlayer::onResponseCompleted(ex); } -ssize_t TsPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) { - ssize_t ret = HttpTSPlayer::onResponseHeader(status, header); - if (_first) { - _first = false; - onPlayResult(SockException(Err_success, "play success")); +void TsPlayer::onResponseBody(const char *buf, size_t size) { + if (!_play_result) { + _play_result = true; + onPlayResult(SockException(Err_success, "play http-ts success")); } - return ret; + HttpTSPlayer::onResponseBody(buf, size); } -}//namespace mediakit \ No newline at end of file +} // namespace mediakit \ No newline at end of file diff --git a/src/Http/TsPlayer.h b/src/Http/TsPlayer.h index 685b969b..a34a2d62 100644 --- a/src/Http/TsPlayer.h +++ b/src/Http/TsPlayer.h @@ -11,19 +11,19 @@ #ifndef HTTP_TSPLAYER_H #define HTTP_TSPLAYER_H -#include -#include "Util/util.h" -#include "Poller/Timer.h" #include "Http/HttpDownloader.h" +#include "HttpTSPlayer.h" #include "Player/MediaPlayer.h" +#include "Poller/Timer.h" #include "Rtp/Decoder.h" #include "Rtp/TSDecoder.h" -#include "HttpTSPlayer.h" +#include "Util/util.h" +#include using namespace toolkit; namespace mediakit { -class TsPlayer : public HttpTSPlayer, public PlayerBase { +class TsPlayer : public HttpTSPlayer , public PlayerBase { public: TsPlayer(const EventPoller::Ptr &poller); ~TsPlayer() override = default; @@ -38,18 +38,13 @@ public: */ void teardown() override; -private: - void playTs(); - protected: - virtual void onResponseCompleted() override; - virtual void onDisconnect(const SockException &ex) override; - virtual ssize_t onResponseHeader(const string &status, const HttpHeader &header) override; + void onResponseBody(const char *buf, size_t size) override; + void onResponseCompleted(const SockException &ex) override; private: - bool _first = true; - string _ts_url; + bool _play_result = true; }; -}//namespace mediakit -#endif //HTTP_TSPLAYER_H +} // namespace mediakit +#endif // HTTP_TSPLAYER_H diff --git a/src/Http/TsPlayerImp.h b/src/Http/TsPlayerImp.h index 402f32ee..fe0dd2a8 100644 --- a/src/Http/TsPlayerImp.h +++ b/src/Http/TsPlayerImp.h @@ -24,46 +24,6 @@ using namespace toolkit; namespace mediakit { -class TsDemuxer : public MediaSinkInterface, public TrackSource, public std::enable_shared_from_this { -public: - TsDemuxer() = default; - ~TsDemuxer() override { _timer = nullptr; } - - void start(const EventPoller::Ptr &poller, TrackListener *listener); - - bool inputFrame(const Frame::Ptr &frame) override; - - bool addTrack(const Track::Ptr &track) override { - return _delegate.addTrack(track); - } - - void addTrackCompleted() override { - _delegate.addTrackCompleted(); - } - - void resetTracks() override { - ((MediaSink &) _delegate).resetTracks(); - } - - vector getTracks(bool ready = true) const override { - return _delegate.getTracks(ready); - } - -private: - void onTick(); - int64_t getBufferMS(); - int64_t getPlayPosition(); - void setPlayPosition(int64_t pos); - -private: - int64_t _ticker_offset = 0; - Ticker _ticker; - Stamp _stamp[2]; - Timer::Ptr _timer; - MediaSinkDelegate _delegate; - multimap _frame_cache; -}; - class TsPlayerImp : public PlayerImp, private TrackListener { public: using Ptr = std::shared_ptr; diff --git a/src/Http/TsplayerImp.cpp b/src/Http/TsplayerImp.cpp index bcfd2f51..9bb263bb 100644 --- a/src/Http/TsplayerImp.cpp +++ b/src/Http/TsplayerImp.cpp @@ -9,94 +9,14 @@ */ #include "TsPlayerImp.h" +#include "HlsPlayer.h" namespace mediakit { -void TsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) { - _frame_cache.clear(); - _stamp[TrackAudio].setRelativeStamp(0); - _stamp[TrackVideo].setRelativeStamp(0); - _stamp[TrackAudio].syncTo(_stamp[TrackVideo]); - setPlayPosition(0); - - _delegate.setTrackListener(listener); - - //每50毫秒执行一次 - weak_ptr weak_self = shared_from_this(); - _timer = std::make_shared(0.05f, [weak_self]() { - auto strong_self = weak_self.lock(); - if (!strong_self) { - return false; - } - strong_self->onTick(); - return true; - }, poller); -} - -bool TsDemuxer::inputFrame(const Frame::Ptr &frame) { - //为了避免track准备时间过长, 因此在没准备好之前, 直接消费掉所有的帧 - if (!_delegate.isAllTrackReady()) { - _delegate.inputFrame(frame); - return true; - } - //计算相对时间戳 - int64_t dts, pts; - //根据时间戳缓存frame - _stamp[frame->getTrackType()].revise(frame->dts(), frame->pts(), dts, pts); - _frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame)); - - if (getBufferMS() > 30 * 1000) { - //缓存超过30秒,强制消费至15秒(减少延时或内存占用) - while (getBufferMS() > 15 * 1000) { - _delegate.inputFrame(_frame_cache.begin()->second); - _frame_cache.erase(_frame_cache.begin()); - } - //接着播放缓存中最早的帧 - setPlayPosition(_frame_cache.begin()->first); - } - return true; -} - -int64_t TsDemuxer::getPlayPosition() { - return _ticker.elapsedTime() + _ticker_offset; -} - -int64_t TsDemuxer::getBufferMS() { - if (_frame_cache.empty()) { - return 0; - } - return _frame_cache.rbegin()->first - _frame_cache.begin()->first; -} - -void TsDemuxer::setPlayPosition(int64_t pos) { - _ticker.resetTime(); - _ticker_offset = pos; -} - -void TsDemuxer::onTick() { - auto it = _frame_cache.begin(); - while (it != _frame_cache.end()) { - if (it->first > getPlayPosition()) { - //这些帧还未到时间播放 - break; - } - if (getBufferMS() < 3 * 1000) { - //缓存小于3秒,那么降低定时器消费速度(让剩余的数据在3秒后消费完毕) - //目的是为了防止定时器长时间干等后,数据瞬间消费完毕 - setPlayPosition(_frame_cache.begin()->first); - } - //消费掉已经到期的帧 - _delegate.inputFrame(it->second); - it = _frame_cache.erase(it); - } -} - -////////////////////////////////////////////////////////////////////////// - TsPlayerImp::TsPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller) {} void TsPlayerImp::onPacket(const char *data, size_t len) { - if (!_decoder) { + if (!_decoder && _demuxer) { _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); } @@ -106,15 +26,14 @@ void TsPlayerImp::onPacket(const char *data, size_t len) { } void TsPlayerImp::addTrackCompleted() { - PlayerImp::onPlayResult(SockException(Err_success, "play hls success")); + PlayerImp::onPlayResult(SockException(Err_success, "play http-ts success")); } void TsPlayerImp::onPlayResult(const SockException &ex) { - WarnL << ex.getErrCode() << " " << ex.what(); if (ex) { PlayerImp::onPlayResult(ex); } else { - auto demuxer = std::make_shared(); + auto demuxer = std::make_shared(); demuxer->start(getPoller(), this); _demuxer = std::move(demuxer); } @@ -126,7 +45,7 @@ void TsPlayerImp::onShutdown(const SockException &ex) { } vector TsPlayerImp::getTracks(bool ready) const { - return static_pointer_cast(_demuxer)->getTracks(ready); + return static_pointer_cast(_demuxer)->getTracks(ready); } }//namespace mediakit \ No newline at end of file diff --git a/src/Http/WebSocketClient.h b/src/Http/WebSocketClient.h index 08a23dbb..5152bd57 100644 --- a/src/Http/WebSocketClient.h +++ b/src/Http/WebSocketClient.h @@ -118,10 +118,8 @@ protected: * 收到http回复头 * @param status 状态码,譬如:200 OK * @param headers http头 - * @return 返回后续content的长度;-1:后续数据全是content;>=0:固定长度content - * 需要指出的是,在http头中带有Content-Length字段时,该返回值无效 */ - ssize_t onResponseHeader(const string &status,const HttpHeader &headers) override { + void onResponseHeader(const string &status,const HttpHeader &headers) override { if(status == "101"){ auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(_Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); if(Sec_WebSocket_Accept == const_cast(headers)["Sec-WebSocket-Accept"]){ @@ -129,26 +127,24 @@ protected: onWebSocketException(SockException()); //防止ws服务器返回Content-Length const_cast(headers).erase("Content-Length"); - //后续全是websocket负载数据 - return -1; + return; } - shutdown(SockException(Err_shutdown,StrPrinter << "Sec-WebSocket-Accept mismatch")); - return 0; + shutdown(SockException(Err_shutdown, StrPrinter << "Sec-WebSocket-Accept mismatch")); + return; } shutdown(SockException(Err_shutdown,StrPrinter << "bad http status code:" << status)); - return 0; }; /** * 接收http回复完毕, */ - void onResponseCompleted() override {} + void onResponseCompleted(const SockException &ex) override {} /** * 接收websocket负载数据 */ - void onResponseBody(const char *buf,size_t size,size_t recvedSize,size_t totalSize) override{ + void onResponseBody(const char *buf,size_t size) override{ if(_onRecv){ //完成websocket握手后,拦截websocket数据并解析 _onRecv(buf, size); diff --git a/tests/test_httpClient.cpp b/tests/test_httpClient.cpp index 869c5121..567b17a1 100644 --- a/tests/test_httpClient.cpp +++ b/tests/test_httpClient.cpp @@ -49,15 +49,15 @@ int main(int argc, char *argv[]) { for (auto &url : urlList) { //创建下载器 HttpDownloader::Ptr downloader(new HttpDownloader()); - downloader->setOnResult([](ErrCode code, const string &errMsg, const string &filePath) { + downloader->setOnResult([](const SockException &ex, const string &filePath) { DebugL << "=====================HttpDownloader result======================="; //下载结果回调 - if (code == Err_success) { + if (!ex) { //文件下载成功 InfoL << "download file success:" << filePath; } else { //下载失败 - WarnL << "code:" << code << " msg:" << errMsg; + WarnL << "code:" << ex.getErrCode() << " msg:" << ex.what(); } }); //断点续传功能,开启后可能会遇到416的错误(因为上次文件已经下载完全)