From 91aa9c494cedbd16a95853171baa4054086261f6 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Sat, 4 Jun 2022 11:23:18 +0800 Subject: [PATCH] srt can play stream --- srt/HSExt.cpp | 2 + srt/SrtTransport.cpp | 18 ++++++-- srt/SrtTransport.hpp | 4 +- srt/SrtTransportImp.cpp | 99 ++++++++++++++++------------------------- srt/SrtTransportImp.hpp | 8 ++++ 5 files changed, 64 insertions(+), 67 deletions(-) diff --git a/srt/HSExt.cpp b/srt/HSExt.cpp index 5398e48e..20a5b89c 100644 --- a/srt/HSExt.cpp +++ b/srt/HSExt.cpp @@ -80,6 +80,8 @@ bool HSExtStreamID::loadFromData(uint8_t *buf, size_t len) { streamid.push_back(*(ptr)); ptr+=4; } + char zero = 0x00; + streamid.erase(streamid.find_first_of(zero),streamid.size()); return true; } diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index bc185717..f06361c5 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -228,9 +228,11 @@ void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage sendControlPacket(pkt,true); } void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr){ - TraceL; + //TraceL; ACKPacket ack; - ack.loadFromData(buf,len); + if(!ack.loadFromData(buf,len)){ + return; + } ACKACKPacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; @@ -239,9 +241,16 @@ void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *add pkt->storeToData(); _send_buf->dropForSend(ack.last_ack_pkt_seq_number); sendControlPacket(pkt,true); + //TraceL<<"ack number "<(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp); + pkt->first_pkt_seq_num = first; + pkt->last_pkt_seq_num = last; + pkt->storeToData(); + sendControlPacket(pkt,true); } void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){ TraceL; @@ -472,7 +481,8 @@ size_t SrtTransport::getPayloadSize(){ size_t ret = (_mtu - 28 -16)/188*188; return ret; } -void SrtTransport::onSendData(const Buffer::Ptr &buffer, bool flush){ +void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush){ + //TraceL; DataPacket::Ptr pkt; size_t payloadSize = getPayloadSize(); size_t size = buffer->size(); diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 892a85c4..0cf93b35 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -37,7 +37,7 @@ public: * @param addr 数据来源地址 */ virtual void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr); - virtual void onSendData(const Buffer::Ptr &buffer, bool flush); + virtual void onSendTSData(const Buffer::Ptr &buffer, bool flush); std::string getIdentifier(); @@ -80,7 +80,7 @@ private: protected: void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true); - void sendPacket(Buffer::Ptr pkt,bool flush = true); + virtual void sendPacket(Buffer::Ptr pkt,bool flush = true); private: //当前选中的udp链接 Session::Ptr _selected_session; diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index e2b1c903..b5d72344 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -10,7 +10,7 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) SrtTransportImp::~SrtTransportImp() { InfoP(this); uint64_t duration = _alive_ticker.createdTime() / 1000; - WarnP(this) << "srt 推流器(" + WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info._vhost << "/" << _media_info._app << "/" << _media_info._streamid @@ -39,6 +39,7 @@ void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_ emitOnPublish(); }else{ _is_pusher = false; + emitOnPlay(); } } void SrtTransportImp::onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) { @@ -145,72 +146,48 @@ void SrtTransportImp::emitOnPlay(){ } } void SrtTransportImp::doPlay(){ - //鉴权结果回调 - weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); - auto onRes = [weak_self](const string &err) { + //异步查找直播流 + std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); + + MediaInfo info = _media_info; + info._schema = TS_SCHEMA; + MediaSource::findAsync(info, getSession(), [weak_self](const MediaSource::Ptr &src) { auto strong_self = weak_self.lock(); if (!strong_self) { //本对象已经销毁 + TraceL<<"本对象已经销毁"; return; } - - if (!err.empty()) { - //播放鉴权失败 - strong_self->onShutdown(SockException(Err_refused, err)); - return; - } - - //异步查找直播流 - MediaInfo info = strong_self->_media_info; - info._schema = TS_SCHEMA; - MediaSource::findAsync(info, strong_self->getSession(), [weak_self](const MediaSource::Ptr &src) { - auto strong_self = weak_self.lock(); - if (!strong_self) { - //本对象已经销毁 - return; - } - if (!src) { - //未找到该流 + if (!src) { + //未找到该流 + TraceL<<"未找到该流"; + strong_self->onShutdown(SockException(Err_shutdown)); + } else { + TraceL<<"找到该流"; + auto ts_src = dynamic_pointer_cast(src); + assert(ts_src); + ts_src->pause(false); + strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller()); + strong_self->_ts_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + //本对象已经销毁 + return; + } strong_self->onShutdown(SockException(Err_shutdown)); - } else { - auto ts_src = dynamic_pointer_cast(src); - assert(ts_src); - ts_src->pause(false); - strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller()); - strong_self->_ts_reader->setDetachCB([weak_self]() { - auto strong_self = weak_self.lock(); - if (!strong_self) { - //本对象已经销毁 - return; - } - strong_self->onShutdown(SockException(Err_shutdown)); - }); - strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { - auto strong_self = weak_self.lock(); - if (!strong_self) { - //本对象已经销毁 - return; - } - size_t i = 0; - auto size = ts_list->size(); - ts_list->for_each([&](const TSPacket::Ptr &ts) { strong_self->onSendData(ts, ++i == size); }); - }); - }; - - }); - }; - - Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) { - if (auto strongSelf = weak_self.lock()) { - strongSelf->getPoller()->async([onRes, err]() { onRes(err); }); - } - }; - - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); - if (!flag) { - //该事件无人监听,默认不鉴权 - onRes(""); - } + }); + strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + //本对象已经销毁 + return; + } + size_t i = 0; + auto size = ts_list->size(); + ts_list->for_each([&](const TSPacket::Ptr &ts) { strong_self->onSendTSData(ts, ++i == size); }); + }); + }; + }); } std::string SrtTransportImp::get_peer_ip() { if (!_addr) { diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index 80106d2d..8abe2b40 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -23,6 +23,9 @@ public: SrtTransport::inputSockData(buf,len,addr); _total_bytes += len; } + void onSendTSData(const Buffer::Ptr &buffer, bool flush){ + SrtTransport::onSendTSData(buffer,flush); + } /// SockInfo override std::string get_local_ip() override; uint16_t get_local_port() override; @@ -36,6 +39,11 @@ protected: void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) override; void onShutdown(const SockException &ex) override; + void sendPacket(Buffer::Ptr pkt,bool flush = true) override{ + _total_bytes += pkt->size(); + SrtTransport::sendPacket(pkt,flush); + }; + ///////MediaSourceEvent override/////// // 关闭 bool close(mediakit::MediaSource &sender, bool force) override;