From ef11c66fb8b67af9ed0a45f7117622f9ac0947e5 Mon Sep 17 00:00:00 2001 From: xia-chu <771730766@qq.com> Date: Sat, 9 Nov 2024 19:17:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=AF=AD=E9=9F=B3=E5=AF=B9?= =?UTF-8?q?=E8=AE=B2=E6=8E=A5=E5=8F=A3(startSendRtpTalk)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- postman/ZLMediaKit.postman_collection.json | 75 ++++++++++ server/WebApi.cpp | 35 +++++ src/Common/MediaSource.h | 3 +- src/Rtp/RtpProcess.cpp | 4 + src/Rtp/RtpProcess.h | 2 + src/Rtp/RtpSender.cpp | 159 +++++++++++++-------- 6 files changed, 220 insertions(+), 58 deletions(-) diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index 3cdf255d..b35d78e6 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -2161,6 +2161,81 @@ }, "response": [] }, + { + "name": "开始双向对讲(startSendRtpTalk)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/startSendRtpTalk?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs&ssrc=1&recv_stream_id=", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "startSendRtpTalk" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置)" + }, + { + "key": "vhost", + "value": "{{defaultVhost}}", + "description": "虚拟主机,例如__defaultVhost__" + }, + { + "key": "app", + "value": "rtp", + "description": "应用名,例如 rtp" + }, + { + "key": "stream", + "value": "rtc", + "description": "流id,例如webrtc推流上来的流id" + }, + { + "key": "ssrc", + "value": "1", + "description": "rtp推流出去的ssrc" + }, + { + "key": "recv_stream_id", + "value": "", + "description": "对方rtp推流上来的流id,我们将通过这个链接回复他rtp流;请注意两个流的app和vhost需一致" + }, + { + "key": "from_mp4", + "value": "0", + "description": "是否推送本地MP4录像,该参数非必选参数", + "disabled": true + }, + { + "key": "type", + "value": "1", + "description": "0(ES流)、1(PS流)、2(TS流),默认1(PS流);该参数非必选参数", + "disabled": true + }, + { + "key": "pt", + "value": "96", + "description": "rtp payload type,默认96;该参数非必选参数", + "disabled": true + }, + { + "key": "only_audio", + "value": "1", + "description": "rtp es方式打包时,是否只打包音频;该参数非必选参数", + "disabled": true + } + ] + } + }, + "response": [] + }, { "name": "停止 发送rtp(stopSendRtp)", "request": { diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 5c1aab1a..873f33cb 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1606,6 +1606,41 @@ void installWebApi() { start_send_rtp(true, API_ARGS_VALUE, invoker); }); + api_regist("/index/api/startSendRtpTalk",[](API_ARGS_MAP_ASYNC){ + CHECK_SECRET(); + CHECK_ARGS("vhost", "app", "stream", "ssrc", "recv_stream_id"); + auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["from_mp4"].as()); + if (!src) { + throw ApiRetException("can not find the source stream", API::NotFound); + } + MediaSourceEvent::SendRtpArgs args; + args.con_type = mediakit::MediaSourceEvent::SendRtpArgs::kVoiceTalk; + args.ssrc = allArgs["ssrc"]; + args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); + args.data_type = allArgs["type"].empty() ? MediaSourceEvent::SendRtpArgs::kRtpPS : (MediaSourceEvent::SendRtpArgs::DataType)(allArgs["type"].as()); + args.only_audio = allArgs["only_audio"].as(); + args.recv_stream_id = allArgs["recv_stream_id"]; + args.recv_stream_app = allArgs["app"]; + args.recv_stream_vhost = allArgs["vhost"]; + + src->getOwnerPoller()->async([=]() mutable { + try { + src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { + if (ex) { + val["code"] = API::OtherFailed; + val["msg"] = ex.what(); + } + val["local_port"] = local_port; + invoker(200, headerOut, val.toStyledString()); + }); + } catch (std::exception &ex) { + val["code"] = API::Exception; + val["msg"] = ex.what(); + invoker(200, headerOut, val.toStyledString()); + } + }); + }); + api_regist("/index/api/listRtpSender",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); CHECK_ARGS("vhost", "app", "stream"); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index abb5420f..8469bc2d 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -124,7 +124,8 @@ public: kTcpActive = 0, // tcp主动模式,tcp客户端主动连接对方并发送rtp kUdpActive = 1, // udp主动模式,主动发送数据给对方 kTcpPassive = 2, // tcp被动模式,tcp服务器,等待对方连接并回复rtp - kUdpPassive = 3 // udp被动方式,等待对方发送nat打洞包,然后回复rtp至打洞包源地址 + kUdpPassive = 3, // udp被动方式,等待对方发送nat打洞包,然后回复rtp至打洞包源地址 + kVoiceTalk = 4, // 语音对讲模式,对方必须想推流上来,通过他的推流链路再回复rtp数据 }; // rtp类型 [AUTO-TRANSLATED:acca40ab] diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 62a82e90..61079161 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -346,5 +346,9 @@ float RtpProcess::getLossRate(MediaSource &sender, TrackType type) { return getLostInterval() * 100 / expected; } +const toolkit::Socket::Ptr& RtpProcess::getSock() const { + return _sock; +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index ef82f82d..eecdf694 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -102,6 +102,8 @@ public: uint16_t get_peer_port() override; std::string getIdentifier() const override; + const toolkit::Socket::Ptr& getSock() const; + protected: bool inputFrame(const Frame::Ptr &frame) override; bool addTrack(const Track::Ptr & track) override; diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 21b82ab6..88f06961 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -190,6 +190,25 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct } }, delay_ms / 1000.0, "::", args.src_port); InfoL << "start tcp active send rtp to: " << args.dst_url << ":" << args.dst_port; + } else if (args.con_type == MediaSourceEvent::SendRtpArgs::kVoiceTalk) { + auto src = MediaSource::find(args.recv_stream_vhost, args.recv_stream_app, args.recv_stream_id); + if (!src) { + cb(0, SockException(Err_other, "can not find the target stream")); + return; + } + auto processor = src->getRtpProcess(); + if (!processor) { + cb(0, SockException(Err_other, "get rtp processor from target stream failed")); + return; + } + auto sock = processor->getSock(); + if (!sock) { + cb(0, SockException(Err_other, "get sock from rtp processor failed")); + return; + } + _socket_rtp = std::move(sock); + onConnect(); + cb(_socket_rtp->get_local_port(), SockException()); } else { CHECK(0, "invalid con type"); } @@ -249,48 +268,51 @@ void RtpSender::onConnect() { // 加大发送缓存,防止udp丢包之类的问题 [AUTO-TRANSLATED:6e1cb40a] // Increase the send buffer to prevent problems such as UDP packet loss SockUtil::setSendBuf(_socket_rtp->rawFD(), 4 * 1024 * 1024); - if (_args.con_type == MediaSourceEvent::SendRtpArgs::kTcpActive || _args.con_type == MediaSourceEvent::SendRtpArgs::kTcpPassive) { - // 关闭tcp no_delay并开启MSG_MORE, 提高发送性能 [AUTO-TRANSLATED:c0f4e378] - // Close TCP no_delay and enable MSG_MORE to improve sending performance - SockUtil::setNoDelay(_socket_rtp->rawFD(), false); - _socket_rtp->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); - } else if (_args.udp_rtcp_timeout) { - createRtcpSocket(); - } - // 连接建立成功事件 [AUTO-TRANSLATED:ac279c86] - // Connection established successfully event - weak_ptr weak_self = shared_from_this(); - if (!_args.recv_stream_id.empty()) { - mINI ini; - ini[RtpSession::kStreamID] = _args.recv_stream_id; - // 强制同步接收流和发送流的app和vhost [AUTO-TRANSLATED:134c9663] - // Force synchronization of the app and vhost of the receive stream and send stream - ini[RtpSession::kApp] = _args.recv_stream_app; - ini[RtpSession::kVhost] = _args.recv_stream_vhost; - _rtp_session = std::make_shared(_socket_rtp); - _rtp_session->setParams(ini); - _socket_rtp->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + if (_args.con_type != MediaSourceEvent::SendRtpArgs::kVoiceTalk) { + if (_args.con_type == MediaSourceEvent::SendRtpArgs::kTcpActive || _args.con_type == MediaSourceEvent::SendRtpArgs::kTcpPassive) { + // 关闭tcp no_delay并开启MSG_MORE, 提高发送性能 [AUTO-TRANSLATED:c0f4e378] + // Close TCP no_delay and enable MSG_MORE to improve sending performance + SockUtil::setNoDelay(_socket_rtp->rawFD(), false); + _socket_rtp->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + } else if (_args.udp_rtcp_timeout) { + createRtcpSocket(); + } + // 连接建立成功事件 [AUTO-TRANSLATED:ac279c86] + // Connection established successfully event + weak_ptr weak_self = shared_from_this(); + if (!_args.recv_stream_id.empty()) { + mINI ini; + ini[RtpSession::kStreamID] = _args.recv_stream_id; + // 强制同步接收流和发送流的app和vhost [AUTO-TRANSLATED:134c9663] + // Force synchronization of the app and vhost of the receive stream and send stream + ini[RtpSession::kApp] = _args.recv_stream_app; + ini[RtpSession::kVhost] = _args.recv_stream_vhost; + _rtp_session = std::make_shared(_socket_rtp); + _rtp_session->setParams(ini); + + _socket_rtp->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + try { + strong_self->_rtp_session->onRecv(buf); + } catch (std::exception &ex) { + SockException err(toolkit::Err_shutdown, ex.what()); + strong_self->_rtp_session->shutdown(err); + } + }); + } else { + _socket_rtp->setOnRead(nullptr); + } + _socket_rtp->setOnErr([weak_self](const SockException &err) { auto strong_self = weak_self.lock(); - if (!strong_self) { - return; - } - try { - strong_self->_rtp_session->onRecv(buf); - } catch (std::exception &ex) { - SockException err(toolkit::Err_shutdown, ex.what()); - strong_self->_rtp_session->shutdown(err); + if (strong_self) { + strong_self->onErr(err); } }); - } else { - _socket_rtp->setOnRead(nullptr); } - _socket_rtp->setOnErr([weak_self](const SockException &err) { - auto strong_self = weak_self.lock(); - if (strong_self) { - strong_self->onErr(err); - } - }); InfoL << "startSend rtp success: " << _socket_rtp->get_peer_ip() << ":" << _socket_rtp->get_peer_port() << ", data_type: " << _args.data_type << ", con_type: " << _args.con_type; } @@ -378,28 +400,51 @@ void RtpSender::onFlushRtpList(shared_ptr> rtp_list) { return; } - size_t i = 0; - auto size = rtp_list->size(); - rtp_list->for_each([&](Buffer::Ptr &packet) { - switch (_args.con_type) { - case MediaSourceEvent::SendRtpArgs::kUdpActive: - case MediaSourceEvent::SendRtpArgs::kUdpPassive: { - onSendRtpUdp(packet, i == 0); - // udp模式,rtp over tcp前4个字节可以忽略 [AUTO-TRANSLATED:5d648f4b] - // UDP mode, the first 4 bytes of rtp over tcp can be ignored - _socket_rtp->send(std::make_shared(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size); - break; + auto send_func = [this](const shared_ptr> &rtp_list) { + size_t i = 0; + auto size = rtp_list->size(); + rtp_list->for_each([&](Buffer::Ptr &packet) { + switch (_args.con_type) { + case MediaSourceEvent::SendRtpArgs::kUdpActive: + case MediaSourceEvent::SendRtpArgs::kUdpPassive: { + onSendRtpUdp(packet, i == 0); + // udp模式,rtp over tcp前4个字节可以忽略 [AUTO-TRANSLATED:5d648f4b] + // UDP mode, the first 4 bytes of rtp over tcp can be ignored + _socket_rtp->send(std::make_shared(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size); + break; + } + case MediaSourceEvent::SendRtpArgs::kTcpActive: + case MediaSourceEvent::SendRtpArgs::kTcpPassive: { + // tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 [AUTO-TRANSLATED:a3bc338a] + // TCP mode, the first 2 bytes of rtp over tcp can be ignored, only the subsequent 2 bytes of rtp length are retained + _socket_rtp->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); + break; + } + case MediaSourceEvent::SendRtpArgs::kVoiceTalk: { + auto type = _socket_rtp->alive() ? _socket_rtp->sockType() : SockNum::Sock_Invalid; + if (type == SockNum::Sock_UDP) { + _socket_rtp->send(std::make_shared(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size); + } else if (type == SockNum::Sock_TCP) { + _socket_rtp->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); + } else { + onErr(SockException(Err_other, "dst socket disconnected")); + } + break; + } + default: CHECK(0); } - case MediaSourceEvent::SendRtpArgs::kTcpActive: - case MediaSourceEvent::SendRtpArgs::kTcpPassive: { - // tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 [AUTO-TRANSLATED:a3bc338a] - // TCP mode, the first 2 bytes of rtp over tcp can be ignored, only the subsequent 2 bytes of rtp length are retained - _socket_rtp->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); - break; + }); + }; + if (_args.con_type != MediaSourceEvent::SendRtpArgs::kVoiceTalk) { + weak_ptr weak_self = shared_from_this(); + _socket_rtp->getPoller()->async([weak_self, rtp_list, send_func]() { + if (auto strong_self = weak_self.lock()) { + send_func(rtp_list); } - default: CHECK(0); - } - }); + }); + } else { + send_func(rtp_list); + } } void RtpSender::onErr(const SockException &ex) {