diff --git a/README.md b/README.md index 33b99c00..4461ed8a 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ ## 项目特点 - 基于C++11开发,避免使用裸指针,代码稳定可靠,性能优越。 -- 支持多种协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV/GB28181/MP4),支持协议互转。 +- 支持多种协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV/GB28181/HTTP-TS/Websocket-TS/MP4),支持协议互转。 - 使用多路复用/多线程/异步网络IO模式开发,并发性能优越,支持海量客户端连接。 - 代码经过长期大量的稳定性、性能测试,已经在线上商用验证已久。 - 支持linux、macos、ios、android、windows全平台。 @@ -59,6 +59,10 @@ - 通过cookie追踪技术,可以模拟HLS播放为长连接,可以实现HLS按需拉流、播放统计等业务 - 支持HLS播发器,支持拉流HLS转rtsp/rtmp/mp4 - 支持H264/H265/AAC/G711/OPUS编码 + +- TS + - 支持http[s]-ts直播 + - 支持ws[s]-ts直播 - HTTP[S]与WebSocket - 服务器支持`目录索引生成`,`文件下载`,`表单提交请求` diff --git a/README_en.md b/README_en.md index 71ef5d6d..dc521043 100644 --- a/README_en.md +++ b/README_en.md @@ -11,7 +11,7 @@ ## Why ZLMediaKit? - Developed based on C++ 11, the code is stable and reliable, avoiding the use of raw pointers, cross-platform porting is simple and convenient, and the code is clear and concise. -- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV/Websocket-flv`),and support Inter-protocol conversion. +- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV/WebSocket-flv/HTTP-TS/WebSocket-TS`),and support Inter-protocol conversion. - Multiplexing asynchronous network IO based on epoll and multi thread,extreme performance. - Well performance and stable test,can be used commercially. - Support linux, macos, ios, android, Windows Platforms. @@ -31,7 +31,7 @@ - RTMP[S] - RTMP[S] server,support player and pusher. - RTMP[S] player and pusher. - - Support HTTP-FLV player. + - Support HTTP-FLV/WebSocket-FLV sever. - H265/H264/AAC/G711/OPUS codec. - Recorded as flv or mp4. - Vod of mp4. @@ -41,6 +41,9 @@ - RTSP RTMP can be converted into HLS,built-in HTTP server. - Play authentication based on cookie. - Support HLS player, support streaming HLS proxy to RTSP / RTMP / MP4. + +- TS + - Support HTTP-TS/WebSocket-TS sever. - HTTP[S] - HTTP server,suppor directory meun、RESTful http api. diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 0bbe8959..caeac717 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -32,6 +32,8 @@ MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, con if (enable_mp4) { _mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream); } + + _ts = std::make_shared(vhost, app, stream); } void MultiMuxerPrivate::resetTracks() { @@ -41,6 +43,9 @@ void MultiMuxerPrivate::resetTracks() { if (_rtsp) { _rtsp->resetTracks(); } + if (_ts) { + _ts->resetTracks(); + } //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 auto hls = _hls; @@ -62,6 +67,9 @@ void MultiMuxerPrivate::setMediaListener(const std::weak_ptr & if (_rtsp) { _rtsp->setListener(listener); } + if (_ts) { + _ts->setListener(listener); + } auto hls = _hls; if (hls) { hls->setListener(listener); @@ -70,7 +78,10 @@ void MultiMuxerPrivate::setMediaListener(const std::weak_ptr & int MultiMuxerPrivate::totalReaderCount() const { auto hls = _hls; - return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0) + (hls ? hls->readerCount() : 0); + return (_rtsp ? _rtsp->readerCount() : 0) + + (_rtmp ? _rtmp->readerCount() : 0) + + (_ts ? _ts->readerCount() : 0) + + (hls ? hls->readerCount() : 0) ; } static std::shared_ptr makeRecorder(const vector &tracks, Recorder::type type, const string &custom_path, MediaSource &sender){ @@ -145,6 +156,9 @@ void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) { if (_rtsp) { _rtsp->addTrack(track); } + if (_ts) { + _ts->addTrack(track); + } //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 auto hls = _hls; @@ -161,6 +175,7 @@ bool MultiMuxerPrivate::isEnabled(){ auto hls = _hls; return (_rtmp ? _rtmp->isEnabled() : false) || (_rtsp ? _rtsp->isEnabled() : false) || + (_ts ? _ts->isEnabled() : false) || (hls ? hls->isEnabled() : false) || _mp4; } @@ -171,6 +186,10 @@ void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) { if (_rtsp) { _rtsp->inputFrame(frame); } + if (_ts) { + _ts->inputFrame(frame); + } + //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 //此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优 auto hls = _hls; diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 7e38a4f8..b1c64751 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -18,6 +18,7 @@ #include "Record/HlsMediaSource.h" #include "Rtsp/RtspMediaSourceMuxer.h" #include "Rtmp/RtmpMediaSourceMuxer.h" +#include "TS/TSMediaSourceMuxer.h" namespace mediakit{ @@ -56,6 +57,7 @@ private: RtspMediaSourceMuxer::Ptr _rtsp; HlsRecorder::Ptr _hls; MediaSinkInterface::Ptr _mp4; + TSMediaSourceMuxer::Ptr _ts; std::weak_ptr _listener; }; diff --git a/src/Common/config.h b/src/Common/config.h index 7038ad40..49e13615 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -47,6 +47,7 @@ bool loadIniConfig(const char *ini_path = nullptr); #define RTSP_SCHEMA "rtsp" #define RTMP_SCHEMA "rtmp" #define HLS_SCHEMA "hls" +#define TS_SCHEMA "ts" #define DEFAULT_VHOST "__defaultVhost__" ////////////广播名称/////////// diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 3f173b05..30bd0049 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -8,15 +8,9 @@ * may be found in the AUTHORS file in the root of the source tree. */ -#if !defined(_WIN32) -#include -#endif //!defined(_WIN32) - #include #include #include -#include - #include "Common/config.h" #include "strCoding.h" #include "HttpSession.h" @@ -96,10 +90,10 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { } void HttpSession::onError(const SockException& err) { - if(_is_flv_stream){ + if(_is_live_stream){ uint64_t duration = _ticker.createdTime()/1000; - //flv播放器 - WarnP(this) << "FLV播放器(" + //flv/ts播放器 + WarnP(this) << "FLV/TS播放器(" << _mediaInfo._vhost << "/" << _mediaInfo._app << "/" << _mediaInfo._streamid @@ -107,8 +101,8 @@ void HttpSession::onError(const SockException& err) { << ",耗时(s):" << duration; GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); - if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, static_cast(*this)); + if(_total_bytes_usage > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _total_bytes_usage, duration , true, static_cast(*this)); } return; } @@ -135,8 +129,7 @@ bool HttpSession::checkWebSocket(){ if (Sec_WebSocket_Key.empty()) { return false; } - auto Sec_WebSocket_Accept = encodeBase64( - SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); + auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); KeyValue headerOut; headerOut["Upgrade"] = "websocket"; @@ -147,17 +140,23 @@ bool HttpSession::checkWebSocket(){ } auto res_cb = [this, headerOut]() { - _flv_over_websocket = true; + _live_over_websocket = true; sendResponse("101 Switching Protocols", false, nullptr, headerOut, nullptr, true); }; //判断是否为websocket-flv - if (checkLiveFlvStream(res_cb)) { + if (checkLiveStreamFlv(res_cb)) { //这里是websocket-flv直播请求 return true; } - //如果checkLiveFlvStream返回false,则代表不是websocket-flv,而是普通的websocket连接 + //判断是否为websocket-ts + if (checkLiveStreamTS(res_cb)) { + //这里是websocket-ts直播请求 + return true; + } + + //这是普通的websocket连接 if (!onWebSocketConnect(_parser)) { sendResponse("501 Not Implemented", true, nullptr, headerOut); return true; @@ -166,75 +165,63 @@ bool HttpSession::checkWebSocket(){ return true; } -//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 -//如果url(除去?以及后面的参数)后缀是.flv,那么表明该url是一个http-flv直播。 -bool HttpSession::checkLiveFlvStream(const function &cb){ - auto pos = strrchr(_parser.Url().data(),'.'); - if(!pos){ - //未找到".flv"后缀 +bool HttpSession::checkLiveStream(const string &schema, const string &url_suffix, const function &cb){ + auto pos = strrchr(_parser.Url().data(), '.'); + if (!pos) { + //未找到后缀 return false; } - if(strcasecmp(pos,".flv") != 0){ - //未找到".flv"后缀 + if (strcasecmp(pos, url_suffix.data()) != 0) { + //未找到直播流后缀 return false; } - //这是个.flv的流 - _mediaInfo.parse(string(RTMP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl()); - if(_mediaInfo._app.empty() || _mediaInfo._streamid.size() < 5){ + //这是个符合后缀的直播的流 + _mediaInfo.parse(schema + "://" + _parser["Host"] + _parser.FullUrl()); + if (_mediaInfo._app.empty() || _mediaInfo._streamid.size() < url_suffix.size() + 1) { //url不合法 return false; } - _mediaInfo._streamid.erase(_mediaInfo._streamid.size() - 4);//去除.flv后缀 - bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); - - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + //去除后缀 + bool close_flag = !strcasecmp(_parser["Connection"].data(), "close"); + //流id去除后缀 + _mediaInfo._streamid.erase(_mediaInfo._streamid.size() - url_suffix.size()); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); //鉴权结果回调 - auto onRes = [cb, weakSelf, bClose](const string &err){ - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + auto onRes = [cb, weak_self, close_flag](const string &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } - if(!err.empty()){ + if (!err.empty()) { //播放鉴权失败 - strongSelf->sendResponse("401 Unauthorized", bClose, nullptr, KeyValue(), std::make_shared(err)); + strong_self->sendResponse("401 Unauthorized", close_flag, nullptr, KeyValue(), std::make_shared(err)); return; } - //异步查找rtmp流 - MediaSource::findAsync(strongSelf->_mediaInfo, strongSelf, [weakSelf, bClose, cb](const MediaSource::Ptr &src) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + //异步查找直播流 + MediaSource::findAsync(strong_self->_mediaInfo, strong_self, [weak_self, close_flag, cb](const MediaSource::Ptr &src) { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } - auto rtmp_src = dynamic_pointer_cast(src); - if (!rtmp_src) { + if (!src) { //未找到该流 - strongSelf->sendNotFound(bClose); + strong_self->sendNotFound(close_flag); return; } - - if (!cb) { - //找到rtmp源,发送http头,负载后续发送 - strongSelf->sendResponse("200 OK", false, "video/x-flv", KeyValue(), nullptr, true); - } else { - //自定义发送http头 - cb(); - } - - //http-flv直播牺牲延时提升发送性能 - strongSelf->setSocketFlags(); - strongSelf->start(strongSelf->getPoller(), rtmp_src); - strongSelf->_is_flv_stream = true; + strong_self->_is_live_stream = true; + //触发回调 + cb(src); }); }; - Broadcast::AuthInvoker invoker = [weakSelf, onRes](const string &err) { - auto strongSelf = weakSelf.lock(); + Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) { + auto strongSelf = weak_self.lock(); if (!strongSelf) { return; } @@ -251,34 +238,91 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ return true; } +//http-ts 链接格式:http://vhost-url:port/app/streamid.ts?key1=value1&key2=value2 +//如果url(除去?以及后面的参数)后缀是.ts,那么表明该url是一个http-ts直播。 +bool HttpSession::checkLiveStreamTS(const function &cb){ + return checkLiveStream(TS_SCHEMA, ".ts", [this, cb](const MediaSource::Ptr &src) { + auto ts_src = dynamic_pointer_cast(src); + assert(ts_src); + if (!cb) { + //找到源,发送http头,负载后续发送 + sendResponse("200 OK", false, "video/mp2t", KeyValue(), nullptr, true); + } else { + //自定义发送http头 + cb(); + } + + //直播牺牲延时提升发送性能 + setSocketFlags(); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _ts_reader = ts_src->getRing()->attach(getPoller()); + _ts_reader->setReadCB([weakSelf](const TSMediaSource::RingDataType &ts_list) { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + //本对象已经销毁 + return; + } + int i = 0; + int size = ts_list->size(); + strongSelf->setSendFlushFlag(false); + ts_list->for_each([&](const TSPacket::Ptr &ts) { + strongSelf->onWrite(ts, ++i == size); + }); + }); + }); +} + +//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 +//如果url(除去?以及后面的参数)后缀是.flv,那么表明该url是一个http-flv直播。 +bool HttpSession::checkLiveStreamFlv(const function &cb){ + return checkLiveStream(RTMP_SCHEMA, ".flv", [this, cb](const MediaSource::Ptr &src) { + auto rtmp_src = dynamic_pointer_cast(src); + assert(rtmp_src); + if (!cb) { + //找到源,发送http头,负载后续发送 + sendResponse("200 OK", false, "video/x-flv", KeyValue(), nullptr, true); + } else { + //自定义发送http头 + cb(); + } + //直播牺牲延时提升发送性能 + setSocketFlags(); + start(getPoller(), rtmp_src); + }); +} + void HttpSession::Handle_Req_GET(int64_t &content_len) { Handle_Req_GET_l(content_len, true); } void HttpSession::Handle_Req_GET_l(int64_t &content_len, bool sendBody) { //先看看是否为WebSocket请求 - if(checkWebSocket()){ + if (checkWebSocket()) { content_len = -1; - _contentCallBack = [this](const char *data,uint64_t len){ - WebSocketSplitter::decode((uint8_t *)data,len); + _contentCallBack = [this](const char *data, uint64_t len) { + WebSocketSplitter::decode((uint8_t *) data, len); //_contentCallBack是可持续的,后面还要处理后续数据 return true; }; return; } - if(emitHttpEvent(false)){ + if (emitHttpEvent(false)) { //拦截http api事件 return; } - if(checkLiveFlvStream()){ + if (checkLiveStreamFlv()) { //拦截http-flv播放器 return; } - bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); + if (checkLiveStreamTS()) { + //拦截http-ts播放器 + return; + } + bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); HttpFileManager::onAccessPath(*this, _parser, [weakSelf, bClose](const string &status_code, const string &content_type, const StrCaseMap &responseHeader, const HttpBody::Ptr &body) { @@ -623,26 +667,26 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) { } _ticker.resetTime(); - if(!_flv_over_websocket){ - _ui64TotalBytes += buffer->size(); + if (!_live_over_websocket) { + _total_bytes_usage += buffer->size(); send(buffer); - }else{ + } else { WebSocketHeader header; header._fin = true; header._reserved = 0; header._opcode = WebSocketHeader::BINARY; header._mask_flag = false; - WebSocketSplitter::encode(header,buffer); + WebSocketSplitter::encode(header, buffer); } - if(flush){ + if (flush) { //本次刷新缓存后,下次不用刷新缓存 HttpSession::setSendFlushFlag(false); } } void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){ - _ui64TotalBytes += buffer->size(); + _total_bytes_usage += buffer->size(); send(buffer); } diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index b842457b..7430a06e 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -19,6 +19,7 @@ #include "WebSocketSplitter.h" #include "HttpCookieManager.h" #include "HttpFileManager.h" +#include "TS/TSMediaSource.h" using namespace std; using namespace toolkit; @@ -104,7 +105,11 @@ private: void Handle_Req_POST(int64_t &content_len); void Handle_Req_HEAD(int64_t &content_len); - bool checkLiveFlvStream(const function &cb = nullptr); + bool checkLiveStream(const string &schema, const string &url_suffix, const function &cb); + + bool checkLiveStreamFlv(const function &cb = nullptr); + bool checkLiveStreamTS(const function &cb = nullptr); + bool checkWebSocket(); bool emitHttpEvent(bool doInvoke); void urlDecode(Parser &parser); @@ -117,17 +122,17 @@ private: void setSocketFlags(); private: + bool _is_live_stream = false; + bool _live_over_websocket = false; + //消耗的总流量 + uint64_t _total_bytes_usage = 0; string _origin; Parser _parser; Ticker _ticker; - //消耗的总流量 - uint64_t _ui64TotalBytes = 0; - //flv over http MediaInfo _mediaInfo; + TSMediaSource::RingType::RingReader::Ptr _ts_reader; //处理content数据的callback function _contentCallBack; - bool _flv_over_websocket = false; - bool _is_flv_stream = false; }; diff --git a/src/TS/TSMediaSource.h b/src/TS/TSMediaSource.h new file mode 100644 index 00000000..f030828b --- /dev/null +++ b/src/TS/TSMediaSource.h @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_TSMEDIASOURCE_H +#define ZLMEDIAKIT_TSMEDIASOURCE_H + +#include "Common/MediaSource.h" +using namespace toolkit; +#define TS_GOP_SIZE 512 + +namespace mediakit { + +//TS直播数据包 +class TSPacket : public BufferRaw{ +public: + using Ptr = std::shared_ptr; + + template + TSPacket(ARGS && ...args) : BufferRaw(std::forward(args)...) {}; + ~TSPacket() override = default; + +public: + uint32_t time_stamp = 0; +}; + +//TS直播合并写策略类 +class TSFlushPolicy : public FlushPolicy{ +public: + TSFlushPolicy() = default; + ~TSFlushPolicy() = default; + + uint32_t getStamp(const TSPacket::Ptr &packet) { + return packet->time_stamp; + } +}; + +//TS直播源 +class TSMediaSource : public MediaSource, public RingDelegate, public PacketCache{ +public: + using PoolType = ResourcePool; + using Ptr = std::shared_ptr; + using RingDataType = std::shared_ptr >; + using RingType = RingBuffer; + + TSMediaSource(const string &vhost, + const string &app, + const string &stream_id, + int ring_size = TS_GOP_SIZE) : MediaSource(TS_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {} + + ~TSMediaSource() override = default; + + /** + * 获取媒体源的环形缓冲 + */ + const RingType::Ptr &getRing() const { + return _ring; + } + + /** + * 获取播放器个数 + */ + int readerCount() override { + return _ring ? _ring->readerCount() : 0; + } + + /** + * 输入TS包 + * @param packet TS包 + * @param key 是否为关键帧第一个包 + */ + void onWrite(const TSPacket::Ptr &packet, bool key) override { + if (!_ring) { + createRing(); + } + if (key) { + _have_video = true; + } + PacketCache::inputPacket(true, packet, key); + } + + /** + * 情况GOP缓存 + */ + void clearCache() override { + PacketCache::clearCache(); + _ring->clearCache(); + } + +private: + void createRing(){ + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _ring = std::make_shared(_ring_size, [weak_self](int size) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + strong_self->onReaderChanged(size); + }); + onReaderChanged(0); + //注册媒体源 + regist(); + } + + /** + * 合并写回调 + * @param packet_list 合并写缓存列队 + * @param key_pos 是否包含关键帧 + */ + void onFlush(std::shared_ptr > &packet_list, bool key_pos) override { + //如果不存在视频,那么就没有存在GOP缓存的意义,所以确保一直清空GOP缓存 + _ring->write(packet_list, _have_video ? key_pos : true); + } + +private: + bool _have_video = false; + int _ring_size; + RingType::Ptr _ring; +}; + + +}//namespace mediakit +#endif //ZLMEDIAKIT_TSMEDIASOURCE_H diff --git a/src/TS/TSMediaSourceMuxer.h b/src/TS/TSMediaSourceMuxer.h new file mode 100644 index 00000000..373475bb --- /dev/null +++ b/src/TS/TSMediaSourceMuxer.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_TSMEDIASOURCEMUXER_H +#define ZLMEDIAKIT_TSMEDIASOURCEMUXER_H + +#include "TSMediaSource.h" +#include "Record/TsMuxer.h" + +namespace mediakit { + +class TSMediaSourceMuxer : public TsMuxer, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { +public: + using Ptr = std::shared_ptr; + + TSMediaSourceMuxer(const string &vhost, + const string &app, + const string &stream_id) { + _media_src = std::make_shared(vhost, app, stream_id); + _pool.setSize(256); + } + + ~TSMediaSourceMuxer() override = default; + + void setListener(const std::weak_ptr &listener){ + _listener = listener; + _media_src->setListener(shared_from_this()); + } + + int readerCount() const{ + return _media_src->readerCount(); + } + + void onReaderChanged(MediaSource &sender, int size) override { + _enabled = size; + if (!size) { + _clear_cache = true; + } + MediaSourceEventInterceptor::onReaderChanged(sender, size); + } + + void inputFrame(const Frame::Ptr &frame) override { + if (_clear_cache) { + _clear_cache = false; + _media_src->clearCache(); + } + if (_enabled) { + TsMuxer::inputFrame(frame); + } + } + + bool isEnabled() { + //缓存尚未清空时,还允许触发inputFrame函数,以便及时清空缓存 + return _clear_cache ? true : _enabled; + } + +protected: + void onTs(const void *data, int len,uint32_t timestamp,bool is_idr_fast_packet) override{ + if(!data || !len){ + return; + } + TSPacket::Ptr packet = _pool.obtain(); + packet->assign((char *) data, len); + packet->time_stamp = timestamp; + _media_src->onWrite(packet, is_idr_fast_packet); + } + +private: + bool _enabled = true; + bool _clear_cache = false; + TSMediaSource::PoolType _pool; + TSMediaSource::Ptr _media_src; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_TSMEDIASOURCEMUXER_H