diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index a827f0bd..b8e06622 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit a827f0bdcb4221c19ab1a243ebd1f01fa7dc50f2 +Subproject commit b8e066222b757a2c11b5e44c49ef6982acb95fe2 diff --git a/server/main.cpp b/server/main.cpp index 5f6d4d45..e3c4d672 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -309,6 +309,17 @@ int start_main(int argc,char *argv[]) { #if defined(ENABLE_WEBRTC) //webrtc udp服务器 UdpServer::Ptr rtcSrv = std::make_shared(); + rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { + if (!buf) { + return Socket::createSocket(poller, false); + } + auto new_poller = WebRtcSession::getPoller(buf); + if (!new_poller) { + //该数据对应的webrtc对象未找到,丢弃之 + return Socket::Ptr(); + } + return Socket::createSocket(new_poller, false); + }); uint16_t rtcPort = mINI::Instance()[RTC::kPort]; #endif//defined(ENABLE_WEBRTC) diff --git a/webrtc/WebRtcSession.cpp b/webrtc/WebRtcSession.cpp index 9559f583..fdb3a539 100644 --- a/webrtc/WebRtcSession.cpp +++ b/webrtc/WebRtcSession.cpp @@ -21,26 +21,54 @@ WebRtcSession::~WebRtcSession() { InfoP(this); } +static string getUserName(const Buffer::Ptr &buffer) { + auto buf = buffer->data(); + auto len = buffer->size(); + if (!RTC::StunPacket::IsStun((const uint8_t *) buf, len)) { + return ""; + } + std::unique_ptr packet(RTC::StunPacket::Parse((const uint8_t *) buf, len)); + if (!packet) { + return ""; + } + if (packet->GetClass() != RTC::StunPacket::Class::REQUEST || + packet->GetMethod() != RTC::StunPacket::Method::BINDING) { + return ""; + } + //收到binding request请求 + auto vec = split(packet->GetUsername(), ":"); + return vec[0]; +} + +EventPoller::Ptr WebRtcSession::getPoller(const Buffer::Ptr &buffer) { + auto user_name = getUserName(buffer); + if (user_name.empty()) { + return nullptr; + } + auto ret = WebRtcTransportImp::getTransport(user_name); + return ret ? ret->getPoller() : nullptr; +} + void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { auto buf = buffer->data(); auto len = buffer->size(); - if (!_transport && RTC::StunPacket::IsStun((const uint8_t *) buf, len)) { - std::unique_ptr packet(RTC::StunPacket::Parse((const uint8_t *) buf, len)); - if (!packet) { - WarnL << "parse stun error"; + if (!_transport) { + auto user_name = getUserName(buffer); + if (user_name.empty()) { + //逻辑分支不太可能走到这里 + WarnL << user_name; return; } - if (packet->GetClass() == RTC::StunPacket::Class::REQUEST && - packet->GetMethod() == RTC::StunPacket::Method::BINDING) { - //收到binding request请求 - _transport = createTransport(packet->GetUsername()); + _transport = WebRtcTransportImp::getTransport(user_name); + if (!_transport) { + //逻辑分支不太可能走到这里 + WarnL << user_name; + return; } + _transport->setSession(this); } - - if (_transport) { - _transport->inputSockData(buf, len, &_peer_addr); - } + _transport->inputSockData(buf, len, &_peer_addr); } void WebRtcSession::onError(const SockException &err) { @@ -53,13 +81,3 @@ void WebRtcSession::onError(const SockException &err) { void WebRtcSession::onManager() { } - -std::shared_ptr WebRtcSession::createTransport(const string &user_name) { - if (user_name.empty()) { - return nullptr; - } - auto vec = split(user_name, ":"); - auto ret = WebRtcTransportImp::getTransport(vec[0]); - ret->setSession(this); - return ret; -} diff --git a/webrtc/WebRtcSession.h b/webrtc/WebRtcSession.h index 24b1eb7a..b86f3f81 100644 --- a/webrtc/WebRtcSession.h +++ b/webrtc/WebRtcSession.h @@ -20,6 +20,8 @@ using namespace toolkit; class WebRtcSession : public UdpSession { public: + static EventPoller::Ptr getPoller(const Buffer::Ptr &); + WebRtcSession(const Socket::Ptr &sock); ~WebRtcSession() override; @@ -27,12 +29,9 @@ public: void onError(const SockException &err) override; void onManager() override; -private: - std::shared_ptr createTransport(const string &user_name); - private: struct sockaddr _peer_addr; - std::shared_ptr _transport; + std::shared_ptr _transport; };