diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 97f9b9a2..5d74e09b 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 97f9b9a2ac58353f72f085830690d27833b8ad88 +Subproject commit 5d74e09b8c84cccc46036ed2ef1a62f670c423d4 diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 8ee36526..e8081756 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -816,6 +816,19 @@ void installWebApi() { }); }); + api_regist("/index/api/broadcastMessage", [](API_ARGS_MAP) { + CHECK_SECRET(); + CHECK_ARGS("schema", "vhost", "app", "stream", "msg"); + auto src = MediaSource::find(allArgs["schema"], allArgs["vhost"], allArgs["app"], allArgs["stream"]); + if (!src) { + throw ApiRetException("can not find the stream", API::NotFound); + } + Any any; + Buffer::Ptr buffer = std::make_shared(allArgs["msg"]); + any.set(std::move(buffer)); + src->broadcastMessage(any); + }); + //测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs api_regist("/index/api/getMediaInfo",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index a8403680..bcf75a09 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -353,6 +353,8 @@ public: cb(std::list()); } + virtual bool broadcastMessage(const toolkit::Any &data) { return false; } + // 获取媒体源类型 MediaOriginType getOriginType() const; // 获取媒体源url或者文件路径 diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index d1faa1d4..bac7fcf9 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -55,9 +55,16 @@ public: void getPlayerList(const std::function &info_list)> &cb, const std::function &on_change) override { + assert(_ring); _ring->getInfoList(cb, on_change); } + bool broadcastMessage(const toolkit::Any &data) override { + assert(_ring); + _ring->sendMessage(data); + return true; + } + /** * 获取播放器个数 */ diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp index 4d8d9b9f..63f174eb 100644 --- a/webrtc/WebRtcPlayer.cpp +++ b/webrtc/WebRtcPlayer.cpp @@ -71,6 +71,21 @@ void WebRtcPlayer::onStartWebRTC() { } strong_self->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); }); + + _reader->setMessageCB([weak_self] (const toolkit::Any &data) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + if (data.is()) { + auto &buffer = data.get(); + // PPID 51: 文本string + // PPID 53: 二进制 + strong_self->sendDatachannel(0, 51, buffer.data(), buffer.size()); + } else { + WarnL << "Send unknown message type to webrtc player: " << data.type_name(); + } + }); } } void WebRtcPlayer::onDestory() { diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 82139a7f..4ec13e6f 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -229,6 +229,19 @@ void WebRtcTransport::OnSctpAssociationMessageReceived( _sctp->SendSctpMessage(params, ppid, msg, len); } #endif + +void WebRtcTransport::sendDatachannel(uint16_t streamId, uint32_t ppid, const char *msg, size_t len) { +#ifdef ENABLE_SCTP + if (_sctp) { + RTC::SctpStreamParameters params; + params.streamId = streamId; + _sctp->SendSctpMessage(params, ppid, (uint8_t *)msg, len); + } +#else + WarnL << "WebRTC datachannel disabled!"; +#endif +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTuple *tuple) { diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 2534ca3b..2d1e6bbf 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -112,6 +112,7 @@ public: */ void sendRtpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); + void sendDatachannel(uint16_t streamId, uint32_t ppid, const char *msg, size_t len); const EventPoller::Ptr& getPoller() const; Session::Ptr getSession() const;