ZLMediaKit/webrtc/WebRtcSession.cpp

167 lines
7.1 KiB
C++
Raw Normal View History

2021-09-08 18:00:55 +08:00
/*
2023-12-09 16:23:51 +08:00
* Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved.
2021-09-08 18:00:55 +08:00
*
2023-12-09 16:23:51 +08:00
* This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit).
2021-09-08 18:00:55 +08:00
*
2023-12-09 16:23:51 +08:00
* Use of this source code is governed by MIT-like license that can be found in the
2021-09-08 18:00:55 +08:00
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "WebRtcSession.h"
#include "Util/util.h"
#include "Network/TcpServer.h"
#include "Common/config.h"
#include "IceServer.hpp"
#include "WebRtcTransport.h"
2021-09-08 18:00:55 +08:00
using namespace std;
2022-09-18 21:03:05 +08:00
namespace mediakit {
static string getUserName(const char *buf, size_t len) {
if (!RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {
return "";
}
std::unique_ptr<RTC::StunPacket> packet(RTC::StunPacket::Parse((const uint8_t *) buf, len));
if (!packet) {
return "";
}
if (packet->GetClass() != RTC::StunPacket::Class::REQUEST ||
packet->GetMethod() != RTC::StunPacket::Method::BINDING) {
return "";
}
// 收到binding request请求 [AUTO-TRANSLATED:eff4d773]
// Received binding request
auto vec = split(packet->GetUsername(), ":");
return vec[0];
}
2021-10-16 10:29:00 +08:00
EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) {
auto user_name = getUserName(buffer->data(), buffer->size());
if (user_name.empty()) {
return nullptr;
}
2021-10-16 10:25:23 +08:00
auto ret = WebRtcTransportManager::Instance().getItem(user_name);
return ret ? ret->getPoller() : nullptr;
}
2021-10-16 10:29:00 +08:00
////////////////////////////////////////////////////////////////////////////////
WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) {
_over_tcp = sock->sockType() == SockNum::Sock_TCP;
2021-10-16 10:29:00 +08:00
}
void WebRtcSession::attachServer(const Server &server) {
2023-04-28 22:04:38 +08:00
_server = std::static_pointer_cast<toolkit::TcpServer>(const_cast<Server &>(server).shared_from_this());
}
void WebRtcSession::onRecv_l(const char *data, size_t len) {
2021-09-15 11:50:15 +08:00
if (_find_transport) {
// 只允许寻找一次transport [AUTO-TRANSLATED:446fae53]
// Only allow searching for transport once
2021-09-15 11:50:15 +08:00
_find_transport = false;
auto user_name = getUserName(data, len);
2021-10-16 10:25:23 +08:00
auto transport = WebRtcTransportManager::Instance().getItem(user_name);
CHECK(transport);
// WebRtcTransport在其他poller线程上需要切换poller线程并重新创建WebRtcSession对象 [AUTO-TRANSLATED:7e5534cf]
// WebRtcTransport is on another poller thread, need to switch poller thread and recreate WebRtcSession object
if (!transport->getPoller()->isCurrentThread()) {
auto sock = Socket::createSocket(transport->getPoller(), false);
// 1、克隆socket(fd不变)切换poller线程到WebRtcTransport所在线程 [AUTO-TRANSLATED:f930bfab]
// 1. Clone socket (fd remains unchanged), switch poller thread to the thread where WebRtcTransport is located
sock->cloneSocket(*(getSock()));
auto server = _server;
std::string str(data, len);
sock->getPoller()->async([sock, server, str](){
auto strong_server = server.lock();
if (strong_server) {
auto session = static_pointer_cast<WebRtcSession>(strong_server->createSession(sock));
// 2、创建新的WebRtcSession对象(绑定到WebRtcTransport所在线程)重新处理一遍ice binding request命令 [AUTO-TRANSLATED:c75203bb]
// 2. Create a new WebRtcSession object (bound to the thread where WebRtcTransport is located), reprocess the ice binding request command
session->onRecv_l(str.data(), str.size());
}
});
// 3、销毁原先的socket和WebRtcSession(原先的对象跟WebRtcTransport不在同一条线程) [AUTO-TRANSLATED:a6d6d63f]
// 3. Destroy the original socket and WebRtcSession (the original object is not on the same thread as WebRtcTransport)
throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName());
}
_transport = std::move(transport);
2021-10-15 17:12:39 +08:00
InfoP(this);
2021-09-08 18:00:55 +08:00
}
2021-09-10 22:31:44 +08:00
_ticker.resetTime();
2021-09-15 11:50:15 +08:00
CHECK(_transport);
_transport->inputSockData((char *)data, len, this);
}
void WebRtcSession::onRecv(const Buffer::Ptr &buffer) {
if (_over_tcp) {
input(buffer->data(), buffer->size());
} else {
onRecv_l(buffer->data(), buffer->size());
}
2021-09-08 18:00:55 +08:00
}
void WebRtcSession::onError(const SockException &err) {
// udp链接超时但是rtc链接不一定超时因为可能存在链接迁移的情况 [AUTO-TRANSLATED:aaa9672f]
// UDP connection timeout, but RTC connection may not timeout, because there may be connection migration
// 在udp链接迁移时新的WebRtcSession对象将接管WebRtcTransport对象的生命周期 [AUTO-TRANSLATED:7e7d19df]
// When UDP connection migrates, the new WebRtcSession object will take over the life cycle of the WebRtcTransport object
// 本WebRtcSession对象将在超时后自动销毁 [AUTO-TRANSLATED:bc903a06]
// This WebRtcSession object will be automatically destroyed after timeout
2023-04-23 00:10:18 +08:00
WarnP(this) << err;
2021-09-16 10:03:28 +08:00
2021-09-16 10:05:04 +08:00
if (!_transport) {
return;
}
2023-04-28 22:03:16 +08:00
auto self = static_pointer_cast<WebRtcSession>(shared_from_this());
2021-09-16 10:03:28 +08:00
auto transport = std::move(_transport);
getPoller()->async([transport, self]() mutable {
// 延时减引用防止使用transport对象时销毁对象 [AUTO-TRANSLATED:09dd6609]
// Delay decrementing the reference count to prevent the object from being destroyed when using the transport object
transport->removeTuple(self.get());
// 确保transport在Session对象前销毁防止WebRtcTransport::onDestory()时获取不到Session对象 [AUTO-TRANSLATED:acd8bd77]
// Ensure that the transport is destroyed before the Session object to prevent WebRtcTransport::onDestory() from not being able to get the Session object
transport = nullptr;
2021-09-16 10:03:28 +08:00
}, false);
2021-09-08 18:00:55 +08:00
}
void WebRtcSession::onManager() {
2022-09-18 21:03:05 +08:00
GET_CONFIG(float, timeoutSec, Rtc::kTimeOutSec);
2021-09-10 22:31:44 +08:00
if (!_transport && _ticker.createdTime() > timeoutSec * 1000) {
shutdown(SockException(Err_timeout, "illegal webrtc connection"));
return;
}
if (_ticker.elapsedTime() > timeoutSec * 1000) {
shutdown(SockException(Err_timeout, "webrtc connection timeout"));
return;
}
2021-09-08 18:00:55 +08:00
}
2021-10-15 17:12:39 +08:00
ssize_t WebRtcSession::onRecvHeader(const char *data, size_t len) {
onRecv_l(data + 2, len - 2);
return 0;
}
const char *WebRtcSession::onSearchPacketTail(const char *data, size_t len) {
if (len < 2) {
// 数据不够 [AUTO-TRANSLATED:830a2785]
// Not enough data
return nullptr;
}
uint16_t length = (((uint8_t *)data)[0] << 8) | ((uint8_t *)data)[1];
if (len < (size_t)(length + 2)) {
// 数据不够 [AUTO-TRANSLATED:830a2785]
// Not enough data
return nullptr;
}
// 返回rtp包末尾 [AUTO-TRANSLATED:5134cf6f]
// Return the end of the RTP packet
return data + 2 + length;
}
2022-09-18 21:03:05 +08:00
}// namespace mediakit
2021-10-15 17:12:39 +08:00