mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2024-11-29 14:45:55 +08:00
can build prepare for play ts stream by srt
This commit is contained in:
parent
82da99eef3
commit
7f65e082f5
@ -61,7 +61,35 @@ bool DataPacket::loadFromData(uint8_t *buf, size_t len) {
|
|||||||
_data->assign((char *)(buf), len);
|
_data->assign((char *)(buf), len);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
bool DataPacket::storeToHeader(){
|
||||||
|
if (!_data || _data->size() < HEADER_SIZE) {
|
||||||
|
WarnL << "data size less " << HEADER_SIZE;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
uint8_t *ptr = (uint8_t *)_data->data();
|
||||||
|
|
||||||
|
ptr[0] = packet_seq_number >> 24;
|
||||||
|
ptr[1] = (packet_seq_number >> 16) & 0xff;
|
||||||
|
ptr[2] = (packet_seq_number >> 8) & 0xff;
|
||||||
|
ptr[3] = packet_seq_number & 0xff;
|
||||||
|
ptr += 4;
|
||||||
|
|
||||||
|
ptr[0] = PP << 6;
|
||||||
|
ptr[0] |= O << 5;
|
||||||
|
ptr[0] |= KK << 3;
|
||||||
|
ptr[0] |= R << 2;
|
||||||
|
ptr[0] |= (msg_number & 0xff000000) >> 24;
|
||||||
|
ptr[1] = (msg_number & 0xff0000) >> 16;
|
||||||
|
ptr[2] = (msg_number & 0xff00) >> 8;
|
||||||
|
ptr[3] = msg_number & 0xff;
|
||||||
|
ptr += 4;
|
||||||
|
|
||||||
|
storeUint32(ptr, timestamp);
|
||||||
|
ptr += 4;
|
||||||
|
|
||||||
|
storeUint32(ptr, dst_socket_id);
|
||||||
|
ptr += 4;
|
||||||
|
}
|
||||||
bool DataPacket::storeToData(uint8_t *buf, size_t len) {
|
bool DataPacket::storeToData(uint8_t *buf, size_t len) {
|
||||||
_data = BufferRaw::create();
|
_data = BufferRaw::create();
|
||||||
_data->setCapacity(len + HEADER_SIZE);
|
_data->setCapacity(len + HEADER_SIZE);
|
||||||
|
@ -43,6 +43,7 @@ public:
|
|||||||
static uint32_t getSocketID(uint8_t *buf, size_t len);
|
static uint32_t getSocketID(uint8_t *buf, size_t len);
|
||||||
bool loadFromData(uint8_t *buf, size_t len);
|
bool loadFromData(uint8_t *buf, size_t len);
|
||||||
bool storeToData(uint8_t *buf, size_t len);
|
bool storeToData(uint8_t *buf, size_t len);
|
||||||
|
bool storeToHeader();
|
||||||
|
|
||||||
///////Buffer override///////
|
///////Buffer override///////
|
||||||
char *data() const override;
|
char *data() const override;
|
||||||
|
@ -51,12 +51,39 @@ bool PacketQueue::dropForRecv(uint32_t first,uint32_t last){
|
|||||||
}
|
}
|
||||||
|
|
||||||
if(_pkt_expected_seq <= last){
|
if(_pkt_expected_seq <= last){
|
||||||
|
for(uint32_t i =first;i<=last;++i){
|
||||||
|
if(_pkt_map.find(i) != _pkt_map.end()){
|
||||||
|
_pkt_map.erase(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
_pkt_expected_seq = last+1;
|
_pkt_expected_seq = last+1;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool PacketQueue::dropForSend(uint32_t num){
|
||||||
|
if(num <= _pkt_expected_seq){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for(uint32_t i =_pkt_expected_seq;i< num;++i){
|
||||||
|
if(_pkt_map.find(i) != _pkt_map.end()){
|
||||||
|
_pkt_map.erase(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_pkt_expected_seq = num;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
DataPacket::Ptr PacketQueue::findPacketBySeq(uint32_t seq){
|
||||||
|
auto it = _pkt_map.find(seq);
|
||||||
|
if(it != _pkt_map.end()){
|
||||||
|
return it->second;
|
||||||
|
}
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
uint32_t PacketQueue::timeLantency() {
|
uint32_t PacketQueue::timeLantency() {
|
||||||
if (_pkt_map.empty()) {
|
if (_pkt_map.empty()) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -30,6 +30,10 @@ public:
|
|||||||
|
|
||||||
bool dropForRecv(uint32_t first,uint32_t last);
|
bool dropForRecv(uint32_t first,uint32_t last);
|
||||||
|
|
||||||
|
bool dropForSend(uint32_t num);
|
||||||
|
|
||||||
|
DataPacket::Ptr findPacketBySeq(uint32_t seq);
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::map<uint32_t,DataPacket::Ptr> _pkt_map;
|
std::map<uint32_t,DataPacket::Ptr> _pkt_map;
|
||||||
|
@ -192,6 +192,8 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
|
|||||||
sendControlPacket(res, true);
|
sendControlPacket(res, true);
|
||||||
TraceL<<" buf size = "<<res->max_flow_window_size<<" init seq ="<<_init_seq_number<<" lantency="<<req->recv_tsbpd_delay;
|
TraceL<<" buf size = "<<res->max_flow_window_size<<" init seq ="<<_init_seq_number<<" lantency="<<req->recv_tsbpd_delay;
|
||||||
_recv_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, req->recv_tsbpd_delay*1e6);
|
_recv_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, req->recv_tsbpd_delay*1e6);
|
||||||
|
_send_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, req->recv_tsbpd_delay*1e6);
|
||||||
|
_send_packet_seq_number = _init_seq_number;
|
||||||
onHandShakeFinished(_stream_id,addr);
|
onHandShakeFinished(_stream_id,addr);
|
||||||
} else {
|
} else {
|
||||||
TraceL << getIdentifier() << " CONCLUSION handle repeate ";
|
TraceL << getIdentifier() << " CONCLUSION handle repeate ";
|
||||||
@ -235,10 +237,33 @@ void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *add
|
|||||||
pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp);
|
pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp);
|
||||||
pkt->ack_number = ack.ack_number;
|
pkt->ack_number = ack.ack_number;
|
||||||
pkt->storeToData();
|
pkt->storeToData();
|
||||||
|
_send_buf->dropForSend(ack.last_ack_pkt_seq_number);
|
||||||
sendControlPacket(pkt,true);
|
sendControlPacket(pkt,true);
|
||||||
|
}
|
||||||
|
void SrtTransport::sendMsgDropReq(uint32_t first ,uint32_t last){
|
||||||
|
|
||||||
}
|
}
|
||||||
void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){
|
void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){
|
||||||
TraceL;
|
TraceL;
|
||||||
|
NAKPacket pkt;
|
||||||
|
pkt.loadFromData(buf,len);
|
||||||
|
bool empty = false;
|
||||||
|
|
||||||
|
for(auto it : pkt.lost_list){
|
||||||
|
empty = true;
|
||||||
|
for(uint32_t i=it.first;i<it.second;++i){
|
||||||
|
auto data = _send_buf->findPacketBySeq(i);
|
||||||
|
if(data){
|
||||||
|
data->R = 1;
|
||||||
|
data->storeToHeader();
|
||||||
|
sendPacket(data,true);
|
||||||
|
empty = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(empty){
|
||||||
|
sendMsgDropReq(it.first,it.second-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
void SrtTransport::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr_storage *addr){
|
void SrtTransport::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr_storage *addr){
|
||||||
TraceL;
|
TraceL;
|
||||||
@ -391,6 +416,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
|
|||||||
void SrtTransport::sendDataPacket(DataPacket::Ptr pkt,char* buf,int len, bool flush) {
|
void SrtTransport::sendDataPacket(DataPacket::Ptr pkt,char* buf,int len, bool flush) {
|
||||||
pkt->storeToData((uint8_t*)buf,len);
|
pkt->storeToData((uint8_t*)buf,len);
|
||||||
sendPacket(pkt,flush);
|
sendPacket(pkt,flush);
|
||||||
|
_send_buf->inputPacket(pkt);
|
||||||
}
|
}
|
||||||
void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) {
|
void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) {
|
||||||
sendPacket(pkt,flush);
|
sendPacket(pkt,flush);
|
||||||
@ -442,6 +468,48 @@ void SrtTransport::onShutdown(const SockException &ex){
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
size_t SrtTransport::getPayloadSize(){
|
||||||
|
size_t ret = (_mtu - 28 -16)/188*188;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
void SrtTransport::onSendData(const Buffer::Ptr &buffer, bool flush){
|
||||||
|
DataPacket::Ptr pkt;
|
||||||
|
size_t payloadSize = getPayloadSize();
|
||||||
|
size_t size = buffer->size();
|
||||||
|
char* ptr = buffer->data();
|
||||||
|
char* end = buffer->data()+size;
|
||||||
|
|
||||||
|
while(ptr < end && size >=payloadSize){
|
||||||
|
pkt = std::make_shared<DataPacket>();
|
||||||
|
pkt->f = 0;
|
||||||
|
pkt->packet_seq_number = _send_packet_seq_number++;
|
||||||
|
pkt->PP = 3;
|
||||||
|
pkt->O = 0;
|
||||||
|
pkt->KK = 0;
|
||||||
|
pkt->R = 0;
|
||||||
|
pkt->msg_number = _send_msg_number++;
|
||||||
|
pkt->dst_socket_id = _peer_socket_id;
|
||||||
|
pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp);
|
||||||
|
sendDataPacket(pkt,ptr,(int)payloadSize,flush);
|
||||||
|
ptr += payloadSize;
|
||||||
|
size -= payloadSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(size >0 && ptr <end){
|
||||||
|
pkt = std::make_shared<DataPacket>();
|
||||||
|
pkt->f = 0;
|
||||||
|
pkt->packet_seq_number = _send_packet_seq_number++;
|
||||||
|
pkt->PP = 3;
|
||||||
|
pkt->O = 0;
|
||||||
|
pkt->KK = 0;
|
||||||
|
pkt->R = 0;
|
||||||
|
pkt->msg_number = _send_msg_number++;
|
||||||
|
pkt->dst_socket_id = _peer_socket_id;
|
||||||
|
pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp);
|
||||||
|
sendDataPacket(pkt,ptr,(int)size,flush);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
//////////// SrtTransportManager //////////////////////////
|
//////////// SrtTransportManager //////////////////////////
|
||||||
SrtTransportManager &SrtTransportManager::Instance() {
|
SrtTransportManager &SrtTransportManager::Instance() {
|
||||||
static SrtTransportManager s_instance;
|
static SrtTransportManager s_instance;
|
||||||
|
@ -37,6 +37,7 @@ public:
|
|||||||
* @param addr 数据来源地址
|
* @param addr 数据来源地址
|
||||||
*/
|
*/
|
||||||
virtual void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr);
|
virtual void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr);
|
||||||
|
virtual void onSendData(const Buffer::Ptr &buffer, bool flush);
|
||||||
|
|
||||||
std::string getIdentifier();
|
std::string getIdentifier();
|
||||||
|
|
||||||
@ -73,6 +74,9 @@ private:
|
|||||||
void sendLightACKPacket();
|
void sendLightACKPacket();
|
||||||
void sendKeepLivePacket();
|
void sendKeepLivePacket();
|
||||||
void sendShutDown();
|
void sendShutDown();
|
||||||
|
void sendMsgDropReq(uint32_t first ,uint32_t last);
|
||||||
|
|
||||||
|
size_t getPayloadSize();
|
||||||
protected:
|
protected:
|
||||||
void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false);
|
void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false);
|
||||||
void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true);
|
void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true);
|
||||||
@ -97,7 +101,10 @@ private:
|
|||||||
|
|
||||||
std::string _stream_id;
|
std::string _stream_id;
|
||||||
uint32_t _sync_cookie = 0;
|
uint32_t _sync_cookie = 0;
|
||||||
|
uint32_t _send_packet_seq_number = 0;
|
||||||
|
uint32_t _send_msg_number = 1;
|
||||||
|
|
||||||
|
PacketQueue::Ptr _send_buf;
|
||||||
PacketQueue::Ptr _recv_buf;
|
PacketQueue::Ptr _recv_buf;
|
||||||
uint32_t _rtt = 100*1000;
|
uint32_t _rtt = 100*1000;
|
||||||
uint32_t _rtt_variance =50*1000;
|
uint32_t _rtt_variance =50*1000;
|
||||||
|
@ -145,7 +145,72 @@ void SrtTransportImp::emitOnPlay(){
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
void SrtTransportImp::doPlay(){
|
void SrtTransportImp::doPlay(){
|
||||||
|
//鉴权结果回调
|
||||||
|
weak_ptr<SrtTransportImp> weak_self = dynamic_pointer_cast<SrtTransportImp>(shared_from_this());
|
||||||
|
auto onRes = [weak_self](const string &err) {
|
||||||
|
auto strong_self = weak_self.lock();
|
||||||
|
if (!strong_self) {
|
||||||
|
//本对象已经销毁
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!err.empty()) {
|
||||||
|
//播放鉴权失败
|
||||||
|
strong_self->onShutdown(SockException(Err_refused, err));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//异步查找直播流
|
||||||
|
MediaInfo info = strong_self->_media_info;
|
||||||
|
info._schema = TS_SCHEMA;
|
||||||
|
MediaSource::findAsync(info, strong_self->getSession(), [weak_self](const MediaSource::Ptr &src) {
|
||||||
|
auto strong_self = weak_self.lock();
|
||||||
|
if (!strong_self) {
|
||||||
|
//本对象已经销毁
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!src) {
|
||||||
|
//未找到该流
|
||||||
|
strong_self->onShutdown(SockException(Err_shutdown));
|
||||||
|
} else {
|
||||||
|
auto ts_src = dynamic_pointer_cast<TSMediaSource>(src);
|
||||||
|
assert(ts_src);
|
||||||
|
ts_src->pause(false);
|
||||||
|
strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller());
|
||||||
|
strong_self->_ts_reader->setDetachCB([weak_self]() {
|
||||||
|
auto strong_self = weak_self.lock();
|
||||||
|
if (!strong_self) {
|
||||||
|
//本对象已经销毁
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
strong_self->onShutdown(SockException(Err_shutdown));
|
||||||
|
});
|
||||||
|
strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) {
|
||||||
|
auto strong_self = weak_self.lock();
|
||||||
|
if (!strong_self) {
|
||||||
|
//本对象已经销毁
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
size_t i = 0;
|
||||||
|
auto size = ts_list->size();
|
||||||
|
ts_list->for_each([&](const TSPacket::Ptr &ts) { strong_self->onSendData(ts, ++i == size); });
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) {
|
||||||
|
if (auto strongSelf = weak_self.lock()) {
|
||||||
|
strongSelf->getPoller()->async([onRes, err]() { onRes(err); });
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this));
|
||||||
|
if (!flag) {
|
||||||
|
//该事件无人监听,默认不鉴权
|
||||||
|
onRes("");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
std::string SrtTransportImp::get_peer_ip() {
|
std::string SrtTransportImp::get_peer_ip() {
|
||||||
if (!_addr) {
|
if (!_addr) {
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
#ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
|
#ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
|
||||||
#define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
|
#define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
|
||||||
|
#include <mutex>
|
||||||
#include "Common/MultiMediaSourceMuxer.h"
|
#include "Common/MultiMediaSourceMuxer.h"
|
||||||
#include "Rtp/Decoder.h"
|
#include "Rtp/Decoder.h"
|
||||||
|
#include "TS/TSMediaSource.h"
|
||||||
#include "SrtTransport.hpp"
|
#include "SrtTransport.hpp"
|
||||||
|
|
||||||
|
|
||||||
namespace SRT {
|
namespace SRT {
|
||||||
using namespace toolkit;
|
using namespace toolkit;
|
||||||
using namespace mediakit;
|
using namespace mediakit;
|
||||||
@ -64,7 +66,8 @@ private:
|
|||||||
uint64_t _total_bytes = 0;
|
uint64_t _total_bytes = 0;
|
||||||
Ticker _alive_ticker;
|
Ticker _alive_ticker;
|
||||||
std::unique_ptr<sockaddr_storage> _addr;
|
std::unique_ptr<sockaddr_storage> _addr;
|
||||||
|
// for player
|
||||||
|
TSMediaSource::RingType::RingReader::Ptr _ts_reader;
|
||||||
// for pusher
|
// for pusher
|
||||||
MultiMediaSourceMuxer::Ptr _muxer;
|
MultiMediaSourceMuxer::Ptr _muxer;
|
||||||
DecoderImp::Ptr _decoder;
|
DecoderImp::Ptr _decoder;
|
||||||
|
Loading…
Reference in New Issue
Block a user