ZLMediaKit/srt/SrtSession.cpp
2023-04-28 22:04:09 +08:00

144 lines
4.9 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "SrtSession.hpp"
#include "Packet.hpp"
#include "SrtTransportImp.hpp"
#include "Common/config.h"
namespace SRT {
using namespace mediakit;
SrtSession::SrtSession(const Socket::Ptr &sock)
: Session(sock) {
socklen_t addr_len = sizeof(_peer_addr);
memset(&_peer_addr, 0, addr_len);
// TraceL<<"before addr len "<<addr_len;
getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len);
// TraceL<<"after addr len "<<addr_len<<" family "<<_peer_addr.ss_family;
}
SrtSession::~SrtSession() = default;
EventPoller::Ptr SrtSession::queryPoller(const Buffer::Ptr &buffer) {
uint8_t *data = (uint8_t *)buffer->data();
size_t size = buffer->size();
if (DataPacket::isDataPacket(data, size)) {
uint32_t socket_id = DataPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
return trans ? trans->getPoller() : nullptr;
}
if (HandshakePacket::isHandshakePacket(data, size)) {
auto type = HandshakePacket::getHandshakeType(data, size);
if (type == HandshakePacket::HS_TYPE_INDUCTION) {
// 握手第一阶段
return nullptr;
} else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
// 握手第二阶段
uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
return trans ? trans->getPoller() : nullptr;
} else {
WarnL << " not reach there";
}
} else {
uint32_t socket_id = ControlPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
return trans ? trans->getPoller() : nullptr;
}
return nullptr;
}
void SrtSession::attachServer(const toolkit::Server &server) {
SockUtil::setRecvBuf(getSock()->rawFD(), 1024 * 1024);
}
void SrtSession::onRecv(const Buffer::Ptr &buffer) {
uint8_t *data = (uint8_t *)buffer->data();
size_t size = buffer->size();
if (_find_transport) {
//只允许寻找一次transport
_find_transport = false;
if (DataPacket::isDataPacket(data, size)) {
uint32_t socket_id = DataPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
if (trans) {
_transport = std::move(trans);
} else {
WarnL << " data packet not find transport ";
}
}
if (HandshakePacket::isHandshakePacket(data, size)) {
auto type = HandshakePacket::getHandshakeType(data, size);
if (type == HandshakePacket::HS_TYPE_INDUCTION) {
// 握手第一阶段
_transport = std::make_shared<SrtTransportImp>(getPoller());
} else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
// 握手第二阶段
uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
if (trans) {
_transport = std::move(trans);
} else {
WarnL << " hanshake packet not find transport ";
}
} else {
WarnL << " not reach there";
}
} else {
uint32_t socket_id = ControlPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
if (trans) {
_transport = std::move(trans);
} else {
WarnL << " not find transport";
}
}
if (_transport) {
_transport->setSession(static_pointer_cast<Session>(shared_from_this()));
}
InfoP(this);
}
_ticker.resetTime();
if (_transport) {
_transport->inputSockData(data, size, &_peer_addr);
} else {
// WarnL<< "ingore data";
}
}
void SrtSession::onError(const SockException &err) {
// udp链接超时但是srt链接不一定超时因为可能存在udp链接迁移的情况
//在udp链接迁移时新的SrtSession对象将接管SrtSession对象的生命周期
//本SrtSession对象将在超时后自动销毁
WarnP(this) << err;
if (!_transport) {
return;
}
// 防止互相引用导致不释放
auto transport = std::move(_transport);
getPoller()->async(
[transport] {
//延时减引用防止使用transport对象时销毁对象
//transport->onShutdown(err);
},
false);
}
void SrtSession::onManager() {
GET_CONFIG(float, timeoutSec, kTimeOutSec);
if (_ticker.elapsedTime() > timeoutSec * 1000) {
shutdown(SockException(Err_timeout, "srt connection timeout"));
return;
}
}
} // namespace SRT