openrtpserver接口新增ssrc参数,强制过滤不属于本端口的视频流,以解决视频串流问题 (#1572)

This commit is contained in:
wangcker 2022-04-16 15:12:49 +08:00 committed by GitHub
parent 2b460c97ed
commit e712639e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 34 additions and 10 deletions

View File

@ -1293,7 +1293,14 @@
{ {
"key": "re_use_port", "key": "re_use_port",
"value": "0", "value": "0",
"description": "是否重用端口默认为0非必选参数" "description": "是否重用端口默认为0非必选参数",
"disabled": true
},
{
"key": "ssrc",
"value": "0",
"description": "是否指定收流的rtp ssrc, 十进制数字不指定或指定0时则不过滤rtp非必选参数",
"disabled": true
} }
] ]
} }

View File

@ -1056,7 +1056,8 @@ void installWebApi() {
} }
RtpServer::Ptr server = std::make_shared<RtpServer>(); RtpServer::Ptr server = std::make_shared<RtpServer>();
server->start(allArgs["port"], stream_id, allArgs["enable_tcp"].as<bool>(), "0.0.0.0", allArgs["re_use_port"].as<bool>()); server->start(allArgs["port"], stream_id, allArgs["ssrc"].as<uint32_t>(), allArgs["enable_tcp"].as<bool>(),
"0.0.0.0", allArgs["re_use_port"].as<bool>());
server->setOnDetach([stream_id]() { server->setOnDetach([stream_id]() {
//设置rtp超时移除事件 //设置rtp超时移除事件
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);

View File

@ -88,7 +88,7 @@ private:
std::shared_ptr<struct sockaddr> _rtcp_addr; std::shared_ptr<struct sockaddr> _rtcp_addr;
}; };
void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip, bool re_use_port) { void RtpServer::start(uint16_t local_port, const string &stream_id, uint32_t ssrc, bool enable_tcp, const char *local_ip, bool re_use_port) {
//创建udp服务器 //创建udp服务器
Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true);
Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true);
@ -116,6 +116,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable
tcp_server = std::make_shared<TcpServer>(rtp_socket->getPoller()); tcp_server = std::make_shared<TcpServer>(rtp_socket->getPoller());
(*tcp_server)[RtpSession::kStreamID] = stream_id; (*tcp_server)[RtpSession::kStreamID] = stream_id;
(*tcp_server)[RtpSession::kIsUDP] = 0; (*tcp_server)[RtpSession::kIsUDP] = 0;
(*tcp_server)[RtpSession::kSSRC] = ssrc;
tcp_server->start<RtpSession>(rtp_socket->get_local_port(), local_ip); tcp_server->start<RtpSession>(rtp_socket->get_local_port(), local_ip);
} }
@ -124,13 +125,18 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable
RtpProcess::Ptr process; RtpProcess::Ptr process;
if (!stream_id.empty()) { if (!stream_id.empty()) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流) //指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
//由于是一个端口一个流,单线程处理即可
process = RtpSelector::Instance().getProcess(stream_id, true); process = RtpSelector::Instance().getProcess(stream_id, true);
RtcpHelper::Ptr helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), 90000); RtcpHelper::Ptr helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), 90000);
helper->startRtcp(); helper->startRtcp();
rtp_socket->setOnRead([rtp_socket, process, helper](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { rtp_socket->setOnRead([rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
process->inputRtp(true, rtp_socket, buf->data(), buf->size(), addr); RtpHeader *header = (RtpHeader *)buf->data();
helper->onRecvRtp(buf, addr, addr_len); auto rtp_ssrc = ntohl(header->ssrc);
if (ssrc && rtp_ssrc != ssrc) {
WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc;
} else {
process->inputRtp(true, rtp_socket, buf->data(), buf->size(), addr);
helper->onRecvRtp(buf, addr, addr_len);
}
}); });
} else { } else {
#if 1 #if 1

View File

@ -39,7 +39,7 @@ public:
* @param local_ip ip * @param local_ip ip
* @param re_use_port socket为re_use属性 * @param re_use_port socket为re_use属性
*/ */
void start(uint16_t local_port, const std::string &stream_id = "", bool enable_tcp = true, const char *local_ip = "0.0.0.0", bool re_use_port = true); void start(uint16_t local_port, const std::string &stream_id = "", uint32_t ssrc = 0, bool enable_tcp = true, const char *local_ip = "0.0.0.0", bool re_use_port = true);
/** /**
* *

View File

@ -21,10 +21,12 @@ namespace mediakit{
const string RtpSession::kStreamID = "stream_id"; const string RtpSession::kStreamID = "stream_id";
const string RtpSession::kIsUDP = "is_udp"; const string RtpSession::kIsUDP = "is_udp";
const string RtpSession::kSSRC = "ssrc";
void RtpSession::attachServer(const Server &server) { void RtpSession::attachServer(const Server &server) {
_stream_id = const_cast<Server &>(server)[kStreamID]; _stream_id = const_cast<Server &>(server)[kStreamID];
_is_udp = const_cast<Server &>(server)[kIsUDP]; _is_udp = const_cast<Server &>(server)[kIsUDP];
_ssrc = const_cast<Server &>(server)[kSSRC];
if (_is_udp) { if (_is_udp) {
//设置udp socket读缓存 //设置udp socket读缓存
@ -89,7 +91,8 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
} }
} }
if (!_process) { if (!_process) {
if (!RtpSelector::getSSRC(data, len, _ssrc)) { //未设置ssrc时尝试获取ssrc
if (!_ssrc && !RtpSelector::getSSRC(data, len, _ssrc)) {
return; return;
} }
if (_stream_id.empty()) { if (_stream_id.empty()) {
@ -101,6 +104,12 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
_process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this())); _process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
} }
try { try {
uint32_t rtp_ssrc = 0;
RtpSelector::getSSRC(data, len, rtp_ssrc);
if (rtp_ssrc != _ssrc) {
WarnP(this) << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << _ssrc;
return;
}
_process->inputRtp(false, getSock(), data, len, &_addr); _process->inputRtp(false, getSock(), data, len, &_addr);
} catch (RtpTrack::BadRtpException &ex) { } catch (RtpTrack::BadRtpException &ex) {
if (!_is_udp) { if (!_is_udp) {

View File

@ -24,6 +24,7 @@ class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSour
public: public:
static const std::string kStreamID; static const std::string kStreamID;
static const std::string kIsUDP; static const std::string kIsUDP;
static const std::string kSSRC;
RtpSession(const toolkit::Socket::Ptr &sock); RtpSession(const toolkit::Socket::Ptr &sock);
~RtpSession() override; ~RtpSession() override;