srt can play stream

This commit is contained in:
xiongguangjie 2022-06-04 11:23:18 +08:00
parent 6237dd48dc
commit 91aa9c494c
5 changed files with 64 additions and 67 deletions

View File

@ -80,6 +80,8 @@ bool HSExtStreamID::loadFromData(uint8_t *buf, size_t len) {
streamid.push_back(*(ptr)); streamid.push_back(*(ptr));
ptr+=4; ptr+=4;
} }
char zero = 0x00;
streamid.erase(streamid.find_first_of(zero),streamid.size());
return true; return true;
} }

View File

@ -228,9 +228,11 @@ void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage
sendControlPacket(pkt,true); sendControlPacket(pkt,true);
} }
void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr){ void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr){
TraceL; //TraceL;
ACKPacket ack; ACKPacket ack;
ack.loadFromData(buf,len); if(!ack.loadFromData(buf,len)){
return;
}
ACKACKPacket::Ptr pkt = std::make_shared<ACKACKPacket>(); ACKACKPacket::Ptr pkt = std::make_shared<ACKACKPacket>();
pkt->dst_socket_id = _peer_socket_id; pkt->dst_socket_id = _peer_socket_id;
@ -239,9 +241,16 @@ void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *add
pkt->storeToData(); pkt->storeToData();
_send_buf->dropForSend(ack.last_ack_pkt_seq_number); _send_buf->dropForSend(ack.last_ack_pkt_seq_number);
sendControlPacket(pkt,true); sendControlPacket(pkt,true);
//TraceL<<"ack number "<<ack.ack_number;
} }
void SrtTransport::sendMsgDropReq(uint32_t first ,uint32_t last){ void SrtTransport::sendMsgDropReq(uint32_t first ,uint32_t last){
MsgDropReqPacket::Ptr pkt = std::make_shared<MsgDropReqPacket>();
pkt->dst_socket_id = _peer_socket_id;
pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp);
pkt->first_pkt_seq_num = first;
pkt->last_pkt_seq_num = last;
pkt->storeToData();
sendControlPacket(pkt,true);
} }
void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){ void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr){
TraceL; TraceL;
@ -472,7 +481,8 @@ size_t SrtTransport::getPayloadSize(){
size_t ret = (_mtu - 28 -16)/188*188; size_t ret = (_mtu - 28 -16)/188*188;
return ret; return ret;
} }
void SrtTransport::onSendData(const Buffer::Ptr &buffer, bool flush){ void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush){
//TraceL;
DataPacket::Ptr pkt; DataPacket::Ptr pkt;
size_t payloadSize = getPayloadSize(); size_t payloadSize = getPayloadSize();
size_t size = buffer->size(); size_t size = buffer->size();

View File

@ -37,7 +37,7 @@ public:
* @param addr * @param addr
*/ */
virtual void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr); virtual void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr);
virtual void onSendData(const Buffer::Ptr &buffer, bool flush); virtual void onSendTSData(const Buffer::Ptr &buffer, bool flush);
std::string getIdentifier(); std::string getIdentifier();
@ -80,7 +80,7 @@ private:
protected: protected:
void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false);
void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true); void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true);
void sendPacket(Buffer::Ptr pkt,bool flush = true); virtual void sendPacket(Buffer::Ptr pkt,bool flush = true);
private: private:
//当前选中的udp链接 //当前选中的udp链接
Session::Ptr _selected_session; Session::Ptr _selected_session;

View File

