Add datachannel c apis and callbacks(#3328)

增加datachannel数据收发的回调通知 #3326,和控制datachannel回显的开关

---------

Co-authored-by: xiongziliang <771730766@qq.com>
This commit is contained in:
gongluck 2024-03-02 16:52:51 +08:00 committed by GitHub
parent 06abbd0eb7
commit 5a6364bae2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 161 additions and 3 deletions

View File

@ -177,6 +177,33 @@ typedef struct {
*/ */
void(API_CALL *on_mk_media_send_rtp_stop)(const char *vhost, const char *app, const char *stream, const char *ssrc, int err, const char *msg); void(API_CALL *on_mk_media_send_rtp_stop)(const char *vhost, const char *app, const char *stream, const char *ssrc, int err, const char *msg);
/**
* rtc sctp连接中///
* @param rtc_transport
*/
void(API_CALL *on_mk_rtc_sctp_connecting)(mk_rtc_transport rtc_transport);
void(API_CALL *on_mk_rtc_sctp_connected)(mk_rtc_transport rtc_transport);
void(API_CALL *on_mk_rtc_sctp_failed)(mk_rtc_transport rtc_transport);
void(API_CALL *on_mk_rtc_sctp_closed)(mk_rtc_transport rtc_transport);
/**
* rtc数据通道发送数据回调
* @param rtc_transport
* @param msg
* @param len
*/
void(API_CALL *on_mk_rtc_sctp_send)(mk_rtc_transport rtc_transport, const uint8_t *msg, size_t len);
/**
* rtc数据通道接收数据回调
* @param rtc_transport
* @param streamId id
* @param ppid id
* @param msg
* @param len
*/
void(API_CALL *on_mk_rtc_sctp_received)(mk_rtc_transport rtc_transport, uint16_t streamId, uint32_t ppid, const uint8_t *msg, size_t len);
} mk_events; } mk_events;

View File

@ -352,6 +352,20 @@ API_EXPORT mk_auth_invoker API_CALL mk_auth_invoker_clone(const mk_auth_invoker
*/ */
API_EXPORT void API_CALL mk_auth_invoker_clone_release(const mk_auth_invoker ctx); API_EXPORT void API_CALL mk_auth_invoker_clone_release(const mk_auth_invoker ctx);
///////////////////////////////////////////WebRtcTransport/////////////////////////////////////////////
//WebRtcTransport对象的C映射
typedef struct mk_rtc_transport_t *mk_rtc_transport;
/**
* rtc数据通道
* @param ctx
* @param streamId id
* @param ppid id
* @param msg
* @param len
*/
API_EXPORT void API_CALL mk_rtc_send_datachannel(const mk_rtc_transport ctx, uint16_t streamId, uint32_t ppid, const char* msg, size_t len);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -14,6 +14,7 @@
#include "Http/HttpSession.h" #include "Http/HttpSession.h"
#include "Rtsp/RtspSession.h" #include "Rtsp/RtspSession.h"
#include "Record/MP4Recorder.h" #include "Record/MP4Recorder.h"
#include "webrtc/WebRtcTransport.h"
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
@ -167,6 +168,42 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
sender.getMediaTuple().stream.c_str(), ssrc.c_str(), ex.getErrCode(), ex.what()); sender.getMediaTuple().stream.c_str(), ssrc.c_str(), ex.getErrCode(), ex.what());
} }
}); });
NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpConnecting,[](BroadcastRtcSctpConnectArgs){
if (s_events.on_mk_rtc_sctp_connecting) {
s_events.on_mk_rtc_sctp_connecting((mk_rtc_transport)&sender);
}
});
NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpConnected,[](BroadcastRtcSctpConnectArgs){
if (s_events.on_mk_rtc_sctp_connected) {
s_events.on_mk_rtc_sctp_connected((mk_rtc_transport)&sender);
}
});
NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpFailed,[](BroadcastRtcSctpConnectArgs){
if (s_events.on_mk_rtc_sctp_failed) {
s_events.on_mk_rtc_sctp_failed((mk_rtc_transport)&sender);
}
});
NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpClosed,[](BroadcastRtcSctpConnectArgs){
if (s_events.on_mk_rtc_sctp_closed) {
s_events.on_mk_rtc_sctp_closed((mk_rtc_transport)&sender);
}
});
NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpSend,[](BroadcastRtcSctpSendArgs){
if (s_events.on_mk_rtc_sctp_send) {
s_events.on_mk_rtc_sctp_send((mk_rtc_transport)&sender, data, len);
}
});
NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpReceived,[](BroadcastRtcSctpReceivedArgs){
if (s_events.on_mk_rtc_sctp_received) {
s_events.on_mk_rtc_sctp_received((mk_rtc_transport)&sender, streamId, ppid, msg, len);
}
});
}); });
} }

