格式化srt相关代码

This commit is contained in:
ziyue 2022-06-07 09:52:20 +08:00
parent 83d75c9a72
commit e415230e47
20 changed files with 821 additions and 789 deletions

View File

@ -291,8 +291,8 @@ timeoutSec=5
#该端口是多线程的,同时支持客户端网络切换导致的连接迁移 #该端口是多线程的,同时支持客户端网络切换导致的连接迁移
port=9000 port=9000
#srt 协议中延迟缓存的估算参数在握手阶段估算rtt ,然后lantencyMul*rtt 为最大缓存时长,此参数越大,表示等待重传的时长就越大 #srt 协议中延迟缓存的估算参数在握手阶段估算rtt ,然后latencyMul*rtt 为最大缓存时长,此参数越大,表示等待重传的时长就越大
lantencyMul=4 latencyMul=4
[rtsp] [rtsp]

View File

@ -2,6 +2,7 @@
#include "Common.hpp" #include "Common.hpp"
namespace SRT { namespace SRT {
bool ACKPacket::loadFromData(uint8_t *buf, size_t len) { bool ACKPacket::loadFromData(uint8_t *buf, size_t len) {
if (len < ACK_CIF_SIZE + ControlPacket::HEADER_SIZE) { if (len < ACK_CIF_SIZE + ControlPacket::HEADER_SIZE) {
return false; return false;
@ -36,6 +37,7 @@ bool ACKPacket::loadFromData(uint8_t *buf, size_t len) {
return true; return true;
} }
bool ACKPacket::storeToData() { bool ACKPacket::storeToData() {
_data = BufferRaw::create(); _data = BufferRaw::create();
_data->setCapacity(HEADER_SIZE + ACK_CIF_SIZE); _data->setCapacity(HEADER_SIZE + ACK_CIF_SIZE);
@ -74,10 +76,10 @@ bool ACKPacket::storeToData() {
std::string ACKPacket::dump() { std::string ACKPacket::dump() {
_StrPrinter printer; _StrPrinter printer;
printer << "last_ack_pkt_seq_number="<<last_ack_pkt_seq_number<<\ printer << "last_ack_pkt_seq_number=" << last_ack_pkt_seq_number << " rtt=" << rtt
" rtt="<<rtt<<" rtt_variance="<<rtt_variance<<\ << " rtt_variance=" << rtt_variance << " pkt_recv_rate=" << pkt_recv_rate
" pkt_recv_rate="<<pkt_recv_rate<<" available_buf_size="<<available_buf_size<<\ << " available_buf_size=" << available_buf_size << " estimated_link_capacity=" << estimated_link_capacity
" estimated_link_capacity="<<estimated_link_capacity<<" recv_rate="<<recv_rate; << " recv_rate=" << recv_rate;
return std::move(printer); return std::move(printer);
} }
} // namespace } // namespace SRT

View File

@ -2,7 +2,6 @@
#define ZLMEDIAKIT_SRT_ACK_H #define ZLMEDIAKIT_SRT_ACK_H
#include "Packet.hpp" #include "Packet.hpp"
namespace SRT { namespace SRT {
/* /*
0 1 2 3 0 1 2 3
@ -33,16 +32,13 @@ namespace SRT{
Figure 13: ACK control packet Figure 13: ACK control packet
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-ack-acknowledgment https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-ack-acknowledgment
*/ */
class ACKPacket : public ControlPacket class ACKPacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<ACKPacket>; using Ptr = std::shared_ptr<ACKPacket>;
ACKPacket() = default; ACKPacket() = default;
~ACKPacket() = default; ~ACKPacket() = default;
enum{ enum { ACK_CIF_SIZE = 7 * 4 };
ACK_CIF_SIZE = 7*4
};
std::string dump(); std::string dump();
///////ControlPacket override/////// ///////ControlPacket override///////
bool loadFromData(uint8_t *buf, size_t len) override; bool loadFromData(uint8_t *buf, size_t len) override;
@ -59,7 +55,6 @@ public:
uint32_t recv_rate; uint32_t recv_rate;
}; };
class ACKACKPacket : public ControlPacket { class ACKACKPacket : public ControlPacket {
public: public:
using Ptr = std::shared_ptr<ACKACKPacket>; using Ptr = std::shared_ptr<ACKACKPacket>;
@ -89,7 +84,6 @@ public:
} }
uint32_t ack_number; uint32_t ack_number;
}; };
} // namespace SRT } // namespace SRT

View File

@ -2,83 +2,73 @@
#define ZLMEDIAKIT_SRT_COMMON_H #define ZLMEDIAKIT_SRT_COMMON_H
#include <chrono> #include <chrono>
namespace SRT namespace SRT {
{
using SteadyClock = std::chrono::steady_clock; using SteadyClock = std::chrono::steady_clock;
using TimePoint = std::chrono::time_point<SteadyClock>; using TimePoint = std::chrono::time_point<SteadyClock>;
using Microseconds = std::chrono::microseconds; using Microseconds = std::chrono::microseconds;
using Milliseconds = std::chrono::milliseconds; using Milliseconds = std::chrono::milliseconds;
inline int64_t DurationCountMicroseconds( SteadyClock::duration dur){ static inline int64_t DurationCountMicroseconds(SteadyClock::duration dur) {
return std::chrono::duration_cast<std::chrono::microseconds>(dur).count(); return std::chrono::duration_cast<std::chrono::microseconds>(dur).count();
} }
inline uint32_t loadUint32(uint8_t *ptr) { static inline uint32_t loadUint32(uint8_t *ptr) {
return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3]; return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3];
} }
inline uint16_t loadUint16(uint8_t *ptr) {
static inline uint16_t loadUint16(uint8_t *ptr) {
return ptr[0] << 8 | ptr[1]; return ptr[0] << 8 | ptr[1];
} }
inline void storeUint32(uint8_t *buf, uint32_t val) { static inline void storeUint32(uint8_t *buf, uint32_t val) {
buf[0] = val >> 24; buf[0] = val >> 24;
buf[1] = (val >> 16) & 0xff; buf[1] = (val >> 16) & 0xff;
buf[2] = (val >> 8) & 0xff; buf[2] = (val >> 8) & 0xff;
buf[3] = val & 0xff; buf[3] = val & 0xff;
} }
inline void storeUint16(uint8_t *buf, uint16_t val) { static inline void storeUint16(uint8_t *buf, uint16_t val) {
buf[0] = (val >> 8) & 0xff; buf[0] = (val >> 8) & 0xff;
buf[1] = val & 0xff; buf[1] = val & 0xff;
} }
inline void storeUint32LE(uint8_t *buf, uint32_t val) { static inline void storeUint32LE(uint8_t *buf, uint32_t val) {
buf[0] = val & 0xff; buf[0] = val & 0xff;
buf[1] = (val >> 8) & 0xff; buf[1] = (val >> 8) & 0xff;
buf[2] = (val >> 16) & 0xff; buf[2] = (val >> 16) & 0xff;
buf[3] = (val >> 24) & 0xff; buf[3] = (val >> 24) & 0xff;
} }
inline void storeUint16LE(uint8_t *buf, uint16_t val) { static inline void storeUint16LE(uint8_t *buf, uint16_t val) {
buf[0] = val & 0xff; buf[0] = val & 0xff;
buf[1] = (val >> 8) & 0xff; buf[1] = (val >> 8) & 0xff;
} }
inline uint32_t srtVersion(int major, int minor, int patch) static inline uint32_t srtVersion(int major, int minor, int patch) {
{
return patch + minor * 0x100 + major * 0x10000; return patch + minor * 0x100 + major * 0x10000;
} }
class UTicker { class UTicker {
public: public:
UTicker() { UTicker() { _created = _begin = SteadyClock::now(); }
_created = _begin = SteadyClock::now(); ~UTicker() = default;
}
~UTicker() {
}
/** /**
* *
*/ */
int64_t elapsedTime(TimePoint now) const { int64_t elapsedTime(TimePoint now) const { return DurationCountMicroseconds(now - _begin); }
return DurationCountMicroseconds(now - _begin);
}
/** /**
* resetTime后至今的时间 * resetTime后至今的时间
*/ */
int64_t createdTime(TimePoint now) const { int64_t createdTime(TimePoint now) const { return DurationCountMicroseconds(now - _created); }
return DurationCountMicroseconds(now - _created);
}
/** /**
* *
*/ */
void resetTime(TimePoint now) { void resetTime(TimePoint now) { _begin = now; }
_begin = now;
}
private: private:
TimePoint _begin; TimePoint _begin;

View File

@ -1,6 +1,7 @@
#include "HSExt.hpp" #include "HSExt.hpp"
namespace SRT { namespace SRT {
bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) { bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) {
if (buf == NULL || len != HSEXT_MSG_SIZE) { if (buf == NULL || len != HSEXT_MSG_SIZE) {
return false; return false;
@ -27,14 +28,15 @@ bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) {
ptr += 2; ptr += 2;
return true; return true;
} }
std::string HSExtMessage::dump() { std::string HSExtMessage::dump() {
_StrPrinter printer; _StrPrinter printer;
printer << "srt version : "<<std::hex<<srt_version<<" srt flag : "<<std::hex<<srt_flag<<\ printer << "srt version : " << std::hex << srt_version << " srt flag : " << std::hex << srt_flag
" recv_tsbpd_delay="<<recv_tsbpd_delay<<" send_tsbpd_delay = "<<send_tsbpd_delay; << " recv_tsbpd_delay=" << recv_tsbpd_delay << " send_tsbpd_delay = " << send_tsbpd_delay;
return std::move(printer); return std::move(printer);
} }
bool HSExtMessage::storeToData() { bool HSExtMessage::storeToData() {
_data = BufferRaw::create(); _data = BufferRaw::create();
_data->setCapacity(HSEXT_MSG_SIZE); _data->setCapacity(HSEXT_MSG_SIZE);
@ -85,8 +87,8 @@ bool HSExtStreamID::loadFromData(uint8_t *buf, size_t len) {
streamid.erase(streamid.find_first_of(zero), streamid.size()); streamid.erase(streamid.find_first_of(zero), streamid.size());
} }
return true; return true;
} }
bool HSExtStreamID::storeToData() { bool HSExtStreamID::storeToData() {
size_t content_size = ((streamid.length() + 4) + 3) / 4 * 4; size_t content_size = ((streamid.length() + 4) + 3) / 4 * 4;
@ -122,6 +124,7 @@ bool HSExtStreamID::storeToData() {
return true; return true;
} }
std::string HSExtStreamID::dump() { std::string HSExtStreamID::dump() {
_StrPrinter printer; _StrPrinter printer;
printer << " streamid : " << streamid; printer << " streamid : " << streamid;

View File

@ -11,6 +11,7 @@ class HSExt : public Buffer {
public: public:
HSExt() = default; HSExt() = default;
virtual ~HSExt() = default; virtual ~HSExt() = default;
enum { enum {
SRT_CMD_REJECT = 0, SRT_CMD_REJECT = 0,
SRT_CMD_HSREQ = 1, SRT_CMD_HSREQ = 1,
@ -36,13 +37,13 @@ public:
return _data->data(); return _data->data();
} }
return nullptr; return nullptr;
}; }
size_t size() const override { size_t size() const override {
if (_data) { if (_data) {
return _data->size(); return _data->size();
} }
return 0; return 0;
}; }
protected: protected:
void loadHeader() { void loadHeader() {

View File

@ -10,19 +10,13 @@
#include <netdb.h> #include <netdb.h>
#endif // defined(_WIN32) #endif // defined(_WIN32)
#include <atomic> #include <atomic>
#include "Util/logger.h" #include "Util/logger.h"
#include "Util/MD5.h" #include "Util/MD5.h"
#include "Packet.hpp" #include "Packet.hpp"
namespace SRT { namespace SRT {
const size_t DataPacket::HEADER_SIZE; const size_t DataPacket::HEADER_SIZE;
const size_t ControlPacket::HEADER_SIZE; const size_t ControlPacket::HEADER_SIZE;
const size_t HandshakePacket::HS_CONTENT_MIN_SIZE; const size_t HandshakePacket::HS_CONTENT_MIN_SIZE;
@ -71,6 +65,7 @@ bool DataPacket::loadFromData(uint8_t *buf, size_t len) {
_data->assign((char *)(buf), len); _data->assign((char *)(buf), len);
return true; return true;
} }
bool DataPacket::storeToHeader() { bool DataPacket::storeToHeader() {
if (!_data || _data->size() < HEADER_SIZE) { if (!_data || _data->size() < HEADER_SIZE) {
WarnL << "data size less " << HEADER_SIZE; WarnL << "data size less " << HEADER_SIZE;
@ -101,6 +96,7 @@ bool DataPacket::storeToHeader(){
ptr += 4; ptr += 4;
return true; return true;
} }
bool DataPacket::storeToData(uint8_t *buf, size_t len) { bool DataPacket::storeToData(uint8_t *buf, size_t len) {
_data = BufferRaw::create(); _data = BufferRaw::create();
_data->setCapacity(len + HEADER_SIZE); _data->setCapacity(len + HEADER_SIZE);
@ -139,6 +135,7 @@ char *DataPacket::data() const {
return nullptr; return nullptr;
return _data->data(); return _data->data();
} }
size_t DataPacket::size() const { size_t DataPacket::size() const {
if (!_data) { if (!_data) {
return 0; return 0;
@ -151,6 +148,7 @@ char *DataPacket::payloadData() {
return nullptr; return nullptr;
return _data->data() + HEADER_SIZE; return _data->data() + HEADER_SIZE;
} }
size_t DataPacket::payloadSize() { size_t DataPacket::payloadSize() {
if (!_data) { if (!_data) {
return 0; return 0;
@ -158,8 +156,6 @@ size_t DataPacket::payloadSize() {
return _data->size() - HEADER_SIZE; return _data->size() - HEADER_SIZE;
} }
bool ControlPacket::isControlPacket(uint8_t *buf, size_t len) { bool ControlPacket::isControlPacket(uint8_t *buf, size_t len) {
if (len < HEADER_SIZE) { if (len < HEADER_SIZE) {
WarnL << "data size" << len << " less " << HEADER_SIZE; WarnL << "data size" << len << " less " << HEADER_SIZE;
@ -199,6 +195,7 @@ bool ControlPacket::loadHeader() {
ptr += 4; ptr += 4;
return true; return true;
} }
bool ControlPacket::storeToHeader() { bool ControlPacket::storeToHeader() {
uint8_t *ptr = (uint8_t *)_data->data(); uint8_t *ptr = (uint8_t *)_data->data();
ptr[0] = 0x80; ptr[0] = 0x80;
@ -228,15 +225,18 @@ char *ControlPacket::data() const {
return nullptr; return nullptr;
return _data->data(); return _data->data();
} }
size_t ControlPacket::size() const { size_t ControlPacket::size() const {
if (!_data) { if (!_data) {
return 0; return 0;
} }
return _data->size(); return _data->size();
} }
uint32_t ControlPacket::getSocketID(uint8_t *buf, size_t len) { uint32_t ControlPacket::getSocketID(uint8_t *buf, size_t len) {
return loadUint32(buf + 12); return loadUint32(buf + 12);
} }
bool HandshakePacket::loadFromData(uint8_t *buf, size_t len) { bool HandshakePacket::loadFromData(uint8_t *buf, size_t len) {
if (HEADER_SIZE + HS_CONTENT_MIN_SIZE > len) { if (HEADER_SIZE + HS_CONTENT_MIN_SIZE > len) {
ErrorL << "size too smalle " << encryption_field; ErrorL << "size too smalle " << encryption_field;
@ -293,6 +293,7 @@ bool HandshakePacket::loadFromData(uint8_t *buf, size_t len) {
return loadExtMessage(ptr, len - HS_CONTENT_MIN_SIZE - HEADER_SIZE); return loadExtMessage(ptr, len - HS_CONTENT_MIN_SIZE - HEADER_SIZE);
} }
bool HandshakePacket::loadExtMessage(uint8_t *buf, size_t len) { bool HandshakePacket::loadExtMessage(uint8_t *buf, size_t len) {
uint8_t *ptr = buf; uint8_t *ptr = buf;
ext_list.clear(); ext_list.clear();
@ -302,15 +303,10 @@ bool HandshakePacket::loadExtMessage(uint8_t *buf,size_t len){
while (ptr < buf + len) { while (ptr < buf + len) {
type = loadUint16(ptr); type = loadUint16(ptr);
length = loadUint16(ptr + 2); length = loadUint16(ptr + 2);
switch (type) switch (type) {
{
case HSExt::SRT_CMD_HSREQ: case HSExt::SRT_CMD_HSREQ:
case HSExt::SRT_CMD_HSRSP: case HSExt::SRT_CMD_HSRSP: ext = std::make_shared<HSExtMessage>(); break;
ext = std::make_shared<HSExtMessage>(); case HSExt::SRT_CMD_SID: ext = std::make_shared<HSExtStreamID>(); break;
break;
case HSExt::SRT_CMD_SID:
ext = std::make_shared<HSExtStreamID>();
break;
default: default:
WarnL << "not support ext " << type; WarnL << "not support ext " << type;
break; break;
@ -329,8 +325,7 @@ bool HandshakePacket::loadExtMessage(uint8_t *buf,size_t len){
return true; return true;
} }
bool HandshakePacket::storeExtMessage() bool HandshakePacket::storeExtMessage() {
{
uint8_t *buf = (uint8_t *)_data->data() + HEADER_SIZE + 48; uint8_t *buf = (uint8_t *)_data->data() + HEADER_SIZE + 48;
size_t len = _data->size() - HEADER_SIZE - 48; size_t len = _data->size() - HEADER_SIZE - 48;
for (auto ex : ext_list) { for (auto ex : ext_list) {
@ -347,6 +342,7 @@ bool HandshakePacket::storeExtMessage()
} }
return size; return size;
} }
bool HandshakePacket::storeToData() { bool HandshakePacket::storeToData() {
_data = BufferRaw::create(); _data = BufferRaw::create();
for (auto ex : ext_list) { for (auto ex : ext_list) {
@ -399,7 +395,6 @@ bool HandshakePacket::storeToData() {
assert(encryption_field == NO_ENCRYPTION); assert(encryption_field == NO_ENCRYPTION);
return storeExtMessage(); return storeExtMessage();
} }
@ -415,7 +410,6 @@ bool HandshakePacket::isHandshakePacket(uint8_t *buf, size_t len){
uint32_t HandshakePacket::getHandshakeType(uint8_t *buf, size_t len) { uint32_t HandshakePacket::getHandshakeType(uint8_t *buf, size_t len) {
uint8_t *ptr = buf + HEADER_SIZE + 5 * 4; uint8_t *ptr = buf + HEADER_SIZE + 5 * 4;
return loadUint32(ptr); return loadUint32(ptr);
} }
@ -423,6 +417,7 @@ uint32_t HandshakePacket::getSynCookie(uint8_t *buf, size_t len){
uint8_t *ptr = buf + HEADER_SIZE + 7 * 4; uint8_t *ptr = buf + HEADER_SIZE + 7 * 4;
return loadUint32(ptr); return loadUint32(ptr);
} }
void HandshakePacket::assignPeerIP(struct sockaddr_storage *addr) { void HandshakePacket::assignPeerIP(struct sockaddr_storage *addr) {
memset(peer_ip_addr, 0, sizeof(peer_ip_addr) * sizeof(peer_ip_addr[0])); memset(peer_ip_addr, 0, sizeof(peer_ip_addr) * sizeof(peer_ip_addr[0]));
if (addr->ss_family == AF_INET) { if (addr->ss_family == AF_INET) {
@ -440,13 +435,13 @@ void HandshakePacket::assignPeerIP(struct sockaddr_storage* addr){
} }
} }
} }
uint32_t HandshakePacket::generateSynCookie(struct sockaddr_storage* addr,TimePoint ts,uint32_t current_cookie, int correction ){
uint32_t HandshakePacket::generateSynCookie(
struct sockaddr_storage *addr, TimePoint ts, uint32_t current_cookie, int correction) {
static std::atomic<uint32_t> distractor { 0 }; static std::atomic<uint32_t> distractor { 0 };
uint32_t rollover = distractor.load() + 10; uint32_t rollover = distractor.load() + 10;
for (;;) while (true) {
{
// SYN cookie // SYN cookie
char clienthost[NI_MAXHOST]; char clienthost[NI_MAXHOST];
char clientport[NI_MAXSERV]; char clientport[NI_MAXSERV];
@ -468,17 +463,19 @@ uint32_t HandshakePacket::generateSynCookie(struct sockaddr_storage* addr,TimePo
MD5 md5(cookiestr.str()); MD5 md5(cookiestr.str());
memcpy(cookie, md5.rawdigest().c_str(), 16); memcpy(cookie, md5.rawdigest().c_str(), 16);
if (cookie_val != current_cookie) if (cookie_val != current_cookie) {
return cookie_val; return cookie_val;
}
++distractor; ++distractor;
// This is just to make the loop formally breakable, // This is just to make the loop formally breakable,
// but this is virtually impossible to happen. // but this is virtually impossible to happen.
if (distractor == rollover) if (distractor == rollover) {
return cookie_val; return cookie_val;
} }
} }
}
bool KeepLivePacket::loadFromData(uint8_t *buf, size_t len) { bool KeepLivePacket::loadFromData(uint8_t *buf, size_t len) {
if (len < HEADER_SIZE) { if (len < HEADER_SIZE) {
@ -512,8 +509,7 @@ bool NAKPacket::loadFromData(uint8_t *buf, size_t len) {
uint8_t *ptr = (uint8_t *)_data->data() + HEADER_SIZE; uint8_t *ptr = (uint8_t *)_data->data() + HEADER_SIZE;
uint8_t *end = (uint8_t *)_data->data() + _data->size(); uint8_t *end = (uint8_t *)_data->data() + _data->size();
LostPair lost; LostPair lost;
while (ptr<end) while (ptr < end) {
{
if ((*ptr) & 0x80) { if ((*ptr) & 0x80) {
lost.first = loadUint32(ptr) & 0x7fffffff; lost.first = loadUint32(ptr) & 0x7fffffff;
lost.second = loadUint32(ptr + 4) & 0x7fffffff; lost.second = loadUint32(ptr + 4) & 0x7fffffff;

View File

@ -5,6 +5,7 @@
#include <vector> #include <vector>
#include "Network/Buffer.h" #include "Network/Buffer.h"
#include "Network/sockutil.h"
#include "Util/logger.h" #include "Util/logger.h"
#include "Common.hpp" #include "Common.hpp"
@ -182,8 +183,6 @@ public:
enum { HS_EXT_FILED_HSREQ = 0x00000001, HS_EXT_FILED_KMREQ = 0x00000002, HS_EXT_FILED_CONFIG = 0x00000004 }; enum { HS_EXT_FILED_HSREQ = 0x00000001, HS_EXT_FILED_KMREQ = 0x00000002, HS_EXT_FILED_CONFIG = 0x00000004 };
HandshakePacket() = default; HandshakePacket() = default;
~HandshakePacket() = default; ~HandshakePacket() = default;
@ -209,6 +208,7 @@ public:
uint8_t peer_ip_addr[16]; uint8_t peer_ip_addr[16];
std::vector<HSExt::Ptr> ext_list; std::vector<HSExt::Ptr> ext_list;
private: private:
bool loadExtMessage(uint8_t *buf, size_t len); bool loadExtMessage(uint8_t *buf, size_t len);
bool storeExtMessage(); bool storeExtMessage();
@ -229,8 +229,7 @@ private:
Figure 12: Keep-Alive control packet Figure 12: Keep-Alive control packet
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-keep-alive https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-keep-alive
*/ */
class KeepLivePacket : public ControlPacket class KeepLivePacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<KeepLivePacket>; using Ptr = std::shared_ptr<KeepLivePacket>;
KeepLivePacket() = default; KeepLivePacket() = default;
@ -265,8 +264,7 @@ An SRT NAK packet is formatted as follows:
Figure 14: NAK control packet Figure 14: NAK control packet
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-nak-control-packet https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-nak-control-packet
*/ */
class NAKPacket : public ControlPacket class NAKPacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<NAKPacket>; using Ptr = std::shared_ptr<NAKPacket>;
using LostPair = std::pair<uint32_t, uint32_t>; using LostPair = std::pair<uint32_t, uint32_t>;
@ -278,11 +276,11 @@ public:
bool storeToData() override; bool storeToData() override;
std::list<LostPair> lost_list; std::list<LostPair> lost_list;
private: private:
size_t getCIFSize(); size_t getCIFSize();
}; };
/* /*
0 1 2 3 0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
@ -302,8 +300,7 @@ private:
Figure 18: Drop Request control packet Figure 18: Drop Request control packet
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-message-drop-request https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-message-drop-request
*/ */
class MsgDropReqPacket : public ControlPacket class MsgDropReqPacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<MsgDropReqPacket>; using Ptr = std::shared_ptr<MsgDropReqPacket>;
MsgDropReqPacket() = default; MsgDropReqPacket() = default;
@ -332,12 +329,12 @@ class MsgDropReqPacket : public ControlPacket
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-shutdown https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-shutdown
*/ */
class ShutDownPacket : public ControlPacket class ShutDownPacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<ShutDownPacket>; using Ptr = std::shared_ptr<ShutDownPacket>;
ShutDownPacket() = default; ShutDownPacket() = default;
~ShutDownPacket() = default; ~ShutDownPacket() = default;
///////ControlPacket override/////// ///////ControlPacket override///////
bool loadFromData(uint8_t *buf, size_t len) override { bool loadFromData(uint8_t *buf, size_t len) override {
if (len < HEADER_SIZE) { if (len < HEADER_SIZE) {
@ -348,7 +345,7 @@ public:
_data->assign((char *)buf, len); _data->assign((char *)buf, len);
return loadHeader(); return loadHeader();
}; }
bool storeToData() override { bool storeToData() override {
control_type = ControlPacket::SHUTDOWN; control_type = ControlPacket::SHUTDOWN;
sub_type = 0; sub_type = 0;
@ -356,8 +353,9 @@ public:
_data->setCapacity(HEADER_SIZE); _data->setCapacity(HEADER_SIZE);
_data->setSize(HEADER_SIZE); _data->setSize(HEADER_SIZE);
return storeToHeader(); return storeToHeader();
}
}; };
};
} // namespace SRT } // namespace SRT
#endif //ZLMEDIAKIT_SRT_PACKET_H #endif //ZLMEDIAKIT_SRT_PACKET_H

View File

@ -4,17 +4,19 @@ namespace SRT {
#define MAX_SEQ 0x7fffffff #define MAX_SEQ 0x7fffffff
#define MAX_TS 0xffffffff #define MAX_TS 0xffffffff
inline uint32_t genExpectedSeq(uint32_t seq){
static inline uint32_t genExpectedSeq(uint32_t seq) {
return MAX_SEQ & seq; return MAX_SEQ & seq;
} }
inline bool isSeqEdge(uint32_t seq,uint32_t cap){
static inline bool isSeqEdge(uint32_t seq, uint32_t cap) {
if (seq > (MAX_SEQ - cap)) { if (seq > (MAX_SEQ - cap)) {
return true; return true;
} }
return false; return false;
} }
inline bool isTSCycle(uint32_t first,uint32_t second){ static inline bool isTSCycle(uint32_t first, uint32_t second) {
uint32_t diff; uint32_t diff;
if (first > second) { if (first > second) {
diff = first - second; diff = first - second;
@ -28,17 +30,18 @@ inline bool isTSCycle(uint32_t first,uint32_t second){
return false; return false;
} }
} }
PacketQueue::PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency)
: _pkt_expected_seq(init_seq)
, _pkt_cap(max_size)
, _pkt_lantency(lantency) {
}
void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt){
PacketQueue::PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency)
: _pkt_cap(max_size)
, _pkt_latency(latency)
, _pkt_expected_seq(init_seq) {}
void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt) {
if (_pkt_expected_seq <= pkt->packet_seq_number) { if (_pkt_expected_seq <= pkt->packet_seq_number) {
auto diff = pkt->packet_seq_number - _pkt_expected_seq; auto diff = pkt->packet_seq_number - _pkt_expected_seq;
if (diff >= (MAX_SEQ >> 1)) { if (diff >= (MAX_SEQ >> 1)) {
TraceL << "drop packet too later for cycle "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; TraceL << "drop packet too later for cycle "
<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number;
return; return;
} else { } else {
_pkt_map.emplace(pkt->packet_seq_number, pkt); _pkt_map.emplace(pkt->packet_seq_number, pkt);
@ -47,12 +50,15 @@ void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt){
auto diff = _pkt_expected_seq - pkt->packet_seq_number; auto diff = _pkt_expected_seq - pkt->packet_seq_number;
if (diff >= (MAX_SEQ >> 1)) { if (diff >= (MAX_SEQ >> 1)) {
_pkt_map.emplace(pkt->packet_seq_number, pkt); _pkt_map.emplace(pkt->packet_seq_number, pkt);
TraceL<<" cycle packet "<<"expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; TraceL << " cycle packet "
<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number;
} else { } else {
//TraceL << "drop packet too later "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; // TraceL << "drop packet too later "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" <<
// pkt->packet_seq_number;
} }
} }
} }
bool PacketQueue::inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out) { bool PacketQueue::inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out) {
tryInsertPkt(pkt); tryInsertPkt(pkt);
auto it = _pkt_map.find(_pkt_expected_seq); auto it = _pkt_map.find(_pkt_expected_seq);
@ -73,7 +79,7 @@ bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& ou
_pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1);
} }
while (timeLantency() > _pkt_lantency) { while (timeLatency() > _pkt_latency) {
it = _pkt_map.find(_pkt_expected_seq); it = _pkt_map.find(_pkt_expected_seq);
if (it != _pkt_map.end()) { if (it != _pkt_map.end()) {
out.push_back(it->second); out.push_back(it->second);
@ -100,7 +106,7 @@ bool PacketQueue::drop(uint32_t first, uint32_t last,std::list<DataPacket::Ptr>&
return true; return true;
} }
uint32_t PacketQueue::timeLantency() { uint32_t PacketQueue::timeLatency() {
if (_pkt_map.empty()) { if (_pkt_map.empty()) {
return 0; return 0;
} }
@ -182,7 +188,9 @@ size_t PacketQueue::getExpectedSize() {
uint32_t max = _pkt_map.rbegin()->first; uint32_t max = _pkt_map.rbegin()->first;
uint32_t min = _pkt_map.begin()->first; uint32_t min = _pkt_map.begin()->first;
if ((max - min) >= (MAX_SEQ >> 1)) { if ((max - min) >= (MAX_SEQ >> 1)) {
TraceL<<"cycle "<<"expected seq "<<_pkt_expected_seq<<" min "<<min<<" max "<<max<<" size "<<_pkt_map.size(); TraceL << "cycle "
<< "expected seq " << _pkt_expected_seq << " min " << min << " max " << max << " size "
<< _pkt_map.size();
return MAX_SEQ - _pkt_expected_seq + min + 1; return MAX_SEQ - _pkt_expected_seq + min + 1;
} else { } else {
return max - _pkt_expected_seq + 1; return max - _pkt_expected_seq + 1;
@ -205,15 +213,18 @@ size_t PacketQueue::getAvailableBufferSize(){
uint32_t PacketQueue::getExpectedSeq() { uint32_t PacketQueue::getExpectedSeq() {
return _pkt_expected_seq; return _pkt_expected_seq;
} }
std::string PacketQueue::dump() { std::string PacketQueue::dump() {
_StrPrinter printer; _StrPrinter printer;
if (_pkt_map.empty()) { if (_pkt_map.empty()) {
printer << " expected seq :" << _pkt_expected_seq; printer << " expected seq :" << _pkt_expected_seq;
} else { } else {
printer<<" expected seq :"<<_pkt_expected_seq<<" size:"<<_pkt_map.size()<<" first:"<<_pkt_map.begin()->second->packet_seq_number; printer << " expected seq :" << _pkt_expected_seq << " size:" << _pkt_map.size()
<< " first:" << _pkt_map.begin()->second->packet_seq_number;
printer << " last:" << _pkt_map.rbegin()->second->packet_seq_number; printer << " last:" << _pkt_map.rbegin()->second->packet_seq_number;
printer<<" latency:"<<timeLantency()/1e3; printer << " latency:" << timeLatency() / 1e3;
} }
return std::move(printer); return std::move(printer);
} }
} // namespace SRT } // namespace SRT

View File

@ -3,8 +3,8 @@
#include "Packet.hpp" #include "Packet.hpp"
#include <algorithm> #include <algorithm>
#include <list> #include <list>
#include <memory>
#include <map> #include <map>
#include <memory>
#include <tuple> #include <tuple>
#include <utility> #include <utility>
@ -16,11 +16,11 @@ public:
using Ptr = std::shared_ptr<PacketQueue>; using Ptr = std::shared_ptr<PacketQueue>;
using LostPair = std::pair<uint32_t, uint32_t>; using LostPair = std::pair<uint32_t, uint32_t>;
PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency); PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency);
~PacketQueue() = default; ~PacketQueue() = default;
bool inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out); bool inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out);
uint32_t timeLantency(); uint32_t timeLatency();
std::list<LostPair> getLostSeq(); std::list<LostPair> getLostSeq();
size_t getSize(); size_t getSize();
@ -28,18 +28,17 @@ public:
size_t getAvailableBufferSize(); size_t getAvailableBufferSize();
uint32_t getExpectedSeq(); uint32_t getExpectedSeq();
std::string dump();
bool drop(uint32_t first, uint32_t last, std::list<DataPacket::Ptr> &out); bool drop(uint32_t first, uint32_t last, std::list<DataPacket::Ptr> &out);
std::string dump();
private: private:
void tryInsertPkt(DataPacket::Ptr pkt); void tryInsertPkt(DataPacket::Ptr pkt);
private: private:
std::map<uint32_t, DataPacket::Ptr> _pkt_map;
uint32_t _pkt_expected_seq = 0;
uint32_t _pkt_cap; uint32_t _pkt_cap;
uint32_t _pkt_lantency; uint32_t _pkt_latency;
uint32_t _pkt_expected_seq = 0;
std::map<uint32_t, DataPacket::Ptr> _pkt_map;
}; };
} // namespace SRT } // namespace SRT

View File

@ -2,9 +2,10 @@
namespace SRT { namespace SRT {
PacketSendQueue::PacketSendQueue(uint32_t max_size, uint32_t lantency) PacketSendQueue::PacketSendQueue(uint32_t max_size, uint32_t latency)
: _pkt_cap(max_size) : _pkt_cap(max_size)
, _pkt_lantency(lantency) {} , _pkt_latency(latency) {}
bool PacketSendQueue::drop(uint32_t num) { bool PacketSendQueue::drop(uint32_t num) {
decltype(_pkt_cache.begin()) it; decltype(_pkt_cache.begin()) it;
for (it = _pkt_cache.begin(); it != _pkt_cache.end(); ++it) { for (it = _pkt_cache.begin(); it != _pkt_cache.end(); ++it) {
@ -17,12 +18,13 @@ bool PacketSendQueue::drop(uint32_t num) {
} }
return true; return true;
} }
bool PacketSendQueue::inputPacket(DataPacket::Ptr pkt) { bool PacketSendQueue::inputPacket(DataPacket::Ptr pkt) {
_pkt_cache.push_back(pkt); _pkt_cache.push_back(pkt);
while (_pkt_cache.size() > _pkt_cap) { while (_pkt_cache.size() > _pkt_cap) {
_pkt_cache.pop_front(); _pkt_cache.pop_front();
} }
while (timeLantency() > _pkt_lantency) { while (timeLatency() > _pkt_latency) {
_pkt_cache.pop_front(); _pkt_cache.pop_front();
} }
return true; return true;
@ -53,7 +55,7 @@ std::list<DataPacket::Ptr> PacketSendQueue::findPacketBySeq(uint32_t start, uint
return re; return re;
} }
uint32_t PacketSendQueue::timeLantency() { uint32_t PacketSendQueue::timeLatency() {
if (_pkt_cache.empty()) { if (_pkt_cache.empty()) {
return 0; return 0;
} }
@ -67,7 +69,7 @@ uint32_t PacketSendQueue::timeLantency() {
dur = first - last; dur = first - last;
} }
if (dur > (0x01 << 31)) { if (dur > (0x01 << 31)) {
TraceL << "cycle timeLantency " << dur; TraceL << "cycle timeLatency " << dur;
dur = 0xffffffff - dur; dur = 0xffffffff - dur;
} }

View File

@ -1,5 +1,6 @@
#ifndef ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H #ifndef ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H
#define ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H #define ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H
#include "Packet.hpp" #include "Packet.hpp"
#include <algorithm> #include <algorithm>
#include <list> #include <list>
@ -7,23 +8,30 @@
#include <set> #include <set>
#include <tuple> #include <tuple>
#include <utility> #include <utility>
namespace SRT { namespace SRT {
class PacketSendQueue { class PacketSendQueue {
public: public:
using Ptr = std::shared_ptr<PacketSendQueue>; using Ptr = std::shared_ptr<PacketSendQueue>;
using LostPair = std::pair<uint32_t, uint32_t>; using LostPair = std::pair<uint32_t, uint32_t>;
PacketSendQueue(uint32_t max_size, uint32_t lantency);
PacketSendQueue(uint32_t max_size, uint32_t latency);
~PacketSendQueue() = default; ~PacketSendQueue() = default;
bool drop(uint32_t num); bool drop(uint32_t num);
bool inputPacket(DataPacket::Ptr pkt); bool inputPacket(DataPacket::Ptr pkt);
std::list<DataPacket::Ptr> findPacketBySeq(uint32_t start, uint32_t end); std::list<DataPacket::Ptr> findPacketBySeq(uint32_t start, uint32_t end);
private: private:
uint32_t timeLantency(); uint32_t timeLatency();
private: private:
std::list<DataPacket::Ptr> _pkt_cache;
uint32_t _pkt_cap; uint32_t _pkt_cap;
uint32_t _pkt_lantency; uint32_t _pkt_latency;
std::list<DataPacket::Ptr> _pkt_cache;
}; };
} // namespace SRT } // namespace SRT
#endif // ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H #endif // ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H

View File

@ -50,9 +50,11 @@ EventPoller::Ptr SrtSession::queryPoller(const Buffer::Ptr &buffer) {
} }
return nullptr; return nullptr;
} }
void SrtSession::attachServer(const toolkit::Server &server) { void SrtSession::attachServer(const toolkit::Server &server) {
SockUtil::setRecvBuf(getSock()->rawFD(), 1024 * 1024); SockUtil::setRecvBuf(getSock()->rawFD(), 1024 * 1024);
} }
void SrtSession::onRecv(const Buffer::Ptr &buffer) { void SrtSession::onRecv(const Buffer::Ptr &buffer) {
uint8_t *data = (uint8_t *)buffer->data(); uint8_t *data = (uint8_t *)buffer->data();
size_t size = buffer->size(); size_t size = buffer->size();
@ -125,10 +127,12 @@ void SrtSession::onError(const SockException &err) {
// 防止互相引用导致不释放 // 防止互相引用导致不释放
auto transport = std::move(_transport); auto transport = std::move(_transport);
getPoller()->async([transport,err] { getPoller()->async(
[transport, err] {
//延时减引用防止使用transport对象时销毁对象 //延时减引用防止使用transport对象时销毁对象
transport->onShutdown(err); transport->onShutdown(err);
}, false); },
false);
} }
void SrtSession::onManager() { void SrtSession::onManager() {

View File

@ -24,7 +24,6 @@ private:
Ticker _ticker; Ticker _ticker;
struct sockaddr_storage _peer_addr; struct sockaddr_storage _peer_addr;
SrtTransport::Ptr _transport; SrtTransport::Ptr _transport;
}; };
} // namespace SRT } // namespace SRT

View File

@ -1,24 +1,24 @@
#include <stdlib.h> #include <stdlib.h>
#include "Ack.hpp"
#include "Packet.hpp"
#include "SrtTransport.hpp"
#include "Util/onceToken.h" #include "Util/onceToken.h"
#include "SrtTransport.hpp"
#include "Packet.hpp"
#include "Ack.hpp"
namespace SRT { namespace SRT {
#define SRT_FIELD "srt." #define SRT_FIELD "srt."
// srt 超时时间 // srt 超时时间
const std::string kTimeOutSec = SRT_FIELD "timeoutSec"; const std::string kTimeOutSec = SRT_FIELD "timeoutSec";
// srt 单端口udp服务器 // srt 单端口udp服务器
const std::string kPort = SRT_FIELD "port"; const std::string kPort = SRT_FIELD "port";
const std::string kLatencyMul = SRT_FIELD "latencyMul";
const std::string kLantencyMul = SRT_FIELD"lantencyMul";
static std::atomic<uint32_t> s_srt_socket_id_generate { 125 }; static std::atomic<uint32_t> s_srt_socket_id_generate { 125 };
//////////// SrtTransport ////////////////////////// //////////// SrtTransport //////////////////////////
SrtTransport::SrtTransport(const EventPoller::Ptr &poller) SrtTransport::SrtTransport(const EventPoller::Ptr &poller)
: _poller(poller) { : _poller(poller) {
_start_timestamp = SteadyClock::now(); _start_timestamp = SteadyClock::now();
_socket_id = s_srt_socket_id_generate.fetch_add(1);\ _socket_id = s_srt_socket_id_generate.fetch_add(1);
_pkt_recv_rate_context = std::make_shared<PacketRecvRateContext>(_start_timestamp); _pkt_recv_rate_context = std::make_shared<PacketRecvRateContext>(_start_timestamp);
_recv_rate_context = std::make_shared<RecvRateContext>(_start_timestamp); _recv_rate_context = std::make_shared<RecvRateContext>(_start_timestamp);
_estimated_link_capacity_context = std::make_shared<EstimatedLinkCapacityContext>(_start_timestamp); _estimated_link_capacity_context = std::make_shared<EstimatedLinkCapacityContext>(_start_timestamp);
@ -27,6 +27,7 @@ SrtTransport::SrtTransport(const EventPoller::Ptr &poller)
SrtTransport::~SrtTransport() { SrtTransport::~SrtTransport() {
TraceL << " "; TraceL << " ";
} }
const EventPoller::Ptr &SrtTransport::getPoller() const { const EventPoller::Ptr &SrtTransport::getPoller() const {
return _poller; return _poller;
} }
@ -40,6 +41,7 @@ void SrtTransport::setSession(Session::Ptr session) {
} }
_selected_session = session; _selected_session = session;
} }
const Session::Ptr &SrtTransport::getSession() const { const Session::Ptr &SrtTransport::getSession() const {
return _selected_session; return _selected_session;
} }
@ -146,6 +148,7 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd
registerSelfHandshake(); registerSelfHandshake();
sendControlPacket(res, true); sendControlPacket(res, true);
} }
void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr) { void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr) {
if (!_handleshake_res) { if (!_handleshake_res) {
ErrorL << "must Induction Phase for handleshake "; ErrorL << "must Induction Phase for handleshake ";
@ -157,7 +160,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
HSExtMessage::Ptr req; HSExtMessage::Ptr req;
HSExtStreamID::Ptr sid; HSExtStreamID::Ptr sid;
uint32_t srt_flag = 0xbf; uint32_t srt_flag = 0xbf;
uint16_t delay = DurationCountMicroseconds(_now - _induction_ts)*getLantencyMul()/1000; uint16_t delay = DurationCountMicroseconds(_now - _induction_ts) * getLatencyMul() / 1000;
for (auto ext : pkt.ext_list) { for (auto ext : pkt.ext_list) {
// TraceL << getIdentifier() << " ext " << ext->dump(); // TraceL << getIdentifier() << " ext " << ext->dump();
@ -200,7 +203,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
unregisterSelfHandshake(); unregisterSelfHandshake();
registerSelf(); registerSelf();
sendControlPacket(res, true); sendControlPacket(res, true);
TraceL<<" buf size = "<<res->max_flow_window_size<<" init seq ="<<_init_seq_number<<" lantency="<<delay; TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number << " latency=" << delay;
_recv_buf = std::make_shared<PacketQueue>(res->max_flow_window_size, _init_seq_number, delay * 1e3); _recv_buf = std::make_shared<PacketQueue>(res->max_flow_window_size, _init_seq_number, delay * 1e3);
_send_buf = std::make_shared<PacketSendQueue>(res->max_flow_window_size, delay * 1e3); _send_buf = std::make_shared<PacketSendQueue>(res->max_flow_window_size, delay * 1e3);
_send_packet_seq_number = _init_seq_number; _send_packet_seq_number = _init_seq_number;
@ -212,6 +215,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
} }
_last_ack_pkt_seq_num = _init_seq_number; _last_ack_pkt_seq_num = _init_seq_number;
} }
void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr) {
HandshakePacket pkt; HandshakePacket pkt;
assert(pkt.loadFromData(buf, len)); assert(pkt.loadFromData(buf, len));
@ -226,6 +230,7 @@ void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storag
_ack_ticker.resetTime(_now); _ack_ticker.resetTime(_now);
_nak_ticker.resetTime(_now); _nak_ticker.resetTime(_now);
} }
void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr) {
// TraceL; // TraceL;
sendKeepLivePacket(); sendKeepLivePacket();
@ -238,6 +243,7 @@ void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage
pkt->storeToData(); pkt->storeToData();
sendControlPacket(pkt, true); sendControlPacket(pkt, true);
} }
void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr) {
// TraceL; // TraceL;
ACKPacket ack; ACKPacket ack;
@ -254,6 +260,7 @@ void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *add
sendControlPacket(pkt, true); sendControlPacket(pkt, true);
// TraceL<<"ack number "<<ack.ack_number; // TraceL<<"ack number "<<ack.ack_number;
} }
void SrtTransport::sendMsgDropReq(uint32_t first, uint32_t last) { void SrtTransport::sendMsgDropReq(uint32_t first, uint32_t last) {
MsgDropReqPacket::Ptr pkt = std::make_shared<MsgDropReqPacket>(); MsgDropReqPacket::Ptr pkt = std::make_shared<MsgDropReqPacket>();
pkt->dst_socket_id = _peer_socket_id; pkt->dst_socket_id = _peer_socket_id;
@ -263,6 +270,7 @@ void SrtTransport::sendMsgDropReq(uint32_t first ,uint32_t last){
pkt->storeToData(); pkt->storeToData();
sendControlPacket(pkt, true); sendControlPacket(pkt, true);
} }
void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr) {
// TraceL; // TraceL;
NAKPacket pkt; NAKPacket pkt;
@ -287,13 +295,16 @@ void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *add
} }
} }
} }
void SrtTransport::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr_storage *addr) {
TraceL; TraceL;
} }
void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage *addr) {
TraceL; TraceL;
onShutdown(SockException(Err_shutdown, "peer close connection")); onShutdown(SockException(Err_shutdown, "peer close connection"));
} }
void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr) {
MsgDropReqPacket pkt; MsgDropReqPacket pkt;
pkt.loadFromData(buf, len); pkt.loadFromData(buf, len);
@ -335,8 +346,8 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage
_light_ack_pkt_count = 0; _light_ack_pkt_count = 0;
} }
_light_ack_pkt_count++; _light_ack_pkt_count++;
} }
void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr) {
TraceL; TraceL;
} }
@ -350,7 +361,6 @@ void SrtTransport::handleACKACK(uint8_t *buf, int len, struct sockaddr_storage *
_rtt_variance = (3 * _rtt_variance + abs((long)_rtt - (long)rtt)) / 4; _rtt_variance = (3 * _rtt_variance + abs((long)_rtt - (long)rtt)) / 4;
_rtt = (7 * rtt + _rtt) / 8; _rtt = (7 * rtt + _rtt) / 8;
// TraceL<<" rtt:"<<_rtt<<" rtt variance:"<<_rtt_variance; // TraceL<<" rtt:"<<_rtt<<" rtt variance:"<<_rtt_variance;
_ack_send_timestamp.erase(pkt->ack_number); _ack_send_timestamp.erase(pkt->ack_number);
} }
@ -377,6 +387,7 @@ void SrtTransport::sendACKPacket() {
sendControlPacket(pkt, true); sendControlPacket(pkt, true);
// TraceL<<"send ack "<<pkt->dump(); // TraceL<<"send ack "<<pkt->dump();
} }
void SrtTransport::sendLightACKPacket() { void SrtTransport::sendLightACKPacket() {
ACKPacket::Ptr pkt = std::make_shared<ACKPacket>(); ACKPacket::Ptr pkt = std::make_shared<ACKPacket>();
@ -416,6 +427,7 @@ void SrtTransport::sendShutDown(){
pkt->storeToData(); pkt->storeToData();
sendControlPacket(pkt, true); sendControlPacket(pkt, true);
} }
void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr) { void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr) {
DataPacket::Ptr pkt = std::make_shared<DataPacket>(); DataPacket::Ptr pkt = std::make_shared<DataPacket>();
pkt->loadFromData(buf, len); pkt->loadFromData(buf, len);
@ -435,7 +447,8 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
} }
if (list.empty()) { if (list.empty()) {
//TraceL<<_recv_buf->dump()<<" nake interval:"<<nak_interval/1000<<" ticker:"<<_nak_ticker.elapsedTime(_now)/1000; // TraceL<<_recv_buf->dump()<<" nake interval:"<<nak_interval/1000<<"
// ticker:"<<_nak_ticker.elapsedTime(_now)/1000;
} }
if (_nak_ticker.elapsedTime(_now) > nak_interval) { if (_nak_ticker.elapsedTime(_now) > nak_interval) {
@ -472,9 +485,11 @@ void SrtTransport::sendDataPacket(DataPacket::Ptr pkt,char* buf,int len, bool fl
sendPacket(pkt, flush); sendPacket(pkt, flush);
_send_buf->inputPacket(pkt); _send_buf->inputPacket(pkt);
} }
void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) { void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) {
sendPacket(pkt, flush); sendPacket(pkt, flush);
} }
void SrtTransport::sendPacket(Buffer::Ptr pkt, bool flush) { void SrtTransport::sendPacket(Buffer::Ptr pkt, bool flush) {
if (_selected_session) { if (_selected_session) {
auto tmp = _packet_pool.obtain2(); auto tmp = _packet_pool.obtain2();
@ -485,6 +500,7 @@ void SrtTransport::sendPacket(Buffer::Ptr pkt,bool flush){
WarnL << "not reach this"; WarnL << "not reach this";
} }
} }
std::string SrtTransport::getIdentifier() { std::string SrtTransport::getIdentifier() {
return _selected_session ? _selected_session->getIdentifier() : ""; return _selected_session ? _selected_session->getIdentifier() : "";
} }
@ -492,6 +508,7 @@ std::string SrtTransport::getIdentifier(){
void SrtTransport::registerSelfHandshake() { void SrtTransport::registerSelfHandshake() {
SrtTransportManager::Instance().addHandshakeItem(std::to_string(_sync_cookie), shared_from_this()); SrtTransportManager::Instance().addHandshakeItem(std::to_string(_sync_cookie), shared_from_this());
} }
void SrtTransport::unregisterSelfHandshake() { void SrtTransport::unregisterSelfHandshake() {
if (_sync_cookie == 0) { if (_sync_cookie == 0) {
return; return;
@ -504,8 +521,8 @@ void SrtTransport::registerSelf() {
return; return;
} }
SrtTransportManager::Instance().addItem(std::to_string(_socket_id), shared_from_this()); SrtTransportManager::Instance().addItem(std::to_string(_socket_id), shared_from_this());
} }
void SrtTransport::unregisterSelf() { void SrtTransport::unregisterSelf() {
SrtTransportManager::Instance().removeItem(std::to_string(_socket_id)); SrtTransportManager::Instance().removeItem(std::to_string(_socket_id));
} }
@ -522,10 +539,12 @@ void SrtTransport::onShutdown(const SockException &ex){
} }
} }
} }
size_t SrtTransport::getPayloadSize() { size_t SrtTransport::getPayloadSize() {
size_t ret = (_mtu - 28 - 16) / 188 * 188; size_t ret = (_mtu - 28 - 16) / 188 * 188;
return ret; return ret;
} }
void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush) { void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush) {
// TraceL; // TraceL;
DataPacket::Ptr pkt; DataPacket::Ptr pkt;
@ -565,9 +584,10 @@ void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush){
pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp); pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp);
sendDataPacket(pkt, ptr, (int)size, flush); sendDataPacket(pkt, ptr, (int)size, flush);
} }
} }
//////////// SrtTransportManager ////////////////////////// //////////// SrtTransportManager //////////////////////////
SrtTransportManager &SrtTransportManager::Instance() { SrtTransportManager &SrtTransportManager::Instance() {
static SrtTransportManager s_instance; static SrtTransportManager s_instance;
return s_instance; return s_instance;
@ -599,10 +619,12 @@ void SrtTransportManager::addHandshakeItem(const std::string &key, const SrtTran
std::lock_guard<std::mutex> lck(_handshake_mtx); std::lock_guard<std::mutex> lck(_handshake_mtx);
_handshake_map[key] = ptr; _handshake_map[key] = ptr;
} }
void SrtTransportManager::removeHandshakeItem(const std::string &key) { void SrtTransportManager::removeHandshakeItem(const std::string &key) {
std::lock_guard<std::mutex> lck(_handshake_mtx); std::lock_guard<std::mutex> lck(_handshake_mtx);
_handshake_map.erase(key); _handshake_map.erase(key);
} }
SrtTransport::Ptr SrtTransportManager::getHandshakeItem(const std::string &key) { SrtTransport::Ptr SrtTransportManager::getHandshakeItem(const std::string &key) {
if (key.empty()) { if (key.empty()) {
return nullptr; return nullptr;
@ -615,5 +637,4 @@ SrtTransport::Ptr SrtTransportManager::getHandshakeItem(const std::string &key)
return it->second.lock(); return it->second.lock();
} }
} // namespace SRT } // namespace SRT

View File

@ -1,10 +1,10 @@
#ifndef ZLMEDIAKIT_SRT_TRANSPORT_H #ifndef ZLMEDIAKIT_SRT_TRANSPORT_H
#define ZLMEDIAKIT_SRT_TRANSPORT_H #define ZLMEDIAKIT_SRT_TRANSPORT_H
#include <mutex> #include <atomic>
#include <chrono> #include <chrono>
#include <memory> #include <memory>
#include <atomic> #include <mutex>
#include "Network/Session.h" #include "Network/Session.h"
#include "Poller/EventPoller.h" #include "Poller/EventPoller.h"
@ -17,11 +17,12 @@
#include "Statistic.hpp" #include "Statistic.hpp"
namespace SRT { namespace SRT {
using namespace toolkit; using namespace toolkit;
extern const std::string kPort; extern const std::string kPort;
extern const std::string kTimeOutSec; extern const std::string kTimeOutSec;
extern const std::string kLantencyMul; extern const std::string kLatencyMul;
class SrtTransport : public std::enable_shared_from_this<SrtTransport> { class SrtTransport : public std::enable_shared_from_this<SrtTransport> {
public: public:
@ -33,6 +34,7 @@ public:
const EventPoller::Ptr &getPoller() const; const EventPoller::Ptr &getPoller() const;
void setSession(Session::Ptr session); void setSession(Session::Ptr session);
const Session::Ptr &getSession() const; const Session::Ptr &getSession() const;
/** /**
* socket收到udp数据 * socket收到udp数据
* @param buf * @param buf
@ -43,20 +45,20 @@ public:
virtual void onSendTSData(const Buffer::Ptr &buffer, bool flush); virtual void onSendTSData(const Buffer::Ptr &buffer, bool flush);
std::string getIdentifier(); std::string getIdentifier();
void unregisterSelfHandshake();
void unregisterSelf(); void unregisterSelf();
void unregisterSelfHandshake();
protected: protected:
virtual void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr){}; virtual bool isPusher() { return true; };
virtual void onSRTData(DataPacket::Ptr pkt) {}; virtual void onSRTData(DataPacket::Ptr pkt) {};
virtual void onShutdown(const SockException &ex); virtual void onShutdown(const SockException &ex);
virtual bool isPusher(){ virtual void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) {};
return true; virtual void sendPacket(Buffer::Ptr pkt, bool flush = true);
}; virtual int getLatencyMul() { return 4; };
private: private:
void registerSelfHandshake();
void registerSelf(); void registerSelf();
void registerSelfHandshake();
void switchToOtherTransport(uint8_t *buf, int len, uint32_t socketid, struct sockaddr_storage *addr); void switchToOtherTransport(uint8_t *buf, int len, uint32_t socketid, struct sockaddr_storage *addr);
@ -83,13 +85,11 @@ private:
void sendMsgDropReq(uint32_t first, uint32_t last); void sendMsgDropReq(uint32_t first, uint32_t last);
size_t getPayloadSize(); size_t getPayloadSize();
protected: protected:
void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false); void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false);
void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true); void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true);
virtual void sendPacket(Buffer::Ptr pkt,bool flush = true);
virtual int getLantencyMul(){
return 4;
};
private: private:
//当前选中的udp链接 //当前选中的udp链接
Session::Ptr _selected_session; Session::Ptr _selected_session;
@ -137,7 +137,6 @@ private:
HandshakePacket::Ptr _handleshake_res; HandshakePacket::Ptr _handleshake_res;
ResourcePool<BufferRaw> _packet_pool; ResourcePool<BufferRaw> _packet_pool;
}; };
class SrtTransportManager { class SrtTransportManager {
@ -150,6 +149,7 @@ public:
void addHandshakeItem(const std::string &key, const SrtTransport::Ptr &ptr); void addHandshakeItem(const std::string &key, const SrtTransport::Ptr &ptr);
void removeHandshakeItem(const std::string &key); void removeHandshakeItem(const std::string &key);
SrtTransport::Ptr getHandshakeItem(const std::string &key); SrtTransport::Ptr getHandshakeItem(const std::string &key);
private: private:
SrtTransportManager() = default; SrtTransportManager() = default;

View File

@ -4,8 +4,7 @@
#include "SrtTransportImp.hpp" #include "SrtTransportImp.hpp"
namespace SRT { namespace SRT {
SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) : SrtTransport(poller) {}
: SrtTransport(poller) {}
SrtTransportImp::~SrtTransportImp() { SrtTransportImp::~SrtTransportImp() {
InfoP(this); InfoP(this);
@ -24,7 +23,6 @@ SrtTransportImp::~SrtTransportImp() {
} }
void SrtTransportImp::onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) { void SrtTransportImp::onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) {
// TODO parse stream id like this zlmediakit.com/live/test?token=1213444&type=push // TODO parse stream id like this zlmediakit.com/live/test?token=1213444&type=push
if (!_addr) { if (!_addr) {
_addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr)));
@ -48,6 +46,7 @@ void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_
emitOnPlay(); emitOnPlay();
} }
} }
void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) { void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) {
if (!_is_pusher) { if (!_is_pusher) {
WarnP(this) << "this is a player data ignore"; WarnP(this) << "this is a player data ignore";
@ -59,6 +58,7 @@ void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) {
WarnP(this) << " not reach this"; WarnP(this) << " not reach this";
} }
} }
void SrtTransportImp::onShutdown(const SockException &ex) { void SrtTransportImp::onShutdown(const SockException &ex) {
SrtTransport::onShutdown(ex); SrtTransport::onShutdown(ex);
} }
@ -67,8 +67,11 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force){
if (!force && totalReaderCount(sender)) { if (!force && totalReaderCount(sender)) {
return false; return false;
} }
std::string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" std::string err = StrPrinter << "close media:" << sender.getSchema() << "/"
<< sender.getApp() << "/" << sender.getId() << " " << force; << sender.getVhost() << "/"
<< sender.getApp() << "/"
<< sender.getId() << " " << force;
weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this()); weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
getPoller()->async([weak_self, err]() { getPoller()->async([weak_self, err]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
@ -80,18 +83,22 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force){
}); });
return true; return true;
} }
// 播放总人数 // 播放总人数
int SrtTransportImp::totalReaderCount(mediakit::MediaSource &sender) { int SrtTransportImp::totalReaderCount(mediakit::MediaSource &sender) {
return _muxer ? _muxer->totalReaderCount() : sender.readerCount(); return _muxer ? _muxer->totalReaderCount() : sender.readerCount();
} }
// 获取媒体源类型 // 获取媒体源类型
mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &sender) const { mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &sender) const {
return MediaOriginType::srt_push; return MediaOriginType::srt_push;
} }
// 获取媒体源url或者文件路径 // 获取媒体源url或者文件路径
std::string SrtTransportImp::getOriginUrl(mediakit::MediaSource &sender) const { std::string SrtTransportImp::getOriginUrl(mediakit::MediaSource &sender) const {
return _media_info._full_url; return _media_info._full_url;
} }
// 获取媒体源客户端相关信息 // 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> SrtTransportImp::getOriginSock(mediakit::MediaSource &sender) const { std::shared_ptr<SockInfo> SrtTransportImp::getOriginSock(mediakit::MediaSource &sender) const {
return static_pointer_cast<SockInfo>(getSession()); return static_pointer_cast<SockInfo>(getSession());
@ -126,7 +133,6 @@ void SrtTransportImp::emitOnPublish() {
} }
} }
void SrtTransportImp::emitOnPlay() { void SrtTransportImp::emitOnPlay() {
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this()); std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
Broadcast::AuthInvoker invoker = [weak_self](const string &err) { Broadcast::AuthInvoker invoker = [weak_self](const string &err) {
@ -148,12 +154,12 @@ void SrtTransportImp::emitOnPlay(){
doPlay(); doPlay();
} }
} }
void SrtTransportImp::doPlay() { void SrtTransportImp::doPlay() {
//异步查找直播流 //异步查找直播流
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
MediaInfo info = _media_info; MediaInfo info = _media_info;
info._schema = TS_SCHEMA; info._schema = TS_SCHEMA;
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
MediaSource::findAsync(info, getSession(), [weak_self](const MediaSource::Ptr &src) { MediaSource::findAsync(info, getSession(), [weak_self](const MediaSource::Ptr &src) {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
@ -189,9 +195,10 @@ void SrtTransportImp::doPlay(){
auto size = ts_list->size(); auto size = ts_list->size();
ts_list->for_each([&](const TSPacket::Ptr &ts) { strong_self->onSendTSData(ts, ++i == size); }); ts_list->for_each([&](const TSPacket::Ptr &ts) { strong_self->onSendTSData(ts, ++i == size); });
}); });
}; }
}); });
} }
std::string SrtTransportImp::get_peer_ip() { std::string SrtTransportImp::get_peer_ip() {
if (!_addr) { if (!_addr) {
return "::"; return "::";
@ -236,9 +243,7 @@ bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) {
} }
auto frame_cached = Frame::getCacheAbleFrame(frame); auto frame_cached = Frame::getCacheAbleFrame(frame);
lock_guard<recursive_mutex> lck(_func_mtx); lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, frame_cached]() { _cached_func.emplace_back([this, frame_cached]() { _muxer->inputFrame(frame_cached); });
_muxer->inputFrame(frame_cached);
});
return true; return true;
} }
@ -248,9 +253,7 @@ bool SrtTransportImp::addTrack(const Track::Ptr &track) {
} }
lock_guard<recursive_mutex> lck(_func_mtx); lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, track]() { _cached_func.emplace_back([this, track]() { _muxer->addTrack(track); });
_muxer->addTrack(track);
});
return true; return true;
} }
@ -259,9 +262,7 @@ void SrtTransportImp::addTrackCompleted() {
_muxer->addTrackCompleted(); _muxer->addTrackCompleted();
} else { } else {
lock_guard<recursive_mutex> lck(_func_mtx); lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this]() { _cached_func.emplace_back([this]() { _muxer->addTrackCompleted(); });
_muxer->addTrackCompleted();
});
} }
} }
@ -273,10 +274,9 @@ void SrtTransportImp::doCachedFunc() {
_cached_func.clear(); _cached_func.clear();
} }
int SrtTransportImp::getLantencyMul(){ int SrtTransportImp::getLatencyMul() {
GET_CONFIG(int, lantencyMul, kLantencyMul); GET_CONFIG(int, latencyMul, kLatencyMul);
return lantencyMul; return latencyMul;
} }
} // namespace SRT } // namespace SRT

View File

@ -1,16 +1,18 @@
#ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H #ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
#define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H #define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
#include <mutex>
#include "Common/MultiMediaSourceMuxer.h"
#include "Rtp/Decoder.h"
#include "TS/TSMediaSource.h"
#include "SrtTransport.hpp"
#include <mutex>
#include "Rtp/Decoder.h"
#include "SrtTransport.hpp"
#include "TS/TSMediaSource.h"
#include "Common/MultiMediaSourceMuxer.h"
namespace SRT { namespace SRT {
using namespace std;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
using namespace std;
class SrtTransportImp class SrtTransportImp
: public SrtTransport : public SrtTransport
, public toolkit::SockInfo , public toolkit::SockInfo
@ -19,13 +21,13 @@ class SrtTransportImp
public: public:
SrtTransportImp(const EventPoller::Ptr &poller); SrtTransportImp(const EventPoller::Ptr &poller);
~SrtTransportImp(); ~SrtTransportImp();
void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr){
void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) override {
SrtTransport::inputSockData(buf, len, addr); SrtTransport::inputSockData(buf, len, addr);
_total_bytes += len; _total_bytes += len;
} }
void onSendTSData(const Buffer::Ptr &buffer, bool flush){ void onSendTSData(const Buffer::Ptr &buffer, bool flush) override { SrtTransport::onSendTSData(buffer, flush); }
SrtTransport::onSendTSData(buffer,flush);
}
/// SockInfo override /// SockInfo override
std::string get_local_ip() override; std::string get_local_ip() override;
uint16_t get_local_port() override; uint16_t get_local_port() override;
@ -35,20 +37,18 @@ public:
protected: protected:
///////SrtTransport override/////// ///////SrtTransport override///////
void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr) override; int getLatencyMul() override;
void onSRTData(DataPacket::Ptr pkt) override; void onSRTData(DataPacket::Ptr pkt) override;
void onShutdown(const SockException &ex) override; void onShutdown(const SockException &ex) override;
int getLantencyMul() override; void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) override;
void sendPacket(Buffer::Ptr pkt, bool flush = true) override { void sendPacket(Buffer::Ptr pkt, bool flush = true) override {
_total_bytes += pkt->size(); _total_bytes += pkt->size();
SrtTransport::sendPacket(pkt, flush); SrtTransport::sendPacket(pkt, flush);
};
bool isPusher() override{
return _is_pusher;
} }
bool isPusher() override { return _is_pusher; }
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
bool close(mediakit::MediaSource &sender, bool force) override; bool close(mediakit::MediaSource &sender, bool force) override;
@ -61,10 +61,11 @@ protected:
// 获取媒体源客户端相关信息 // 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> getOriginSock(mediakit::MediaSource &sender) const override; std::shared_ptr<SockInfo> getOriginSock(mediakit::MediaSource &sender) const override;
bool inputFrame(const Frame::Ptr &frame) override; ///////MediaSinkInterface override///////
bool addTrack(const Track::Ptr & track) override;
void addTrackCompleted() override;
void resetTracks() override {}; void resetTracks() override {};
void addTrackCompleted() override;
bool addTrack(const Track::Ptr &track) override;
bool inputFrame(const Frame::Ptr &frame) override;
private: private:
void emitOnPublish(); void emitOnPublish();

View File

@ -1,7 +1,9 @@
#include <algorithm> #include <algorithm>
#include "Statistic.hpp" #include "Statistic.hpp"
namespace SRT { namespace SRT {
void PacketRecvRateContext::inputPacket(TimePoint &ts) { void PacketRecvRateContext::inputPacket(TimePoint &ts) {
if (_pkt_map.size() > 100) { if (_pkt_map.size() > 100) {
_pkt_map.erase(_pkt_map.begin()); _pkt_map.erase(_pkt_map.begin());
@ -9,6 +11,7 @@ void PacketRecvRateContext::inputPacket(TimePoint& ts) {
auto tmp = DurationCountMicroseconds(ts - _start); auto tmp = DurationCountMicroseconds(ts - _start);
_pkt_map.emplace(tmp, tmp); _pkt_map.emplace(tmp, tmp);
} }
uint32_t PacketRecvRateContext::getPacketRecvRate() { uint32_t PacketRecvRateContext::getPacketRecvRate() {
if (_pkt_map.size() < 2) { if (_pkt_map.size() < 2) {
return 50000; return 50000;
@ -17,13 +20,13 @@ uint32_t PacketRecvRateContext::getPacketRecvRate() {
for (auto it = _pkt_map.begin(); it != _pkt_map.end(); ++it) { for (auto it = _pkt_map.begin(); it != _pkt_map.end(); ++it) {
auto next = it; auto next = it;
++next; ++next;
if (next != _pkt_map.end()) { if (next == _pkt_map.end()) {
break;
}
if ((next->first - it->first) < dur) { if ((next->first - it->first) < dur) {
dur = next->first - it->first; dur = next->first - it->first;
} }
} else {
break;
}
} }
double rate = 1e6 / (double)dur; double rate = 1e6 / (double)dur;
@ -40,6 +43,7 @@ void EstimatedLinkCapacityContext::inputPacket(TimePoint& ts) {
auto tmp = DurationCountMicroseconds(ts - _start); auto tmp = DurationCountMicroseconds(ts - _start);
_pkt_map.emplace(tmp, tmp); _pkt_map.emplace(tmp, tmp);
} }
uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() { uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() {
decltype(_pkt_map.begin()) next; decltype(_pkt_map.begin()) next;
std::vector<int64_t> tmp; std::vector<int64_t> tmp;
@ -63,9 +67,7 @@ uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() {
} }
double dur = tmp[0] / 1e6; double dur = tmp[0] / 1e6;
return (uint32_t)(1.0 / dur); return (uint32_t)(1.0 / dur);
} }
void RecvRateContext::inputPacket(TimePoint &ts, size_t size) { void RecvRateContext::inputPacket(TimePoint &ts, size_t size) {
@ -73,9 +75,9 @@ void RecvRateContext::inputPacket(TimePoint& ts, size_t size ) {
_pkt_map.erase(_pkt_map.begin()); _pkt_map.erase(_pkt_map.begin());
} }
auto tmp = DurationCountMicroseconds(ts - _start); auto tmp = DurationCountMicroseconds(ts - _start);
_pkt_map.emplace(tmp, tmp); _pkt_map.emplace(tmp, tmp);
} }
uint32_t RecvRateContext::getRecvRate() { uint32_t RecvRateContext::getRecvRate() {
if (_pkt_map.size() < 2) { if (_pkt_map.size() < 2) {
return 0; return 0;

View File

@ -6,16 +6,17 @@
#include "Packet.hpp" #include "Packet.hpp"
namespace SRT { namespace SRT {
class PacketRecvRateContext { class PacketRecvRateContext {
public: public:
PacketRecvRateContext(TimePoint start): _start(start) {}; PacketRecvRateContext(TimePoint start): _start(start) {};
~PacketRecvRateContext() = default; ~PacketRecvRateContext() = default;
void inputPacket(TimePoint &ts); void inputPacket(TimePoint &ts);
uint32_t getPacketRecvRate(); uint32_t getPacketRecvRate();
private:
std::map<int64_t,int64_t> _pkt_map;
TimePoint _start;
private:
TimePoint _start;
std::map<int64_t, int64_t> _pkt_map;
}; };
class EstimatedLinkCapacityContext { class EstimatedLinkCapacityContext {
@ -24,9 +25,10 @@ public:
~EstimatedLinkCapacityContext() = default; ~EstimatedLinkCapacityContext() = default;
void inputPacket(TimePoint &ts); void inputPacket(TimePoint &ts);
uint32_t getEstimatedLinkCapacity(); uint32_t getEstimatedLinkCapacity();
private: private:
std::map<int64_t,int64_t> _pkt_map;
TimePoint _start; TimePoint _start;
std::map<int64_t, int64_t> _pkt_map;
}; };
class RecvRateContext { class RecvRateContext {
@ -35,12 +37,11 @@ public:
~RecvRateContext() = default; ~RecvRateContext() = default;
void inputPacket(TimePoint &ts, size_t size); void inputPacket(TimePoint &ts, size_t size);
uint32_t getRecvRate(); uint32_t getRecvRate();
private: private:
std::map<int64_t,size_t> _pkt_map;
TimePoint _start; TimePoint _start;
std::map<int64_t, size_t> _pkt_map;
}; };
} // namespace SRT } // namespace SRT
#endif // ZLMEDIAKIT_SRT_STATISTIC_H #endif // ZLMEDIAKIT_SRT_STATISTIC_H