From 9fadc22c80f6113635f9fe15493f5e32d86f5409 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Fri, 3 Jun 2022 16:47:53 +0800 Subject: [PATCH] srt can push but bandwith estimate has error --- srt/SrtSession.cpp | 4 +- srt/SrtTransport.cpp | 4 +- srt/SrtTransport.hpp | 4 +- srt/SrtTransportImp.cpp | 220 ++++++++++++++++++++++++++++++++++++++++ srt/SrtTransportImp.hpp | 73 +++++++++++++ 5 files changed, 299 insertions(+), 6 deletions(-) create mode 100644 srt/SrtTransportImp.cpp create mode 100644 srt/SrtTransportImp.hpp diff --git a/srt/SrtSession.cpp b/srt/SrtSession.cpp index a20246d4..ed7f4547 100644 --- a/srt/SrtSession.cpp +++ b/srt/SrtSession.cpp @@ -1,6 +1,6 @@ #include "SrtSession.hpp" #include "Packet.hpp" -#include "SrtTransport.hpp" +#include "SrtTransportImp.hpp" #include "Common/config.h" @@ -70,7 +70,7 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) { auto type = HandshakePacket::getHandshakeType(data, size); if (type == HandshakePacket::HS_TYPE_INDUCTION) { // 握手第一阶段 - _transport = std::make_shared(getPoller()); + _transport = std::make_shared(getPoller()); } else if (type == HandshakePacket::HS_TYPE_CONCLUSION) { // 握手第二阶段 diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index 4086ffe2..b86869b9 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -189,7 +189,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad sendControlPacket(res, true); TraceL<<" buf size = "<max_flow_window_size<<" init seq ="<<_init_seq_number<<" lantency="<recv_tsbpd_delay; _recv_buf = std::make_shared(res->max_flow_window_size,_init_seq_number, req->recv_tsbpd_delay*1e6); - onHandShakeFinished(_stream_id); + onHandShakeFinished(_stream_id,addr); } else { TraceL << getIdentifier() << " CONCLUSION handle repeate "; sendControlPacket(_handleshake_res, true); @@ -356,7 +356,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora auto list = _recv_buf->tryGetPacket(); for(auto data : list){ - onSRTData(std::move(data)); + onSRTData(std::move(data),addr); } } diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 5ecd24ca..c67212a2 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -44,8 +44,8 @@ public: void unregisterSelfHandshake(); void unregisterSelf(); protected: - virtual void onHandShakeFinished(std::string& streamid){}; - virtual void onSRTData(DataPacket::Ptr pkt){}; + virtual void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr){}; + virtual void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr){}; virtual void onShutdown(const SockException &ex); private: diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp new file mode 100644 index 00000000..2367c881 --- /dev/null +++ b/srt/SrtTransportImp.cpp @@ -0,0 +1,220 @@ +#include +#include "Util/util.h" + +#include "SrtTransportImp.hpp" + +namespace SRT { +SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) + : SrtTransport(poller) {} + +SrtTransportImp::~SrtTransportImp() { + InfoP(this); +} + +void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_storage *addr) { + + // TODO parse streamid like this zlmediakit.com/live/test?token=1213444&type=pusher + if(!_addr){ + _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); + } + + _media_info.parse("srt://"+streamid); + + auto params = Parser::parseArgs(_media_info._param_strs); + if(params["type"] == "push"){ + _is_pusher = true; + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this); + emitOnPublish(); + }else{ + _is_pusher = false; + } +} +void SrtTransportImp::onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) { + if(!_is_pusher){ + WarnP(this)<<"this is a player data ignore"; + return; + } + if(!_addr){ + _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); + } + if (_decoder) { + _decoder->input(reinterpret_cast(pkt->payloadData()), pkt->payloadSize()); + }else{ + WarnP(this)<<" not reach this"; + } +} +void SrtTransportImp::onShutdown(const SockException &ex) { + SrtTransport::onShutdown(ex); +} + +bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force){ + if (!force && totalReaderCount(sender)) { + return false; + } + std::string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" + << sender.getApp() << "/" << sender.getId() << " " << force; + weak_ptr weak_self = static_pointer_cast(shared_from_this()); + getPoller()->async([weak_self, err]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->onShutdown(SockException(Err_shutdown, err)); + //主动关闭推流,那么不延时注销 + strong_self->_muxer = nullptr; + } + }); + return true; +} +// 播放总人数 +int SrtTransportImp::totalReaderCount(mediakit::MediaSource &sender){ + return _muxer ? _muxer->totalReaderCount() : sender.readerCount(); +} +// 获取媒体源类型 +mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &sender) const{ + return MediaOriginType::srt_push; +} +// 获取媒体源url或者文件路径 +std::string SrtTransportImp::getOriginUrl(mediakit::MediaSource &sender) const{ + return _media_info._full_url; +} +// 获取媒体源客户端相关信息 +std::shared_ptr SrtTransportImp::getOriginSock(mediakit::MediaSource &sender) const{ + return static_pointer_cast(getSession()); +} + +void SrtTransportImp::emitOnPublish() { + std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); + Broadcast::PublishAuthInvoker invoker = [weak_self](const std::string &err, const ProtocolOption &option) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + if (err.empty()) { + strong_self->_muxer = std::make_shared(strong_self->_media_info._vhost, + strong_self->_media_info._app, + strong_self->_media_info._streamid, 0.0f, + option); + strong_self->_muxer->setMediaListener(strong_self); + strong_self->doCachedFunc(); + InfoP(strong_self) << "允许 srt 推流"; + } else { + WarnP(strong_self) << "禁止 srt 推流:" << err; + strong_self->onShutdown(SockException(Err_refused,err)); + } + }; + + //触发推流鉴权事件 + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::rtp_push, _media_info, invoker, static_cast(*this)); + if (!flag) { + //该事件无人监听,默认不鉴权 + invoker("", ProtocolOption()); + } +} + + +void SrtTransportImp::emitOnPlay(){ + std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); + Broadcast::AuthInvoker invoker = [weak_self](const string &err){ + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + strong_self->getPoller()->async([strong_self,err]{ + if(err != ""){ + strong_self->onShutdown(SockException(Err_refused,err)); + }else{ + strong_self->doPlay(); + } + }); + }; + + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); + if(!flag){ + doPlay(); + } +} +void SrtTransportImp::doPlay(){ + +} +std::string SrtTransportImp::get_peer_ip() { + if (!_addr) { + return "::"; + } + return SockUtil::inet_ntoa((sockaddr *)_addr.get()); +} + +uint16_t SrtTransportImp::get_peer_port() { + if (!_addr) { + return 0; + } + return SockUtil::inet_port((sockaddr *)_addr.get()); +} + +std::string SrtTransportImp::get_local_ip() { + auto s = getSession(); + if (s) { + return s->get_local_ip(); + } + return "::"; +} + +uint16_t SrtTransportImp::get_local_port() { + auto s = getSession(); + if (s) { + return s->get_local_port(); + } + return 0; +} + +std::string SrtTransportImp::getIdentifier() const { + return _media_info._streamid; +} + +bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) { + if (_muxer) { + return _muxer->inputFrame(frame); + } + if (_cached_func.size() > 200) { + WarnL << "cached frame of track(" << frame->getCodecName() << ") is too much, now dropped"; + return false; + } + auto frame_cached = Frame::getCacheAbleFrame(frame); + lock_guard lck(_func_mtx); + _cached_func.emplace_back([this, frame_cached]() { + _muxer->inputFrame(frame_cached); + }); + return true; +} + +bool SrtTransportImp::addTrack(const Track::Ptr &track) { + if (_muxer) { + return _muxer->addTrack(track); + } + + lock_guard lck(_func_mtx); + _cached_func.emplace_back([this, track]() { + _muxer->addTrack(track); + }); + return true; +} + +void SrtTransportImp::addTrackCompleted() { + if (_muxer) { + _muxer->addTrackCompleted(); + } else { + lock_guard lck(_func_mtx); + _cached_func.emplace_back([this]() { + _muxer->addTrackCompleted(); + }); + } +} + +void SrtTransportImp::doCachedFunc() { + lock_guard lck(_func_mtx); + for (auto &func : _cached_func) { + func(); + } + _cached_func.clear(); +} + + +} // namespace SRT \ No newline at end of file diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp new file mode 100644 index 00000000..31f932b5 --- /dev/null +++ b/srt/SrtTransportImp.hpp @@ -0,0 +1,73 @@ +#ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H +#define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H + +#include "Common/MultiMediaSourceMuxer.h" +#include "Rtp/Decoder.h" +#include "SrtTransport.hpp" + +namespace SRT { + using namespace toolkit; + using namespace mediakit; + using namespace std; +class SrtTransportImp + : public SrtTransport + , public toolkit::SockInfo + , public MediaSinkInterface + , public mediakit::MediaSourceEvent { +public: + SrtTransportImp(const EventPoller::Ptr &poller); + ~SrtTransportImp(); + + /// SockInfo override + std::string get_local_ip() override; + uint16_t get_local_port() override; + std::string get_peer_ip() override; + uint16_t get_peer_port() override; + std::string getIdentifier() const override; + +protected: + ///////SrtTransport override/////// + void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr) override; + void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) override; + void onShutdown(const SockException &ex) override; + + ///////MediaSourceEvent override/////// + // 关闭 + bool close(mediakit::MediaSource &sender, bool force) override; + // 播放总人数 + int totalReaderCount(mediakit::MediaSource &sender) override; + // 获取媒体源类型 + mediakit::MediaOriginType getOriginType(mediakit::MediaSource &sender) const override; + // 获取媒体源url或者文件路径 + std::string getOriginUrl(mediakit::MediaSource &sender) const override; + // 获取媒体源客户端相关信息 + std::shared_ptr getOriginSock(mediakit::MediaSource &sender) const override; + + bool inputFrame(const Frame::Ptr &frame) override; + bool addTrack(const Track::Ptr & track) override; + void addTrackCompleted() override; + void resetTracks() override {}; + +private: + void emitOnPublish(); + void emitOnPlay(); + + void doPlay(); + void doCachedFunc(); + +private: + bool _is_pusher = true; + MediaInfo _media_info; + + std::unique_ptr _addr; + + // for pusher + MultiMediaSourceMuxer::Ptr _muxer; + DecoderImp::Ptr _decoder; + std::recursive_mutex _func_mtx; + std::deque > _cached_func; +}; + +} // namespace SRT + +#endif // ZLMEDIAKIT_SRT_TRANSPORT_IMP_H