From 82da99eef3bd494bd46577591a6c9d48b76e1ef3 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Fri, 3 Jun 2022 21:17:01 +0800 Subject: [PATCH] add shutdown packet and flow report --- srt/Packet.hpp | 27 +++++++++++++++++++++++++++ srt/SrtTransport.cpp | 9 +++++++++ srt/SrtTransport.hpp | 3 ++- srt/SrtTransportImp.cpp | 12 ++++++++++++ srt/SrtTransportImp.hpp | 8 ++++++-- 5 files changed, 56 insertions(+), 3 deletions(-) diff --git a/srt/Packet.hpp b/srt/Packet.hpp index f2fbc98a..ddc784fd 100644 --- a/srt/Packet.hpp +++ b/srt/Packet.hpp @@ -5,6 +5,7 @@ #include #include "Network/Buffer.h" +#include "Util/logger.h" #include "Common.hpp" #include "HSExt.hpp" @@ -312,6 +313,32 @@ class MsgDropReqPacket : public ControlPacket uint32_t last_pkt_seq_num; }; +class ShutDownPacket : public ControlPacket +{ +public: + using Ptr = std::shared_ptr; + ShutDownPacket() = default; + ~ShutDownPacket() = default; + ///////ControlPacket override/////// + bool loadFromData(uint8_t *buf, size_t len) override { + if (len < HEADER_SIZE) { + WarnL << "data size" << len << " less " << HEADER_SIZE; + return false; + } + _data = BufferRaw::create(); + _data->assign((char *)buf, len); + + return loadHeader(); + }; + bool storeToData() override { + control_type = ControlPacket::SHUTDOWN; + sub_type = 0; + _data = BufferRaw::create(); + _data->setCapacity(HEADER_SIZE); + _data->setSize(HEADER_SIZE); + return storeToHeader(); + }; +}; } // namespace SRT #endif //ZLMEDIAKIT_SRT_PACKET_H \ No newline at end of file diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index 354d36f4..b9fef358 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -329,6 +329,14 @@ void SrtTransport::sendNAKPacket(std::list& lost_list){ TraceL<<"send NAK "<dump(); sendControlPacket(pkt,true); } + +void SrtTransport::sendShutDown(){ + ShutDownPacket::Ptr pkt = std::make_shared(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->storeToData(); + sendControlPacket(pkt,true); +} void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr){ DataPacket::Ptr pkt = std::make_shared(); pkt->loadFromData(buf,len); @@ -423,6 +431,7 @@ void SrtTransport::unregisterSelf() { } void SrtTransport::onShutdown(const SockException &ex){ + sendShutDown(); WarnL << ex.what(); unregisterSelfHandshake(); unregisterSelf(); diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 6d2b9e48..02ff395e 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -36,7 +36,7 @@ public: * @param len 数据长度 * @param addr 数据来源地址 */ - void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr); + virtual void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr); std::string getIdentifier(); @@ -72,6 +72,7 @@ private: void sendACKPacket(); void sendLightACKPacket(); void sendKeepLivePacket(); + void sendShutDown(); protected: void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true); diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 0d8f90cc..9d06659c 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -9,6 +9,18 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) SrtTransportImp::~SrtTransportImp() { InfoP(this); + uint64_t duration = _alive_ticker.createdTime() / 1000; + WarnP(this) << "srt 推流器(" + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid + << ")断开,耗时(s):" << duration; + + //流量统计事件广播 + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); + if (_total_bytes >= iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast(*this)); + } } void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_storage *addr) { diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index 31f932b5..a1f3b65b 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -17,7 +17,10 @@ class SrtTransportImp public: SrtTransportImp(const EventPoller::Ptr &poller); ~SrtTransportImp(); - + void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr){ + SrtTransport::inputSockData(buf,len,addr); + _total_bytes += len; + } /// SockInfo override std::string get_local_ip() override; uint16_t get_local_port() override; @@ -58,7 +61,8 @@ private: private: bool _is_pusher = true; MediaInfo _media_info; - + uint64_t _total_bytes = 0; + Ticker _alive_ticker; std::unique_ptr _addr; // for pusher