@ -10,7 +10,7 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller)
SrtTransportImp::~SrtTransportImp() { SrtTransportImp::~SrtTransportImp() {
InfoP(this); InfoP(this);
uint64_t duration = _alive_ticker.createdTime() / 1000; uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << "srt 推流器(" WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(")
<< _media_info._vhost << "/" << _media_info._vhost << "/"
<< _media_info._app << "/" << _media_info._app << "/"
<< _media_info._streamid << _media_info._streamid
@ -39,6 +39,7 @@ void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_
emitOnPublish(); emitOnPublish();
}else{ }else{
_is_pusher = false; _is_pusher = false;
emitOnPlay();
} }
} }
void SrtTransportImp::onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) { void SrtTransportImp::onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) {
@ -145,72 +146,48 @@ void SrtTransportImp::emitOnPlay(){
} }
} }
void SrtTransportImp::doPlay(){ void SrtTransportImp::doPlay(){
//鉴权结果回调 //异步查找直播流
weak_ptr<SrtTransportImp> weak_self = dynamic_pointer_cast<SrtTransportImp>(shared_from_this()); std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
auto onRes = [weak_self](const string &err) {
MediaInfo info = _media_info;
info._schema = TS_SCHEMA;
MediaSource::findAsync(info, getSession(), [weak_self](const MediaSource::Ptr &src) {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
//本对象已经销毁 //本对象已经销毁
TraceL<<"本对象已经销毁";
return; return;
} }
if (!src) {
if (!err.empty()) { //未找到该流
//播放鉴权失败 TraceL<<"未找到该流";
strong_self->onShutdown(SockException(Err_refused, err)); strong_self->onShutdown(SockException(Err_shutdown));
return; } else {
} TraceL<<"找到该流";
auto ts_src = dynamic_pointer_cast<TSMediaSource>(src);
//异步查找直播流 assert(ts_src);
MediaInfo info = strong_self->_media_info; ts_src->pause(false);
info._schema = TS_SCHEMA; strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller());
MediaSource::findAsync(info, strong_self->getSession(), [weak_self](const MediaSource::Ptr &src) { strong_self->_ts_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
//本对象已经销毁 //本对象已经销毁
return; return;
} }
if (!src) {
//未找到该流
strong_self->onShutdown(SockException(Err_shutdown)); strong_self->onShutdown(SockException(Err_shutdown));
} else { });
auto ts_src = dynamic_pointer_cast<TSMediaSource>(src); strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) {
assert(ts_src); auto strong_self = weak_self.lock();
ts_src->pause(false); if (!strong_self) {
strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller()); //本对象已经销毁
strong_self->_ts_reader->setDetachCB([weak_self]() { return;
auto strong_self = weak_self.lock(); }
if (!strong_self) { size_t i = 0;
//本对象已经销毁 auto size = ts_list->size();
return; ts_list->for_each([&](const TSPacket::Ptr &ts) { strong_self->onSendTSData(ts, ++i == size); });
} });
strong_self->onShutdown(SockException(Err_shutdown)); };
}); });
strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) {
auto strong_self = weak_self.lock();
if (!strong_self) {
//本对象已经销毁
return;
}
size_t i = 0;
auto size = ts_list->size();
ts_list->for_each([&](const TSPacket::Ptr &ts) { strong_self->onSendData(ts, ++i == size); });
});
};
});
};
Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) {
if (auto strongSelf = weak_self.lock()) {
strongSelf->getPoller()->async([onRes, err]() { onRes(err); });
}
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this));
if (!flag) {
//该事件无人监听,默认不鉴权
onRes("");
}
} }
std::string SrtTransportImp::get_peer_ip() { std::string SrtTransportImp::get_peer_ip() {
if (!_addr) { if (!_addr) {

View File

@ -23,6 +23,9 @@ public:
SrtTransport::inputSockData(buf,len,addr); SrtTransport::inputSockData(buf,len,addr);
_total_bytes += len; _total_bytes += len;
} }
void onSendTSData(const Buffer::Ptr &buffer, bool flush){
SrtTransport::onSendTSData(buffer,flush);
}
/// SockInfo override /// SockInfo override
std::string get_local_ip() override; std::string get_local_ip() override;
uint16_t get_local_port() override; uint16_t get_local_port() override;
@ -36,6 +39,11 @@ protected:
void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) override; void onSRTData(DataPacket::Ptr pkt,struct sockaddr_storage *addr) override;
void onShutdown(const SockException &ex) override; void onShutdown(const SockException &ex) override;
void sendPacket(Buffer::Ptr pkt,bool flush = true) override{
_total_bytes += pkt->size();
SrtTransport::sendPacket(pkt,flush);
};
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
bool close(mediakit::MediaSource &sender, bool force) override; bool close(mediakit::MediaSource &sender, bool force) override;