From c92fc8a4a8a2cb83d5452672f0deb4599ed16ad6 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Mon, 6 Jun 2022 20:40:04 +0800 Subject: [PATCH 1/2] format code and remove some useless code --- srt/Ack.cpp | 42 ++--- srt/Ack.hpp | 26 ++- srt/Common.hpp | 46 ++--- srt/HSExt.cpp | 124 +++++++------- srt/HSExt.hpp | 2 +- srt/Packet.cpp | 285 ++++++++++++++----------------- srt/Packet.hpp | 40 ++--- srt/PacketQueue.cpp | 157 ++++++++--------- srt/PacketQueue.hpp | 9 +- srt/PacketSendQueue.cpp | 2 +- srt/PacketSendQueue.hpp | 4 +- srt/SrtSession.cpp | 70 ++++---- srt/SrtSession.hpp | 3 +- srt/SrtTransport.cpp | 366 ++++++++++++++++++++-------------------- srt/SrtTransport.hpp | 51 +++--- srt/SrtTransportImp.cpp | 113 ++++++------- srt/SrtTransportImp.hpp | 39 ++--- srt/Statistic.cpp | 69 ++++---- srt/Statistic.hpp | 29 ++-- 19 files changed, 715 insertions(+), 762 deletions(-) diff --git a/srt/Ack.cpp b/srt/Ack.cpp index 7be5a08d..ba2e67c5 100644 --- a/srt/Ack.cpp +++ b/srt/Ack.cpp @@ -2,8 +2,8 @@ #include "Common.hpp" namespace SRT { -bool ACKPacket::loadFromData(uint8_t *buf, size_t len) { - if(len < ACK_CIF_SIZE + ControlPacket::HEADER_SIZE){ +bool ACKPacket::loadFromData(uint8_t *buf, size_t len) { + if (len < ACK_CIF_SIZE + ControlPacket::HEADER_SIZE) { return false; } @@ -11,7 +11,7 @@ bool ACKPacket::loadFromData(uint8_t *buf, size_t len) { _data->assign((char *)(buf), len); ControlPacket::loadHeader(); ack_number = loadUint32(type_specific_info); - uint8_t* ptr = (uint8_t*)_data->data()+ControlPacket::HEADER_SIZE; + uint8_t *ptr = (uint8_t *)_data->data() + ControlPacket::HEADER_SIZE; last_ack_pkt_seq_number = loadUint32(ptr); ptr += 4; @@ -32,52 +32,52 @@ bool ACKPacket::loadFromData(uint8_t *buf, size_t len) { ptr += 4; recv_rate = loadUint32(ptr); - ptr += 4; + ptr += 4; return true; } -bool ACKPacket::storeToData() { +bool ACKPacket::storeToData() { _data = BufferRaw::create(); _data->setCapacity(HEADER_SIZE + ACK_CIF_SIZE); _data->setSize(HEADER_SIZE + ACK_CIF_SIZE); control_type = ControlPacket::ACK; sub_type = 0; - storeUint32(type_specific_info,ack_number); + storeUint32(type_specific_info, ack_number); storeToHeader(); - uint8_t* ptr = (uint8_t*)_data->data()+ControlPacket::HEADER_SIZE; - - storeUint32(ptr,last_ack_pkt_seq_number); + uint8_t *ptr = (uint8_t *)_data->data() + ControlPacket::HEADER_SIZE; + + storeUint32(ptr, last_ack_pkt_seq_number); ptr += 4; - storeUint32(ptr,rtt); + storeUint32(ptr, rtt); ptr += 4; - storeUint32(ptr,rtt_variance); + storeUint32(ptr, rtt_variance); ptr += 4; - storeUint32(ptr,pkt_recv_rate); + storeUint32(ptr, pkt_recv_rate); ptr += 4; - storeUint32(ptr,available_buf_size); + storeUint32(ptr, available_buf_size); ptr += 4; - storeUint32(ptr,estimated_link_capacity); + storeUint32(ptr, estimated_link_capacity); ptr += 4; - storeUint32(ptr,recv_rate); + storeUint32(ptr, recv_rate); ptr += 4; return true; } -std::string ACKPacket::dump(){ +std::string ACKPacket::dump() { _StrPrinter printer; - printer << "last_ack_pkt_seq_number="<; ACKPacket() = default; ~ACKPacket() = default; - enum{ - ACK_CIF_SIZE = 7*4 - }; + enum { ACK_CIF_SIZE = 7 * 4 }; std::string dump(); ///////ControlPacket override/////// bool loadFromData(uint8_t *buf, size_t len) override; @@ -59,15 +55,14 @@ public: uint32_t recv_rate; }; - -class ACKACKPacket : public ControlPacket{ +class ACKACKPacket : public ControlPacket { public: using Ptr = std::shared_ptr; ACKACKPacket() = default; ~ACKACKPacket() = default; ///////ControlPacket override/////// - bool loadFromData(uint8_t *buf, size_t len) override{ - if(len < ControlPacket::HEADER_SIZE){ + bool loadFromData(uint8_t *buf, size_t len) override { + if (len < ControlPacket::HEADER_SIZE) { return false; } _data = BufferRaw::create(); @@ -76,21 +71,20 @@ public: ack_number = loadUint32(type_specific_info); return true; } - bool storeToData() override{ + bool storeToData() override { _data = BufferRaw::create(); _data->setCapacity(HEADER_SIZE); - _data->setSize(HEADER_SIZE ); + _data->setSize(HEADER_SIZE); control_type = ControlPacket::ACKACK; sub_type = 0; - storeUint32(type_specific_info,ack_number); + storeUint32(type_specific_info, ack_number); storeToHeader(); return true; } uint32_t ack_number; - }; -} //namespace SRT +} // namespace SRT #endif // ZLMEDIAKIT_SRT_ACK_H \ No newline at end of file diff --git a/srt/Common.hpp b/srt/Common.hpp index d0ba6207..90ca20b0 100644 --- a/srt/Common.hpp +++ b/srt/Common.hpp @@ -1,16 +1,26 @@ #ifndef ZLMEDIAKIT_SRT_COMMON_H #define ZLMEDIAKIT_SRT_COMMON_H +#if defined(_WIN32) +#include +#include +#include +#pragma comment(lib, "Ws2_32.lib") +#pragma comment(lib, "Iphlpapi.lib") +#else +#include +#include +#endif // defined(_WIN32) + #include -namespace SRT -{ +namespace SRT { using SteadyClock = std::chrono::steady_clock; using TimePoint = std::chrono::time_point; using Microseconds = std::chrono::microseconds; using Milliseconds = std::chrono::milliseconds; -inline int64_t DurationCountMicroseconds( SteadyClock::duration dur){ +inline int64_t DurationCountMicroseconds(SteadyClock::duration dur) { return std::chrono::duration_cast(dur).count(); } @@ -37,48 +47,38 @@ inline void storeUint32LE(uint8_t *buf, uint32_t val) { buf[0] = val & 0xff; buf[1] = (val >> 8) & 0xff; buf[2] = (val >> 16) & 0xff; - buf[3] = (val >>24) & 0xff; + buf[3] = (val >> 24) & 0xff; } inline void storeUint16LE(uint8_t *buf, uint16_t val) { buf[0] = val & 0xff; - buf[1] = (val>>8) & 0xff; + buf[1] = (val >> 8) & 0xff; } -inline uint32_t srtVersion(int major, int minor, int patch) -{ - return patch + minor*0x100 + major*0x10000; +inline uint32_t srtVersion(int major, int minor, int patch) { + return patch + minor * 0x100 + major * 0x10000; } class UTicker { public: - UTicker() { - _created = _begin = SteadyClock::now(); - } + UTicker() { _created = _begin = SteadyClock::now(); } - ~UTicker() { - } + ~UTicker() {} /** * 获取创建时间,单位微妙 */ - int64_t elapsedTime(TimePoint now) const { - return DurationCountMicroseconds(now - _begin); - } + int64_t elapsedTime(TimePoint now) const { return DurationCountMicroseconds(now - _begin); } /** * 获取上次resetTime后至今的时间,单位毫秒 */ - int64_t createdTime(TimePoint now) const { - return DurationCountMicroseconds(now - _created); - } + int64_t createdTime(TimePoint now) const { return DurationCountMicroseconds(now - _created); } /** * 重置计时器 */ - void resetTime(TimePoint now) { - _begin = now; - } + void resetTime(TimePoint now) { _begin = now; } private: TimePoint _begin; @@ -87,4 +87,4 @@ private: } // namespace SRT -#endif //ZLMEDIAKIT_SRT_COMMON_H \ No newline at end of file +#endif // ZLMEDIAKIT_SRT_COMMON_H \ No newline at end of file diff --git a/srt/HSExt.cpp b/srt/HSExt.cpp index ab4d4ff2..3a3a7809 100644 --- a/srt/HSExt.cpp +++ b/srt/HSExt.cpp @@ -2,19 +2,19 @@ namespace SRT { bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) { - if(buf == NULL || len != HSEXT_MSG_SIZE){ + if (buf == NULL || len != HSEXT_MSG_SIZE) { return false; } _data = BufferRaw::create(); - _data->assign((char*)buf,len); + _data->assign((char *)buf, len); extension_length = 3; HSExt::loadHeader(); assert(extension_type == SRT_CMD_HSREQ || extension_type == SRT_CMD_HSRSP); - uint8_t* ptr = (uint8_t*)_data->data()+4; - srt_version = loadUint32(ptr); + uint8_t *ptr = (uint8_t *)_data->data() + 4; + srt_version = loadUint32(ptr); ptr += 4; srt_flag = loadUint32(ptr); @@ -27,105 +27,103 @@ bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) { ptr += 2; return true; - - } - std::string HSExtMessage::dump(){ - _StrPrinter printer; - printer << "srt version : "<; + using Ptr = std::shared_ptr; HSExtStreamID() = default; ~HSExtStreamID() = default; bool loadFromData(uint8_t *buf, size_t len) override; diff --git a/srt/Packet.cpp b/srt/Packet.cpp index 18ad279c..7f2b6cc2 100644 --- a/srt/Packet.cpp +++ b/srt/Packet.cpp @@ -1,28 +1,11 @@ - -#if defined(_WIN32) -#include -#include -#include -#pragma comment (lib, "Ws2_32.lib") -#pragma comment(lib,"Iphlpapi.lib") -#else -#include -#include -#endif // defined(_WIN32) - - - -#include +#include "Util/MD5.h" #include "Util/logger.h" -#include "Util/MD5.h" +#include #include "Packet.hpp" - - namespace SRT { - const size_t DataPacket::HEADER_SIZE; const size_t ControlPacket::HEADER_SIZE; const size_t HandshakePacket::HS_CONTENT_MIN_SIZE; @@ -38,7 +21,7 @@ bool DataPacket::isDataPacket(uint8_t *buf, size_t len) { return false; } -uint32_t DataPacket::getSocketID(uint8_t *buf, size_t len){ +uint32_t DataPacket::getSocketID(uint8_t *buf, size_t len) { uint8_t *ptr = buf; ptr += 12; return loadUint32(ptr); @@ -51,7 +34,7 @@ bool DataPacket::loadFromData(uint8_t *buf, size_t len) { } uint8_t *ptr = buf; f = ptr[0] >> 7; - packet_seq_number = loadUint32(ptr)&0x7fffffff; + packet_seq_number = loadUint32(ptr) & 0x7fffffff; ptr += 4; PP = ptr[0] >> 6; @@ -71,7 +54,7 @@ bool DataPacket::loadFromData(uint8_t *buf, size_t len) { _data->assign((char *)(buf), len); return true; } -bool DataPacket::storeToHeader(){ +bool DataPacket::storeToHeader() { if (!_data || _data->size() < HEADER_SIZE) { WarnL << "data size less " << HEADER_SIZE; return false; @@ -158,8 +141,6 @@ size_t DataPacket::payloadSize() { return _data->size() - HEADER_SIZE; } - - bool ControlPacket::isControlPacket(uint8_t *buf, size_t len) { if (len < HEADER_SIZE) { WarnL << "data size" << len << " less " << HEADER_SIZE; @@ -234,11 +215,11 @@ size_t ControlPacket::size() const { } return _data->size(); } -uint32_t ControlPacket::getSocketID(uint8_t *buf, size_t len){ - return loadUint32(buf+12); +uint32_t ControlPacket::getSocketID(uint8_t *buf, size_t len) { + return loadUint32(buf + 12); } bool HandshakePacket::loadFromData(uint8_t *buf, size_t len) { - if(HEADER_SIZE+HS_CONTENT_MIN_SIZE > len){ + if (HEADER_SIZE + HS_CONTENT_MIN_SIZE > len) { ErrorL << "size too smalle " << encryption_field; return false; } @@ -282,79 +263,77 @@ bool HandshakePacket::loadFromData(uint8_t *buf, size_t len) { ErrorL << "not support encryption " << encryption_field; } - if(extension_field == 0){ + if (extension_field == 0) { return true; } - if(len == HEADER_SIZE+HS_CONTENT_MIN_SIZE){ - //ErrorL << "extension filed not exist " << extension_field; + if (len == HEADER_SIZE + HS_CONTENT_MIN_SIZE) { + // ErrorL << "extension filed not exist " << extension_field; return true; } - return loadExtMessage(ptr,len-HS_CONTENT_MIN_SIZE-HEADER_SIZE); + return loadExtMessage(ptr, len - HS_CONTENT_MIN_SIZE - HEADER_SIZE); } -bool HandshakePacket::loadExtMessage(uint8_t *buf,size_t len){ - uint8_t* ptr = buf; - ext_list.clear(); - uint16_t type; - uint16_t length; - HSExt::Ptr ext; - while(ptr(); break; case HSExt::SRT_CMD_SID: ext = std::make_shared(); break; default: - WarnL<<"not support ext "<loadFromData(ptr,length*4+4)){ - ext_list.push_back(std::move(ext)); - }else{ - WarnL<<"parse HS EXT failed type="<assign((char*)buf,len); - + _data->assign((char *)buf, len); + return loadHeader(); } -bool KeepLivePacket::storeToData(){ +bool KeepLivePacket::storeToData() { control_type = ControlPacket::KEEPALIVE; sub_type = 0; @@ -506,22 +480,21 @@ bool NAKPacket::loadFromData(uint8_t *buf, size_t len) { return false; } _data = BufferRaw::create(); - _data->assign((char*)buf,len); + _data->assign((char *)buf, len); loadHeader(); - uint8_t* ptr = (uint8_t*)_data->data()+HEADER_SIZE; - uint8_t* end = (uint8_t*)_data->data()+_data->size(); + uint8_t *ptr = (uint8_t *)_data->data() + HEADER_SIZE; + uint8_t *end = (uint8_t *)_data->data() + _data->size(); LostPair lost; - while (ptrsetCapacity(HEADER_SIZE+cif_size); - _data->setSize(HEADER_SIZE+cif_size); + _data->setCapacity(HEADER_SIZE + cif_size); + _data->setSize(HEADER_SIZE + cif_size); storeToHeader(); - uint8_t* ptr = (uint8_t*)_data->data()+HEADER_SIZE; + uint8_t *ptr = (uint8_t *)_data->data() + HEADER_SIZE; - for(auto it : lost_list){ - if(it.first+1 ==it.second){ - storeUint32(ptr,it.first); - ptr[0] = ptr[0]&0x7f; - ptr += 4; - }else{ - storeUint32(ptr,it.first); - ptr[0] |= 0x80; + for (auto it : lost_list) { + if (it.first + 1 == it.second) { + storeUint32(ptr, it.first); + ptr[0] = ptr[0] & 0x7f; + ptr += 4; + } else { + storeUint32(ptr, it.first); + ptr[0] |= 0x80; - storeUint32(ptr+4,it.second-1); - //ptr[4] = ptr[4]&0x7f; + storeUint32(ptr + 4, it.second - 1); + // ptr[4] = ptr[4]&0x7f; - ptr += 8; + ptr += 8; } } return true; } -size_t NAKPacket::getCIFSize(){ +size_t NAKPacket::getCIFSize() { size_t size = 0; - for(auto it : lost_list){ - if(it.first+1 ==it.second){ + for (auto it : lost_list) { + if (it.first + 1 == it.second) { size += 4; - }else{ + } else { size += 8; } } return size; } -std::string NAKPacket::dump(){ +std::string NAKPacket::dump() { _StrPrinter printer; for (auto it : lost_list) { - printer<<"[ "<assign((char*)buf,len); + _data->assign((char *)buf, len); loadHeader(); - uint8_t* ptr = (uint8_t*)_data->data()+HEADER_SIZE; + uint8_t *ptr = (uint8_t *)_data->data() + HEADER_SIZE; first_pkt_seq_num = loadUint32(ptr); ptr += 4; @@ -602,17 +575,17 @@ bool MsgDropReqPacket::storeToData() { control_type = DROPREQ; sub_type = 0; _data = BufferRaw::create(); - _data->setCapacity(HEADER_SIZE+8); - _data->setSize(HEADER_SIZE+8); + _data->setCapacity(HEADER_SIZE + 8); + _data->setSize(HEADER_SIZE + 8); storeToHeader(); - uint8_t* ptr = (uint8_t*)_data->data()+HEADER_SIZE; + uint8_t *ptr = (uint8_t *)_data->data() + HEADER_SIZE; - storeUint32(ptr,first_pkt_seq_num); + storeUint32(ptr, first_pkt_seq_num); ptr += 4; - storeUint32(ptr,last_pkt_seq_num); + storeUint32(ptr, last_pkt_seq_num); ptr += 4; return true; } diff --git a/srt/Packet.hpp b/srt/Packet.hpp index bab6df0c..7ccfa643 100644 --- a/srt/Packet.hpp +++ b/srt/Packet.hpp @@ -62,8 +62,6 @@ public: uint32_t timestamp; uint32_t dst_socket_id; - TimePoint get_ts; // recv or send time - private: BufferRaw::Ptr _data; }; @@ -171,7 +169,7 @@ class HandshakePacket : public ControlPacket { public: using Ptr = std::shared_ptr; enum { NO_ENCRYPTION = 0, AES_128 = 1, AES_196 = 2, AES_256 = 3 }; - static const size_t HS_CONTENT_MIN_SIZE = 48; + static const size_t HS_CONTENT_MIN_SIZE = 48; enum { HS_TYPE_DONE = 0xFFFFFFFD, HS_TYPE_AGREEMENT = 0xFFFFFFFE, @@ -181,18 +179,17 @@ public: }; enum { HS_EXT_FILED_HSREQ = 0x00000001, HS_EXT_FILED_KMREQ = 0x00000002, HS_EXT_FILED_CONFIG = 0x00000004 }; - - - + HandshakePacket() = default; ~HandshakePacket() = default; static bool isHandshakePacket(uint8_t *buf, size_t len); static uint32_t getHandshakeType(uint8_t *buf, size_t len); static uint32_t getSynCookie(uint8_t *buf, size_t len); - static uint32_t generateSynCookie(struct sockaddr_storage* addr,TimePoint ts,uint32_t current_cookie = 0, int correction = 0); + static uint32_t + generateSynCookie(struct sockaddr_storage *addr, TimePoint ts, uint32_t current_cookie = 0, int correction = 0); - void assignPeerIP(struct sockaddr_storage* addr); + void assignPeerIP(struct sockaddr_storage *addr); ///////ControlPacket override/////// bool loadFromData(uint8_t *buf, size_t len) override; bool storeToData() override; @@ -209,8 +206,9 @@ public: uint8_t peer_ip_addr[16]; std::vector ext_list; + private: - bool loadExtMessage(uint8_t *buf,size_t len); + bool loadExtMessage(uint8_t *buf, size_t len); bool storeExtMessage(); size_t getExtSize(); }; @@ -229,13 +227,12 @@ private: Figure 12: Keep-Alive control packet https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-keep-alive */ -class KeepLivePacket : public ControlPacket -{ +class KeepLivePacket : public ControlPacket { public: using Ptr = std::shared_ptr; KeepLivePacket() = default; ~KeepLivePacket() = default; - ///////ControlPacket override/////// + ///////ControlPacket override/////// bool loadFromData(uint8_t *buf, size_t len) override; bool storeToData() override; }; @@ -265,11 +262,10 @@ An SRT NAK packet is formatted as follows: Figure 14: NAK control packet https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-nak-control-packet */ -class NAKPacket : public ControlPacket -{ +class NAKPacket : public ControlPacket { public: using Ptr = std::shared_ptr; - using LostPair = std::pair; + using LostPair = std::pair; NAKPacket() = default; ~NAKPacket() = default; std::string dump(); @@ -278,11 +274,11 @@ public: bool storeToData() override; std::list lost_list; + private: size_t getCIFSize(); }; - /* 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 @@ -302,9 +298,8 @@ private: Figure 18: Drop Request control packet https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-message-drop-request */ -class MsgDropReqPacket : public ControlPacket -{ - public: +class MsgDropReqPacket : public ControlPacket { +public: using Ptr = std::shared_ptr; MsgDropReqPacket() = default; ~MsgDropReqPacket() = default; @@ -332,13 +327,12 @@ class MsgDropReqPacket : public ControlPacket https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-shutdown */ -class ShutDownPacket : public ControlPacket -{ +class ShutDownPacket : public ControlPacket { public: using Ptr = std::shared_ptr; ShutDownPacket() = default; ~ShutDownPacket() = default; - ///////ControlPacket override/////// + ///////ControlPacket override/////// bool loadFromData(uint8_t *buf, size_t len) override { if (len < HEADER_SIZE) { WarnL << "data size" << len << " less " << HEADER_SIZE; @@ -360,4 +354,4 @@ public: }; } // namespace SRT -#endif //ZLMEDIAKIT_SRT_PACKET_H \ No newline at end of file +#endif // ZLMEDIAKIT_SRT_PACKET_H \ No newline at end of file diff --git a/srt/PacketQueue.cpp b/srt/PacketQueue.cpp index 7cfb4781..47edc73a 100644 --- a/srt/PacketQueue.cpp +++ b/srt/PacketQueue.cpp @@ -2,80 +2,82 @@ namespace SRT { -#define MAX_SEQ 0x7fffffff -#define MAX_TS 0xffffffff -inline uint32_t genExpectedSeq(uint32_t seq){ +#define MAX_SEQ 0x7fffffff +#define MAX_TS 0xffffffff +inline uint32_t genExpectedSeq(uint32_t seq) { return MAX_SEQ & seq; } -inline bool isSeqEdge(uint32_t seq,uint32_t cap){ - if(seq >(MAX_SEQ - cap)){ +inline bool isSeqEdge(uint32_t seq, uint32_t cap) { + if (seq > (MAX_SEQ - cap)) { return true; } return false; } -inline bool isTSCycle(uint32_t first,uint32_t second){ +inline bool isTSCycle(uint32_t first, uint32_t second) { uint32_t diff; - if(first>second){ + if (first > second) { diff = first - second; - }else{ + } else { diff = second - first; } - if(diff > (MAX_TS>>1)){ + if (diff > (MAX_TS >> 1)) { return true; - }else{ + } else { return false; } } PacketQueue::PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency) : _pkt_expected_seq(init_seq) , _pkt_cap(max_size) - , _pkt_lantency(lantency) { -} -void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt){ + , _pkt_lantency(lantency) {} +void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt) { if (_pkt_expected_seq <= pkt->packet_seq_number) { auto diff = pkt->packet_seq_number - _pkt_expected_seq; - if(diff >= (MAX_SEQ>>1)){ - TraceL << "drop packet too later for cycle "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + if (diff >= (MAX_SEQ >> 1)) { + TraceL << "drop packet too later for cycle " + << "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; return; - }else{ + } else { _pkt_map.emplace(pkt->packet_seq_number, pkt); } } else { auto diff = _pkt_expected_seq - pkt->packet_seq_number; - if(diff >= (MAX_SEQ>>1)){ - _pkt_map.emplace(pkt->packet_seq_number, pkt); - TraceL<<" cycle packet "<<"expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; - }else{ - //TraceL << "drop packet too later "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + if (diff >= (MAX_SEQ >> 1)) { + _pkt_map.emplace(pkt->packet_seq_number, pkt); + TraceL << " cycle packet " + << "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + } else { + // TraceL << "drop packet too later "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << + // pkt->packet_seq_number; } } } -bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list& out) { +bool PacketQueue::inputPacket(DataPacket::Ptr pkt, std::list &out) { tryInsertPkt(pkt); auto it = _pkt_map.find(_pkt_expected_seq); - while ( it != _pkt_map.end()) { + while (it != _pkt_map.end()) { out.push_back(it->second); _pkt_map.erase(it); - _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq+1); + _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); it = _pkt_map.find(_pkt_expected_seq); } while (_pkt_map.size() > _pkt_cap) { // 防止回环 it = _pkt_map.find(_pkt_expected_seq); - if(it != _pkt_map.end()){ + if (it != _pkt_map.end()) { out.push_back(it->second); _pkt_map.erase(it); } _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); } - while (timeLantency() > _pkt_lantency) { + while (timeLantency() > _pkt_lantency) { it = _pkt_map.find(_pkt_expected_seq); - if(it != _pkt_map.end()){ + if (it != _pkt_map.end()) { out.push_back(it->second); _pkt_map.erase(it); } @@ -85,16 +87,16 @@ bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list& ou return true; } -bool PacketQueue::drop(uint32_t first, uint32_t last,std::list& out){ - uint32_t end = genExpectedSeq(last+1); +bool PacketQueue::drop(uint32_t first, uint32_t last, std::list &out) { + uint32_t end = genExpectedSeq(last + 1); decltype(_pkt_map.end()) it; - for(uint32_t i =_pkt_expected_seq;i< end;){ - it = _pkt_map.find(i); - if(it != _pkt_map.end()){ - out.push_back(it->second); - _pkt_map.erase(it); - } - i = genExpectedSeq(i+1); + for (uint32_t i = _pkt_expected_seq; i < end;) { + it = _pkt_map.find(i); + if (it != _pkt_map.end()) { + out.push_back(it->second); + _pkt_map.erase(it); + } + i = genExpectedSeq(i + 1); } _pkt_expected_seq = end; return true; @@ -108,15 +110,15 @@ uint32_t PacketQueue::timeLantency() { auto first = _pkt_map.begin()->second->timestamp; auto last = _pkt_map.rbegin()->second->timestamp; uint32_t dur; - if(last>first){ + if (last > first) { dur = last - first; - }else{ + } else { dur = first - last; } - if(dur > 0x80000000){ + if (dur > 0x80000000) { dur = MAX_TS - dur; - WarnL<<"cycle dur "< PacketQueue::getLostSeq() { std::list re; - if(_pkt_map.empty()){ + if (_pkt_map.empty()) { return re; } - - if(getExpectedSize() == getSize()){ + + if (getExpectedSize() == getSize()) { return re; } uint32_t end = 0; - uint32_t first,last; + uint32_t first, last; first = _pkt_map.begin()->second->packet_seq_number; last = _pkt_map.rbegin()->second->packet_seq_number; @@ -149,71 +151,74 @@ std::list PacketQueue::getLostSeq() { uint32_t i = _pkt_expected_seq; bool finish = true; - for(i = _pkt_expected_seq;i<=end;){ - if(_pkt_map.find(i) == _pkt_map.end()){ - if(finish){ + for (i = _pkt_expected_seq; i <= end;) { + if (_pkt_map.find(i) == _pkt_map.end()) { + if (finish) { finish = false; lost.first = i; - lost.second = i+1; - }else{ - lost.second = i+1; + lost.second = i + 1; + } else { + lost.second = i + 1; } - }else{ - if(!finish){ + } else { + if (!finish) { finish = true; re.push_back(lost); } } - i = genExpectedSeq(i+1); + i = genExpectedSeq(i + 1); } return re; } -size_t PacketQueue::getSize(){ +size_t PacketQueue::getSize() { return _pkt_map.size(); } size_t PacketQueue::getExpectedSize() { - if(_pkt_map.empty()){ + if (_pkt_map.empty()) { return 0; } uint32_t max = _pkt_map.rbegin()->first; uint32_t min = _pkt_map.begin()->first; - if((max-min)>=(MAX_SEQ>>1)){ - TraceL<<"cycle "<<"expected seq "<<_pkt_expected_seq<<" min "<= (MAX_SEQ >> 1)) { + TraceL << "cycle " + << "expected seq " << _pkt_expected_seq << " min " << min << " max " << max << " size " + << _pkt_map.size(); + return MAX_SEQ - _pkt_expected_seq + min + 1; + } else { + return max - _pkt_expected_seq + 1; } } -size_t PacketQueue::getAvailableBufferSize(){ - auto size = getExpectedSize(); - if(_pkt_cap > size){ +size_t PacketQueue::getAvailableBufferSize() { + auto size = getExpectedSize(); + if (_pkt_cap > size) { return _pkt_cap - size; } - if(_pkt_cap > _pkt_map.size()){ + if (_pkt_cap > _pkt_map.size()) { return _pkt_cap - _pkt_map.size(); } - WarnL<<" cap "<<_pkt_cap<<" expected size "<second->packet_seq_number; - printer<<" last:"<<_pkt_map.rbegin()->second->packet_seq_number; - printer<<" latency:"<second->packet_seq_number; + printer << " last:" << _pkt_map.rbegin()->second->packet_seq_number; + printer << " latency:" << timeLantency() / 1e3; + } + return std::move(printer); } } // namespace SRT \ No newline at end of file diff --git a/srt/PacketQueue.hpp b/srt/PacketQueue.hpp index 66702d65..02903601 100644 --- a/srt/PacketQueue.hpp +++ b/srt/PacketQueue.hpp @@ -3,8 +3,8 @@ #include "Packet.hpp" #include #include -#include #include +#include #include #include @@ -18,7 +18,7 @@ public: PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency); ~PacketQueue() = default; - bool inputPacket(DataPacket::Ptr pkt,std::list& out); + bool inputPacket(DataPacket::Ptr pkt, std::list &out); uint32_t timeLantency(); std::list getLostSeq(); @@ -28,13 +28,14 @@ public: size_t getAvailableBufferSize(); uint32_t getExpectedSeq(); - bool drop(uint32_t first, uint32_t last,std::list& out); + bool drop(uint32_t first, uint32_t last, std::list &out); std::string dump(); + private: void tryInsertPkt(DataPacket::Ptr pkt); -private: +private: std::map _pkt_map; uint32_t _pkt_expected_seq = 0; diff --git a/srt/PacketSendQueue.cpp b/srt/PacketSendQueue.cpp index 9ea11aaf..61b46e7d 100644 --- a/srt/PacketSendQueue.cpp +++ b/srt/PacketSendQueue.cpp @@ -66,7 +66,7 @@ uint32_t PacketSendQueue::timeLantency() { } else { dur = first - last; } - if (dur > (0x01 << 31)) { + if (dur > ((uint32_t)0x01 << 31)) { TraceL << "cycle timeLantency " << dur; dur = 0xffffffff - dur; } diff --git a/srt/PacketSendQueue.hpp b/srt/PacketSendQueue.hpp index 86fa86f1..df9dc815 100644 --- a/srt/PacketSendQueue.hpp +++ b/srt/PacketSendQueue.hpp @@ -16,9 +16,11 @@ public: ~PacketSendQueue() = default; bool drop(uint32_t num); bool inputPacket(DataPacket::Ptr pkt); - std::list findPacketBySeq(uint32_t start,uint32_t end); + std::list findPacketBySeq(uint32_t start, uint32_t end); + private: uint32_t timeLantency(); + private: std::list _pkt_cache; uint32_t _pkt_cap; diff --git a/srt/SrtSession.cpp b/srt/SrtSession.cpp index fe8cfc13..c828fa13 100644 --- a/srt/SrtSession.cpp +++ b/srt/SrtSession.cpp @@ -10,10 +10,10 @@ using namespace mediakit; SrtSession::SrtSession(const Socket::Ptr &sock) : UdpSession(sock) { socklen_t addr_len = sizeof(_peer_addr); - memset(&_peer_addr,0,addr_len); - //TraceL<<"before addr len "<rawFD(), (struct sockaddr *)&_peer_addr, &addr_len); - //TraceL<<"after addr len "<data(); + uint8_t *data = (uint8_t *)buffer->data(); size_t size = buffer->size(); - if(DataPacket::isDataPacket(data,size)){ - uint32_t socket_id = DataPacket::getSocketID(data,size); + if (DataPacket::isDataPacket(data, size)) { + uint32_t socket_id = DataPacket::getSocketID(data, size); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id)); return trans ? trans->getPoller() : nullptr; } - if(HandshakePacket::isHandshakePacket(data,size)){ - auto type = HandshakePacket::getHandshakeType(data,size); - if(type == HandshakePacket::HS_TYPE_INDUCTION){ + if (HandshakePacket::isHandshakePacket(data, size)) { + auto type = HandshakePacket::getHandshakeType(data, size); + if (type == HandshakePacket::HS_TYPE_INDUCTION) { // 握手第一阶段 return nullptr; - }else if(type == HandshakePacket::HS_TYPE_CONCLUSION){ + } else if (type == HandshakePacket::HS_TYPE_CONCLUSION) { // 握手第二阶段 - uint32_t sync_cookie = HandshakePacket::getSynCookie(data,size); + uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size); auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie)); return trans ? trans->getPoller() : nullptr; - }else{ - WarnL<<" not reach there"; + } else { + WarnL << " not reach there"; } - }else{ - uint32_t socket_id = ControlPacket::getSocketID(data,size); + } else { + uint32_t socket_id = ControlPacket::getSocketID(data, size); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id)); return trans ? trans->getPoller() : nullptr; } return nullptr; } -void SrtSession::attachServer(const toolkit::Server &server){ - SockUtil::setRecvBuf(getSock()->rawFD(),1024 * 1024); +void SrtSession::attachServer(const toolkit::Server &server) { + SockUtil::setRecvBuf(getSock()->rawFD(), 1024 * 1024); } void SrtSession::onRecv(const Buffer::Ptr &buffer) { - uint8_t* data = (uint8_t*)buffer->data(); + uint8_t *data = (uint8_t *)buffer->data(); size_t size = buffer->size(); if (_find_transport) { @@ -64,10 +64,10 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) { if (DataPacket::isDataPacket(data, size)) { uint32_t socket_id = DataPacket::getSocketID(data, size); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id)); - if(trans){ + if (trans) { _transport = std::move(trans); - }else{ - WarnL<<" data packet not find transport "; + } else { + WarnL << " data packet not find transport "; } } @@ -92,24 +92,24 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) { } else { uint32_t socket_id = ControlPacket::getSocketID(data, size); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id)); - if(trans){ + if (trans) { _transport = std::move(trans); - }else{ + } else { WarnL << " not find transport"; } } - if(_transport){ + if (_transport) { _transport->setSession(shared_from_this()); } InfoP(this); } _ticker.resetTime(); - if(_transport){ - _transport->inputSockData(data,size,&_peer_addr); - }else{ - //WarnL<< "ingore data"; + if (_transport) { + _transport->inputSockData(data, size, &_peer_addr); + } else { + // WarnL<< "ingore data"; } } @@ -122,18 +122,20 @@ void SrtSession::onError(const SockException &err) { if (!_transport) { return; } - + // 防止互相引用导致不释放 auto transport = std::move(_transport); - getPoller()->async([transport,err] { - //延时减引用,防止使用transport对象时,销毁对象 - transport->onShutdown(err); - }, false); + getPoller()->async( + [transport, err] { + //延时减引用,防止使用transport对象时,销毁对象 + transport->onShutdown(err); + }, + false); } void SrtSession::onManager() { GET_CONFIG(float, timeoutSec, kTimeOutSec); - if (_ticker.elapsedTime() > timeoutSec*1000) { + if (_ticker.elapsedTime() > timeoutSec * 1000) { shutdown(SockException(Err_timeout, "srt connection timeout")); return; } diff --git a/srt/SrtSession.hpp b/srt/SrtSession.hpp index 401aae3c..342a4a91 100644 --- a/srt/SrtSession.hpp +++ b/srt/SrtSession.hpp @@ -24,8 +24,7 @@ private: Ticker _ticker; struct sockaddr_storage _peer_addr; SrtTransport::Ptr _transport; - }; } // namespace SRT -#endif //ZLMEDIAKIT_SRT_SESSION_H \ No newline at end of file +#endif // ZLMEDIAKIT_SRT_SESSION_H \ No newline at end of file diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index e7a84bf3..0f9e1a7a 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -1,31 +1,31 @@ #include #include "Util/onceToken.h" -#include "SrtTransport.hpp" -#include "Packet.hpp" #include "Ack.hpp" +#include "Packet.hpp" +#include "SrtTransport.hpp" namespace SRT { #define SRT_FIELD "srt." -//srt 超时时间 -const std::string kTimeOutSec = SRT_FIELD"timeoutSec"; -//srt 单端口udp服务器 -const std::string kPort = SRT_FIELD"port"; +// srt 超时时间 +const std::string kTimeOutSec = SRT_FIELD "timeoutSec"; +// srt 单端口udp服务器 +const std::string kPort = SRT_FIELD "port"; -const std::string kLantencyMul = SRT_FIELD"lantencyMul"; +const std::string kLantencyMul = SRT_FIELD "lantencyMul"; -static std::atomic s_srt_socket_id_generate{125}; +static std::atomic s_srt_socket_id_generate { 125 }; //////////// SrtTransport ////////////////////////// SrtTransport::SrtTransport(const EventPoller::Ptr &poller) : _poller(poller) { - _start_timestamp = SteadyClock::now(); - _socket_id = s_srt_socket_id_generate.fetch_add(1);\ - _pkt_recv_rate_context = std::make_shared(_start_timestamp); - _recv_rate_context = std::make_shared(_start_timestamp); - _estimated_link_capacity_context = std::make_shared(_start_timestamp); - } + _start_timestamp = SteadyClock::now(); + _socket_id = s_srt_socket_id_generate.fetch_add(1); + _pkt_recv_rate_context = std::make_shared(_start_timestamp); + _recv_rate_context = std::make_shared(_start_timestamp); + _estimated_link_capacity_context = std::make_shared(_start_timestamp); +} -SrtTransport::~SrtTransport(){ - TraceL<<" "; +SrtTransport::~SrtTransport() { + TraceL << " "; } const EventPoller::Ptr &SrtTransport::getPoller() const { return _poller; @@ -44,20 +44,20 @@ const Session::Ptr &SrtTransport::getSession() const { return _selected_session; } -void SrtTransport::switchToOtherTransport(uint8_t *buf, int len,uint32_t socketid, struct sockaddr_storage *addr){ +void SrtTransport::switchToOtherTransport(uint8_t *buf, int len, uint32_t socketid, struct sockaddr_storage *addr) { BufferRaw::Ptr tmp = BufferRaw::create(); struct sockaddr_storage tmp_addr = *addr; - tmp->assign((char*)buf,len); + tmp->assign((char *)buf, len); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socketid)); - if(trans){ - trans->getPoller()->async([tmp,tmp_addr,trans]{ - trans->inputSockData((uint8_t*)tmp->data(),tmp->size(),(struct sockaddr_storage*)&tmp_addr); + if (trans) { + trans->getPoller()->async([tmp, tmp_addr, trans] { + trans->inputSockData((uint8_t *)tmp->data(), tmp->size(), (struct sockaddr_storage *)&tmp_addr); }); } } void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) { - using srt_control_handler = void (SrtTransport::*)(uint8_t* buf,int len,struct sockaddr_storage *addr); + using srt_control_handler = void (SrtTransport::*)(uint8_t * buf, int len, struct sockaddr_storage *addr); static std::unordered_map s_control_functions; static onceToken token([]() { s_control_functions.emplace(ControlPacket::HANDSHAKE, &SrtTransport::handleHandshake); @@ -74,23 +74,23 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage _now = SteadyClock::now(); // 处理srt数据 if (DataPacket::isDataPacket(buf, len)) { - uint32_t socketId = DataPacket::getSocketID(buf,len); - if(socketId == _socket_id){ + uint32_t socketId = DataPacket::getSocketID(buf, len); + if (socketId == _socket_id) { _pkt_recv_rate_context->inputPacket(_now); _estimated_link_capacity_context->inputPacket(_now); _recv_rate_context->inputPacket(_now, len); handleDataPacket(buf, len, addr); - }else{ - switchToOtherTransport(buf,len,socketId,addr); + } else { + switchToOtherTransport(buf, len, socketId, addr); } } else { if (ControlPacket::isControlPacket(buf, len)) { - uint32_t socketId = ControlPacket::getSocketID(buf,len); - uint16_t type = ControlPacket::getControlType(buf,len); - if(type != ControlPacket::HANDSHAKE && socketId != _socket_id && _socket_id != 0){ + uint32_t socketId = ControlPacket::getSocketID(buf, len); + uint16_t type = ControlPacket::getControlType(buf, len); + if (type != ControlPacket::HANDSHAKE && socketId != _socket_id && _socket_id != 0) { // socket id not same - switchToOtherTransport(buf,len,socketId,addr); + switchToOtherTransport(buf, len, socketId, addr); return; } _pkt_recv_rate_context->inputPacket(_now); @@ -99,10 +99,10 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage auto it = s_control_functions.find(type); if (it == s_control_functions.end()) { - WarnL<<" not support type ignore" << ControlPacket::getControlType(buf,len); + WarnL << " not support type ignore" << ControlPacket::getControlType(buf, len); return; - }else{ - (this->*(it->second))(buf,len,addr); + } else { + (this->*(it->second))(buf, len, addr); } } else { // not reach @@ -119,7 +119,7 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd sendControlPacket(_handleshake_res, true); return; } - _induction_ts = _now; + _induction_ts = _now; _start_timestamp = _now; _init_seq_number = pkt.initial_packet_sequence_number; _max_window_size = pkt.max_flow_window_size; @@ -147,8 +147,8 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd sendControlPacket(res, true); } void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr) { - if(!_handleshake_res){ - ErrorL<<"must Induction Phase for handleshake "; + if (!_handleshake_res) { + ErrorL << "must Induction Phase for handleshake "; return; } @@ -157,21 +157,21 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad HSExtMessage::Ptr req; HSExtStreamID::Ptr sid; uint32_t srt_flag = 0xbf; - uint16_t delay = DurationCountMicroseconds(_now - _induction_ts)*getLantencyMul()/1000; + uint16_t delay = DurationCountMicroseconds(_now - _induction_ts) * getLantencyMul() / 1000; for (auto ext : pkt.ext_list) { - //TraceL << getIdentifier() << " ext " << ext->dump(); + // TraceL << getIdentifier() << " ext " << ext->dump(); if (!req) { req = std::dynamic_pointer_cast(ext); } - if(!sid){ + if (!sid) { sid = std::dynamic_pointer_cast(ext); } } - if(sid){ + if (sid) { _stream_id = sid->streamid; } - if(req){ + if (req) { srt_flag = req->srt_flag; delay = delay <= req->recv_tsbpd_delay ? req->recv_tsbpd_delay : delay; } @@ -200,167 +200,166 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad unregisterSelfHandshake(); registerSelf(); sendControlPacket(res, true); - TraceL<<" buf size = "<max_flow_window_size<<" init seq ="<<_init_seq_number<<" lantency="<(res->max_flow_window_size,_init_seq_number, delay*1e3); - _send_buf = std::make_shared(res->max_flow_window_size, delay*1e3); + TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number + << " lantency=" << delay; + _recv_buf = std::make_shared(res->max_flow_window_size, _init_seq_number, delay * 1e3); + _send_buf = std::make_shared(res->max_flow_window_size, delay * 1e3); _send_packet_seq_number = _init_seq_number; _buf_delay = delay; - onHandShakeFinished(_stream_id,addr); + onHandShakeFinished(_stream_id, addr); } else { TraceL << getIdentifier() << " CONCLUSION handle repeate "; sendControlPacket(_handleshake_res, true); } _last_ack_pkt_seq_num = _init_seq_number; } -void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr){ +void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr) { HandshakePacket pkt; - assert(pkt.loadFromData(buf,len)); + assert(pkt.loadFromData(buf, len)); - if(pkt.handshake_type == HandshakePacket::HS_TYPE_INDUCTION){ - handleHandshakeInduction(pkt,addr); - }else if(pkt.handshake_type == HandshakePacket::HS_TYPE_CONCLUSION){ - handleHandshakeConclusion(pkt,addr); - }else{ - WarnL<<" not support handshake type = "<< pkt.handshake_type; + if (pkt.handshake_type == HandshakePacket::HS_TYPE_INDUCTION) { + handleHandshakeInduction(pkt, addr); + } else if (pkt.handshake_type == HandshakePacket::HS_TYPE_CONCLUSION) { + handleHandshakeConclusion(pkt, addr); + } else { + WarnL << " not support handshake type = " << pkt.handshake_type; } _ack_ticker.resetTime(_now); _nak_ticker.resetTime(_now); } -void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr){ - //TraceL; +void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr) { + // TraceL; sendKeepLivePacket(); } - void SrtTransport::sendKeepLivePacket(){ +void SrtTransport::sendKeepLivePacket() { KeepLivePacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; - pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp); + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->storeToData(); - sendControlPacket(pkt,true); - } -void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr){ - //TraceL; + sendControlPacket(pkt, true); +} +void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr) { + // TraceL; ACKPacket ack; - if(!ack.loadFromData(buf,len)){ + if (!ack.loadFromData(buf, len)) { return; } ACKACKPacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; - pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp); + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->ack_number = ack.ack_number; pkt->storeToData(); _send_buf->drop(ack.last_ack_pkt_seq_number); - sendControlPacket(pkt,true); - //TraceL<<"ack number "<(); pkt->dst_socket_id = _peer_socket_id; - pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp); + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->first_pkt_seq_num = first; pkt->last_pkt_seq_num = last; pkt->storeToData(); - sendControlPacket(pkt,true); + sendControlPacket(pkt, true); } -void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){ - //TraceL; +void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr) { + // TraceL; NAKPacket pkt; - pkt.loadFromData(buf,len); + pkt.loadFromData(buf, len); bool empty = false; bool flush = false; - for(auto it : pkt.lost_list){ - if(pkt.lost_list.back() == it){ + for (auto it : pkt.lost_list) { + if (pkt.lost_list.back() == it) { flush = true; } empty = true; - auto re_list = _send_buf->findPacketBySeq(it.first,it.second-1); - for(auto pkt : re_list){ + auto re_list = _send_buf->findPacketBySeq(it.first, it.second - 1); + for (auto pkt : re_list) { pkt->R = 1; pkt->storeToHeader(); - sendPacket(pkt,flush); + sendPacket(pkt, flush); empty = false; } - if(empty){ - sendMsgDropReq(it.first,it.second-1); + if (empty) { + sendMsgDropReq(it.first, it.second - 1); } } } -void SrtTransport::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr_storage *addr){ +void SrtTransport::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr_storage *addr) { TraceL; } -void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage *addr){ +void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage *addr) { TraceL; onShutdown(SockException(Err_shutdown, "peer close connection")); } -void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr){ +void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr) { MsgDropReqPacket pkt; - pkt.loadFromData(buf,len); + pkt.loadFromData(buf, len); std::list list; - //TraceL<<"drop "<drop(pkt.first_pkt_seq_num,pkt.last_pkt_seq_num,list); - if(list.empty()){ + // TraceL<<"drop "<drop(pkt.first_pkt_seq_num, pkt.last_pkt_seq_num, list); + if (list.empty()) { return; } - for(auto data : list){ + for (auto data : list) { onSRTData(std::move(data)); } - auto nak_interval = (_rtt+_rtt_variance*4)/2; - if(nak_interval <= 20*1000){ - nak_interval = 20*1000; + auto nak_interval = (_rtt + _rtt_variance * 4) / 2; + if (nak_interval <= 20 * 1000) { + nak_interval = 20 * 1000; } - if(_nak_ticker.elapsedTime(_now)>nak_interval){ + if (_nak_ticker.elapsedTime(_now) > nak_interval) { auto lost = _recv_buf->getLostSeq(); - if(!lost.empty()){ - sendNAKPacket(lost); + if (!lost.empty()) { + sendNAKPacket(lost); } _nak_ticker.resetTime(_now); } - if(_ack_ticker.elapsedTime(_now)>10*1000){ + if (_ack_ticker.elapsedTime(_now) > 10 * 1000) { _light_ack_pkt_count = 0; _ack_ticker.resetTime(_now); - // send a ack per 10 ms for receiver + // send a ack per 10 ms for receiver sendACKPacket(); - }else{ - if(_light_ack_pkt_count >= 64){ + } else { + if (_light_ack_pkt_count >= 64) { // for high bitrate stream send light ack - // TODO + // TODO sendLightACKPacket(); - TraceL<<"send light ack"; + TraceL << "send light ack"; } _light_ack_pkt_count = 0; } _light_ack_pkt_count++; - } -void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr){ +void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr) { TraceL; } -void SrtTransport::handleACKACK(uint8_t *buf, int len, struct sockaddr_storage *addr){ - //TraceL; +void SrtTransport::handleACKACK(uint8_t *buf, int len, struct sockaddr_storage *addr) { + // TraceL; ACKACKPacket::Ptr pkt = std::make_shared(); - pkt->loadFromData(buf,len); + pkt->loadFromData(buf, len); uint32_t rtt = DurationCountMicroseconds(_now - _ack_send_timestamp[pkt->ack_number]); - _rtt_variance = (3*_rtt_variance+abs((long)_rtt - (long)rtt))/4; - _rtt = (7*rtt+_rtt)/8; + _rtt_variance = (3 * _rtt_variance + abs((long)_rtt - (long)rtt)) / 4; + _rtt = (7 * rtt + _rtt) / 8; - - //TraceL<<" rtt:"<<_rtt<<" rtt variance:"<<_rtt_variance; + // TraceL<<" rtt:"<<_rtt<<" rtt variance:"<<_rtt_variance; _ack_send_timestamp.erase(pkt->ack_number); } -void SrtTransport::handlePeerError(uint8_t *buf, int len, struct sockaddr_storage *addr){ +void SrtTransport::handlePeerError(uint8_t *buf, int len, struct sockaddr_storage *addr) { TraceL; } void SrtTransport::sendACKPacket() { - ACKPacket::Ptr pkt=std::make_shared(); + ACKPacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->ack_number = ++_ack_number_count; @@ -373,13 +372,13 @@ void SrtTransport::sendACKPacket() { pkt->recv_rate = _recv_rate_context->getRecvRate(); pkt->storeToData(); _ack_send_timestamp[pkt->ack_number] = _now; - _last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number; - sendControlPacket(pkt,true); - //TraceL<<"send ack "<dump(); + _last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number; + sendControlPacket(pkt, true); + // TraceL<<"send ack "<dump(); } void SrtTransport::sendLightACKPacket() { - ACKPacket::Ptr pkt=std::make_shared(); - + ACKPacket::Ptr pkt = std::make_shared(); + pkt->dst_socket_id = _peer_socket_id; pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->ack_number = 0; @@ -392,11 +391,11 @@ void SrtTransport::sendLightACKPacket() { pkt->recv_rate = 0; pkt->storeToData(); _last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number; - sendControlPacket(pkt,true); - TraceL<<"send ack "<dump(); + sendControlPacket(pkt, true); + TraceL << "send ack " << pkt->dump(); } -void SrtTransport::sendNAKPacket(std::list& lost_list){ +void SrtTransport::sendNAKPacket(std::list &lost_list) { NAKPacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; @@ -405,112 +404,111 @@ void SrtTransport::sendNAKPacket(std::list& lost_list){ pkt->storeToData(); - //TraceL<<"send NAK "<dump(); - sendControlPacket(pkt,true); + // TraceL<<"send NAK "<dump(); + sendControlPacket(pkt, true); } -void SrtTransport::sendShutDown(){ +void SrtTransport::sendShutDown() { ShutDownPacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->storeToData(); - sendControlPacket(pkt,true); + sendControlPacket(pkt, true); } -void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr){ +void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr) { DataPacket::Ptr pkt = std::make_shared(); - pkt->loadFromData(buf,len); + pkt->loadFromData(buf, len); - pkt->get_ts = _now; std::list list; //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<timestamp<<" size="<payloadSize()<<\ //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; - _recv_buf->inputPacket(pkt,list); - for(auto data : list){ + _recv_buf->inputPacket(pkt, list); + for (auto data : list) { onSRTData(std::move(data)); } - auto nak_interval = (_rtt+_rtt_variance*4)/2; - if(nak_interval <= 20*1000){ - nak_interval = 20*1000; + auto nak_interval = (_rtt + _rtt_variance * 4) / 2; + if (nak_interval <= 20 * 1000) { + nak_interval = 20 * 1000; } - if(list.empty()){ - //TraceL<<_recv_buf->dump()<<" nake interval:"<dump()<<" nake interval:"<nak_interval){ + if (_nak_ticker.elapsedTime(_now) > nak_interval) { auto lost = _recv_buf->getLostSeq(); - if(!lost.empty()){ - sendNAKPacket(lost); - //TraceL<<"send NAK"; - }else{ - //TraceL<<"lost is empty"; + if (!lost.empty()) { + sendNAKPacket(lost); + // TraceL<<"send NAK"; + } else { + // TraceL<<"lost is empty"; } _nak_ticker.resetTime(_now); } - if(_ack_ticker.elapsedTime(_now)>10*1000){ + if (_ack_ticker.elapsedTime(_now) > 10 * 1000) { _light_ack_pkt_count = 0; _ack_ticker.resetTime(_now); - // send a ack per 10 ms for receiver + // send a ack per 10 ms for receiver sendACKPacket(); - }else{ - if(_light_ack_pkt_count >= 64){ + } else { + if (_light_ack_pkt_count >= 64) { // for high bitrate stream send light ack - // TODO + // TODO sendLightACKPacket(); - TraceL<<"send light ack"; + TraceL << "send light ack"; } _light_ack_pkt_count = 0; } _light_ack_pkt_count++; - //bufCheckInterval(); + // bufCheckInterval(); } -void SrtTransport::sendDataPacket(DataPacket::Ptr pkt,char* buf,int len, bool flush) { - pkt->storeToData((uint8_t*)buf,len); - sendPacket(pkt,flush); +void SrtTransport::sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush) { + pkt->storeToData((uint8_t *)buf, len); + sendPacket(pkt, flush); _send_buf->inputPacket(pkt); } -void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) { - sendPacket(pkt,flush); +void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) { + sendPacket(pkt, flush); } -void SrtTransport::sendPacket(Buffer::Ptr pkt,bool flush){ - if(_selected_session){ - auto tmp = _packet_pool.obtain2(); - tmp->assign(pkt->data(),pkt->size()); - _selected_session->setSendFlushFlag(flush); - _selected_session->send(std::move(tmp)); - }else{ - WarnL<<"not reach this"; +void SrtTransport::sendPacket(Buffer::Ptr pkt, bool flush) { + if (_selected_session) { + auto tmp = _packet_pool.obtain2(); + tmp->assign(pkt->data(), pkt->size()); + _selected_session->setSendFlushFlag(flush); + _selected_session->send(std::move(tmp)); + } else { + WarnL << "not reach this"; } } -std::string SrtTransport::getIdentifier(){ +std::string SrtTransport::getIdentifier() { return _selected_session ? _selected_session->getIdentifier() : ""; } -void SrtTransport::registerSelfHandshake() { - SrtTransportManager::Instance().addHandshakeItem(std::to_string(_sync_cookie),shared_from_this()); +void SrtTransport::registerSelfHandshake() { + SrtTransportManager::Instance().addHandshakeItem(std::to_string(_sync_cookie), shared_from_this()); } -void SrtTransport::unregisterSelfHandshake() { - if(_sync_cookie == 0){ +void SrtTransport::unregisterSelfHandshake() { + if (_sync_cookie == 0) { return; } SrtTransportManager::Instance().removeHandshakeItem(std::to_string(_sync_cookie)); } void SrtTransport::registerSelf() { - if(_socket_id == 0){ + if (_socket_id == 0) { return; } - SrtTransportManager::Instance().addItem(std::to_string(_socket_id),shared_from_this()); - + SrtTransportManager::Instance().addItem(std::to_string(_socket_id), shared_from_this()); } -void SrtTransport::unregisterSelf() { +void SrtTransport::unregisterSelf() { SrtTransportManager::Instance().removeItem(std::to_string(_socket_id)); } -void SrtTransport::onShutdown(const SockException &ex){ +void SrtTransport::onShutdown(const SockException &ex) { sendShutDown(); WarnL << ex.what(); unregisterSelfHandshake(); @@ -522,23 +520,23 @@ void SrtTransport::onShutdown(const SockException &ex){ } } } -size_t SrtTransport::getPayloadSize(){ - size_t ret = (_mtu - 28 -16)/188*188; +size_t SrtTransport::getPayloadSize() { + size_t ret = (_mtu - 28 - 16) / 188 * 188; return ret; } -void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush){ - //TraceL; +void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush) { + // TraceL; DataPacket::Ptr pkt; size_t payloadSize = getPayloadSize(); - size_t size = buffer->size(); - char* ptr = buffer->data(); - char* end = buffer->data()+size; + size_t size = buffer->size(); + char *ptr = buffer->data(); + char *end = buffer->data() + size; - while(ptr < end && size >=payloadSize){ + while (ptr < end && size >= payloadSize) { pkt = std::make_shared(); pkt->f = 0; - pkt->packet_seq_number = _send_packet_seq_number&0x7fffffff; - _send_packet_seq_number = (_send_packet_seq_number+1)&0x7fffffff; + pkt->packet_seq_number = _send_packet_seq_number & 0x7fffffff; + _send_packet_seq_number = (_send_packet_seq_number + 1) & 0x7fffffff; pkt->PP = 3; pkt->O = 0; pkt->KK = 0; @@ -546,16 +544,16 @@ void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush){ pkt->msg_number = _send_msg_number++; pkt->dst_socket_id = _peer_socket_id; pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp); - sendDataPacket(pkt,ptr,(int)payloadSize,flush); + sendDataPacket(pkt, ptr, (int)payloadSize, flush); ptr += payloadSize; size -= payloadSize; } - if(size >0 && ptr 0 && ptr < end) { pkt = std::make_shared(); pkt->f = 0; - pkt->packet_seq_number = _send_packet_seq_number&0x7fffffff; - _send_packet_seq_number = (_send_packet_seq_number+1)&0x7fffffff; + pkt->packet_seq_number = _send_packet_seq_number & 0x7fffffff; + _send_packet_seq_number = (_send_packet_seq_number + 1) & 0x7fffffff; pkt->PP = 3; pkt->O = 0; pkt->KK = 0; @@ -563,9 +561,8 @@ void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush){ pkt->msg_number = _send_msg_number++; pkt->dst_socket_id = _peer_socket_id; pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp); - sendDataPacket(pkt,ptr,(int)size,flush); + sendDataPacket(pkt, ptr, (int)size, flush); } - } //////////// SrtTransportManager ////////////////////////// SrtTransportManager &SrtTransportManager::Instance() { @@ -600,7 +597,7 @@ void SrtTransportManager::addHandshakeItem(const std::string &key, const SrtTran _handshake_map[key] = ptr; } void SrtTransportManager::removeHandshakeItem(const std::string &key) { - std::lock_guard lck(_handshake_mtx); + std::lock_guard lck(_handshake_mtx); _handshake_map.erase(key); } SrtTransport::Ptr SrtTransportManager::getHandshakeItem(const std::string &key) { @@ -615,5 +612,4 @@ SrtTransport::Ptr SrtTransportManager::getHandshakeItem(const std::string &key) return it->second.lock(); } - } // namespace SRT \ No newline at end of file diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index c055dd62..64dc566d 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -1,10 +1,10 @@ #ifndef ZLMEDIAKIT_SRT_TRANSPORT_H #define ZLMEDIAKIT_SRT_TRANSPORT_H -#include +#include #include #include -#include +#include #include "Network/Session.h" #include "Poller/EventPoller.h" @@ -43,26 +43,25 @@ public: virtual void onSendTSData(const Buffer::Ptr &buffer, bool flush); std::string getIdentifier(); - + void unregisterSelfHandshake(); void unregisterSelf(); + protected: - virtual void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr){}; - virtual void onSRTData(DataPacket::Ptr pkt){}; + virtual void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) {}; + virtual void onSRTData(DataPacket::Ptr pkt) {}; virtual void onShutdown(const SockException &ex); - virtual bool isPusher(){ - return true; - }; + virtual bool isPusher() { return true; }; private: void registerSelfHandshake(); void registerSelf(); - void switchToOtherTransport(uint8_t *buf, int len,uint32_t socketid, struct sockaddr_storage *addr); + void switchToOtherTransport(uint8_t *buf, int len, uint32_t socketid, struct sockaddr_storage *addr); void handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr); - void handleHandshakeInduction(HandshakePacket& pkt,struct sockaddr_storage *addr); - void handleHandshakeConclusion(HandshakePacket& pkt,struct sockaddr_storage *addr); + void handleHandshakeInduction(HandshakePacket &pkt, struct sockaddr_storage *addr); + void handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr); void handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr); @@ -74,27 +73,27 @@ private: void handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr); void handlePeerError(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr); - - void sendNAKPacket(std::list& lost_list); + + void sendNAKPacket(std::list &lost_list); void sendACKPacket(); void sendLightACKPacket(); void sendKeepLivePacket(); void sendShutDown(); - void sendMsgDropReq(uint32_t first ,uint32_t last); + void sendMsgDropReq(uint32_t first, uint32_t last); size_t getPayloadSize(); + protected: - void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); - void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true); - virtual void sendPacket(Buffer::Ptr pkt,bool flush = true); - virtual int getLantencyMul(){ - return 4; - }; + void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false); + void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true); + virtual void sendPacket(Buffer::Ptr pkt, bool flush = true); + virtual int getLantencyMul() { return 4; }; + private: //当前选中的udp链接 Session::Ptr _selected_session; //链接迁移前后使用过的udp链接 - std::unordered_map > _history_sessions; + std::unordered_map> _history_sessions; EventPoller::Ptr _poller; @@ -109,7 +108,7 @@ private: uint32_t _mtu = 1500; uint32_t _max_window_size = 8192; - uint32_t _init_seq_number = 0; + uint32_t _init_seq_number = 0; std::string _stream_id; uint32_t _sync_cookie = 0; @@ -119,13 +118,13 @@ private: PacketSendQueue::Ptr _send_buf; uint32_t _buf_delay = 120; PacketQueue::Ptr _recv_buf; - uint32_t _rtt = 100*1000; - uint32_t _rtt_variance =50*1000; + uint32_t _rtt = 100 * 1000; + uint32_t _rtt_variance = 50 * 1000; uint32_t _light_ack_pkt_count = 0; uint32_t _ack_number_count = 0; uint32_t _last_ack_pkt_seq_num = 0; UTicker _ack_ticker; - std::map _ack_send_timestamp; + std::map _ack_send_timestamp; std::shared_ptr _pkt_recv_rate_context; std::shared_ptr _estimated_link_capacity_context; @@ -137,7 +136,6 @@ private: HandshakePacket::Ptr _handleshake_res; ResourcePool _packet_pool; - }; class SrtTransportManager { @@ -150,6 +148,7 @@ public: void addHandshakeItem(const std::string &key, const SrtTransport::Ptr &ptr); void removeHandshakeItem(const std::string &key); SrtTransport::Ptr getHandshakeItem(const std::string &key); + private: SrtTransportManager() = default; diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 9425e2e7..09c37ef2 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -10,65 +10,64 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) SrtTransportImp::~SrtTransportImp() { InfoP(this); uint64_t duration = _alive_ticker.createdTime() / 1000; - WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") - << _media_info._vhost << "/" - << _media_info._app << "/" - << _media_info._streamid - << ")断开,耗时(s):" << duration; + WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info._vhost << "/" << _media_info._app << "/" + << _media_info._streamid << ")断开,耗时(s):" << duration; //流量统计事件广播 GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_total_bytes >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast(*this)); + NoticeCenter::Instance().emitEvent( + Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, + static_cast(*this)); } } -void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_storage *addr) { - +void SrtTransportImp::onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) { + // TODO parse streamid like this zlmediakit.com/live/test?token=1213444&type=push - if(!_addr){ + if (!_addr) { _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); } - _is_pusher = false; - TraceL<<" stream id "<input(reinterpret_cast(pkt->payloadData()), pkt->payloadSize()); - }else{ - WarnP(this)<<" not reach this"; + } else { + WarnP(this) << " not reach this"; } } void SrtTransportImp::onShutdown(const SockException &ex) { SrtTransport::onShutdown(ex); } -bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force){ - if (!force && totalReaderCount(sender)) { +bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force) { + if (!force && totalReaderCount(sender)) { return false; } std::string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" - << sender.getApp() << "/" << sender.getId() << " " << force; + << sender.getApp() << "/" << sender.getId() << " " << force; weak_ptr weak_self = static_pointer_cast(shared_from_this()); getPoller()->async([weak_self, err]() { auto strong_self = weak_self.lock(); @@ -81,20 +80,20 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force){ return true; } // 播放总人数 -int SrtTransportImp::totalReaderCount(mediakit::MediaSource &sender){ +int SrtTransportImp::totalReaderCount(mediakit::MediaSource &sender) { return _muxer ? _muxer->totalReaderCount() : sender.readerCount(); } // 获取媒体源类型 -mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &sender) const{ +mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &sender) const { return MediaOriginType::srt_push; } // 获取媒体源url或者文件路径 -std::string SrtTransportImp::getOriginUrl(mediakit::MediaSource &sender) const{ +std::string SrtTransportImp::getOriginUrl(mediakit::MediaSource &sender) const { return _media_info._full_url; } // 获取媒体源客户端相关信息 -std::shared_ptr SrtTransportImp::getOriginSock(mediakit::MediaSource &sender) const{ - return static_pointer_cast(getSession()); +std::shared_ptr SrtTransportImp::getOriginSock(mediakit::MediaSource &sender) const { + return static_pointer_cast(getSession()); } void SrtTransportImp::emitOnPublish() { @@ -105,50 +104,51 @@ void SrtTransportImp::emitOnPublish() { return; } if (err.empty()) { - strong_self->_muxer = std::make_shared(strong_self->_media_info._vhost, - strong_self->_media_info._app, - strong_self->_media_info._streamid, 0.0f, - option); + strong_self->_muxer = std::make_shared( + strong_self->_media_info._vhost, strong_self->_media_info._app, strong_self->_media_info._streamid, + 0.0f, option); strong_self->_muxer->setMediaListener(strong_self); strong_self->doCachedFunc(); InfoP(strong_self) << "允许 srt 推流"; } else { WarnP(strong_self) << "禁止 srt 推流:" << err; - strong_self->onShutdown(SockException(Err_refused,err)); + strong_self->onShutdown(SockException(Err_refused, err)); } }; //触发推流鉴权事件 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker, static_cast(*this)); + auto flag = NoticeCenter::Instance().emitEvent( + Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker, + static_cast(*this)); if (!flag) { //该事件无人监听,默认不鉴权 invoker("", ProtocolOption()); } } - -void SrtTransportImp::emitOnPlay(){ +void SrtTransportImp::emitOnPlay() { std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); - Broadcast::AuthInvoker invoker = [weak_self](const string &err){ + Broadcast::AuthInvoker invoker = [weak_self](const string &err) { auto strong_self = weak_self.lock(); if (!strong_self) { return; } - strong_self->getPoller()->async([strong_self,err]{ - if(err != ""){ - strong_self->onShutdown(SockException(Err_refused,err)); - }else{ + strong_self->getPoller()->async([strong_self, err] { + if (err != "") { + strong_self->onShutdown(SockException(Err_refused, err)); + } else { strong_self->doPlay(); } }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); - if(!flag){ + auto flag = NoticeCenter::Instance().emitEvent( + Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); + if (!flag) { doPlay(); } } -void SrtTransportImp::doPlay(){ +void SrtTransportImp::doPlay() { //异步查找直播流 std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); @@ -158,15 +158,15 @@ void SrtTransportImp::doPlay(){ auto strong_self = weak_self.lock(); if (!strong_self) { //本对象已经销毁 - TraceL<<"本对象已经销毁"; + TraceL << "本对象已经销毁"; return; } if (!src) { //未找到该流 - TraceL<<"未找到该流"; + TraceL << "未找到该流"; strong_self->onShutdown(SockException(Err_shutdown)); } else { - TraceL<<"找到该流"; + TraceL << "找到该流"; auto ts_src = dynamic_pointer_cast(src); assert(ts_src); ts_src->pause(false); @@ -215,7 +215,7 @@ std::string SrtTransportImp::get_local_ip() { } uint16_t SrtTransportImp::get_local_port() { - auto s = getSession(); + auto s = getSession(); if (s) { return s->get_local_port(); } @@ -236,9 +236,7 @@ bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) { } auto frame_cached = Frame::getCacheAbleFrame(frame); lock_guard lck(_func_mtx); - _cached_func.emplace_back([this, frame_cached]() { - _muxer->inputFrame(frame_cached); - }); + _cached_func.emplace_back([this, frame_cached]() { _muxer->inputFrame(frame_cached); }); return true; } @@ -248,9 +246,7 @@ bool SrtTransportImp::addTrack(const Track::Ptr &track) { } lock_guard lck(_func_mtx); - _cached_func.emplace_back([this, track]() { - _muxer->addTrack(track); - }); + _cached_func.emplace_back([this, track]() { _muxer->addTrack(track); }); return true; } @@ -259,9 +255,7 @@ void SrtTransportImp::addTrackCompleted() { _muxer->addTrackCompleted(); } else { lock_guard lck(_func_mtx); - _cached_func.emplace_back([this]() { - _muxer->addTrackCompleted(); - }); + _cached_func.emplace_back([this]() { _muxer->addTrackCompleted(); }); } } @@ -273,10 +267,9 @@ void SrtTransportImp::doCachedFunc() { _cached_func.clear(); } -int SrtTransportImp::getLantencyMul(){ +int SrtTransportImp::getLantencyMul() { GET_CONFIG(int, lantencyMul, kLantencyMul); return lantencyMul; } - } // namespace SRT \ No newline at end of file diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index 5e259632..afa8ab7f 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -1,16 +1,15 @@ #ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H #define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H -#include #include "Common/MultiMediaSourceMuxer.h" #include "Rtp/Decoder.h" -#include "TS/TSMediaSource.h" #include "SrtTransport.hpp" - +#include "TS/TSMediaSource.h" +#include namespace SRT { - using namespace toolkit; - using namespace mediakit; - using namespace std; +using namespace toolkit; +using namespace mediakit; +using namespace std; class SrtTransportImp : public SrtTransport , public toolkit::SockInfo @@ -19,13 +18,11 @@ class SrtTransportImp public: SrtTransportImp(const EventPoller::Ptr &poller); ~SrtTransportImp(); - void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr){ - SrtTransport::inputSockData(buf,len,addr); + void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) { + SrtTransport::inputSockData(buf, len, addr); _total_bytes += len; } - void onSendTSData(const Buffer::Ptr &buffer, bool flush){ - SrtTransport::onSendTSData(buffer,flush); - } + void onSendTSData(const Buffer::Ptr &buffer, bool flush) { SrtTransport::onSendTSData(buffer, flush); } /// SockInfo override std::string get_local_ip() override; uint16_t get_local_port() override; @@ -35,19 +32,17 @@ public: protected: ///////SrtTransport override/////// - void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr) override; + void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) override; void onSRTData(DataPacket::Ptr pkt) override; void onShutdown(const SockException &ex) override; int getLantencyMul() override; - void sendPacket(Buffer::Ptr pkt,bool flush = true) override{ + void sendPacket(Buffer::Ptr pkt, bool flush = true) override { _total_bytes += pkt->size(); - SrtTransport::sendPacket(pkt,flush); + SrtTransport::sendPacket(pkt, flush); }; - bool isPusher() override{ - return _is_pusher; - } + bool isPusher() override { return _is_pusher; } ///////MediaSourceEvent override/////// // 关闭 @@ -62,7 +57,7 @@ protected: std::shared_ptr getOriginSock(mediakit::MediaSource &sender) const override; bool inputFrame(const Frame::Ptr &frame) override; - bool addTrack(const Track::Ptr & track) override; + bool addTrack(const Track::Ptr &track) override; void addTrackCompleted() override; void resetTracks() override {}; @@ -76,16 +71,16 @@ private: private: bool _is_pusher = true; MediaInfo _media_info; - uint64_t _total_bytes = 0; + uint64_t _total_bytes = 0; Ticker _alive_ticker; std::unique_ptr _addr; - // for player + // for player TSMediaSource::RingType::RingReader::Ptr _ts_reader; - // for pusher + // for pusher MultiMediaSourceMuxer::Ptr _muxer; DecoderImp::Ptr _decoder; std::recursive_mutex _func_mtx; - std::deque > _cached_func; + std::deque> _cached_func; }; } // namespace SRT diff --git a/srt/Statistic.cpp b/srt/Statistic.cpp index 9fc13cd5..ad79cb8a 100644 --- a/srt/Statistic.cpp +++ b/srt/Statistic.cpp @@ -2,12 +2,12 @@ #include "Statistic.hpp" namespace SRT { -void PacketRecvRateContext::inputPacket(TimePoint& ts) { - if(_pkt_map.size()>100){ - _pkt_map.erase(_pkt_map.begin()); - } - auto tmp = DurationCountMicroseconds(ts - _start); - _pkt_map.emplace(tmp,tmp); +void PacketRecvRateContext::inputPacket(TimePoint &ts) { + if (_pkt_map.size() > 100) { + _pkt_map.erase(_pkt_map.begin()); + } + auto tmp = DurationCountMicroseconds(ts - _start); + _pkt_map.emplace(tmp, tmp); } uint32_t PacketRecvRateContext::getPacketRecvRate() { if (_pkt_map.size() < 2) { @@ -27,13 +27,13 @@ uint32_t PacketRecvRateContext::getPacketRecvRate() { } double rate = 1e6 / (double)dur; - if(rate <=1000){ + if (rate <= 1000) { return 50000; } return rate; } -void EstimatedLinkCapacityContext::inputPacket(TimePoint& ts) { +void EstimatedLinkCapacityContext::inputPacket(TimePoint &ts) { if (_pkt_map.size() > 16) { _pkt_map.erase(_pkt_map.begin()); } @@ -41,55 +41,54 @@ void EstimatedLinkCapacityContext::inputPacket(TimePoint& ts) { _pkt_map.emplace(tmp, tmp); } uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() { - decltype(_pkt_map.begin()) next; - std::vector tmp; + decltype(_pkt_map.begin()) next; + std::vector tmp; - for(auto it = _pkt_map.begin();it != _pkt_map.end();++it){ - next = it; - ++next; - if(next != _pkt_map.end()){ - tmp.push_back(next->first -it->first); - }else{ - break; - } - } - std::sort(tmp.begin(),tmp.end()); - if(tmp.empty()){ - return 1000; - } + for (auto it = _pkt_map.begin(); it != _pkt_map.end(); ++it) { + next = it; + ++next; + if (next != _pkt_map.end()) { + tmp.push_back(next->first - it->first); + } else { + break; + } + } + std::sort(tmp.begin(), tmp.end()); + if (tmp.empty()) { + return 1000; + } - if(tmp.size()<16){ - return 1000; - } + if (tmp.size() < 16) { + return 1000; + } - double dur =tmp[0]/1e6; - - return (uint32_t)(1.0/dur); + double dur = tmp[0] / 1e6; + return (uint32_t)(1.0 / dur); } -void RecvRateContext::inputPacket(TimePoint& ts, size_t size ) { +void RecvRateContext::inputPacket(TimePoint &ts, size_t size) { if (_pkt_map.size() > 100) { _pkt_map.erase(_pkt_map.begin()); } - auto tmp = DurationCountMicroseconds(ts - _start); + auto tmp = DurationCountMicroseconds(ts - _start); _pkt_map.emplace(tmp, tmp); } uint32_t RecvRateContext::getRecvRate() { - if(_pkt_map.size()<2){ + if (_pkt_map.size() < 2) { return 0; } auto first = _pkt_map.begin(); auto last = _pkt_map.rbegin(); - double dur = (last->first - first->first)/1000000.0; + double dur = (last->first - first->first) / 1000000.0; size_t bytes = 0; - for(auto it : _pkt_map){ + for (auto it : _pkt_map) { bytes += it.second; } - double rate = (double)bytes/dur; + double rate = (double)bytes / dur; return (uint32_t)rate; } diff --git a/srt/Statistic.hpp b/srt/Statistic.hpp index 4524aebe..c47c9700 100644 --- a/srt/Statistic.hpp +++ b/srt/Statistic.hpp @@ -8,39 +8,42 @@ namespace SRT { class PacketRecvRateContext { public: - PacketRecvRateContext(TimePoint start):_start(start){}; + PacketRecvRateContext(TimePoint start) + : _start(start) {}; ~PacketRecvRateContext() = default; - void inputPacket(TimePoint& ts); + void inputPacket(TimePoint &ts); uint32_t getPacketRecvRate(); + private: - std::map _pkt_map; + std::map _pkt_map; TimePoint _start; - }; class EstimatedLinkCapacityContext { public: - EstimatedLinkCapacityContext(TimePoint start):_start(start){}; + EstimatedLinkCapacityContext(TimePoint start) + : _start(start) {}; ~EstimatedLinkCapacityContext() = default; - void inputPacket(TimePoint& ts); + void inputPacket(TimePoint &ts); uint32_t getEstimatedLinkCapacity(); + private: - std::map _pkt_map; + std::map _pkt_map; TimePoint _start; }; class RecvRateContext { public: - RecvRateContext(TimePoint start):_start(start){}; + RecvRateContext(TimePoint start) + : _start(start) {}; ~RecvRateContext() = default; - void inputPacket(TimePoint& ts,size_t size); + void inputPacket(TimePoint &ts, size_t size); uint32_t getRecvRate(); + private: - std::map _pkt_map; - TimePoint _start; + std::map _pkt_map; + TimePoint _start; }; - - } // namespace SRT #endif // ZLMEDIAKIT_SRT_STATISTIC_H \ No newline at end of file From 020b8b85ea44f030066a9d1c102db5807e29b957 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Mon, 6 Jun 2022 22:13:53 +0800 Subject: [PATCH 2/2] try fix for windows build --- srt/Common.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/srt/Common.hpp b/srt/Common.hpp index 90ca20b0..cc49cff1 100644 --- a/srt/Common.hpp +++ b/srt/Common.hpp @@ -1,11 +1,11 @@ #ifndef ZLMEDIAKIT_SRT_COMMON_H #define ZLMEDIAKIT_SRT_COMMON_H #if defined(_WIN32) -#include #include #include -#pragma comment(lib, "Ws2_32.lib") -#pragma comment(lib, "Iphlpapi.lib") +#include +#pragma comment (lib, "Ws2_32.lib") +#pragma comment(lib,"Iphlpapi.lib") #else #include #include