diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index a67aa423..38829c06 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1661,6 +1661,12 @@ "value": "0", "description": "udp方式推流时,是否开启rtcp发送和rtcp接收超时判断,开启后(默认关闭),如果接收rr rtcp超时,将导致主动停止rtp发送", "disabled": true + }, + { + "key": "recv_stream_id", + "value": "", + "description": "发送rtp同时接收,一般用于双向语言对讲, 如果不为空,说明开启接收,值为接收流的id", + "disabled": true } ] } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 319a222d..11926de4 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1203,6 +1203,7 @@ void installWebApi() { args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as(); args.only_audio = allArgs["only_audio"].as(); args.udp_rtcp_timeout = allArgs["udp_rtcp_timeout"]; + args.recv_stream_id = allArgs["recv_stream_id"]; TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; src->getOwnerPoller()->async([=]() mutable { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 84961d2c..90ca24f1 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -118,6 +118,9 @@ public: uint32_t rtcp_timeout_ms = 30 * 1000; //udp 发送时,发送sr rtcp包间隔,单位毫秒 uint32_t rtcp_send_interval_ms = 5 * 1000; + + //发送rtp同时接收,一般用于双向语言对讲, 如果不为空,说明开启接收 + std::string recv_stream_id; }; // 开始发送ps-rtp diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 930b2835..b7d32b76 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -10,6 +10,7 @@ #if defined(ENABLE_RTPPROXY) #include "RtpSender.h" +#include "RtpSession.h" #include "Rtsp/RtspSession.h" #include "Thread/WorkThreadPool.h" #include "Util/uv_errno.h" @@ -213,6 +214,25 @@ void RtpSender::onConnect(){ } //连接建立成功事件 weak_ptr weak_self = shared_from_this(); + if (!_args.recv_stream_id.empty()) { + mINI ini; + ini[RtpSession::kStreamID] = _args.recv_stream_id; + _rtp_session = std::make_shared(_socket_rtp); + _rtp_session->setParams(ini); + + _socket_rtp->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + try { + strong_self->_rtp_session->onRecv(buf); + } catch (std::exception &ex){ + SockException err(toolkit::Err_shutdown, ex.what()); + strong_self->_rtp_session->shutdown(err); + } + }); + } _socket_rtp->setOnErr([weak_self](const SockException &err) { auto strong_self = weak_self.lock(); if (strong_self) { diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 99c8fd0a..304a13f6 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -19,6 +19,8 @@ namespace mediakit{ +class RtpSession; + //rtp发送客户端,支持发送GB28181协议 class RtpSender final : public MediaSinkInterface, public std::enable_shared_from_this{ public: @@ -85,6 +87,7 @@ private: std::shared_ptr _rtcp_context; toolkit::Ticker _rtcp_send_ticker; toolkit::Ticker _rtcp_recv_ticker; + std::shared_ptr _rtp_session; std::function _on_close; }; diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 52eb67b3..190dff5f 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -24,8 +24,12 @@ const string RtpSession::kStreamID = "stream_id"; const string RtpSession::kSSRC = "ssrc"; void RtpSession::attachServer(const Server &server) { - _stream_id = const_cast(server)[kStreamID]; - _ssrc = const_cast(server)[kSSRC]; + setParams(const_cast(server)); +} + +void RtpSession::setParams(mINI &ini) { + _stream_id = ini[kStreamID]; + _ssrc = ini[kSSRC]; } RtpSession::RtpSession(const Socket::Ptr &sock) : Session(sock) { diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index fa346802..14cd33dd 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -30,6 +30,7 @@ public: void onRecv(const toolkit::Buffer::Ptr &) override; void onError(const toolkit::SockException &err) override; void onManager() override; + void setParams(toolkit::mINI &ini); void attachServer(const toolkit::Server &server) override; protected: