ZLMediaKit/srt/SrtSession.cpp

144 lines
4.9 KiB
C++
Raw Normal View History

2022-06-03 13:25:32 +08:00
#include "SrtSession.hpp"
#include "Packet.hpp"
#include "SrtTransportImp.hpp"
2022-06-03 13:25:32 +08:00
#include "Common/config.h"
namespace SRT {
using namespace mediakit;
SrtSession::SrtSession(const Socket::Ptr &sock)
: Session(sock) {
2022-06-03 13:25:32 +08:00
socklen_t addr_len = sizeof(_peer_addr);
2022-06-07 09:52:20 +08:00
memset(&_peer_addr, 0, addr_len);
// TraceL<<"before addr len "<<addr_len;
2022-06-03 13:25:32 +08:00
getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len);
2022-06-07 09:52:20 +08:00
// TraceL<<"after addr len "<<addr_len<<" family "<<_peer_addr.ss_family;
2022-06-03 13:25:32 +08:00
}
SrtSession::~SrtSession() = default;
2022-06-03 13:25:32 +08:00
EventPoller::Ptr SrtSession::queryPoller(const Buffer::Ptr &buffer) {
2022-06-07 09:52:20 +08:00
uint8_t *data = (uint8_t *)buffer->data();
2022-06-03 13:25:32 +08:00
size_t size = buffer->size();
2022-06-07 09:52:20 +08:00
if (DataPacket::isDataPacket(data, size)) {
uint32_t socket_id = DataPacket::getSocketID(data, size);
2022-06-03 13:25:32 +08:00
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
return trans ? trans->getPoller() : nullptr;
}
2022-06-07 09:52:20 +08:00
if (HandshakePacket::isHandshakePacket(data, size)) {
auto type = HandshakePacket::getHandshakeType(data, size);
if (type == HandshakePacket::HS_TYPE_INDUCTION) {
2022-06-03 13:25:32 +08:00
// 握手第一阶段
return nullptr;
2022-06-07 09:52:20 +08:00
} else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
2022-06-03 13:25:32 +08:00
// 握手第二阶段
2022-06-07 09:52:20 +08:00
uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
2022-06-03 13:25:32 +08:00
auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
return trans ? trans->getPoller() : nullptr;
2022-06-07 09:52:20 +08:00
} else {
WarnL << " not reach there";
2022-06-03 13:25:32 +08:00
}
2022-06-07 09:52:20 +08:00
} else {
uint32_t socket_id = ControlPacket::getSocketID(data, size);
2022-06-03 13:25:32 +08:00
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
return trans ? trans->getPoller() : nullptr;
}
return nullptr;
}
2022-06-07 09:52:20 +08:00
void SrtSession::attachServer(const toolkit::Server &server) {
SockUtil::setRecvBuf(getSock()->rawFD(), 1024 * 1024);
2022-06-03 18:05:24 +08:00
}
2022-06-07 09:52:20 +08:00
2022-06-03 13:25:32 +08:00
void SrtSession::onRecv(const Buffer::Ptr &buffer) {
2022-06-07 09:52:20 +08:00
uint8_t *data = (uint8_t *)buffer->data();
2022-06-03 13:25:32 +08:00
size_t size = buffer->size();
if (_find_transport) {
//只允许寻找一次transport
_find_transport = false;
if (DataPacket::isDataPacket(data, size)) {
uint32_t socket_id = DataPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
2022-06-07 09:52:20 +08:00
if (trans) {
2022-06-03 13:25:32 +08:00
_transport = std::move(trans);
2022-06-07 09:52:20 +08:00
} else {
WarnL << " data packet not find transport ";
2022-06-03 13:25:32 +08:00
}
}
if (HandshakePacket::isHandshakePacket(data, size)) {
auto type = HandshakePacket::getHandshakeType(data, size);
if (type == HandshakePacket::HS_TYPE_INDUCTION) {
// 握手第一阶段
_transport = std::make_shared<SrtTransportImp>(getPoller());
2022-06-03 13:25:32 +08:00
} else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
// 握手第二阶段
uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
if (trans) {
_transport = std::move(trans);
} else {
WarnL << " hanshake packet not find transport ";
}
} else {
WarnL << " not reach there";
}
} else {
uint32_t socket_id = ControlPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
2022-06-07 09:52:20 +08:00
if (trans) {
2022-06-03 13:25:32 +08:00
_transport = std::move(trans);
2022-06-07 09:52:20 +08:00
} else {
2022-06-03 13:25:32 +08:00
WarnL << " not find transport";
}
}
2022-06-07 09:52:20 +08:00
if (_transport) {
2022-06-03 13:25:32 +08:00
_transport->setSession(shared_from_this());
}
InfoP(this);
}
_ticker.resetTime();
2022-06-07 09:52:20 +08:00
if (_transport) {
_transport->inputSockData(data, size, &_peer_addr);
} else {
// WarnL<< "ingore data";
2022-06-03 13:25:32 +08:00
}
}
void SrtSession::onError(const SockException &err) {
// udp链接超时但是srt链接不一定超时因为可能存在udp链接迁移的情况
//在udp链接迁移时新的SrtSession对象将接管SrtSession对象的生命周期
//本SrtSession对象将在超时后自动销毁
WarnP(this) << err.what();
if (!_transport) {
return;
}
2022-06-07 09:52:20 +08:00
2022-06-03 13:25:32 +08:00
// 防止互相引用导致不释放
auto transport = std::move(_transport);
2022-06-07 09:52:20 +08:00
getPoller()->async(
2022-09-20 00:39:42 +08:00
[transport] {
2022-06-07 09:52:20 +08:00
//延时减引用防止使用transport对象时销毁对象
2022-09-20 00:39:42 +08:00
//transport->onShutdown(err);
2022-06-07 09:52:20 +08:00
},
false);
2022-06-03 13:25:32 +08:00
}
void SrtSession::onManager() {
GET_CONFIG(float, timeoutSec, kTimeOutSec);
2022-06-07 09:52:20 +08:00
if (_ticker.elapsedTime() > timeoutSec * 1000) {
2022-06-03 13:25:32 +08:00
shutdown(SockException(Err_timeout, "srt connection timeout"));
return;
}
}
} // namespace SRT