From b44ca8fd6f6a16e61dd5513d01e607bc1b8e6411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=8F=E6=A5=9A?= <771730766@qq.com> Date: Sat, 22 Jul 2023 17:31:23 +0800 Subject: [PATCH] =?UTF-8?q?rtmp=E5=8D=8F=E8=AE=AE=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=9B=B4=E6=96=B0metadata=20(#2669=20#2692)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtmp/FlvMuxer.cpp | 8 +-- src/Rtmp/RtmpMediaSource.h | 31 +++------ src/Rtmp/RtmpMediaSourceImp.cpp | 114 +++++++++++++++++--------------- src/Rtmp/RtmpPusher.cpp | 21 ++++-- src/Rtmp/RtmpSession.cpp | 13 ++-- 5 files changed, 97 insertions(+), 90 deletions(-) diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp index a68df5bc..bee86ddb 100644 --- a/src/Rtmp/FlvMuxer.cpp +++ b/src/Rtmp/FlvMuxer.cpp @@ -107,14 +107,12 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &src) { //flv header onWrite(buffer, false); - auto &metadata = src->getMetaData(); - if (metadata) { - //在有metadata的情况下才发送metadata - //其实metadata没什么用,有些推流器不产生metadata + // metadata + src->getMetaData([&](const AMFValue &metadata) { AMFEncoder invoke; invoke << "onMetaData" << metadata; onWriteFlvTag(MSG_DATA, std::make_shared(invoke.data()), 0, false); - } + }); //config frame src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index 5c862a36..718f6055 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -73,42 +73,29 @@ public: /** * 获取metadata */ - const AMFValue &getMetaData() const { + template + void getMetaData(const FUNC &func) const { std::lock_guard lock(_mtx); - return _metadata; + if (_metadata) { + func(_metadata); + } } /** * 获取所有的config帧 */ - template - void getConfigFrame(const FUNC &f) { + template + void getConfigFrame(const FUNC &func) { std::lock_guard lock(_mtx); for (auto &pr : _config_frame_map) { - f(pr.second); + func(pr.second); } } /** * 设置metadata */ - virtual void setMetaData(const AMFValue &metadata) { - _metadata = metadata; - _metadata.set("title", std::string("Streamed by ") + kServerName); - _have_video = _metadata["videocodecid"]; - _have_audio = _metadata["audiocodecid"]; - if (_ring) { - regist(); - } - } - - /** - * 更新metadata - */ - void updateMetaData(const AMFValue &metadata) { - std::lock_guard lock(_mtx); - _metadata = metadata; - } + virtual void setMetaData(const AMFValue &metadata); /** * 输入rtmp包 diff --git a/src/Rtmp/RtmpMediaSourceImp.cpp b/src/Rtmp/RtmpMediaSourceImp.cpp index ce64226a..ee072b8e 100644 --- a/src/Rtmp/RtmpMediaSourceImp.cpp +++ b/src/Rtmp/RtmpMediaSourceImp.cpp @@ -2,15 +2,15 @@ #include "RtmpMediaSourceImp.h" namespace mediakit { -uint32_t RtmpMediaSource::getTimeStamp(TrackType trackType) -{ + +uint32_t RtmpMediaSource::getTimeStamp(TrackType trackType) { assert(trackType >= TrackInvalid && trackType < TrackMax); if (trackType != TrackInvalid) { - //获取某track的时间戳 + // 获取某track的时间戳 return _track_stamps[trackType]; } - //获取所有track的最小时间戳 + // 获取所有track的最小时间戳 uint32_t ret = UINT32_MAX; for (auto &stamp : _track_stamps) { if (stamp > 0 && stamp < ret) { @@ -20,38 +20,61 @@ uint32_t RtmpMediaSource::getTimeStamp(TrackType trackType) return ret; } -void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) -{ +void RtmpMediaSource::setMetaData(const AMFValue &metadata) { + { + std::lock_guard lock(_mtx); + _metadata = metadata; + _metadata.set("title", std::string("Streamed by ") + kServerName); + } + + _have_video = _metadata["videocodecid"]; + _have_audio = _metadata["audiocodecid"]; + if (_ring) { + regist(); + + AMFEncoder enc; + enc << "onMetaData" << _metadata; + RtmpPacket::Ptr packet = RtmpPacket::create(); + packet->buffer = enc.data(); + packet->type_id = MSG_DATA; + packet->time_stamp = 0; + packet->chunk_id = CHUNK_CLIENT_REQUEST_AFTER; + packet->stream_index = STREAM_MEDIA; + onWrite(std::move(packet)); + } +} + +void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) { bool is_video = pkt->type_id == MSG_VIDEO; _speed[is_video ? TrackVideo : TrackAudio] += pkt->size(); - //保存当前时间戳 + // 保存当前时间戳 switch (pkt->type_id) { - case MSG_VIDEO: _track_stamps[TrackVideo] = pkt->time_stamp, _have_video = true; break; - case MSG_AUDIO: _track_stamps[TrackAudio] = pkt->time_stamp, _have_audio = true; break; - default: break; + case MSG_VIDEO: _track_stamps[TrackVideo] = pkt->time_stamp, _have_video = true; break; + case MSG_AUDIO: _track_stamps[TrackAudio] = pkt->time_stamp, _have_audio = true; break; + default: break; } if (pkt->isCfgFrame()) { std::lock_guard lock(_mtx); _config_frame_map[pkt->type_id] = pkt; if (!_ring) { - //注册后收到config帧更新到各播放器 + // 注册后收到config帧更新到各播放器 return; } } if (!_ring) { - std::weak_ptr weakSelf = std::static_pointer_cast(shared_from_this()); - auto lam = [weakSelf](int size) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); + auto lam = [weak_self](int size) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->onReaderChanged(size); + strong_self->onReaderChanged(size); }; - //GOP默认缓冲512组RTMP包,每组RTMP包时间戳相同(如果开启合并写了,那么每组为合并写时间内的RTMP包), - //每次遇到关键帧第一个RTMP包,则会清空GOP缓存(因为有新的关键帧了,同样可以实现秒开) + // GOP默认缓冲512组RTMP包,每组RTMP包时间戳相同(如果开启合并写了,那么每组为合并写时间内的RTMP包), + // 每次遇到关键帧第一个RTMP包,则会清空GOP缓存(因为有新的关键帧了,同样可以实现秒开) _ring = std::make_shared(_ring_size, std::move(lam)); if (_metadata) { regist(); @@ -62,47 +85,42 @@ void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) PacketCache::inputPacket(stamp, is_video, std::move(pkt), key); } - -RtmpMediaSourceImp::RtmpMediaSourceImp(const MediaTuple& tuple, int ringSize) : RtmpMediaSource(tuple, ringSize) -{ +RtmpMediaSourceImp::RtmpMediaSourceImp(const MediaTuple &tuple, int ringSize) + : RtmpMediaSource(tuple, ringSize) { _demuxer = std::make_shared(); _demuxer->setTrackListener(this); } -void RtmpMediaSourceImp::setMetaData(const AMFValue &metadata) -{ +void RtmpMediaSourceImp::setMetaData(const AMFValue &metadata) { if (!_demuxer->loadMetaData(metadata)) { - //该metadata无效,需要重新生成 + // 该metadata无效,需要重新生成 _metadata = metadata; _recreate_metadata = true; } RtmpMediaSource::setMetaData(metadata); } -void RtmpMediaSourceImp::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) -{ +void RtmpMediaSourceImp::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) { if (!_all_track_ready || _muxer->isEnabled()) { - //未获取到所有Track后,或者开启转协议,那么需要解复用rtmp + // 未获取到所有Track后,或者开启转协议,那么需要解复用rtmp _demuxer->inputRtmp(pkt); } RtmpMediaSource::onWrite(std::move(pkt)); } -int RtmpMediaSourceImp::totalReaderCount() -{ +int RtmpMediaSourceImp::totalReaderCount() { return readerCount() + (_muxer ? _muxer->totalReaderCount() : 0); } -void RtmpMediaSourceImp::setProtocolOption(const ProtocolOption &option) -{ - //不重复生成rtmp +void RtmpMediaSourceImp::setProtocolOption(const ProtocolOption &option) { + // 不重复生成rtmp _option = option; - //不重复生成rtmp协议 + // 不重复生成rtmp协议 _option.enable_rtmp = false; _muxer = std::make_shared(_tuple, _demuxer->getDuration(), _option); _muxer->setMediaListener(getListener()); _muxer->setTrackListener(std::static_pointer_cast(shared_from_this())); - //让_muxer对象拦截一部分事件(比如说录像相关事件) + // 让_muxer对象拦截一部分事件(比如说录像相关事件) MediaSource::setListener(_muxer); for (auto &track : _demuxer->getTracks(false)) { @@ -111,8 +129,7 @@ void RtmpMediaSourceImp::setProtocolOption(const ProtocolOption &option) } } -bool RtmpMediaSourceImp::addTrack(const Track::Ptr &track) -{ +bool RtmpMediaSourceImp::addTrack(const Track::Ptr &track) { if (_muxer) { if (_muxer->addTrack(track)) { track->addDelegate(_muxer); @@ -122,45 +139,38 @@ bool RtmpMediaSourceImp::addTrack(const Track::Ptr &track) return false; } -void RtmpMediaSourceImp::addTrackCompleted() -{ +void RtmpMediaSourceImp::addTrackCompleted() { if (_muxer) { _muxer->addTrackCompleted(); } } -void RtmpMediaSourceImp::resetTracks() -{ +void RtmpMediaSourceImp::resetTracks() { if (_muxer) { _muxer->resetTracks(); } } -void RtmpMediaSourceImp::onAllTrackReady() -{ +void RtmpMediaSourceImp::onAllTrackReady() { _all_track_ready = true; if (_recreate_metadata) { - //更新metadata + // 更新metadata for (auto &track : _muxer->getTracks()) { Metadata::addTrack(_metadata, track); } - RtmpMediaSource::updateMetaData(_metadata); + RtmpMediaSource::setMetaData(_metadata); } } -void RtmpMediaSourceImp::setListener(const std::weak_ptr &listener) -{ +void RtmpMediaSourceImp::setListener(const std::weak_ptr &listener) { if (_muxer) { //_muxer对象不能处理的事件再给listener处理 _muxer->setMediaListener(listener); - } - else { - //未创建_muxer对象,事件全部给listener处理 + } else { + // 未创建_muxer对象,事件全部给listener处理 MediaSource::setListener(listener); } } -} - - +} // namespace mediakit diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index e8c55f70..1f1cfdaa 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -183,10 +183,14 @@ void RtmpPusher::send_metaData(){ throw std::runtime_error("the media source was released"); } - AMFEncoder enc; - enc << "@setDataFrame" << "onMetaData" << src->getMetaData(); - sendRequest(MSG_DATA, enc.data()); + // metadata + src->getMetaData([&](const AMFValue &metadata) { + AMFEncoder enc; + enc << "@setDataFrame" << "onMetaData" << metadata; + sendRequest(MSG_DATA, enc.data()); + }); + // config frame src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id); }); @@ -207,7 +211,16 @@ void RtmpPusher::send_metaData(){ if (++i == size) { strong_self->setSendFlushFlag(true); } - strong_self->sendRtmp(rtmp->type_id, strong_self->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id); + if (rtmp->type_id == MSG_DATA) { + // update metadata + AMFEncoder enc; + enc << "@setDataFrame"; + auto pkt = enc.data(); + pkt.append(rtmp->data(), rtmp->size()); + strong_self->sendRequest(MSG_DATA, pkt); + } else { + strong_self->sendRtmp(rtmp->type_id, strong_self->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id); + } }); }); _rtmp_reader->setDetachCB([weak_self]() { diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 56a799cb..ebdfb74e 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -291,17 +291,14 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr "description", "Now published." , "details", _media_info.stream, "clientid", "0"}); - - auto &metadata = src->getMetaData(); - if(metadata){ - //在有metadata的情况下才发送metadata - //其实metadata没什么用,有些推流器不产生metadata - // onMetaData + // metadata + src->getMetaData([&](const AMFValue &metadata) { invoke.clear(); invoke << "onMetaData" << metadata; sendResponse(MSG_DATA, invoke.data()); - } + }); + // config frame src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { onSendMedia(pkt); }); @@ -481,6 +478,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) { throw std::runtime_error("can only set metadata"); } _push_metadata = dec.load(); + _set_meta_data = false; } void RtmpSession::onProcessCmd(AMFDecoder &dec) { @@ -528,6 +526,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) { } else if (type == "onMetaData") { //兼容某些不规范的推流器 _push_metadata = dec.load(); + _set_meta_data = false; } else { TraceP(this) << "unknown notify:" << type; }