RtpServer支持udp与tcp模式并存

This commit is contained in:
ziyue 2022-11-01 17:27:27 +08:00
parent 60d96f4f3b
commit 2cdeddeb2c
4 changed files with 67 additions and 51 deletions

View File

@ -191,8 +191,8 @@ void RtpProcess::onDetach() {
} }
} }
void RtpProcess::setOnDetach(const function<void()> &cb) { void RtpProcess::setOnDetach(function<void()> cb) {
_on_detach = cb; _on_detach = std::move(cb);
} }
string RtpProcess::get_peer_ip() { string RtpProcess::get_peer_ip() {

View File

@ -50,7 +50,7 @@ public:
/** /**
* onDetach事件回调 * onDetach事件回调
*/ */
void setOnDetach(const std::function<void()> &cb); void setOnDetach(std::function<void()> cb);
/** /**
* onDetach事件回调,false检查RTP超时true停止 * onDetach事件回调,false检查RTP超时true停止

View File

@ -31,27 +31,48 @@ class RtcpHelper: public std::enable_shared_from_this<RtcpHelper> {
public: public:
using Ptr = std::shared_ptr<RtcpHelper>; using Ptr = std::shared_ptr<RtcpHelper>;
RtcpHelper(Socket::Ptr rtcp_sock, RtpProcess::Ptr process) { RtcpHelper(Socket::Ptr rtcp_sock, std::string stream_id) {
_rtcp_sock = std::move(rtcp_sock); _rtcp_sock = std::move(rtcp_sock);
_process = std::move(process); _stream_id = std::move(stream_id);
} }
void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){ ~RtcpHelper() {
//统计rtp接受情况用于发送rr包 if (_process) {
auto header = (RtpHeader *) buf->data(); // 删除rtp处理器
sendRtcp(ntohl(header->ssrc), addr, addr_len); RtpSelector::Instance().delProcess(_stream_id, _process.get());
}
} }
void startRtcp(){ void setOnDetach(function<void()> cb) {
if (_process) {
_process->setOnDetach(std::move(cb));
} else {
_on_detach = std::move(cb);
}
}
void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) {
if (!_process) {
_process = RtpSelector::Instance().getProcess(_stream_id, true);
_process->setOnDetach(std::move(_on_detach));
}
_process->inputRtp(true, sock, buf->data(), buf->size(), addr);
// 统计rtp接受情况用于发送rr包
auto header = (RtpHeader *)buf->data();
sendRtcp(ntohl(header->ssrc), addr);
}
void startRtcp() {
weak_ptr<RtcpHelper> weak_self = shared_from_this(); weak_ptr<RtcpHelper> weak_self = shared_from_this();
_rtcp_sock->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { _rtcp_sock->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
//用于接受rtcp打洞包 // 用于接受rtcp打洞包
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self || !strong_self->_process) {
return; return;
} }
if (!strong_self->_rtcp_addr) { if (!strong_self->_rtcp_addr) {
//只设置一次rtcp对端端口 // 只设置一次rtcp对端端口
strong_self->_rtcp_addr = std::make_shared<struct sockaddr_storage>(); strong_self->_rtcp_addr = std::make_shared<struct sockaddr_storage>();
memcpy(strong_self->_rtcp_addr.get(), addr, addr_len); memcpy(strong_self->_rtcp_addr.get(), addr, addr_len);
} }
@ -63,30 +84,32 @@ public:
} }
private: private:
void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr, int addr_len){ void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) {
//每5秒发送一次rtcp // 每5秒发送一次rtcp
if (_ticker.elapsedTime() < 5000) { if (_ticker.elapsedTime() < 5000 || !_process) {
return; return;
} }
_ticker.resetTime(); _ticker.resetTime();
auto rtcp_addr = (struct sockaddr *)_rtcp_addr.get(); auto rtcp_addr = (struct sockaddr *)_rtcp_addr.get();
if (!rtcp_addr) { if (!rtcp_addr) {
//默认的rtcp端口为rtp端口+1 // 默认的rtcp端口为rtp端口+1
switch(addr->sa_family){ switch (addr->sa_family) {
case AF_INET: ((sockaddr_in *) addr)->sin_port = htons(ntohs(((sockaddr_in *) addr)->sin_port) + 1); break; case AF_INET: ((sockaddr_in *)addr)->sin_port = htons(ntohs(((sockaddr_in *)addr)->sin_port) + 1); break;
case AF_INET6: ((sockaddr_in6 *) addr)->sin6_port = htons(ntohs(((sockaddr_in6 *) addr)->sin6_port) + 1); break; case AF_INET6: ((sockaddr_in6 *)addr)->sin6_port = htons(ntohs(((sockaddr_in6 *)addr)->sin6_port) + 1); break;
} }
//未收到rtcp打洞包时采用默认的rtcp端口 // 未收到rtcp打洞包时采用默认的rtcp端口
rtcp_addr = addr; rtcp_addr = addr;
} }
_rtcp_sock->send(_process->createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr, addr_len); _rtcp_sock->send(_process->createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr);
} }
private: private:
Ticker _ticker; Ticker _ticker;
Socket::Ptr _rtcp_sock; Socket::Ptr _rtcp_sock;
RtpProcess::Ptr _process; RtpProcess::Ptr _process;
std::string _stream_id;
function<void()> _on_detach;
std::shared_ptr<struct sockaddr_storage> _rtcp_addr; std::shared_ptr<struct sockaddr_storage> _rtcp_addr;
}; };
@ -127,25 +150,20 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
//创建udp服务器 //创建udp服务器
UdpServer::Ptr udp_server; UdpServer::Ptr udp_server;
RtpProcess::Ptr process; RtcpHelper::Ptr helper;
if (!stream_id.empty()) { if (!stream_id.empty()) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流) //指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
if (tcp_mode == NONE) { helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id);
process = RtpSelector::Instance().getProcess(stream_id, true); helper->startRtcp();
RtcpHelper::Ptr helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), process); rtp_socket->setOnRead([rtp_socket, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
helper->startRtcp(); RtpHeader *header = (RtpHeader *)buf->data();
rtp_socket->setOnRead( auto rtp_ssrc = ntohl(header->ssrc);
[rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { if (ssrc && rtp_ssrc != ssrc) {
RtpHeader *header = (RtpHeader *)buf->data(); WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc;
auto rtp_ssrc = ntohl(header->ssrc); } else {
if (ssrc && rtp_ssrc != ssrc) { helper->onRecvRtp(rtp_socket, buf, addr);
WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc; }
} else { });
process->inputRtp(true, rtp_socket, buf->data(), buf->size(), addr);
helper->onRecvRtp(buf, addr, addr_len);
}
});
}
} else { } else {
#if 1 #if 1
//单端口多线程接收多个流根据ssrc区分流 //单端口多线程接收多个流根据ssrc区分流
@ -162,26 +180,22 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
#endif #endif
} }
_on_cleanup = [rtp_socket, process, stream_id]() { _on_cleanup = [rtp_socket, stream_id]() {
if (rtp_socket) { if (rtp_socket) {
//去除循环引用 //去除循环引用
rtp_socket->setOnRead(nullptr); rtp_socket->setOnRead(nullptr);
} }
if (process) {
//删除rtp处理器
RtpSelector::Instance().delProcess(stream_id, process.get());
}
}; };
_tcp_server = tcp_server; _tcp_server = tcp_server;
_udp_server = udp_server; _udp_server = udp_server;
_rtp_socket = rtp_socket; _rtp_socket = rtp_socket;
_rtp_process = process; _rtcp_helper = helper;
} }
void RtpServer::setOnDetach(const function<void()> &cb) { void RtpServer::setOnDetach(function<void()> cb) {
if (_rtp_process) { if (_rtcp_helper) {
_rtp_process->setOnDetach(cb); _rtcp_helper->setOnDetach(std::move(cb));
} }
} }

View File

@ -18,7 +18,9 @@
#include "Network/UdpServer.h" #include "Network/UdpServer.h"
#include "RtpSession.h" #include "RtpSession.h"
namespace mediakit{ namespace mediakit {
class RtcpHelper;
/** /**
* RTP服务器UDP/TCP * RTP服务器UDP/TCP
@ -60,7 +62,7 @@ public:
/** /**
* RtpProcess onDetach事件回调 * RtpProcess onDetach事件回调
*/ */
void setOnDetach(const std::function<void()> &cb); void setOnDetach(std::function<void()> cb);
private: private:
// tcp主动模式连接服务器成功回调 // tcp主动模式连接服务器成功回调
@ -70,7 +72,7 @@ protected:
toolkit::Socket::Ptr _rtp_socket; toolkit::Socket::Ptr _rtp_socket;
toolkit::UdpServer::Ptr _udp_server; toolkit::UdpServer::Ptr _udp_server;
toolkit::TcpServer::Ptr _tcp_server; toolkit::TcpServer::Ptr _tcp_server;
RtpProcess::Ptr _rtp_process; std::shared_ptr<RtcpHelper> _rtcp_helper;
std::function<void()> _on_cleanup; std::function<void()> _on_cleanup;
//用于tcp主动模式 //用于tcp主动模式