rtmp协议支持更新metadata (#2669 #2692)

This commit is contained in:
夏楚 2023-07-22 17:31:23 +08:00 committed by GitHub
parent a97f1e503d
commit b44ca8fd6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 90 deletions

View File

@ -107,14 +107,12 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &src) {
//flv header //flv header
onWrite(buffer, false); onWrite(buffer, false);
auto &metadata = src->getMetaData(); // metadata
if (metadata) { src->getMetaData([&](const AMFValue &metadata) {
//在有metadata的情况下才发送metadata
//其实metadata没什么用有些推流器不产生metadata
AMFEncoder invoke; AMFEncoder invoke;
invoke << "onMetaData" << metadata; invoke << "onMetaData" << metadata;
onWriteFlvTag(MSG_DATA, std::make_shared<BufferString>(invoke.data()), 0, false); onWriteFlvTag(MSG_DATA, std::make_shared<BufferString>(invoke.data()), 0, false);
} });
//config frame //config frame
src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) {

View File

@ -73,42 +73,29 @@ public:
/** /**
* metadata * metadata
*/ */
const AMFValue &getMetaData() const { template <typename FUNC>
void getMetaData(const FUNC &func) const {
std::lock_guard<std::recursive_mutex> lock(_mtx); std::lock_guard<std::recursive_mutex> lock(_mtx);
return _metadata; if (_metadata) {
func(_metadata);
}
} }
/** /**
* config帧 * config帧
*/ */
template<typename FUNC> template <typename FUNC>
void getConfigFrame(const FUNC &f) { void getConfigFrame(const FUNC &func) {
std::lock_guard<std::recursive_mutex> lock(_mtx); std::lock_guard<std::recursive_mutex> lock(_mtx);
for (auto &pr : _config_frame_map) { for (auto &pr : _config_frame_map) {
f(pr.second); func(pr.second);
} }
} }
/** /**
* metadata * metadata
*/ */
virtual void setMetaData(const AMFValue &metadata) { virtual void setMetaData(const AMFValue &metadata);
_metadata = metadata;
_metadata.set("title", std::string("Streamed by ") + kServerName);
_have_video = _metadata["videocodecid"];
_have_audio = _metadata["audiocodecid"];
if (_ring) {
regist();
}
}
/**
* metadata
*/
void updateMetaData(const AMFValue &metadata) {
std::lock_guard<std::recursive_mutex> lock(_mtx);
_metadata = metadata;
}
/** /**
* rtmp包 * rtmp包

View File

@ -2,15 +2,15 @@
#include "RtmpMediaSourceImp.h" #include "RtmpMediaSourceImp.h"
namespace mediakit { namespace mediakit {
uint32_t RtmpMediaSource::getTimeStamp(TrackType trackType)
{ uint32_t RtmpMediaSource::getTimeStamp(TrackType trackType) {
assert(trackType >= TrackInvalid && trackType < TrackMax); assert(trackType >= TrackInvalid && trackType < TrackMax);
if (trackType != TrackInvalid) { if (trackType != TrackInvalid) {
//获取某track的时间戳 // 获取某track的时间戳
return _track_stamps[trackType]; return _track_stamps[trackType];
} }
//获取所有track的最小时间戳 // 获取所有track的最小时间戳
uint32_t ret = UINT32_MAX; uint32_t ret = UINT32_MAX;
for (auto &stamp : _track_stamps) { for (auto &stamp : _track_stamps) {
if (stamp > 0 && stamp < ret) { if (stamp > 0 && stamp < ret) {
@ -20,11 +20,34 @@ uint32_t RtmpMediaSource::getTimeStamp(TrackType trackType)
return ret; return ret;
} }
void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) void RtmpMediaSource::setMetaData(const AMFValue &metadata) {
{ {
std::lock_guard<std::recursive_mutex> lock(_mtx);
_metadata = metadata;
_metadata.set("title", std::string("Streamed by ") + kServerName);
}
_have_video = _metadata["videocodecid"];
_have_audio = _metadata["audiocodecid"];
if (_ring) {
regist();
AMFEncoder enc;
enc << "onMetaData" << _metadata;
RtmpPacket::Ptr packet = RtmpPacket::create();
packet->buffer = enc.data();
packet->type_id = MSG_DATA;
packet->time_stamp = 0;
packet->chunk_id = CHUNK_CLIENT_REQUEST_AFTER;
packet->stream_index = STREAM_MEDIA;
onWrite(std::move(packet));
}
}
void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) {
bool is_video = pkt->type_id == MSG_VIDEO; bool is_video = pkt->type_id == MSG_VIDEO;
_speed[is_video ? TrackVideo : TrackAudio] += pkt->size(); _speed[is_video ? TrackVideo : TrackAudio] += pkt->size();
//保存当前时间戳 // 保存当前时间戳
switch (pkt->type_id) { switch (pkt->type_id) {
case MSG_VIDEO: _track_stamps[TrackVideo] = pkt->time_stamp, _have_video = true; break; case MSG_VIDEO: _track_stamps[TrackVideo] = pkt->time_stamp, _have_video = true; break;
case MSG_AUDIO: _track_stamps[TrackAudio] = pkt->time_stamp, _have_audio = true; break; case MSG_AUDIO: _track_stamps[TrackAudio] = pkt->time_stamp, _have_audio = true; break;
@ -35,23 +58,23 @@ void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/)
std::lock_guard<std::recursive_mutex> lock(_mtx); std::lock_guard<std::recursive_mutex> lock(_mtx);
_config_frame_map[pkt->type_id] = pkt; _config_frame_map[pkt->type_id] = pkt;
if (!_ring) { if (!_ring) {
//注册后收到config帧更新到各播放器 // 注册后收到config帧更新到各播放器
return; return;
} }
} }
if (!_ring) { if (!_ring) {
std::weak_ptr<RtmpMediaSource> weakSelf = std::static_pointer_cast<RtmpMediaSource>(shared_from_this()); std::weak_ptr<RtmpMediaSource> weak_self = std::static_pointer_cast<RtmpMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) { auto lam = [weak_self](int size) {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
strongSelf->onReaderChanged(size); strong_self->onReaderChanged(size);
}; };
//GOP默认缓冲512组RTMP包每组RTMP包时间戳相同(如果开启合并写了那么每组为合并写时间内的RTMP包), // GOP默认缓冲512组RTMP包每组RTMP包时间戳相同(如果开启合并写了那么每组为合并写时间内的RTMP包),
//每次遇到关键帧第一个RTMP包则会清空GOP缓存(因为有新的关键帧了,同样可以实现秒开) // 每次遇到关键帧第一个RTMP包则会清空GOP缓存(因为有新的关键帧了,同样可以实现秒开)
_ring = std::make_shared<RingType>(_ring_size, std::move(lam)); _ring = std::make_shared<RingType>(_ring_size, std::move(lam));
if (_metadata) { if (_metadata) {
regist(); regist();
@ -62,47 +85,42 @@ void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/)
PacketCache<RtmpPacket>::inputPacket(stamp, is_video, std::move(pkt), key); PacketCache<RtmpPacket>::inputPacket(stamp, is_video, std::move(pkt), key);
} }
RtmpMediaSourceImp::RtmpMediaSourceImp(const MediaTuple &tuple, int ringSize)
RtmpMediaSourceImp::RtmpMediaSourceImp(const MediaTuple& tuple, int ringSize) : RtmpMediaSource(tuple, ringSize) : RtmpMediaSource(tuple, ringSize) {
{
_demuxer = std::make_shared<RtmpDemuxer>(); _demuxer = std::make_shared<RtmpDemuxer>();
_demuxer->setTrackListener(this); _demuxer->setTrackListener(this);
} }
void RtmpMediaSourceImp::setMetaData(const AMFValue &metadata) void RtmpMediaSourceImp::setMetaData(const AMFValue &metadata) {
{
if (!_demuxer->loadMetaData(metadata)) { if (!_demuxer->loadMetaData(metadata)) {
//该metadata无效需要重新生成 // 该metadata无效需要重新生成
_metadata = metadata; _metadata = metadata;
_recreate_metadata = true; _recreate_metadata = true;
} }
RtmpMediaSource::setMetaData(metadata); RtmpMediaSource::setMetaData(metadata);
} }
void RtmpMediaSourceImp::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) void RtmpMediaSourceImp::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/) {
{
if (!_all_track_ready || _muxer->isEnabled()) { if (!_all_track_ready || _muxer->isEnabled()) {
//未获取到所有Track后或者开启转协议那么需要解复用rtmp // 未获取到所有Track后或者开启转协议那么需要解复用rtmp
_demuxer->inputRtmp(pkt); _demuxer->inputRtmp(pkt);
} }
RtmpMediaSource::onWrite(std::move(pkt)); RtmpMediaSource::onWrite(std::move(pkt));
} }
int RtmpMediaSourceImp::totalReaderCount() int RtmpMediaSourceImp::totalReaderCount() {
{
return readerCount() + (_muxer ? _muxer->totalReaderCount() : 0); return readerCount() + (_muxer ? _muxer->totalReaderCount() : 0);
} }
void RtmpMediaSourceImp::setProtocolOption(const ProtocolOption &option) void RtmpMediaSourceImp::setProtocolOption(const ProtocolOption &option) {
{ // 不重复生成rtmp
//不重复生成rtmp
_option = option; _option = option;
//不重复生成rtmp协议 // 不重复生成rtmp协议
_option.enable_rtmp = false; _option.enable_rtmp = false;
_muxer = std::make_shared<MultiMediaSourceMuxer>(_tuple, _demuxer->getDuration(), _option); _muxer = std::make_shared<MultiMediaSourceMuxer>(_tuple, _demuxer->getDuration(), _option);
_muxer->setMediaListener(getListener()); _muxer->setMediaListener(getListener());
_muxer->setTrackListener(std::static_pointer_cast<RtmpMediaSourceImp>(shared_from_this())); _muxer->setTrackListener(std::static_pointer_cast<RtmpMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件) // 让_muxer对象拦截一部分事件(比如说录像相关事件)
MediaSource::setListener(_muxer); MediaSource::setListener(_muxer);
for (auto &track : _demuxer->getTracks(false)) { for (auto &track : _demuxer->getTracks(false)) {
@ -111,8 +129,7 @@ void RtmpMediaSourceImp::setProtocolOption(const ProtocolOption &option)
} }
} }
bool RtmpMediaSourceImp::addTrack(const Track::Ptr &track) bool RtmpMediaSourceImp::addTrack(const Track::Ptr &track) {
{
if (_muxer) { if (_muxer) {
if (_muxer->addTrack(track)) { if (_muxer->addTrack(track)) {
track->addDelegate(_muxer); track->addDelegate(_muxer);
@ -122,45 +139,38 @@ bool RtmpMediaSourceImp::addTrack(const Track::Ptr &track)
return false; return false;
} }
void RtmpMediaSourceImp::addTrackCompleted() void RtmpMediaSourceImp::addTrackCompleted() {
{
if (_muxer) { if (_muxer) {
_muxer->addTrackCompleted(); _muxer->addTrackCompleted();
} }
} }
void RtmpMediaSourceImp::resetTracks() void RtmpMediaSourceImp::resetTracks() {
{
if (_muxer) { if (_muxer) {
_muxer->resetTracks(); _muxer->resetTracks();
} }
} }
void RtmpMediaSourceImp::onAllTrackReady() void RtmpMediaSourceImp::onAllTrackReady() {
{
_all_track_ready = true; _all_track_ready = true;
if (_recreate_metadata) { if (_recreate_metadata) {
//更新metadata // 更新metadata
for (auto &track : _muxer->getTracks()) { for (auto &track : _muxer->getTracks()) {
Metadata::addTrack(_metadata, track); Metadata::addTrack(_metadata, track);
} }
RtmpMediaSource::updateMetaData(_metadata); RtmpMediaSource::setMetaData(_metadata);
} }
} }
void RtmpMediaSourceImp::setListener(const std::weak_ptr<MediaSourceEvent> &listener) void RtmpMediaSourceImp::setListener(const std::weak_ptr<MediaSourceEvent> &listener) {
{
if (_muxer) { if (_muxer) {
//_muxer对象不能处理的事件再给listener处理 //_muxer对象不能处理的事件再给listener处理
_muxer->setMediaListener(listener); _muxer->setMediaListener(listener);
} } else {
else { // 未创建_muxer对象事件全部给listener处理
//未创建_muxer对象事件全部给listener处理
MediaSource::setListener(listener); MediaSource::setListener(listener);
} }
} }
} } // namespace mediakit

View File

@ -183,10 +183,14 @@ void RtmpPusher::send_metaData(){
throw std::runtime_error("the media source was released"); throw std::runtime_error("the media source was released");
} }
// metadata
src->getMetaData([&](const AMFValue &metadata) {
AMFEncoder enc; AMFEncoder enc;
enc << "@setDataFrame" << "onMetaData" << src->getMetaData(); enc << "@setDataFrame" << "onMetaData" << metadata;
sendRequest(MSG_DATA, enc.data()); sendRequest(MSG_DATA, enc.data());
});
// config frame
src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) {
sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id); sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id);
}); });
@ -207,7 +211,16 @@ void RtmpPusher::send_metaData(){
if (++i == size) { if (++i == size) {
strong_self->setSendFlushFlag(true); strong_self->setSendFlushFlag(true);
} }
if (rtmp->type_id == MSG_DATA) {
// update metadata
AMFEncoder enc;
enc << "@setDataFrame";
auto pkt = enc.data();
pkt.append(rtmp->data(), rtmp->size());
strong_self->sendRequest(MSG_DATA, pkt);
} else {
strong_self->sendRtmp(rtmp->type_id, strong_self->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id); strong_self->sendRtmp(rtmp->type_id, strong_self->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id);
}
}); });
}); });
_rtmp_reader->setDetachCB([weak_self]() { _rtmp_reader->setDetachCB([weak_self]() {

View File

@ -291,17 +291,14 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr
"description", "Now published." , "description", "Now published." ,
"details", _media_info.stream, "details", _media_info.stream,
"clientid", "0"}); "clientid", "0"});
// metadata
auto &metadata = src->getMetaData(); src->getMetaData([&](const AMFValue &metadata) {
if(metadata){
//在有metadata的情况下才发送metadata
//其实metadata没什么用有些推流器不产生metadata
// onMetaData
invoke.clear(); invoke.clear();
invoke << "onMetaData" << metadata; invoke << "onMetaData" << metadata;
sendResponse(MSG_DATA, invoke.data()); sendResponse(MSG_DATA, invoke.data());
} });
// config frame
src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) {
onSendMedia(pkt); onSendMedia(pkt);
}); });
@ -481,6 +478,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) {
throw std::runtime_error("can only set metadata"); throw std::runtime_error("can only set metadata");
} }
_push_metadata = dec.load<AMFValue>(); _push_metadata = dec.load<AMFValue>();
_set_meta_data = false;
} }
void RtmpSession::onProcessCmd(AMFDecoder &dec) { void RtmpSession::onProcessCmd(AMFDecoder &dec) {
@ -528,6 +526,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) {
} else if (type == "onMetaData") { } else if (type == "onMetaData") {
//兼容某些不规范的推流器 //兼容某些不规范的推流器
_push_metadata = dec.load<AMFValue>(); _push_metadata = dec.load<AMFValue>();
_set_meta_data = false;
} else { } else {
TraceP(this) << "unknown notify:" << type; TraceP(this) << "unknown notify:" << type;
} }