From 64f15202de4222b98bf3a5b103daecb848fc63d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=8F=E6=A5=9A?= <771730766@qq.com> Date: Sat, 9 Dec 2023 22:34:22 +0800 Subject: [PATCH] Support multi audio/video track --- README.md | 6 ++ server/WebApi.cpp | 2 + src/Common/MediaSink.cpp | 140 ++++++++++++++------------- src/Common/MediaSink.h | 14 ++- src/Common/MediaSource.h | 4 + src/Common/MultiMediaSourceMuxer.cpp | 27 ++++-- src/Common/MultiMediaSourceMuxer.h | 3 +- src/Extension/AAC.cpp | 1 + src/Extension/Frame.cpp | 1 + src/Extension/Frame.h | 19 ++++ src/Extension/H264.cpp | 2 + src/Extension/H265.cpp | 3 + src/Extension/Track.h | 5 +- src/Player/PlayerProxy.cpp | 9 ++ src/Record/MP4Demuxer.cpp | 17 ++-- src/Record/MP4Demuxer.h | 2 +- src/Record/MP4Muxer.cpp | 104 +++++++------------- src/Record/MP4Muxer.h | 12 ++- src/Record/MP4Reader.cpp | 3 +- src/Record/MPEG.cpp | 28 +++--- src/Record/MPEG.h | 13 ++- src/Rtmp/RtmpMuxer.cpp | 32 +++--- src/Rtmp/RtmpMuxer.h | 7 +- src/Rtp/Decoder.cpp | 46 +++++---- src/Rtp/Decoder.h | 13 ++- src/Rtp/GB28181Process.cpp | 14 ++- src/Rtp/PSEncoder.cpp | 2 +- src/Rtp/RawEncoder.cpp | 2 +- src/Rtp/RtpProcess.cpp | 3 +- src/Rtsp/RtpCodec.cpp | 1 + src/Rtsp/RtpCodec.h | 8 +- src/Rtsp/Rtsp.h | 2 +- src/Rtsp/RtspMuxer.cpp | 58 ++++++----- src/Rtsp/RtspMuxer.h | 15 ++- srt/SrtTransportImp.cpp | 24 +---- srt/SrtTransportImp.hpp | 2 - 36 files changed, 366 insertions(+), 278 deletions(-) diff --git a/README.md b/README.md index a3d2faae..56f29476 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ - 极致体验,[独家特性](https://github.com/ZLMediaKit/ZLMediaKit/wiki/ZLMediakit%E7%8B%AC%E5%AE%B6%E7%89%B9%E6%80%A7%E4%BB%8B%E7%BB%8D) - [谁在使用zlmediakit?](https://github.com/ZLMediaKit/ZLMediaKit/issues/511) - 全面支持ipv6网络 +- 支持多轨道模式(一个流中多个视频/音频) ## 项目定位 @@ -76,16 +77,19 @@ - 通过cookie追踪技术,可以模拟HLS播放为长连接,可以实现HLS按需拉流、播放统计等业务 - 支持HLS播发器,支持拉流HLS转rtsp/rtmp/mp4 - 支持H264/H265/AAC/G711/OPUS编码 + - 支持多轨道模式 - TS - 支持http[s]-ts直播 - 支持ws[s]-ts直播 - 支持H264/H265/AAC/G711/OPUS编码 + - 支持多轨道模式 - fMP4 - 支持http[s]-fmp4直播 - 支持ws[s]-fmp4直播 - 支持H264/H265/AAC/G711/OPUS/MJPEG编码 + - 支持多轨道模式 - HTTP[S]与WebSocket - 服务器支持`目录索引生成`,`文件下载`,`表单提交请求` @@ -104,11 +108,13 @@ - 支持es/ps rtp转推 - 支持GB28181主动拉流模式 - 支持双向语音对讲 + - 支持多轨道模式 - MP4点播与录制 - 支持录制为FLV/HLS/MP4 - RTSP/RTMP/HTTP-FLV/WS-FLV支持MP4文件点播,支持seek - 支持H264/H265/AAC/G711/OPUS编码 + - 支持多轨道模式 - WebRTC - 支持WebRTC推流,支持转其他协议 diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 0985604f..c80774cc 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1818,6 +1818,8 @@ void installWebApi() { CHECK_ARGS("vhost", "app", "stream", "file_path"); ProtocolOption option; + // mp4支持多track + option.max_track = 16; // 默认解复用mp4不生成mp4 option.enable_mp4 = false; // 但是如果参数明确指定开启mp4, 那么也允许之 diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp index adae1738..3fc5a9ea 100644 --- a/src/Common/MediaSink.cpp +++ b/src/Common/MediaSink.cpp @@ -12,6 +12,8 @@ #include "Common/config.h" #include "Extension/Factory.h" +#define MUTE_AUDIO_INDEX 0xFFFF + using namespace std; namespace mediakit{ @@ -33,28 +35,30 @@ bool MediaSink::addTrack(const Track::Ptr &track_in) { WarnL << "All track is ready, add track too late: " << track_in->getCodecName(); return false; } - //克隆Track,只拷贝其数据,不拷贝其数据转发关系 + // 克隆Track,只拷贝其数据,不拷贝其数据转发关系 auto track = track_in->clone(); - auto track_type = track->getTrackType(); - _track_map[track_type] = std::make_pair(track, false); - _track_ready_callback[track_type] = [this, track]() { - onTrackReady(track); - }; + auto index = track->getIndex(); + if (!_track_map.emplace(index, std::make_pair(track, false)).second) { + WarnL << "Already add a same track: " << track->getIndex() << ", codec: " << track->getCodecName(); + return false; + } _ticker.resetTime(); + _audio_add = track->getTrackType() == TrackAudio ? true : _audio_add; + _track_ready_callback[index] = [this, track]() { onTrackReady(track); }; track->addDelegate([this](const Frame::Ptr &frame) { if (_all_track_ready) { return onTrackFrame(frame); } - auto &frame_unread = _frame_unread[frame->getTrackType()]; + auto &frame_unread = _frame_unread[frame->getIndex()]; GET_CONFIG(uint32_t, kMaxUnreadyFrame, General::kUnreadyFrameCache); if (frame_unread.size() > kMaxUnreadyFrame) { - //未就绪的的track,不能缓存太多的帧,否则可能内存溢出 + // 未就绪的的track,不能缓存太多的帧,否则可能内存溢出 frame_unread.clear(); WarnL << "Cached frame of unready track(" << frame->getCodecName() << ") is too much, now cleared"; } - //还有Track未就绪,先缓存之 + // 还有Track未就绪,先缓存之 frame_unread.emplace_back(Frame::getCacheAbleFrame(frame)); return true; }); @@ -62,36 +66,37 @@ bool MediaSink::addTrack(const Track::Ptr &track_in) { } void MediaSink::resetTracks() { - _all_track_ready = false; + _audio_add = false; _have_video = false; - _track_map.clear(); - _track_ready_callback.clear(); + _all_track_ready = false; + _mute_audio_maker = nullptr; _ticker.resetTime(); - _max_track_size = 2; + _track_map.clear(); _frame_unread.clear(); + _track_ready_callback.clear(); } bool MediaSink::inputFrame(const Frame::Ptr &frame) { - auto it = _track_map.find(frame->getTrackType()); + auto it = _track_map.find(frame->getIndex()); if (it == _track_map.end()) { return false; } - //got frame + // got frame it->second.second = true; auto ret = it->second.first->inputFrame(frame); if (_mute_audio_maker && frame->getTrackType() == TrackVideo) { - //视频驱动产生静音音频 + // 视频驱动产生静音音频 _mute_audio_maker->inputFrame(frame); } checkTrackIfReady(); return ret; } -void MediaSink::checkTrackIfReady(){ +void MediaSink::checkTrackIfReady() { if (!_all_track_ready && !_track_ready_callback.empty()) { for (auto &pr : _track_map) { if (pr.second.second && pr.second.first->ready()) { - //Track由未就绪状态转换成就绪状态,我们就触发onTrackReady回调 + // Track由未就绪状态转换成就绪状态,我们就触发onTrackReady回调 auto it = _track_ready_callback.find(pr.first); if (it != _track_ready_callback.end()) { it->second(); @@ -101,28 +106,34 @@ void MediaSink::checkTrackIfReady(){ } } - if(!_all_track_ready){ + if (!_all_track_ready) { GET_CONFIG(uint32_t, kMaxWaitReadyMS, General::kWaitTrackReadyMS); - if(_ticker.elapsedTime() > kMaxWaitReadyMS){ - //如果超过规定时间,那么不再等待并忽略未准备好的Track + if (_ticker.elapsedTime() > kMaxWaitReadyMS) { + // 如果超过规定时间,那么不再等待并忽略未准备好的Track emitAllTrackReady(); return; } - if(!_track_ready_callback.empty()){ - //在超时时间内,如果存在未准备好的Track,那么继续等待 + if (!_track_ready_callback.empty()) { + // 在超时时间内,如果存在未准备好的Track,那么继续等待 return; } - if(_track_map.size() == _max_track_size){ - //如果已经添加了音视频Track,并且不存在未准备好的Track,那么说明所有Track都准备好了 + if (_only_audio && _audio_add) { + // 只开启音频 + emitAllTrackReady(); + return; + } + + if (_track_map.size() == _max_track_size) { + // 如果已经添加了音视频Track,并且不存在未准备好的Track,那么说明所有Track都准备好了 emitAllTrackReady(); return; } GET_CONFIG(uint32_t, kMaxAddTrackMS, General::kWaitAddTrackMS); - if(_track_map.size() == 1 && _ticker.elapsedTime() > kMaxAddTrackMS){ - //如果只有一个Track,那么在该Track添加后,我们最多还等待若干时间(可能后面还会添加Track) + if (_track_map.size() == 1 && _ticker.elapsedTime() > kMaxAddTrackMS) { + // 如果只有一个Track,那么在该Track添加后,我们最多还等待若干时间(可能后面还会添加Track) emitAllTrackReady(); return; } @@ -138,7 +149,7 @@ void MediaSink::setMaxTrackCount(size_t i) { WarnL << "All track is ready, set max track count ignored"; return; } - _max_track_size = MAX(MIN(i, 2), 1); + _max_track_size = MAX(i, 1); checkTrackIfReady(); } @@ -149,9 +160,9 @@ void MediaSink::emitAllTrackReady() { DebugL << "All track ready use " << _ticker.elapsedTime() << "ms"; if (!_track_ready_callback.empty()) { - //这是超时强制忽略未准备好的Track + // 这是超时强制忽略未准备好的Track _track_ready_callback.clear(); - //移除未准备好的Track + // 移除未准备好的Track for (auto it = _track_map.begin(); it != _track_map.end();) { if (!it->second.second || !it->second.first->ready()) { WarnL << "Track not ready for a long time, ignored: " << it->second.first->getCodecName(); @@ -163,25 +174,23 @@ void MediaSink::emitAllTrackReady() { } if (!_track_map.empty()) { - //最少有一个有效的Track + // 最少有一个有效的Track onAllTrackReady_l(); - //全部Track就绪,我们一次性把之前的帧输出 - for(auto &pr : _frame_unread){ + // 全部Track就绪,我们一次性把之前的帧输出 + for (auto &pr : _frame_unread) { if (_track_map.find(pr.first) == _track_map.end()) { - //该Track已经被移除 + // 该Track已经被移除 continue; } - pr.second.for_each([&](const Frame::Ptr &frame) { - MediaSink::inputFrame(frame); - }); + pr.second.for_each([&](const Frame::Ptr &frame) { MediaSink::inputFrame(frame); }); } _frame_unread.clear(); } } void MediaSink::onAllTrackReady_l() { - //是否添加静音音频 + // 是否添加静音音频 if (_add_mute_audio) { addMuteAudioTrack(); } @@ -190,10 +199,10 @@ void MediaSink::onAllTrackReady_l() { _have_video = (bool)getTrack(TrackVideo); } -vector MediaSink::getTracks(bool ready) const{ +vector MediaSink::getTracks(bool ready) const { vector ret; - for (auto &pr : _track_map){ - if(ready && !pr.second.first->ready()){ + for (auto &pr : _track_map) { + if (ready && !pr.second.first->ready()) { continue; } ret.emplace_back(pr.second.first); @@ -230,14 +239,20 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00, static uint8_t ADTS_CONFIG[2] = { 0x15, 0x88 }; bool MuteAudioMaker::inputFrame(const Frame::Ptr &frame) { - if (frame->getTrackType() == TrackVideo) { - auto audio_idx = frame->dts() / MUTE_ADTS_DATA_MS; - if (_audio_idx != audio_idx) { - _audio_idx = audio_idx; - auto aacFrame = std::make_shared>(CodecAAC, (char *) MUTE_ADTS_DATA, sizeof(s_mute_adts), - _audio_idx * MUTE_ADTS_DATA_MS, 0, 7); - return FrameDispatcher::inputFrame(aacFrame); - } + if (_track_index == -1) { + // 锁定track + _track_index = frame->getIndex(); + } + if (frame->getIndex() != _track_index) { + // 不是锁定的track + return false; + } + auto audio_idx = frame->dts() / MUTE_ADTS_DATA_MS; + if (_audio_idx != audio_idx) { + _audio_idx = audio_idx; + auto aacFrame = std::make_shared>(CodecAAC, (char *)MUTE_ADTS_DATA, sizeof(s_mute_adts), _audio_idx * MUTE_ADTS_DATA_MS, 0, 7); + aacFrame->setIndex(MUTE_AUDIO_INDEX); + return FrameDispatcher::inputFrame(aacFrame); } return false; } @@ -246,19 +261,18 @@ bool MediaSink::addMuteAudioTrack() { if (!_enable_audio) { return false; } - if (_track_map.find(TrackAudio) != _track_map.end()) { - return false; + for (auto &pr : _track_map) { + if (pr.second.first->getTrackType() == TrackAudio) { + return false; + } } auto audio = Factory::getTrackByCodecId(CodecAAC); + audio->setIndex(MUTE_AUDIO_INDEX); audio->setExtraData(ADTS_CONFIG, 2); - _track_map[audio->getTrackType()] = std::make_pair(audio, true); - audio->addDelegate([this](const Frame::Ptr &frame) { - return onTrackFrame(frame); - }); + _track_map[MUTE_AUDIO_INDEX] = std::make_pair(audio, true); + audio->addDelegate([this](const Frame::Ptr &frame) { return onTrackFrame(frame); }); _mute_audio_maker = std::make_shared(); - _mute_audio_maker->addDelegate([audio](const Frame::Ptr &frame) { - return audio->inputFrame(frame); - }); + _mute_audio_maker->addDelegate([audio](const Frame::Ptr &frame) { return audio->inputFrame(frame); }); onTrackReady(audio); TraceL << "Mute aac track added"; return true; @@ -270,14 +284,12 @@ bool MediaSink::isAllTrackReady() const { void MediaSink::enableAudio(bool flag) { _enable_audio = flag; - _max_track_size = flag ? 2 : 1; } -void MediaSink::setOnlyAudio(){ +void MediaSink::setOnlyAudio() { _only_audio = true; _enable_audio = true; _add_mute_audio = false; - _max_track_size = 1; } void MediaSink::enableMuteAudio(bool flag) { @@ -332,9 +344,7 @@ bool Demuxer::addTrack(const Track::Ptr &track) { } if (_sink->addTrack(track)) { - track->addDelegate([this](const Frame::Ptr &frame) { - return _sink->inputFrame(frame); - }); + track->addDelegate([this](const Frame::Ptr &frame) { return _sink->inputFrame(frame); }); return true; } return false; @@ -370,4 +380,4 @@ vector Demuxer::getTracks(bool ready) const { } return ret; } -}//namespace mediakit +} // namespace mediakit diff --git a/src/Common/MediaSink.h b/src/Common/MediaSink.h index c3cd94d6..683878e3 100644 --- a/src/Common/MediaSink.h +++ b/src/Common/MediaSink.h @@ -55,6 +55,7 @@ public: bool inputFrame(const Frame::Ptr &frame) override; private: + int _track_index = -1; uint64_t _audio_idx = 0; }; @@ -86,7 +87,7 @@ public: void addTrackCompleted() override; /** - * 设置最大track数,取值范围1~2;该方法与addTrackCompleted类型; + * 设置最大track数,取值范围>=1;该方法与addTrackCompleted类型; * 在设置单track时,可以加快媒体注册速度 */ void setMaxTrackCount(size_t i); @@ -163,17 +164,20 @@ private: bool addMuteAudioTrack(); private: + bool _audio_add = false; + bool _have_video = false; bool _enable_audio = true; bool _only_audio = false; bool _add_mute_audio = true; bool _all_track_ready = false; - bool _have_video = false; size_t _max_track_size = 2; - std::unordered_map > _track_map; - std::unordered_map > _frame_unread; - std::unordered_map > _track_ready_callback; + toolkit::Ticker _ticker; MuteAudioMaker::Ptr _mute_audio_maker; + + std::unordered_map > _frame_unread; + std::unordered_map > _track_ready_callback; + std::unordered_map > _track_map; }; diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 7d423f0b..4c6b33e9 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -202,6 +202,9 @@ public: // 支持通过on_publish返回值替换stream_id std::string stream_replace; + // 最大track数 + size_t max_track = 2; + template ProtocolOption(const MAP &allArgs) : ProtocolOption() { load(allArgs); @@ -237,6 +240,7 @@ public: GET_OPT_VALUE(hls_save_path); GET_OPT_VALUE(stream_replace); + GET_OPT_VALUE(max_track); } private: diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 0e91fe30..5b3b8d72 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -177,11 +177,8 @@ MultiMediaSourceMuxer::MultiMediaSourceMuxer(const MediaTuple& tuple, float dur_ _poller = EventPollerPool::Instance().getPoller(); _create_in_poller = _poller->isCurrentThread(); _option = option; - if (dur_sec > 0.01) { - // 点播 - _stamp[TrackVideo].setPlayBack(); - _stamp[TrackAudio].setPlayBack(); - } + _dur_sec = dur_sec; + setMaxTrackCount(option.max_track); if (option.enable_rtmp) { _rtmp = std::make_shared(_tuple, option, std::make_shared(dur_sec)); @@ -464,6 +461,12 @@ std::shared_ptr MultiMediaSourceMuxer::getMuxer(MediaSour } bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { + auto &stamp = _stamps[track->getIndex()]; + if (_dur_sec > 0.01) { + // 点播 + stamp.setPlayBack(); + } + bool ret = false; if (_rtmp) { ret = _rtmp->addTrack(track) ? true : ret; @@ -536,10 +539,14 @@ void MultiMediaSourceMuxer::onAllTrackReady() { createGopCacheIfNeed(); } #endif - auto tracks = getTracks(false); - if (tracks.size() >= 2) { - // 音频时间戳同步于视频,因为音频时间戳被修改后不影响播放 - _stamp[TrackAudio].syncTo(_stamp[TrackVideo]); + + Stamp *first = nullptr; + for (auto &pr : _stamps) { + if (!first) { + first = &pr.second; + } else { + pr.second.syncTo(*first); + } } InfoL << "stream: " << shortUrl() << " , codec info: " << getTrackInfoStr(this); } @@ -589,7 +596,7 @@ void MultiMediaSourceMuxer::resetTracks() { bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame) { if (_option.modify_stamp != ProtocolOption::kModifyStampOff) { // 时间戳不采用原始的绝对时间戳 - const_cast(frame) = std::make_shared(frame, _stamp[frame->getTrackType()], _option.modify_stamp); + const_cast(frame) = std::make_shared(frame, _stamps[frame->getIndex()], _option.modify_stamp); } return _paced_sender ? _paced_sender->inputFrame(frame) : onTrackFrame_l(frame); } diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 5a6adf12..9ca34370 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -162,11 +162,12 @@ private: bool _is_enable = false; bool _create_in_poller = false; bool _video_key_pos = false; + float _dur_sec; std::shared_ptr _paced_sender; MediaTuple _tuple; ProtocolOption _option; toolkit::Ticker _last_check; - Stamp _stamp[2]; + std::unordered_map _stamps; std::weak_ptr _track_listener; std::unordered_multimap _rtp_sender; FMP4MediaSourceMuxer::Ptr _fmp4; diff --git a/src/Extension/AAC.cpp b/src/Extension/AAC.cpp index ee4ad947..148504ac 100644 --- a/src/Extension/AAC.cpp +++ b/src/Extension/AAC.cpp @@ -275,6 +275,7 @@ static Frame::Ptr addADTSHeader(const Frame::Ptr &frame_in, const std::string &a frame->_dts = frame_in->dts(); frame->_buffer.assign(adts_header, size); frame->_buffer.append(frame_in->data(), frame_in->size()); + frame->setIndex(frame_in->getIndex()); return frame; } diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp index 5b01d1ca..27973f9f 100644 --- a/src/Extension/Frame.cpp +++ b/src/Extension/Frame.cpp @@ -42,6 +42,7 @@ Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){ FrameStamp::FrameStamp(Frame::Ptr frame, Stamp &stamp, int modify_stamp) { + setIndex(frame->getIndex()); _frame = std::move(frame); // kModifyStampSystem时采用系统时间戳,kModifyStampRelative采用相对时间戳 stamp.revise(_frame->dts(), _frame->pts(), _dts, _pts, modify_stamp == ProtocolOption::kModifyStampSystem); diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index 1bdf526c..da55e8fe 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -123,7 +123,24 @@ public: * 获取音视频类型 */ TrackType getTrackType() const; + + /** + * 获取音视频类型描述 + */ std::string getTrackTypeStr() const; + + /** + * 设置track index, 用于支持多track + */ + void setIndex(int index) { _index = index; } + + /** + * 获取track index, 用于支持多track + */ + int getIndex() const { return _index < 0 ? (int)getTrackType() : _index; } + +private: + int _index = -1; }; /** @@ -302,6 +319,7 @@ public: FrameInternalBase(Frame::Ptr parent_frame, char *ptr, size_t size, uint64_t dts, uint64_t pts = 0, size_t prefix_size = 0) : Parent(parent_frame->getCodecId(), ptr, size, dts, pts, prefix_size) { _parent_frame = std::move(parent_frame); + this->setIndex(_parent_frame->getIndex()); } bool cacheAble() const override { return _parent_frame->cacheAble(); } @@ -353,6 +371,7 @@ public: using Ptr = std::shared_ptr; FrameCacheAble(const Frame::Ptr &frame, bool force_key_frame = false, toolkit::Buffer::Ptr buf = nullptr) { + setIndex(frame->getIndex()); if (frame->cacheAble()) { _ptr = frame->data(); _buffer = frame; diff --git a/src/Extension/H264.cpp b/src/Extension/H264.cpp index 29931499..c147011e 100644 --- a/src/Extension/H264.cpp +++ b/src/Extension/H264.cpp @@ -285,6 +285,7 @@ void H264Track::insertConfigFrame(const Frame::Ptr &frame) { spsFrame->_buffer.assign("\x00\x00\x00\x01", 4); spsFrame->_buffer.append(_sps); spsFrame->_dts = frame->dts(); + spsFrame->setIndex(frame->getIndex()); VideoTrack::inputFrame(spsFrame); } @@ -294,6 +295,7 @@ void H264Track::insertConfigFrame(const Frame::Ptr &frame) { ppsFrame->_buffer.assign("\x00\x00\x00\x01", 4); ppsFrame->_buffer.append(_pps); ppsFrame->_dts = frame->dts(); + ppsFrame->setIndex(frame->getIndex()); VideoTrack::inputFrame(ppsFrame); } } diff --git a/src/Extension/H265.cpp b/src/Extension/H265.cpp index fb62e508..e8daf399 100644 --- a/src/Extension/H265.cpp +++ b/src/Extension/H265.cpp @@ -195,6 +195,7 @@ void H265Track::insertConfigFrame(const Frame::Ptr &frame) { vpsFrame->_buffer.assign("\x00\x00\x00\x01", 4); vpsFrame->_buffer.append(_vps); vpsFrame->_dts = frame->dts(); + vpsFrame->setIndex(frame->getIndex()); VideoTrack::inputFrame(vpsFrame); } if (!_sps.empty()) { @@ -203,6 +204,7 @@ void H265Track::insertConfigFrame(const Frame::Ptr &frame) { spsFrame->_buffer.assign("\x00\x00\x00\x01", 4); spsFrame->_buffer.append(_sps); spsFrame->_dts = frame->dts(); + spsFrame->setIndex(frame->getIndex()); VideoTrack::inputFrame(spsFrame); } @@ -212,6 +214,7 @@ void H265Track::insertConfigFrame(const Frame::Ptr &frame) { ppsFrame->_buffer.assign("\x00\x00\x00\x01", 4); ppsFrame->_buffer.append(_pps); ppsFrame->_dts = frame->dts(); + ppsFrame->setIndex(frame->getIndex()); VideoTrack::inputFrame(ppsFrame); } } diff --git a/src/Extension/Track.h b/src/Extension/Track.h index ec6d5cae..8264b3a8 100644 --- a/src/Extension/Track.h +++ b/src/Extension/Track.h @@ -34,7 +34,10 @@ public: * 复制拷贝,只能拷贝派生类的信息, * 环形缓存和代理关系不能拷贝,否则会关系紊乱 */ - Track(const Track &that) { _bit_rate = that._bit_rate; } + Track(const Track &that) { + _bit_rate = that._bit_rate; + setIndex(that.getIndex()); + } /** * 是否准备好,准备好才能获取譬如sps pps等信息 diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index a0a57ffe..0f31e33b 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -96,7 +96,16 @@ void PlayerProxy::setTranslationInfo() } } +static int getMaxTrackSize(const std::string &url) { + if (url.find(".m3u8") != std::string::npos || url.find(".ts") != std::string::npos) { + // hls和ts协议才开放多track支持 + return 16; + } + return 2; +} + void PlayerProxy::play(const string &strUrlTmp) { + _option.max_track = getMaxTrackSize(strUrlTmp); weak_ptr weakSelf = shared_from_this(); std::shared_ptr piFailedCnt(new int(0)); // 连续播放失败次数 setOnPlayResult([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) { diff --git a/src/Record/MP4Demuxer.cpp b/src/Record/MP4Demuxer.cpp index 652ac8c4..4d6800d2 100644 --- a/src/Record/MP4Demuxer.cpp +++ b/src/Record/MP4Demuxer.cpp @@ -61,7 +61,8 @@ void MP4Demuxer::onVideoTrack(uint32_t track, uint8_t object, int width, int hei if (!video) { return; } - _track_to_codec.emplace(track, video); + video->setIndex(track); + _tracks.emplace(track, video); if (extra && bytes) { video->setExtraData((uint8_t *)extra, bytes); } @@ -72,7 +73,8 @@ void MP4Demuxer::onAudioTrack(uint32_t track, uint8_t object, int channel_count, if (!audio) { return; } - _track_to_codec.emplace(track, audio); + audio->setIndex(track); + _tracks.emplace(track, audio); if (extra && bytes) { audio->setExtraData((uint8_t *)extra, bytes); } @@ -134,8 +136,8 @@ Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) { } Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int64_t pts, int64_t dts) { - auto it = _track_to_codec.find(track_id); - if (it == _track_to_codec.end()) { + auto it = _tracks.find(track_id); + if (it == _tracks.end()) { return nullptr; } Frame::Ptr ret; @@ -166,15 +168,16 @@ Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int6 } } if (ret) { + ret->setIndex(track_id); it->second->inputFrame(ret); } return ret; } -vector MP4Demuxer::getTracks(bool trackReady) const { +vector MP4Demuxer::getTracks(bool ready) const { vector ret; - for (auto &pr : _track_to_codec) { - if (trackReady && !pr.second->ready()) { + for (auto &pr : _tracks) { + if (ready && !pr.second->ready()) { continue; } ret.push_back(pr.second); diff --git a/src/Record/MP4Demuxer.h b/src/Record/MP4Demuxer.h index 166a1770..889f91fc 100644 --- a/src/Record/MP4Demuxer.h +++ b/src/Record/MP4Demuxer.h @@ -71,7 +71,7 @@ private: MP4FileDisk::Ptr _mp4_file; MP4FileDisk::Reader _mov_reader; uint64_t _duration_ms = 0; - std::map _track_to_codec; + std::unordered_map _tracks; toolkit::ResourcePool _buffer_pool; }; diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index c7490d99..37c6cfd7 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -60,7 +60,7 @@ bool MP4MuxerInterface::haveVideo() const { uint64_t MP4MuxerInterface::getDuration() const { uint64_t ret = 0; - for (auto &pr : _codec_to_trackid) { + for (auto &pr : _tracks) { if (pr.second.stamp.getRelativeStamp() > (int64_t)ret) { ret = pr.second.stamp.getRelativeStamp(); } @@ -72,61 +72,50 @@ void MP4MuxerInterface::resetTracks() { _started = false; _have_video = false; _mov_writter = nullptr; - _frame_merger.clear(); - _codec_to_trackid.clear(); + _tracks.clear(); } void MP4MuxerInterface::flush() { - _frame_merger.flush(); + for (auto &pr : _tracks) { + pr.second.merger.flush(); + } } bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { - auto it = _codec_to_trackid.find(frame->getCodecId()); - if (it == _codec_to_trackid.end()) { - //该Track不存在或初始化失败 + auto it = _tracks.find(frame->getIndex()); + if (it == _tracks.end()) { + // 该Track不存在或初始化失败 return false; } if (!_started) { - //该逻辑确保含有视频时,第一帧为关键帧 + // 该逻辑确保含有视频时,第一帧为关键帧 if (_have_video && !frame->keyFrame()) { - //含有视频,但是不是关键帧,那么前面的帧丢弃 + // 含有视频,但是不是关键帧,那么前面的帧丢弃 return false; } - //开始写文件 + // 开始写文件 _started = true; } - //mp4文件时间戳需要从0开始 - auto &track_info = it->second; + // mp4文件时间戳需要从0开始 + auto &track = it->second; switch (frame->getCodecId()) { case CodecH264: case CodecH265: { - //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - _frame_merger.inputFrame(frame, [this, &track_info](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { + // 这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, + track.merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { int64_t dts_out, pts_out; - track_info.stamp.revise(dts, pts, dts_out, pts_out); - mp4_writer_write(_mov_writter.get(), - track_info.track_id, - buffer->data(), - buffer->size(), - pts_out, - dts_out, - have_idr ? MOV_AV_FLAG_KEYFREAME : 0); + track.stamp.revise(dts, pts, dts_out, pts_out); + mp4_writer_write(_mov_writter.get(), track.track_id, buffer->data(), buffer->size(), pts_out, dts_out, have_idr ? MOV_AV_FLAG_KEYFREAME : 0); }); break; } default: { int64_t dts_out, pts_out; - track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); - mp4_writer_write(_mov_writter.get(), - track_info.track_id, - frame->data() + frame->prefixSize(), - frame->size() - frame->prefixSize(), - pts_out, - dts_out, - frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0); + track.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); + mp4_writer_write(_mov_writter.get(), track.track_id, frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize(), pts_out, dts_out, frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0); break; } } @@ -134,23 +123,14 @@ bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { } void MP4MuxerInterface::stampSync() { - if (_codec_to_trackid.size() < 2) { - return; - } - - Stamp *audio = nullptr, *video = nullptr; - for(auto &pr : _codec_to_trackid){ - switch (getTrackType((CodecId) pr.first)){ - case TrackAudio : audio = &pr.second.stamp; break; - case TrackVideo : video = &pr.second.stamp; break; - default : break; + Stamp *first = nullptr; + for (auto &pr : _tracks) { + if (!first) { + first = &pr.second.stamp; + } else { + pr.second.stamp.syncTo(*first); } } - - if (audio && video) { - //音频时间戳同步于视频,因为音频时间戳被修改后不影响播放 - audio->syncTo(*video); - } } bool MP4MuxerInterface::addTrack(const Track::Ptr &track) { @@ -164,7 +144,7 @@ bool MP4MuxerInterface::addTrack(const Track::Ptr &track) { } if (!track->ready()) { - WarnL << "Track[" << track->getCodecName() << "] unready"; + WarnL << "Track " << track->getCodecName() << " unready"; return false; } @@ -175,36 +155,26 @@ bool MP4MuxerInterface::addTrack(const Track::Ptr &track) { auto extra_size = extra ? extra->size() : 0; if (track->getTrackType() == TrackVideo) { auto video_track = dynamic_pointer_cast(track); - if (!video_track) { - WarnL << "不是VideoTrack"; - return false; - } - + CHECK(video_track); auto track_id = mp4_writer_add_video(_mov_writter.get(), mp4_object, video_track->getVideoWidth(), video_track->getVideoHeight(), extra_data, extra_size); if (track_id < 0) { - WarnL << "添加Video Track失败:" << video_track->getCodecName(); + WarnL << "mp4_writer_add_video failed: " << video_track->getCodecName(); return false; } - _codec_to_trackid[track->getCodecId()].track_id = track_id; + _tracks[track->getIndex()].track_id = track_id; _have_video = true; } else if (track->getTrackType() == TrackAudio) { auto audio_track = dynamic_pointer_cast(track); - if (!audio_track) { - WarnL << "不是音频Track:" << track->getCodecName(); - return false; - } - - auto track_id = mp4_writer_add_audio(_mov_writter.get(), mp4_object, audio_track->getAudioChannel(), - audio_track->getAudioSampleBit() * audio_track->getAudioChannel(), - audio_track->getAudioSampleRate(), extra_data, extra_size); + CHECK(audio_track); + auto track_id = mp4_writer_add_audio(_mov_writter.get(), mp4_object, audio_track->getAudioChannel(), audio_track->getAudioSampleBit() * audio_track->getAudioChannel(), audio_track->getAudioSampleRate(), extra_data, extra_size); if (track_id < 0) { - WarnL << "添加Track[" << track->getCodecName() << "]失败:" << track_id; + WarnL << "mp4_writer_add_audio failed: " << audio_track->getCodecName(); return false; } - _codec_to_trackid[track->getCodecId()].track_id = track_id; + _tracks[track->getIndex()].track_id = track_id; } - //尝试音视频同步 + // 尝试音视频同步 stampSync(); return true; } @@ -236,7 +206,7 @@ void MP4MuxerMemory::resetTracks() { bool MP4MuxerMemory::inputFrame(const Frame::Ptr &frame) { if (_init_segment.empty()) { - //尚未生成init segment + // 尚未生成init segment return false; } @@ -259,5 +229,5 @@ bool MP4MuxerMemory::inputFrame(const Frame::Ptr &frame) { return MP4MuxerInterface::inputFrame(frame); } -}//namespace mediakit -#endif //defined(ENABLE_MP4) +} // namespace mediakit +#endif // defined(ENABLE_MP4) diff --git a/src/Record/MP4Muxer.h b/src/Record/MP4Muxer.h index 1591699d..4f764c3f 100644 --- a/src/Record/MP4Muxer.h +++ b/src/Record/MP4Muxer.h @@ -72,12 +72,18 @@ private: bool _started = false; bool _have_video = false; MP4FileIO::Writer _mov_writter; - struct track_info { + + class FrameMergerImp : public FrameMerger { + public: + FrameMergerImp() : FrameMerger(FrameMerger::mp4_nal_size) {} + }; + + struct MP4Track { int track_id = -1; Stamp stamp; + FrameMergerImp merger; }; - std::unordered_map _codec_to_trackid; - FrameMerger _frame_merger { FrameMerger::mp4_nal_size }; + std::unordered_map _tracks; }; class MP4Muxer : public MP4MuxerInterface{ diff --git a/src/Record/MP4Reader.cpp b/src/Record/MP4Reader.cpp index aa5fe1a6..e70f9fe8 100644 --- a/src/Record/MP4Reader.cpp +++ b/src/Record/MP4Reader.cpp @@ -27,7 +27,8 @@ MP4Reader::MP4Reader(const std::string &vhost, const std::string &app, const std option.enable_mp4 = false; option.enable_hls = false; option.enable_hls_fmp4 = false; - + // mp4支持多track + option.max_track = 16; setup(vhost, app, stream_id, file_path, option, std::move(poller)); } diff --git a/src/Record/MPEG.cpp b/src/Record/MPEG.cpp index 82ec8191..ef844277 100644 --- a/src/Record/MPEG.cpp +++ b/src/Record/MPEG.cpp @@ -18,7 +18,7 @@ using namespace toolkit; -namespace mediakit{ +namespace mediakit { MpegMuxer::MpegMuxer(bool is_ps) { _is_ps = is_ps; @@ -40,27 +40,27 @@ bool MpegMuxer::addTrack(const Track::Ptr &track) { if (track->getTrackType() == TrackVideo) { _have_video = true; } - _codec_to_trackid[track->getCodecId()] = mpeg_muxer_add_stream((::mpeg_muxer_t *)_context, mpeg_id, nullptr, 0); + _tracks[track->getIndex()].track_id = mpeg_muxer_add_stream((::mpeg_muxer_t *)_context, mpeg_id, nullptr, 0); return true; } bool MpegMuxer::inputFrame(const Frame::Ptr &frame) { - auto it = _codec_to_trackid.find(frame->getCodecId()); - if (it == _codec_to_trackid.end()) { + auto it = _tracks.find(frame->getIndex()); + if (it == _tracks.end()) { return false; } - auto track_id = it->second; + auto &track = it->second; _key_pos = !_have_video; switch (frame->getCodecId()) { case CodecH264: case CodecH265: { - //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - return _frame_merger.inputFrame(frame,[this, track_id](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { + // 这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, + return track.merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { _key_pos = have_idr; - //取视频时间戳为TS的时间戳 + // 取视频时间戳为TS的时间戳 _timestamp = dts; _max_cache_size = 512 + 1.2 * buffer->size(); - mpeg_muxer_input((::mpeg_muxer_t *)_context, track_id, have_idr ? 0x0001 : 0, pts * 90LL,dts * 90LL, buffer->data(), buffer->size()); + mpeg_muxer_input((::mpeg_muxer_t *)_context, track.track_id, have_idr ? 0x0001 : 0, pts * 90LL, dts * 90LL, buffer->data(), buffer->size()); flushCache(); }); } @@ -80,7 +80,7 @@ bool MpegMuxer::inputFrame(const Frame::Ptr &frame) { _timestamp = frame->dts(); } _max_cache_size = 512 + 1.2 * frame->size(); - mpeg_muxer_input((::mpeg_muxer_t *)_context, track_id, frame->keyFrame() ? 0x0001 : 0, frame->pts() * 90LL, frame->dts() * 90LL, frame->data(), frame->size()); + mpeg_muxer_input((::mpeg_muxer_t *)_context, track.track_id, frame->keyFrame() ? 0x0001 : 0, frame->pts() * 90LL, frame->dts() * 90LL, frame->data(), frame->size()); flushCache(); return true; } @@ -103,7 +103,6 @@ void MpegMuxer::createContext() { if (!thiz->_current_buffer || thiz->_current_buffer->size() + bytes > thiz->_current_buffer->getCapacity()) { if (thiz->_current_buffer) { - //WarnL << "need realloc mpeg buffer" << thiz->_current_buffer->size() + bytes << " > " << thiz->_current_buffer->getCapacity(); thiz->flushCache(); } thiz->_current_buffer = thiz->_buffer_pool.obtain2(); @@ -143,12 +142,13 @@ void MpegMuxer::releaseContext() { mpeg_muxer_destroy((::mpeg_muxer_t *)_context); _context = nullptr; } - _codec_to_trackid.clear(); - _frame_merger.clear(); + _tracks.clear(); } void MpegMuxer::flush() { - _frame_merger.flush(); + for (auto &pr : _tracks) { + pr.second.merger.flush(); + } } }//mediakit diff --git a/src/Record/MPEG.h b/src/Record/MPEG.h index b1507fc9..dc07af4a 100644 --- a/src/Record/MPEG.h +++ b/src/Record/MPEG.h @@ -70,8 +70,17 @@ private: uint32_t _max_cache_size = 0; uint64_t _timestamp = 0; struct mpeg_muxer_t *_context = nullptr; - std::unordered_map _codec_to_trackid; - FrameMerger _frame_merger{FrameMerger::h264_prefix}; + + class FrameMergerImp : public FrameMerger { + public: + FrameMergerImp() : FrameMerger(FrameMerger::h264_prefix) {} + }; + + struct MP4Track { + int track_id = -1; + FrameMergerImp merger; + }; + std::unordered_map _tracks; toolkit::BufferRaw::Ptr _current_buffer; toolkit::ResourcePool _buffer_pool; }; diff --git a/src/Rtmp/RtmpMuxer.cpp b/src/Rtmp/RtmpMuxer.cpp index 52f83811..5a5e8b2d 100644 --- a/src/Rtmp/RtmpMuxer.cpp +++ b/src/Rtmp/RtmpMuxer.cpp @@ -23,17 +23,22 @@ RtmpMuxer::RtmpMuxer(const TitleMeta::Ptr &title) { } bool RtmpMuxer::addTrack(const Track::Ptr &track) { - auto &encoder = _encoder[track->getTrackType()]; - if (encoder) { - WarnL << "Already add a track kind of: " << track->getTrackTypeStr() - << ", ignore track: " << track->getCodecName(); + if (_track_existed[track->getTrackType()]) { + // rtmp不支持多个同类型track + WarnL << "Already add a track kind of: " << track->getTrackTypeStr() << ", ignore track: " << track->getCodecName(); return false; } + + auto &encoder = _encoders[track->getIndex()]; + CHECK(!encoder); encoder = Factory::getRtmpEncoderByTrack(track); if (!encoder) { return false; } + // 标记已经存在该类型track + _track_existed[track->getTrackType()] = true; + // 设置rtmp输出环形缓存 encoder->setRtmpRing(_rtmp_ring); @@ -43,22 +48,22 @@ bool RtmpMuxer::addTrack(const Track::Ptr &track) { } bool RtmpMuxer::inputFrame(const Frame::Ptr &frame) { - auto &encoder = _encoder[frame->getTrackType()]; + auto &encoder = _encoders[frame->getIndex()]; return encoder ? encoder->inputFrame(frame) : false; } void RtmpMuxer::flush() { - for (auto &encoder : _encoder) { - if (encoder) { - encoder->flush(); + for (auto &pr : _encoders) { + if (pr.second) { + pr.second->flush(); } } } void RtmpMuxer::makeConfigPacket() { - for (auto &encoder : _encoder) { - if (encoder) { - encoder->makeConfigPacket(); + for (auto &pr : _encoders) { + if (pr.second) { + pr.second->makeConfigPacket(); } } } @@ -73,9 +78,8 @@ RtmpRing::RingType::Ptr RtmpMuxer::getRtmpRing() const { void RtmpMuxer::resetTracks() { _metadata.clear(); - for (auto &encoder : _encoder) { - encoder = nullptr; - } + _encoders.clear(); + CLEAR_ARR(_track_existed); } } /* namespace mediakit */ \ No newline at end of file diff --git a/src/Rtmp/RtmpMuxer.h b/src/Rtmp/RtmpMuxer.h index f64b875a..d029a632 100644 --- a/src/Rtmp/RtmpMuxer.h +++ b/src/Rtmp/RtmpMuxer.h @@ -64,10 +64,13 @@ public: * 生成config包 */ void makeConfigPacket(); + private: - RtmpRing::RingType::Ptr _rtmp_ring; + bool _track_existed[2] = { false, false }; + AMFValue _metadata; - RtmpCodec::Ptr _encoder[TrackMax]; + RtmpRing::RingType::Ptr _rtmp_ring; + std::unordered_map _encoders; }; diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index dc0ef749..65dc655c 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -62,7 +62,9 @@ DecoderImp::Ptr DecoderImp::createDecoder(Type type, MediaSinkInterface *sink){ } void DecoderImp::flush() { - _merger.flush(); + for (auto &pr : _tracks) { + pr.second.second.flush(); + } } ssize_t DecoderImp::input(const uint8_t *data, size_t bytes){ @@ -88,9 +90,9 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt if (!track) { return; } - onTrack(std::move(track)); + onTrack(stream, std::move(track)); // 防止未获取视频track提前complete导致忽略后续视频的问题,用于兼容一些不太规范的ps流 - if (finish && _tracks[TrackVideo]) { + if (finish && _have_video) { _sink->addTrackCompleted(); InfoL << "Add track finished"; } @@ -104,37 +106,43 @@ void DecoderImp::onDecode(int stream, int codecid, int flags, int64_t pts, int64 if (codec == CodecInvalid) { return; } - if (!_tracks[getTrackType(codec)]) { - onTrack(Factory::getTrackByCodecId(codec, 8000, 1, 16)); + auto &ref = _tracks[stream]; + if (!ref.first) { + onTrack(stream, Factory::getTrackByCodecId(codec, 8000, 1, 16)); } - // TODO 支持多track - auto frame = Factory::getFrameFromPtr(codec, (char *) data, bytes, dts, pts); - if (getTrackType(codec) == TrackVideo) { - _merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool) { - onFrame(Factory::getFrameFromBuffer(codec, buffer, dts, pts)); - }); - } else { - onFrame(frame); + auto frame = Factory::getFrameFromPtr(codec, (char *)data, bytes, dts, pts); + if (getTrackType(codec) != TrackVideo) { + onFrame(stream, frame); + return; } + ref.second.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool) { + onFrame(stream, Factory::getFrameFromBuffer(codec, buffer, dts, pts)); + }); } #else void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,size_t bytes) {} void DecoderImp::onStream(int stream,int codecid,const void *extra,size_t bytes,int finish) {} #endif -void DecoderImp::onTrack(const Track::Ptr &track) { +void DecoderImp::onTrack(int index, const Track::Ptr &track) { if (!track) { return; } - if (!_tracks[track->getTrackType()]) { - _tracks[track->getTrackType()] = track; - _sink->addTrack(track); - InfoL << "got track: " << track->getCodecName(); + track->setIndex(index); + auto &ref = _tracks[index]; + if (ref.first) { + WarnL << "Already existed a same track: " << index << ", codec: " << track->getCodecName(); + return; } + ref.first = track; + _sink->addTrack(track); + InfoL << "Got track: " << track->getCodecName(); + _have_video = track->getTrackType() == TrackVideo ? true : _have_video; } -void DecoderImp::onFrame(const Frame::Ptr &frame) { +void DecoderImp::onFrame(int index, const Frame::Ptr &frame) { if (frame) { + frame->setIndex(index); _sink->inputFrame(frame); } } diff --git a/src/Rtp/Decoder.h b/src/Rtp/Decoder.h index 55012870..9da460c4 100644 --- a/src/Rtp/Decoder.h +++ b/src/Rtp/Decoder.h @@ -48,8 +48,8 @@ public: void flush(); protected: - void onTrack(const Track::Ptr &track); - void onFrame(const Frame::Ptr &frame); + void onTrack(int index, const Track::Ptr &track); + void onFrame(int index, const Frame::Ptr &frame); private: DecoderImp(const Decoder::Ptr &decoder, MediaSinkInterface *sink); @@ -57,10 +57,15 @@ private: void onStream(int stream, int codecid, const void *extra, size_t bytes, int finish); private: + bool _have_video = false; Decoder::Ptr _decoder; MediaSinkInterface *_sink; - FrameMerger _merger{FrameMerger::none}; - Track::Ptr _tracks[TrackMax]; + + class FrameMergerImp : public FrameMerger { + public: + FrameMergerImp() : FrameMerger(FrameMerger::none) {} + }; + std::unordered_map > _tracks; }; }//namespace mediakit diff --git a/src/Rtp/GB28181Process.cpp b/src/Rtp/GB28181Process.cpp index 5f1cce82..82f97c0f 100644 --- a/src/Rtp/GB28181Process.cpp +++ b/src/Rtp/GB28181Process.cpp @@ -78,7 +78,7 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) { if (!ref) { if (_rtp_receiver.size() > 2) { // 防止pt类型太多导致内存溢出 - throw std::invalid_argument("rtp pt类型不得超过2种!"); + WarnL << "Rtp payload type more than 2 types: " << _rtp_receiver.size(); } switch (pt) { case Rtsp::PT_PCMA: @@ -87,6 +87,7 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) { ref = std::make_shared(8000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); }); auto track = Factory::getTrackByCodecId(pt == Rtsp::PT_PCMU ? CodecG711U : CodecG711A, 8000, 1, 16); CHECK(track); + track->setIndex(pt); _interface->addTrack(track); _rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track); break; @@ -96,6 +97,7 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) { ref = std::make_shared(90000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); }); auto track = Factory::getTrackByCodecId(CodecJPEG); CHECK(track); + track->setIndex(pt); _interface->addTrack(track); _rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track); break; @@ -106,23 +108,28 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) { ref = std::make_shared(48000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); }); auto track = Factory::getTrackByCodecId(CodecOpus); CHECK(track); + track->setIndex(pt); _interface->addTrack(track); _rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track); } else if (pt == h265_pt) { // H265负载 ref = std::make_shared(90000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); }); auto track = Factory::getTrackByCodecId(CodecH265); + CHECK(track); + track->setIndex(pt); _interface->addTrack(track); _rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track); } else if (pt == h264_pt) { // H264负载 ref = std::make_shared(90000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); }); auto track = Factory::getTrackByCodecId(CodecH264); + CHECK(track); + track->setIndex(pt); _interface->addTrack(track); _rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track); } else { if (pt != Rtsp::PT_MP2T && pt != ps_pt) { - WarnL << "rtp payload type未识别(" << (int)pt << "),已按ts或ps负载处理"; + WarnL << "Unknown rtp payload type(" << (int)pt << "), decode it as mpeg-ps or mpeg-ts"; } ref = std::make_shared(90000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); }); // ts或ps负载 @@ -142,7 +149,8 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) { } } // 设置frame回调 - _rtp_decoder[pt]->addDelegate([this](const Frame::Ptr &frame) { + _rtp_decoder[pt]->addDelegate([this, pt](const Frame::Ptr &frame) { + frame->setIndex(pt); onRtpDecode(frame); return true; }); diff --git a/src/Rtp/PSEncoder.cpp b/src/Rtp/PSEncoder.cpp index 4e5c31d5..66e39bc7 100644 --- a/src/Rtp/PSEncoder.cpp +++ b/src/Rtp/PSEncoder.cpp @@ -22,7 +22,7 @@ namespace mediakit{ PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type) : MpegMuxer(true) { GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); _rtp_encoder = std::make_shared(); - _rtp_encoder->setRtpInfo(ssrc, video_mtu, 90000, payload_type, 0); + _rtp_encoder->setRtpInfo(ssrc, video_mtu, 90000, payload_type); auto ring = std::make_shared(); ring->setDelegate(std::make_shared([this](RtpPacket::Ptr rtp, bool is_key) { onRTP(std::move(rtp), is_key); })); _rtp_encoder->setRtpRing(std::move(ring)); diff --git a/src/Rtp/RawEncoder.cpp b/src/Rtp/RawEncoder.cpp index b680eccb..4bc17af7 100644 --- a/src/Rtp/RawEncoder.cpp +++ b/src/Rtp/RawEncoder.cpp @@ -72,7 +72,7 @@ RtpCodec::Ptr RawEncoderImp::createRtpEncoder(const Track::Ptr &track) { sample_rate = std::static_pointer_cast(track)->getAudioSampleRate(); } auto ret = Factory::getRtpEncoderByCodecId(track->getCodecId(), _payload_type); - ret->setRtpInfo(_ssrc, mtu, sample_rate, _payload_type, 2 * track->getTrackType()); + ret->setRtpInfo(_ssrc, mtu, sample_rate, _payload_type); return ret; } diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 8ac83f19..c0e16d64 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -254,8 +254,7 @@ void RtpProcess::emitOnPublish() { return; } if (err.empty()) { - strong_self->_muxer = std::make_shared(strong_self->_media_info, 0.0f, - option); + strong_self->_muxer = std::make_shared(strong_self->_media_info, 0.0f, option); if (strong_self->_only_audio) { strong_self->_muxer->setOnlyAudio(); } diff --git a/src/Rtsp/RtpCodec.cpp b/src/Rtsp/RtpCodec.cpp index 83db75fe..65ee311f 100644 --- a/src/Rtsp/RtpCodec.cpp +++ b/src/Rtsp/RtpCodec.cpp @@ -19,6 +19,7 @@ RtpPacket::Ptr RtpInfo::makeRtp(TrackType type, const void* data, size_t len, bo rtp->setSize(payload_len + RtpPacket::kRtpTcpHeaderSize); rtp->sample_rate = _sample_rate; rtp->type = type; + rtp->track_index = _track_index; //rtsp over tcp 头 auto ptr = (uint8_t *) rtp->data(); diff --git a/src/Rtsp/RtpCodec.h b/src/Rtsp/RtpCodec.h index 0d1c7a80..d63f1962 100644 --- a/src/Rtsp/RtpCodec.h +++ b/src/Rtsp/RtpCodec.h @@ -54,7 +54,7 @@ class RtpInfo { public: using Ptr = std::shared_ptr; - RtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved) { + RtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved, int track_index) { if (ssrc == 0) { ssrc = ((uint64_t) this) & 0xFFFFFFFF; } @@ -63,6 +63,7 @@ public: _mtu_size = mtu_size; _sample_rate = sample_rate; _interleaved = interleaved; + _track_index = track_index; } //返回rtp负载最大长度 @@ -78,6 +79,7 @@ private: uint16_t _seq = 0; uint32_t _ssrc; uint32_t _sample_rate; + int _track_index; size_t _mtu_size; }; @@ -85,8 +87,8 @@ class RtpCodec : public RtpRing, public FrameDispatcher { public: using Ptr = std::shared_ptr; - void setRtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved) { - _rtp_info.reset(new RtpInfo(ssrc, mtu_size, sample_rate, pt, interleaved)); + void setRtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved = 0, int track_index = 0) { + _rtp_info.reset(new RtpInfo(ssrc, mtu_size, sample_rate, pt, interleaved, track_index)); } RtpInfo &getRtpInfo() { return *_rtp_info; } diff --git a/src/Rtsp/Rtsp.h b/src/Rtsp/Rtsp.h index 8963c8ce..fde5caea 100644 --- a/src/Rtsp/Rtsp.h +++ b/src/Rtsp/Rtsp.h @@ -166,7 +166,7 @@ public: // ntp时间戳 uint64_t ntp_stamp; - bool disable_ntp = false; + int track_index; static Ptr create(); diff --git a/src/Rtsp/RtspMuxer.cpp b/src/Rtsp/RtspMuxer.cpp index f2967c04..dae5f111 100644 --- a/src/Rtsp/RtspMuxer.cpp +++ b/src/Rtsp/RtspMuxer.cpp @@ -19,18 +19,19 @@ namespace mediakit { void RtspMuxer::onRtp(RtpPacket::Ptr in, bool is_key) { if (_live) { - if (_rtp_stamp[in->type] != in->getHeader()->stamp) { + auto &ref = _tracks[in->track_index]; + if (ref.rtp_stamp != in->getHeader()->stamp) { // rtp时间戳变化才计算ntp,节省cpu资源 int64_t stamp_ms = in->getStamp() * uint64_t(1000) / in->sample_rate; int64_t stamp_ms_inc; // 求rtp时间戳增量 - _stamp[in->type].revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc); - _rtp_stamp[in->type] = in->getHeader()->stamp; - _ntp_stamp[in->type] = stamp_ms_inc + _ntp_stamp_start; + ref.stamp.revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc); + ref.rtp_stamp = in->getHeader()->stamp; + ref.ntp_stamp = stamp_ms_inc + _ntp_stamp_start; } // rtp拦截入口,此处统一赋值ntp - in->ntp_stamp = _ntp_stamp[in->type]; + in->ntp_stamp = ref.ntp_stamp; } else { // 点播情况下设置ntp时间戳为rtp时间戳加基准ntp时间戳 in->ntp_stamp = _ntp_stamp_start + (in->getStamp() * uint64_t(1000) / in->sample_rate); @@ -55,16 +56,20 @@ RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title) { } bool RtspMuxer::addTrack(const Track::Ptr &track) { - auto &encoder = _encoder[track->getTrackType()]; - if (encoder) { - WarnL << "Already add a track kind of: " << track->getTrackTypeStr() - << ", ignore track: " << track->getCodecName(); + if (_track_existed[track->getTrackType()]) { + // rtsp不支持多个同类型track + WarnL << "Already add a track kind of: " << track->getTrackTypeStr() << ", ignore track: " << track->getCodecName(); return false; } + + auto &ref = _tracks[track->getIndex()]; + auto &encoder = ref.encoder; + CHECK(!encoder); + // payload type 96以后则为动态pt Sdp::Ptr sdp = track->getSdp(96 + _index); if (!sdp) { - WarnL << "rtsp复用器不支持该编码:" << track->getCodecName(); + WarnL << "Unsupported codec: " << track->getCodecName(); return false; } @@ -73,6 +78,9 @@ bool RtspMuxer::addTrack(const Track::Ptr &track) { return false; } + // 标记已经存在该类型track + _track_existed[track->getTrackType()] = true; + { static atomic s_ssrc(0); uint32_t ssrc = s_ssrc++; @@ -90,7 +98,7 @@ bool RtspMuxer::addTrack(const Track::Ptr &track) { GET_CONFIG(uint32_t, audio_mtu, Rtp::kAudioMtuSize); GET_CONFIG(uint32_t, video_mtu, Rtp::kVideoMtuSize); auto mtu = track->getTrackType() == TrackVideo ? video_mtu : audio_mtu; - encoder->setRtpInfo(ssrc, mtu, sdp->getSampleRate(), sdp->getPayloadType(), 2 * track->getTrackType()); + encoder->setRtpInfo(ssrc, mtu, sdp->getSampleRate(), sdp->getPayloadType(), 2 * track->getTrackType(), track->getIndex()); } // 设置rtp输出环形缓存 @@ -106,28 +114,33 @@ bool RtspMuxer::addTrack(const Track::Ptr &track) { trySyncTrack(); // rtp的时间戳是pts,允许回退 - _stamp[TrackVideo].enableRollback(true); - + if (track->getTrackType() == TrackVideo) { + ref.stamp.enableRollback(true); + } ++_index; return true; } void RtspMuxer::trySyncTrack() { - if (_encoder[TrackAudio] && _encoder[TrackVideo]) { - // 音频时间戳同步于视频,因为音频时间戳被修改后不影响播放 - _stamp[TrackAudio].syncTo(_stamp[TrackVideo]); + Stamp *first = nullptr; + for (auto &pr : _tracks) { + if (!first) { + first = &pr.second.stamp; + } else { + pr.second.stamp.syncTo(*first); + } } } bool RtspMuxer::inputFrame(const Frame::Ptr &frame) { - auto &encoder = _encoder[frame->getTrackType()]; + auto &encoder = _tracks[frame->getIndex()].encoder; return encoder ? encoder->inputFrame(frame) : false; } void RtspMuxer::flush() { - for (auto &encoder : _encoder) { - if (encoder) { - encoder->flush(); + for (auto &pr : _tracks) { + if (pr.second.encoder) { + pr.second.encoder->flush(); } } } @@ -142,9 +155,8 @@ RtpRing::RingType::Ptr RtspMuxer::getRtpRing() const { void RtspMuxer::resetTracks() { _sdp.clear(); - for (auto &encoder : _encoder) { - encoder = nullptr; - } + _tracks.clear(); + CLEAR_ARR(_track_existed); } } /* namespace mediakit */ \ No newline at end of file diff --git a/src/Rtsp/RtspMuxer.h b/src/Rtsp/RtspMuxer.h index ba528ecf..00f356e0 100644 --- a/src/Rtsp/RtspMuxer.h +++ b/src/Rtsp/RtspMuxer.h @@ -85,13 +85,20 @@ private: private: bool _live = true; + bool _track_existed[2] = { false, false }; + uint8_t _index {0}; - uint32_t _rtp_stamp[TrackMax]{0}; - uint64_t _ntp_stamp[TrackMax]{0}; uint64_t _ntp_stamp_start; std::string _sdp; - Stamp _stamp[TrackMax]; - RtpCodec::Ptr _encoder[TrackMax]; + + struct TrackInfo { + Stamp stamp; + uint32_t rtp_stamp { 0 }; + uint64_t ntp_stamp { 0 }; + RtpCodec::Ptr encoder; + }; + + std::unordered_map _tracks; RtpRing::RingType::Ptr _rtpRing; RtpRing::RingType::Ptr _rtpInterceptor; }; diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index ab67f52b..087bb4ae 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -287,18 +287,7 @@ std::string SrtTransportImp::getIdentifier() const { bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) { if (_muxer) { - //TraceL<<"before type "<getCodecName()<<" dts "<dts()<<" pts "<pts(); - auto frame_tmp = std::make_shared(frame, _type_to_stamp[frame->getTrackType()],false); - if(_type_to_stamp.size()>1){ - // 有音视频,检查是否时间戳是否差距过大 - auto diff = _type_to_stamp[TrackType::TrackVideo].getRelativeStamp() - _type_to_stamp[TrackType::TrackAudio].getRelativeStamp(); - if(std::abs(diff) > 5000){ - // 超过5s,应该同步 TODO - WarnL << _media_info.full_url <<" video or audio not sync : "<getCodecName()<<" dts "<dts()<<" pts "<pts(); - return _muxer->inputFrame(frame_tmp); + return _muxer->inputFrame(frame); } if (_cached_func.size() > 200) { WarnL << "cached frame of track(" << frame->getCodecName() << ") is too much, now dropped"; @@ -306,17 +295,11 @@ bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) { } auto frame_cached = Frame::getCacheAbleFrame(frame); lock_guard lck(_func_mtx); - _cached_func.emplace_back([this, frame_cached]() { - //TraceL<<"before type "<getCodecName()<<" dts "<dts()<<" pts "<pts(); - auto frame_tmp = std::make_shared(frame_cached, _type_to_stamp[frame_cached->getTrackType()],false); - //TraceL<<"after type "<getCodecName()<<" dts "<dts()<<" pts "<pts(); - _muxer->inputFrame(frame_tmp); - }); + _cached_func.emplace_back([this, frame_cached]() { _muxer->inputFrame(frame_cached); }); return true; } bool SrtTransportImp::addTrack(const Track::Ptr &track) { - _type_to_stamp.emplace(track->getTrackType(),Stamp()); if (_muxer) { return _muxer->addTrack(track); } @@ -333,9 +316,6 @@ void SrtTransportImp::addTrackCompleted() { lock_guard lck(_func_mtx); _cached_func.emplace_back([this]() { _muxer->addTrackCompleted(); }); } - if(_type_to_stamp.size() >1){ - _type_to_stamp[TrackType::TrackAudio].syncTo(_type_to_stamp[TrackType::TrackVideo]); - } } void SrtTransportImp::doCachedFunc() { diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index 71478605..9c1ac47e 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -86,8 +86,6 @@ private: DecoderImp::Ptr _decoder; std::recursive_mutex _func_mtx; std::deque> _cached_func; - - std::unordered_map _type_to_stamp; }; } // namespace SRT