View File

@ -17,6 +17,7 @@
#include "Http/HttpClient.h" #include "Http/HttpClient.h"
#include "Rtsp/RtspSession.h" #include "Rtsp/RtspSession.h"
#include "webrtc/WebRtcTransport.h"
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
@ -498,3 +499,21 @@ API_EXPORT void API_CALL mk_auth_invoker_clone_release(const mk_auth_invoker ctx
Broadcast::AuthInvoker *invoker = (Broadcast::AuthInvoker *)ctx; Broadcast::AuthInvoker *invoker = (Broadcast::AuthInvoker *)ctx;
delete invoker; delete invoker;
} }
///////////////////////////////////////////WebRtcTransport/////////////////////////////////////////////
API_EXPORT void API_CALL mk_rtc_sendDatachannel(const mk_rtc_transport ctx, uint16_t streamId, uint32_t ppid, const char *msg, size_t len) {
#ifdef ENABLE_WEBRTC
assert(ctx && msg);
WebRtcTransport *transport = (WebRtcTransport *)ctx;
std::string msg_str(msg, len);
std::weak_ptr<WebRtcTransport> weak_trans = transport->shared_from_this();
transport->getPoller()->async([streamId, ppid, msg_str, weak_trans]() {
// 切换线程后再操作
if (auto trans = weak_trans.lock()) {
trans->sendDatachannel(streamId, ppid, msg_str.c_str(), msg_str.size());
}
});
#else
WarnL << "未启用webrtc功能, 编译时请开启ENABLE_WEBRTC";
#endif
}

View File

@ -58,6 +58,12 @@ const string kBroadcastStreamNoneReader = "kBroadcastStreamNoneReader";
const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess"; const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess";
const string kBroadcastSendRtpStopped = "kBroadcastSendRtpStopped"; const string kBroadcastSendRtpStopped = "kBroadcastSendRtpStopped";
const string kBroadcastRtpServerTimeout = "kBroadcastRtpServerTimeout"; const string kBroadcastRtpServerTimeout = "kBroadcastRtpServerTimeout";
const string kBroadcastRtcSctpConnecting = "kBroadcastRtcSctpConnecting";
const string kBroadcastRtcSctpConnected = "kBroadcastRtcSctpConnected";
const string kBroadcastRtcSctpFailed = "kBroadcastRtcSctpFailed";
const string kBroadcastRtcSctpClosed = "kBroadcastRtcSctpClosed";
const string kBroadcastRtcSctpSend = "kBroadcastRtcSctpSend";
const string kBroadcastRtcSctpReceived = "kBroadcastRtcSctpReceived";
} // namespace Broadcast } // namespace Broadcast

View File

