diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index 6d5d42ee..aee6ba18 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1293,7 +1293,14 @@ { "key": "re_use_port", "value": "0", - "description": "是否重用端口,默认为0,非必选参数" + "description": "是否重用端口,默认为0,非必选参数", + "disabled": true + }, + { + "key": "ssrc", + "value": "0", + "description": "是否指定收流的rtp ssrc, 十进制数字,不指定或指定0时则不过滤rtp,非必选参数", + "disabled": true } ] } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index c725a1e0..e1d2bc82 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1056,7 +1056,8 @@ void installWebApi() { } RtpServer::Ptr server = std::make_shared(); - server->start(allArgs["port"], stream_id, allArgs["enable_tcp"].as(), "0.0.0.0", allArgs["re_use_port"].as()); + server->start(allArgs["port"], stream_id, allArgs["ssrc"].as(), allArgs["enable_tcp"].as(), + "0.0.0.0", allArgs["re_use_port"].as()); server->setOnDetach([stream_id]() { //设置rtp超时移除事件 lock_guard lck(s_rtpServerMapMtx); diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 2963cd25..12685587 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -88,7 +88,7 @@ private: std::shared_ptr _rtcp_addr; }; -void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip, bool re_use_port) { +void RtpServer::start(uint16_t local_port, const string &stream_id, uint32_t ssrc, bool enable_tcp, const char *local_ip, bool re_use_port) { //创建udp服务器 Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true); @@ -116,6 +116,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable tcp_server = std::make_shared(rtp_socket->getPoller()); (*tcp_server)[RtpSession::kStreamID] = stream_id; (*tcp_server)[RtpSession::kIsUDP] = 0; + (*tcp_server)[RtpSession::kSSRC] = ssrc; tcp_server->start(rtp_socket->get_local_port(), local_ip); } @@ -124,13 +125,18 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable RtpProcess::Ptr process; if (!stream_id.empty()) { //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) - //由于是一个端口一个流,单线程处理即可 process = RtpSelector::Instance().getProcess(stream_id, true); RtcpHelper::Ptr helper = std::make_shared(std::move(rtcp_socket), 90000); helper->startRtcp(); - rtp_socket->setOnRead([rtp_socket, process, helper](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { - process->inputRtp(true, rtp_socket, buf->data(), buf->size(), addr); - helper->onRecvRtp(buf, addr, addr_len); + rtp_socket->setOnRead([rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + RtpHeader *header = (RtpHeader *)buf->data(); + auto rtp_ssrc = ntohl(header->ssrc); + if (ssrc && rtp_ssrc != ssrc) { + WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc; + } else { + process->inputRtp(true, rtp_socket, buf->data(), buf->size(), addr); + helper->onRecvRtp(buf, addr, addr_len); + } }); } else { #if 1 @@ -176,4 +182,4 @@ uint16_t RtpServer::getPort() { } }//namespace mediakit -#endif//defined(ENABLE_RTPPROXY) \ No newline at end of file +#endif//defined(ENABLE_RTPPROXY) diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index 77c0c4e7..72f1d30c 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -39,7 +39,7 @@ public: * @param local_ip 绑定的本地网卡ip * @param re_use_port 是否设置socket为re_use属性 */ - void start(uint16_t local_port, const std::string &stream_id = "", bool enable_tcp = true, const char *local_ip = "0.0.0.0", bool re_use_port = true); + void start(uint16_t local_port, const std::string &stream_id = "", uint32_t ssrc = 0, bool enable_tcp = true, const char *local_ip = "0.0.0.0", bool re_use_port = true); /** * 获取绑定的本地端口 diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index fe88d8b5..c16da709 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -21,10 +21,12 @@ namespace mediakit{ const string RtpSession::kStreamID = "stream_id"; const string RtpSession::kIsUDP = "is_udp"; +const string RtpSession::kSSRC = "ssrc"; void RtpSession::attachServer(const Server &server) { _stream_id = const_cast(server)[kStreamID]; _is_udp = const_cast(server)[kIsUDP]; + _ssrc = const_cast(server)[kSSRC]; if (_is_udp) { //设置udp socket读缓存 @@ -89,7 +91,8 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { } } if (!_process) { - if (!RtpSelector::getSSRC(data, len, _ssrc)) { + //未设置ssrc时,尝试获取ssrc + if (!_ssrc && !RtpSelector::getSSRC(data, len, _ssrc)) { return; } if (_stream_id.empty()) { @@ -101,6 +104,12 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { _process->setListener(dynamic_pointer_cast(shared_from_this())); } try { + uint32_t rtp_ssrc = 0; + RtpSelector::getSSRC(data, len, rtp_ssrc); + if (rtp_ssrc != _ssrc) { + WarnP(this) << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << _ssrc; + return; + } _process->inputRtp(false, getSock(), data, len, &_addr); } catch (RtpTrack::BadRtpException &ex) { if (!_is_udp) { diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 488c4da1..f788858a 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -24,6 +24,7 @@ class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSour public: static const std::string kStreamID; static const std::string kIsUDP; + static const std::string kSSRC; RtpSession(const toolkit::Socket::Ptr &sock); ~RtpSession() override;