@ -109,6 +109,21 @@ extern const std::string kBroadcastReloadConfig;
extern const std::string kBroadcastRtpServerTimeout; extern const std::string kBroadcastRtpServerTimeout;
#define BroadcastRtpServerTimeoutArgs uint16_t &local_port, const string &stream_id,int &tcp_mode, bool &re_use_port, uint32_t &ssrc #define BroadcastRtpServerTimeoutArgs uint16_t &local_port, const string &stream_id,int &tcp_mode, bool &re_use_port, uint32_t &ssrc
// rtc transport sctp 连接状态
extern const std::string kBroadcastRtcSctpConnecting;
extern const std::string kBroadcastRtcSctpConnected;
extern const std::string kBroadcastRtcSctpFailed;
extern const std::string kBroadcastRtcSctpClosed;
#define BroadcastRtcSctpConnectArgs WebRtcTransport& sender
// rtc transport sctp 发送数据
extern const std::string kBroadcastRtcSctpSend;
#define BroadcastRtcSctpSendArgs WebRtcTransport& sender, const uint8_t *&data, size_t& len
// rtc transport sctp 接收数据
extern const std::string kBroadcastRtcSctpReceived;
#define BroadcastRtcSctpReceivedArgs WebRtcTransport& sender, uint16_t &streamId, uint32_t &ppid, const uint8_t *&msg, size_t &len
#define ReloadConfigTag ((void *)(0xFF)) #define ReloadConfigTag ((void *)(0xFF))
#define RELOAD_KEY(arg, key) \ #define RELOAD_KEY(arg, key) \
do { \ do { \

View File

@ -55,6 +55,9 @@ const string kStartBitrate = RTC_FIELD "start_bitrate";
const string kMaxBitrate = RTC_FIELD "max_bitrate"; const string kMaxBitrate = RTC_FIELD "max_bitrate";
const string kMinBitrate = RTC_FIELD "min_bitrate"; const string kMinBitrate = RTC_FIELD "min_bitrate";
// 数据通道设置
const string kDataChannelEcho = RTC_FIELD "datachannel_echo";
static onceToken token([]() { static onceToken token([]() {
mINI::Instance()[kTimeOutSec] = 15; mINI::Instance()[kTimeOutSec] = 15;
mINI::Instance()[kExternIP] = ""; mINI::Instance()[kExternIP] = "";
@ -65,6 +68,8 @@ static onceToken token([]() {
mINI::Instance()[kStartBitrate] = 0; mINI::Instance()[kStartBitrate] = 0;
mINI::Instance()[kMaxBitrate] = 0; mINI::Instance()[kMaxBitrate] = 0;
mINI::Instance()[kMinBitrate] = 0; mINI::Instance()[kMinBitrate] = 0;
mINI::Instance()[kDataChannelEcho] = true;
}); });
} // namespace RTC } // namespace RTC
@ -250,22 +255,47 @@ void WebRtcTransport::OnDtlsTransportApplicationDataReceived(
#ifdef ENABLE_SCTP #ifdef ENABLE_SCTP
void WebRtcTransport::OnSctpAssociationConnecting(RTC::SctpAssociation *sctpAssociation) { void WebRtcTransport::OnSctpAssociationConnecting(RTC::SctpAssociation *sctpAssociation) {
TraceL << getIdentifier(); TraceL << getIdentifier();
try {
NOTICE_EMIT(BroadcastRtcSctpConnectArgs, Broadcast::kBroadcastRtcSctpConnecting, *this);
} catch (std::exception &ex) {
WarnL << "Exception occurred: " << ex.what();
}
} }
void WebRtcTransport::OnSctpAssociationConnected(RTC::SctpAssociation *sctpAssociation) { void WebRtcTransport::OnSctpAssociationConnected(RTC::SctpAssociation *sctpAssociation) {
InfoL << getIdentifier(); InfoL << getIdentifier();
try {
NOTICE_EMIT(BroadcastRtcSctpConnectArgs, Broadcast::kBroadcastRtcSctpConnected, *this);
} catch (std::exception &ex) {
WarnL << "Exception occurred: " << ex.what();
}
} }
void WebRtcTransport::OnSctpAssociationFailed(RTC::SctpAssociation *sctpAssociation) { void WebRtcTransport::OnSctpAssociationFailed(RTC::SctpAssociation *sctpAssociation) {
WarnL << getIdentifier(); WarnL << getIdentifier();
try {
NOTICE_EMIT(BroadcastRtcSctpConnectArgs, Broadcast::kBroadcastRtcSctpFailed, *this);
} catch (std::exception &ex) {
WarnL << "Exception occurred: " << ex.what();
}
} }
void WebRtcTransport::OnSctpAssociationClosed(RTC::SctpAssociation *sctpAssociation) { void WebRtcTransport::OnSctpAssociationClosed(RTC::SctpAssociation *sctpAssociation) {
InfoL << getIdentifier(); InfoL << getIdentifier();
try {
NOTICE_EMIT(BroadcastRtcSctpConnectArgs, Broadcast::kBroadcastRtcSctpClosed, *this);
} catch (std::exception &ex) {
WarnL << "Exception occurred: " << ex.what();
}
} }
void WebRtcTransport::OnSctpAssociationSendData( void WebRtcTransport::OnSctpAssociationSendData(
RTC::SctpAssociation *sctpAssociation, const uint8_t *data, size_t len) { RTC::SctpAssociation *sctpAssociation, const uint8_t *data, size_t len) {
try {
NOTICE_EMIT(BroadcastRtcSctpSendArgs, Broadcast::kBroadcastRtcSctpSend, *this, data, len);
} catch (std::exception &ex) {
WarnL << "Exception occurred: " << ex.what();
}
_dtls_transport->SendApplicationData(data, len); _dtls_transport->SendApplicationData(data, len);
} }
@ -274,8 +304,18 @@ void WebRtcTransport::OnSctpAssociationMessageReceived(
InfoL << getIdentifier() << " " << streamId << " " << ppid << " " << len << " " << string((char *)msg, len); InfoL << getIdentifier() << " " << streamId << " " << ppid << " " << len << " " << string((char *)msg, len);
RTC::SctpStreamParameters params; RTC::SctpStreamParameters params;
params.streamId = streamId; params.streamId = streamId;
// 回显数据
_sctp->SendSctpMessage(params, ppid, msg, len); GET_CONFIG(bool, datachannel_echo, Rtc::kDataChannelEcho);
if (datachannel_echo) {
// 回显数据
_sctp->SendSctpMessage(params, ppid, msg, len);
}
try {
NOTICE_EMIT(BroadcastRtcSctpReceivedArgs, Broadcast::kBroadcastRtcSctpReceived, *this, streamId, ppid, msg, len);
} catch (std::exception &ex) {
WarnL << "Exception occurred: " << ex.what();
}
} }
#endif #endif