From 11a7d1e6c4a7532ec5fecfa13c9c702708d83dea Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 26 Dec 2019 21:22:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90tcp=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8/=E5=AE=A2=E6=88=B7=E7=AB=AF=20c=20api?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/include/events_objects.h | 19 +-- api/include/mediakit.h | 2 +- api/include/tcp.h | 226 +++++++++++++++++++++++++ api/include/websocket.h | 105 ------------ api/source/common.cpp | 4 +- api/source/events_objects.cpp | 54 ------ api/source/tcp.cpp | 299 ++++++++++++++++++++++++++++++++++ api/source/websocket.cpp | 110 ------------- api/tests/websocket.c | 156 ++++++++++++++---- src/Http/WebSocketClient.h | 12 +- 10 files changed, 658 insertions(+), 329 deletions(-) create mode 100644 api/include/tcp.h delete mode 100644 api/include/websocket.h create mode 100644 api/source/tcp.cpp delete mode 100644 api/source/websocket.cpp diff --git a/api/include/events_objects.h b/api/include/events_objects.h index e24a1603..5881f0f7 100644 --- a/api/include/events_objects.h +++ b/api/include/events_objects.h @@ -27,6 +27,7 @@ #ifndef MK_EVENT_OBJECTS_H #define MK_EVENT_OBJECTS_H #include "common.h" +#include "tcp.h" #ifdef __cplusplus extern "C" { #endif @@ -123,24 +124,6 @@ API_EXPORT void API_CALL mk_media_source_find(const char *schema, //MediaSource::for_each_media() API_EXPORT void API_CALL mk_media_source_for_each(void *user_data, on_mk_media_source_find_cb cb); -///////////////////////////////////////////TcpSession///////////////////////////////////////////// -//TcpSession对象的C映射 -typedef void* mk_tcp_session; -//TcpSession::safeShutdown() -API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg); -//TcpSession::get_peer_ip() -API_EXPORT const char* API_CALL mk_tcp_session_peer_ip(const mk_tcp_session ctx); -//TcpSession::get_local_ip() -API_EXPORT const char* API_CALL mk_tcp_session_local_ip(const mk_tcp_session ctx); -//TcpSession::get_peer_port() -API_EXPORT uint16_t API_CALL mk_tcp_session_peer_port(const mk_tcp_session ctx); -//TcpSession::get_local_port() -API_EXPORT uint16_t API_CALL mk_tcp_session_local_port(const mk_tcp_session ctx); -//TcpSession::send() -API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx,const char *data,int len); -//切换到该对象所在线程后再TcpSession::send() -API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx,const char *data,int len); - ///////////////////////////////////////////HttpBody///////////////////////////////////////////// //HttpBody对象的C映射 typedef void* mk_http_body; diff --git a/api/include/mediakit.h b/api/include/mediakit.h index de08c278..6feb9eaf 100755 --- a/api/include/mediakit.h +++ b/api/include/mediakit.h @@ -35,6 +35,6 @@ #include "player.h" #include "pusher.h" #include "events.h" -#include "websocket.h" +#include "tcp.h" #endif /* MK_API_H_ */ diff --git a/api/include/tcp.h b/api/include/tcp.h new file mode 100644 index 00000000..b0511e86 --- /dev/null +++ b/api/include/tcp.h @@ -0,0 +1,226 @@ +/* + * MIT License + * + * Copyright (c) 2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#ifndef MK_TCP_H +#define MK_TCP_H + +#include "common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +///////////////////////////////////////////TcpSession///////////////////////////////////////////// +//TcpSession对象的C映射 +typedef void* mk_tcp_session; +//TcpSession::safeShutdown() +API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg); +//TcpSession::get_peer_ip() +API_EXPORT const char* API_CALL mk_tcp_session_peer_ip(const mk_tcp_session ctx); +//TcpSession::get_local_ip() +API_EXPORT const char* API_CALL mk_tcp_session_local_ip(const mk_tcp_session ctx); +//TcpSession::get_peer_port() +API_EXPORT uint16_t API_CALL mk_tcp_session_peer_port(const mk_tcp_session ctx); +//TcpSession::get_local_port() +API_EXPORT uint16_t API_CALL mk_tcp_session_local_port(const mk_tcp_session ctx); +//TcpSession::send() +API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx,const char *data,int len); +//切换到该对象所在线程后再TcpSession::send() +API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx,const char *data,int len); + +///////////////////////////////////////////自定义tcp服务///////////////////////////////////////////// + +typedef struct { + /** + * 收到mk_tcp_session创建对象 + * @param server_port 服务器端口号 + * @param session 会话处理对象 + */ + void (API_CALL *on_mk_tcp_session_create)(uint16_t server_port,mk_tcp_session session); + + /** + * 收到客户端发过来的数据 + * @param server_port 服务器端口号 + * @param session 会话处理对象 + * @param data 数据指针 + * @param len 数据长度 + */ + void (API_CALL *on_mk_tcp_session_data)(uint16_t server_port,mk_tcp_session session,const char *data,int len); + + /** + * 每隔2秒的定时器,用于管理超时等任务 + * @param server_port 服务器端口号 + * @param session 会话处理对象 + */ + void (API_CALL *on_mk_tcp_session_manager)(uint16_t server_port,mk_tcp_session session); + + /** + * 一般由于客户端断开tcp触发 + * @param server_port 服务器端口号 + * @param session 会话处理对象 + * @param code 错误代码 + * @param msg 错误提示 + */ + void (API_CALL *on_mk_tcp_session_disconnect)(uint16_t server_port,mk_tcp_session session,int code,const char *msg); +} mk_tcp_session_events; + + +typedef enum { + //普通的tcp + mk_type_tcp = 0, + //ssl类型的tcp + mk_type_ssl = 1, + //基于websocket的连接 + mk_type_ws = 2, + //基于ssl websocket的连接 + mk_type_wss = 3 +}mk_tcp_type; + +/** + * tcp会话对象附着用户数据 + * 该函数只对mk_tcp_server_server_start启动的服务类型有效 + * @param session 会话对象 + * @param user_data 用户数据指针 + */ +API_EXPORT void API_CALL mk_tcp_session_set_user_data(mk_tcp_session session,void *user_data); + +/** + * 获取tcp会话对象上附着的用户数据 + * 该函数只对mk_tcp_server_server_start启动的服务类型有效 + * @param session tcp会话对象 + * @return 用户数据指针 + */ +API_EXPORT void* API_CALL mk_tcp_session_get_user_data(mk_tcp_session session); + +/** + * 开启tcp服务器 + * @param port 监听端口号,0则为随机 + * @param type 服务器类型 + */ +API_EXPORT uint16_t API_CALL mk_tcp_server_server_start(uint16_t port, mk_tcp_type type); + +/** + * 监听tcp服务器事件 + */ +API_EXPORT void API_CALL mk_tcp_server_events_listen(const mk_tcp_session_events *events); + + +///////////////////////////////////////////自定义tcp客户端///////////////////////////////////////////// + +typedef void* mk_tcp_client; + +typedef struct { + /** + * tcp客户端连接服务器成功或失败回调 + * @param client tcp客户端 + * @param code 0为连接成功,否则为失败原因 + * @param msg 连接失败错误提示 + */ + void (API_CALL *on_mk_tcp_client_connect)(mk_tcp_client client,int code,const char *msg); + + /** + * tcp客户端与tcp服务器之间断开回调 + * 一般是eof事件导致 + * @param client tcp客户端 + * @param code 错误代码 + * @param msg 错误提示 + */ + void (API_CALL *on_mk_tcp_client_disconnect)(mk_tcp_client client,int code,const char *msg); + + /** + * 收到tcp服务器发来的数据 + * @param client tcp客户端 + * @param data 数据指针 + * @param len 数据长度 + */ + void (API_CALL *on_mk_tcp_client_data)(mk_tcp_client client,const char *data,int len); + + /** + * 每隔2秒的定时器,用于管理超时等任务 + * @param client tcp客户端 + */ + void (API_CALL *on_mk_tcp_client_manager)(mk_tcp_client client); +} mk_tcp_client_events; + +/** + * 创建tcp客户端 + * @param events 回调函数结构体 + * @param user_data 用户数据指针 + * @param type 客户端类型 + * @return 客户端对象 + */ +API_EXPORT mk_tcp_client API_CALL mk_tcp_client_create(mk_tcp_client_events *events, mk_tcp_type type); + +/** + * 释放tcp客户端 + * @param ctx 客户端对象 + */ +API_EXPORT void API_CALL mk_tcp_client_release(mk_tcp_client ctx); + +/** + * 发起连接 + * @param ctx 客户端对象 + * @param host 服务器ip或域名 + * @param port 服务器端口号 + * @param time_out_sec 超时时间 + */ +API_EXPORT void API_CALL mk_tcp_client_connect(mk_tcp_client ctx, const char *host, uint16_t port, float time_out_sec); + +/** + * 非线程安全的发送数据 + * 开发者如果能确保在本对象网络线程内,可以调用此此函数 + * @param ctx 客户端对象 + * @param data 数据指针 + * @param len 数据长度,等于0时,内部通过strlen获取 + */ +API_EXPORT void API_CALL mk_tcp_client_send(mk_tcp_client ctx, const char *data, int len); + +/** + * 切换到本对象的网络线程后再发送数据 + * @param ctx 客户端对象 + * @param data 数据指针 + * @param len 数据长度,等于0时,内部通过strlen获取 + */ +API_EXPORT void API_CALL mk_tcp_client_send_safe(mk_tcp_client ctx, const char *data, int len); + +/** + * 客户端附着用户数据 + * @param ctx 客户端对象 + * @param user_data 用户数据指针 + */ +API_EXPORT void API_CALL mk_tcp_client_set_user_data(mk_tcp_client ctx,void *user_data); + +/** + * 获取客户端对象上附着的用户数据 + * @param ctx 客户端对象 + * @return 用户数据指针 + */ +API_EXPORT void* API_CALL mk_tcp_client_get_user_data(mk_tcp_client ctx); + +#ifdef __cplusplus +} +#endif +#endif //MK_TCP_H diff --git a/api/include/websocket.h b/api/include/websocket.h deleted file mode 100644 index c075fd5c..00000000 --- a/api/include/websocket.h +++ /dev/null @@ -1,105 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 xiongziliang <771730766@qq.com> - * - * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -#ifndef MK_WEBSOCKET_H -#define MK_WEBSOCKET_H - -#include "common.h" -#include "events_objects.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct { - /** - * 当websocket客户端连接服务器时触发 - * @param session 会话处理对象 - */ - void (API_CALL *on_mk_websocket_session_create)(mk_tcp_session session); - - /** - * session会话对象销毁时触发 - * 请在本回调中清理释放你的用户数据 - * 本事件中不能调用mk_tcp_session_send/mk_tcp_session_send_safe函数 - * @param session 会话处理对象 - */ - void (API_CALL *on_mk_websocket_session_destory)(mk_tcp_session session); - - /** - * 收到websocket客户端发过来的数据 - * @param session 会话处理对象 - * @param data 数据指针 - * @param len 数据长度 - */ - void (API_CALL *on_mk_websocket_session_data)(mk_tcp_session session,const char *data,int len); - - /** - * 每隔2秒的定时器,用于管理超时等任务 - * @param session 会话处理对象 - */ - void (API_CALL *on_mk_websocket_session_manager)(mk_tcp_session session); - - /** - * on_mk_websocket_session_destory之前触发on_mk_websocket_session_err - * 一般由于客户端断开tcp触发 - * 本事件中可以调用mk_tcp_session_send_safe函数 - * @param session 会话处理对象 - * @param code 错误代码 - * @param msg 错误提示 - */ - void (API_CALL *on_mk_websocket_session_err)(mk_tcp_session session,int code,const char *msg); -} mk_websocket_events; - -API_EXPORT void API_CALL mk_websocket_events_listen(const mk_websocket_events *events); - -/** - * 往websocket会话对象附着用户数据 - * @param session websocket会话对象 - * @param user_data 用户数据指针 - */ -API_EXPORT void API_CALL mk_websocket_session_set_user_data(mk_tcp_session session,void *user_data); - -/** - * 获取websocket会话对象上附着的用户数据 - * @param session websocket会话对象 - * @return 用户数据指针 - */ -API_EXPORT void* API_CALL mk_websocket_session_get_user_data(mk_tcp_session session); - -/** - * 开启websocket服务器,需要指出的是,websocket服务器包含了Http服务器的所有功能 - * 调用mk_websocket_server_start后不用再调用mk_http_server_start - * @param port 端口号,0则随机 - * @param ssl 是否为wss/ws - * @return 端口号,0代表失败 - */ -API_EXPORT uint16_t API_CALL mk_websocket_server_start(uint16_t port, int ssl); - -#ifdef __cplusplus -} -#endif -#endif //MK_WEBSOCKET_H diff --git a/api/source/common.cpp b/api/source/common.cpp index ca8074c8..b96414e4 100755 --- a/api/source/common.cpp +++ b/api/source/common.cpp @@ -64,7 +64,7 @@ API_EXPORT void API_CALL mk_env_init(const mk_config *cfg) { cfg->ssl_pwd); } -extern void mk_websocket_server_stop(); +extern void stopAllTcpServer(); API_EXPORT void API_CALL mk_stop_all_server(){ CLEAR_ARR(rtsp_server); @@ -72,7 +72,7 @@ API_EXPORT void API_CALL mk_stop_all_server(){ CLEAR_ARR(http_server); udpRtpServer = nullptr; tcpRtpServer = nullptr; - mk_websocket_server_stop(); + stopAllTcpServer(); } API_EXPORT void API_CALL mk_env_init1( int thread_num, diff --git a/api/source/events_objects.cpp b/api/source/events_objects.cpp index 41a8a982..435031f0 100644 --- a/api/source/events_objects.cpp +++ b/api/source/events_objects.cpp @@ -228,60 +228,6 @@ API_EXPORT void API_CALL mk_media_source_for_each(void *user_data, on_mk_media_s }); } -///////////////////////////////////////////TcpSession///////////////////////////////////////////// -API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - session->safeShutdown(SockException((ErrCode)err,err_msg)); -} -API_EXPORT const char* API_CALL mk_tcp_session_peer_ip(const mk_tcp_session ctx){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - return session->get_peer_ip().c_str(); -} -API_EXPORT const char* API_CALL mk_tcp_session_local_ip(const mk_tcp_session ctx){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - return session->get_local_ip().c_str(); -} -API_EXPORT uint16_t API_CALL mk_tcp_session_peer_port(const mk_tcp_session ctx){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - return session->get_peer_port(); -} -API_EXPORT uint16_t API_CALL mk_tcp_session_local_port(const mk_tcp_session ctx){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - return session->get_local_port(); -} -API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx,const char *data,int len){ - assert(ctx && data); - if(!len){ - len = strlen(data); - } - TcpSession *session = (TcpSession *)ctx; - session->send(data,len); -} - -API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx,const char *data,int len){ - assert(ctx && data); - if(!len){ - len = strlen(data); - } - try { - weak_ptr weak_session = ((TcpSession *)ctx)->shared_from_this(); - string str = string(data,len); - ((TcpSession *)ctx)->async([weak_session,str](){ - auto session_session = weak_session.lock(); - if(session_session){ - session_session->send(str); - } - }); - }catch (std::exception &ex){ - WarnL << "can not got the strong pionter of this mk_tcp_session:" << ex.what(); - } -} - ///////////////////////////////////////////HttpBody///////////////////////////////////////////// API_EXPORT mk_http_body API_CALL mk_http_body_from_string(const char *str,int len){ assert(str); diff --git a/api/source/tcp.cpp b/api/source/tcp.cpp new file mode 100644 index 00000000..61dd22e4 --- /dev/null +++ b/api/source/tcp.cpp @@ -0,0 +1,299 @@ +/* + * MIT License + * + * Copyright (c) 2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "tcp.h" +#include "Network/TcpSession.h" +#include "Network/TcpClient.h" +#include "Http/WebSocketClient.h" +#include "Http/WebSocketSession.h" +using namespace mediakit; + +//////////////////////////////////////////////////////////////////////////////////////// +API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg){ + assert(ctx); + TcpSession *session = (TcpSession *)ctx; + session->safeShutdown(SockException((ErrCode)err,err_msg)); +} +API_EXPORT const char* API_CALL mk_tcp_session_peer_ip(const mk_tcp_session ctx){ + assert(ctx); + TcpSession *session = (TcpSession *)ctx; + return session->get_peer_ip().c_str(); +} +API_EXPORT const char* API_CALL mk_tcp_session_local_ip(const mk_tcp_session ctx){ + assert(ctx); + TcpSession *session = (TcpSession *)ctx; + return session->get_local_ip().c_str(); +} +API_EXPORT uint16_t API_CALL mk_tcp_session_peer_port(const mk_tcp_session ctx){ + assert(ctx); + TcpSession *session = (TcpSession *)ctx; + return session->get_peer_port(); +} +API_EXPORT uint16_t API_CALL mk_tcp_session_local_port(const mk_tcp_session ctx){ + assert(ctx); + TcpSession *session = (TcpSession *)ctx; + return session->get_local_port(); +} +API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx,const char *data,int len){ + assert(ctx && data); + if(!len){ + len = strlen(data); + } + TcpSession *session = (TcpSession *)ctx; + session->send(data,len); +} + +API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx,const char *data,int len){ + assert(ctx && data); + if(!len){ + len = strlen(data); + } + try { + weak_ptr weak_session = ((TcpSession *)ctx)->shared_from_this(); + string str = string(data,len); + ((TcpSession *)ctx)->async([weak_session,str](){ + auto session_session = weak_session.lock(); + if(session_session){ + session_session->send(str); + } + }); + }catch (std::exception &ex){ + WarnL << "can not got the strong pionter of this mk_tcp_session:" << ex.what(); + } +} + +////////////////////////////////////////TcpSessionForC//////////////////////////////////////////////// +static TcpServer::Ptr s_tcp_server[4]; +static mk_tcp_session_events s_events_server = {0}; + +class TcpSessionForC : public TcpSession { +public: + TcpSessionForC(const Socket::Ptr &pSock) ; + ~TcpSessionForC() override = default; + void onRecv(const Buffer::Ptr &buffer) override ; + void onError(const SockException &err) override; + void onManager() override; + void *_user_data; + uint16_t _local_port; +}; + +TcpSessionForC::TcpSessionForC(const Socket::Ptr &pSock) : TcpSession(pSock) { + _local_port = get_local_port(); + if (s_events_server.on_mk_tcp_session_create) { + s_events_server.on_mk_tcp_session_create(_local_port,this); + } +} + +void TcpSessionForC::onRecv(const Buffer::Ptr &buffer) { + if (s_events_server.on_mk_tcp_session_data) { + s_events_server.on_mk_tcp_session_data(_local_port,this, buffer->data(), buffer->size()); + } +} + +void TcpSessionForC::onError(const SockException &err) { + if (s_events_server.on_mk_tcp_session_disconnect) { + s_events_server.on_mk_tcp_session_disconnect(_local_port,this, err.getErrCode(), err.what()); + } +} + +void TcpSessionForC::onManager() { + if (s_events_server.on_mk_tcp_session_manager) { + s_events_server.on_mk_tcp_session_manager(_local_port,this); + } +} + +void stopAllTcpServer(){ + CLEAR_ARR(s_tcp_server); +} + +API_EXPORT void API_CALL mk_tcp_session_set_user_data(mk_tcp_session session,void *user_data){ + assert(session); + TcpSessionForC *obj = (TcpSessionForC *)session; + obj->_user_data = user_data; +} + +API_EXPORT void* API_CALL mk_tcp_session_get_user_data(mk_tcp_session session){ + assert(session); + TcpSessionForC *obj = (TcpSessionForC *)session; + return obj->_user_data; +} + +API_EXPORT void API_CALL mk_tcp_server_events_listen(const mk_tcp_session_events *events){ + if (events) { + memcpy(&s_events_server, events, sizeof(s_events_server)); + } else { + memset(&s_events_server, 0, sizeof(s_events_server)); + } +} + +API_EXPORT uint16_t API_CALL mk_tcp_server_server_start(uint16_t port, mk_tcp_type type){ + type = MAX(mk_type_tcp, MIN(type, mk_type_wss)); + try { + s_tcp_server[type] = std::make_shared(); + switch (type) { + case mk_type_tcp: + s_tcp_server[type]->start(port); + break; + case mk_type_ssl: + s_tcp_server[type]->start >(port); + break; + case mk_type_ws: + s_tcp_server[type]->start>(port); + break; + case mk_type_wss: + s_tcp_server[type]->start>(port); + break; + default: + return 0; + } + return s_tcp_server[type]->getPort(); + } catch (std::exception &ex) { + s_tcp_server[type].reset(); + WarnL << ex.what(); + return 0; + } +} + +///////////////////////////////////////////////////TcpClientForC///////////////////////////////////////////////////////// +class TcpClientForC : public TcpClient { +public: + typedef std::shared_ptr Ptr; + TcpClientForC(mk_tcp_client_events *events) ; + ~TcpClientForC() override ; + void onRecv(const Buffer::Ptr &pBuf) override; + void onErr(const SockException &ex) override; + void onManager() override; + void onConnect(const SockException &ex) override; + void setClient(mk_tcp_client client); + void *_user_data; +private: + mk_tcp_client_events _events; + mk_tcp_client _client; +}; + +TcpClientForC::TcpClientForC(mk_tcp_client_events *events){ + _events = *events; +} + + +void TcpClientForC::onRecv(const Buffer::Ptr &pBuf) { + if(_events.on_mk_tcp_client_data){ + _events.on_mk_tcp_client_data(_client,pBuf->data(),pBuf->size()); + } +} + +void TcpClientForC::onErr(const SockException &ex) { + if(_events.on_mk_tcp_client_disconnect){ + _events.on_mk_tcp_client_disconnect(_client,ex.getErrCode(),ex.what()); + } +} + +void TcpClientForC::onManager() { + if(_events.on_mk_tcp_client_manager){ + _events.on_mk_tcp_client_manager(_client); + } +} + +void TcpClientForC::onConnect(const SockException &ex) { + if(_events.on_mk_tcp_client_connect){ + _events.on_mk_tcp_client_connect(_client,ex.getErrCode(),ex.what()); + } +} + +TcpClientForC::~TcpClientForC() { + TraceL << "mk_tcp_client_release:" << _client; +} + +void TcpClientForC::setClient(mk_tcp_client client) { + _client = client; + TraceL << "mk_tcp_client_create:" << _client; +} + +TcpClientForC::Ptr *mk_tcp_client_create_l(mk_tcp_client_events *events, mk_tcp_type type){ + assert(events); + type = MAX(mk_type_tcp, MIN(type, mk_type_wss)); + switch (type) { + case mk_type_tcp: + return new TcpClientForC::Ptr(new TcpClientForC(events)); + case mk_type_ssl: + return (TcpClientForC::Ptr *)new shared_ptr >(new TcpSessionWithSSL(events)); + case mk_type_ws: + return (TcpClientForC::Ptr *)new shared_ptr >(new WebSocketClient(events)); + case mk_type_wss: + return (TcpClientForC::Ptr *)new shared_ptr >(new WebSocketClient(events)); + default: + return nullptr; + } +} + +API_EXPORT mk_tcp_client API_CALL mk_tcp_client_create(mk_tcp_client_events *events, mk_tcp_type type){ + auto ret = mk_tcp_client_create_l(events,type); + (*ret)->setClient(ret); + return ret; +} + +API_EXPORT void API_CALL mk_tcp_client_release(mk_tcp_client ctx){ + assert(ctx); + TcpClient::Ptr *client = (TcpClient::Ptr *)ctx; + delete client; +} + +API_EXPORT void API_CALL mk_tcp_client_connect(mk_tcp_client ctx, const char *host, uint16_t port, float time_out_sec){ + assert(ctx); + TcpClient::Ptr *client = (TcpClient::Ptr *)ctx; + (*client)->startConnect(host,port); +} + +API_EXPORT void API_CALL mk_tcp_client_send(mk_tcp_client ctx, const char *data, int len){ + assert(ctx && data); + TcpClient::Ptr *client = (TcpClient::Ptr *)ctx; + (*client)->send(data,len); +} + +API_EXPORT void API_CALL mk_tcp_client_send_safe(mk_tcp_client ctx, const char *data, int len){ + assert(ctx && data); + TcpClient::Ptr *client = (TcpClient::Ptr *)ctx; + weak_ptr weakClient = *client; + Buffer::Ptr buf = (*client)->obtainBuffer(data,len); + (*client)->async([weakClient,buf](){ + auto strongClient = weakClient.lock(); + if(strongClient){ + strongClient->send(buf); + } + }); +} + +API_EXPORT void API_CALL mk_tcp_client_set_user_data(mk_tcp_client ctx,void *user_data){ + assert(ctx); + TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx; + (*client)->_user_data = user_data; +} + +API_EXPORT void* API_CALL mk_tcp_client_get_user_data(mk_tcp_client ctx){ + assert(ctx); + TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx; + return (*client)->_user_data; +} diff --git a/api/source/websocket.cpp b/api/source/websocket.cpp deleted file mode 100644 index 034a5a24..00000000 --- a/api/source/websocket.cpp +++ /dev/null @@ -1,110 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 xiongziliang <771730766@qq.com> - * - * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -#include "websocket.h" -#include "Http/HttpSession.h" -#include "Http/WebSocketSession.h" -using namespace mediakit; - -static TcpServer::Ptr websocket_server[2]; -static mk_websocket_events s_events = {0}; - -class WebSocketSessionImp : public TcpSession { -public: - WebSocketSessionImp(const Socket::Ptr &pSock) : TcpSession(pSock){ - if(s_events.on_mk_websocket_session_create){ - s_events.on_mk_websocket_session_create(this); - } - } - - virtual ~WebSocketSessionImp(){ - if(s_events.on_mk_websocket_session_destory){ - s_events.on_mk_websocket_session_destory(this); - } - } - - void onRecv(const Buffer::Ptr &buffer) override { - if(s_events.on_mk_websocket_session_data){ - s_events.on_mk_websocket_session_data(this,buffer->data(),buffer->size()); - } - } - - void onError(const SockException &err) override{ - if(s_events.on_mk_websocket_session_err){ - s_events.on_mk_websocket_session_err(this,err.getErrCode(),err.what()); - } - } - - void onManager() override{ - if(s_events.on_mk_websocket_session_manager){ - s_events.on_mk_websocket_session_manager(this); - } - } - - void *_user_data; -}; - -API_EXPORT void API_CALL mk_websocket_events_listen(const mk_websocket_events *events){ - if(events){ - memcpy(&s_events,events, sizeof(s_events)); - }else{ - memset(&s_events,0,sizeof(s_events)); - } -} - -API_EXPORT void API_CALL mk_websocket_session_set_user_data(mk_tcp_session session,void *user_data){ - assert(session); - WebSocketSessionImp *obj = (WebSocketSessionImp *)session; - obj->_user_data = user_data; -} - -API_EXPORT void* API_CALL mk_websocket_session_get_user_data(mk_tcp_session session){ - assert(session); - WebSocketSessionImp *obj = (WebSocketSessionImp *)session; - return obj->_user_data; -} - -API_EXPORT uint16_t API_CALL mk_websocket_server_start(uint16_t port, int ssl){ - ssl = MAX(0,MIN(ssl,1)); - try { - websocket_server[ssl] = std::make_shared(); - if(ssl){ - websocket_server[ssl]->start>(port); - }else{ - websocket_server[ssl]->start>(port); - } - return websocket_server[ssl]->getPort(); - } catch (std::exception &ex) { - websocket_server[ssl].reset(); - WarnL << ex.what(); - return 0; - } -} - -void mk_websocket_server_stop(){ - CLEAR_ARR(websocket_server); -} - diff --git a/api/tests/websocket.c b/api/tests/websocket.c index f9eaf7b1..77411d6a 100644 --- a/api/tests/websocket.c +++ b/api/tests/websocket.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "mediakit.h" #ifdef _WIN32 #include "windows.h" @@ -35,42 +36,37 @@ #endif #define LOG_LEV 4 +#define TCP_TYPE mk_type_wss +static int flag = 1; +static void s_on_exit(int sig){ + flag = 0; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////// typedef struct { mk_tcp_session _session; //下面你可以夹杂你的私货数据 char your_some_useful_data[1024]; -} websocket_user_data; +} tcp_session_user_data; /** - * 当websocket客户端连接服务器时触发 + * 当tcp客户端连接服务器时触发 * @param session 会话处理对象 */ -void API_CALL on_mk_websocket_session_create(mk_tcp_session session){ +void API_CALL on_mk_tcp_session_create(uint16_t server_port,mk_tcp_session session){ log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session)); - websocket_user_data *user_data = malloc(sizeof(websocket_user_data)); + tcp_session_user_data *user_data = malloc(sizeof(tcp_session_user_data)); user_data->_session = session; - mk_websocket_session_set_user_data(session,user_data); + mk_tcp_session_set_user_data(session,user_data); } /** - * session会话对象销毁时触发 - * 请在本回调中清理释放你的用户数据 - * 本事件中不能调用mk_tcp_session_send/mk_tcp_session_send_safe函数 - * @param session 会话处理对象 - */ -void API_CALL on_mk_websocket_session_destory(mk_tcp_session session){ - log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session)); - websocket_user_data *user_data = (websocket_user_data *)mk_websocket_session_get_user_data(session); - free(user_data); -} - -/** - * 收到websocket客户端发过来的数据 + * 收到tcp客户端发过来的数据 * @param session 会话处理对象 * @param data 数据指针 * @param len 数据长度 */ -void API_CALL on_mk_websocket_session_data(mk_tcp_session session,const char *data,int len){ +void API_CALL on_mk_tcp_session_data(uint16_t server_port,mk_tcp_session session,const char *data,int len){ log_printf(LOG_LEV,"from %s %d, data[%d]: %s",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session), len,data); mk_tcp_session_send(session,"echo:",0); mk_tcp_session_send(session,data,len); @@ -80,42 +76,132 @@ void API_CALL on_mk_websocket_session_data(mk_tcp_session session,const char *da * 每隔2秒的定时器,用于管理超时等任务 * @param session 会话处理对象 */ -void API_CALL on_mk_websocket_session_manager(mk_tcp_session session){ +void API_CALL on_mk_tcp_session_manager(uint16_t server_port,mk_tcp_session session){ log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session)); } /** - * on_mk_websocket_session_destory之前触发on_mk_websocket_session_err * 一般由于客户端断开tcp触发 * 本事件中可以调用mk_tcp_session_send_safe函数 * @param session 会话处理对象 * @param code 错误代码 * @param msg 错误提示 */ -void API_CALL on_mk_websocket_session_err(mk_tcp_session session,int code,const char *msg){ - log_printf(LOG_LEV,"%s %d will destory: %d %s",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session),code,msg); +void API_CALL on_mk_tcp_session_disconnect(uint16_t server_port,mk_tcp_session session,int code,const char *msg){ + log_printf(LOG_LEV,"%s %d: %d %s",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session),code,msg); + tcp_session_user_data *user_data = (tcp_session_user_data *)mk_tcp_session_get_user_data(session); + free(user_data); } -static int flag = 1; -static void s_on_exit(int sig){ - flag = 0; +//////////////////////////////////////////////////////////////////////////////////////////////////////////// +typedef struct { + mk_tcp_client client; + //下面你可以夹杂你的私货数据 + char your_some_useful_data[1024]; + int count; +} tcp_client_user_data; + +/** + * tcp客户端连接服务器成功或失败回调 + * @param client tcp客户端 + * @param code 0为连接成功,否则为失败原因 + * @param msg 连接失败错误提示 + */ +void API_CALL on_mk_tcp_client_connect(mk_tcp_client client,int code,const char *msg){ + log_printf(LOG_LEV,"connect result:%d %s",code,msg); + if(code == 0){ + //连接上后我们发送一个hello world测试数据 + mk_tcp_client_send(client,"hello world",0); + }else{ + tcp_client_user_data *user_data = mk_tcp_client_get_user_data(client); + mk_tcp_client_release(client); + free(user_data); + } } + +/** + * tcp客户端与tcp服务器之间断开回调 + * 一般是eof事件导致 + * @param client tcp客户端 + * @param code 错误代码 + * @param msg 错误提示 + */ +void API_CALL on_mk_tcp_client_disconnect(mk_tcp_client client,int code,const char *msg){ + log_printf(LOG_LEV,"disconnect:%d %s",code,msg); + //服务器主动断开了,我们销毁对象 + tcp_client_user_data *user_data = mk_tcp_client_get_user_data(client); + mk_tcp_client_release(client); + free(user_data); +} + +/** + * 收到tcp服务器发来的数据 + * @param client tcp客户端 + * @param data 数据指针 + * @param len 数据长度 + */ +void API_CALL on_mk_tcp_client_data(mk_tcp_client client,const char *data,int len){ + log_printf(LOG_LEV,"data[%d]:%s",len,data); +} + +/** + * 每隔2秒的定时器,用于管理超时等任务 + * @param client tcp客户端 + */ +void API_CALL on_mk_tcp_client_manager(mk_tcp_client client){ + tcp_client_user_data *user_data = mk_tcp_client_get_user_data(client); + char buf[1024]; + sprintf(buf,"on_mk_tcp_client_manager:%d",user_data->count); + mk_tcp_client_send(client,buf,0); + + if(++user_data->count == 5){ + //发送5遍后主动释放对象 + mk_tcp_client_release(client); + free(user_data); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////// +void test_server(){ + mk_tcp_session_events events_server = { + .on_mk_tcp_session_create = on_mk_tcp_session_create, + .on_mk_tcp_session_data = on_mk_tcp_session_data, + .on_mk_tcp_session_manager = on_mk_tcp_session_manager, + .on_mk_tcp_session_disconnect = on_mk_tcp_session_disconnect + }; + + mk_tcp_server_events_listen(&events_server); + mk_tcp_server_server_start(80,TCP_TYPE); +} + +void test_client(){ + mk_tcp_client_events events_clent = { + .on_mk_tcp_client_connect = on_mk_tcp_client_connect, + .on_mk_tcp_client_data = on_mk_tcp_client_data, + .on_mk_tcp_client_disconnect = on_mk_tcp_client_disconnect, + .on_mk_tcp_client_manager = on_mk_tcp_client_manager, + }; + mk_tcp_client client = mk_tcp_client_create(&events_clent, TCP_TYPE); + + tcp_client_user_data *user_data = (tcp_client_user_data *)malloc(sizeof(tcp_client_user_data)); + user_data->client = client; + user_data->count = 0; + mk_tcp_client_set_user_data(client,user_data); + + mk_tcp_client_connect(client, "121.40.165.18", 8800, 3); + //你可以连接127.0.0.1 80测试 + //mk_tcp_client_connect(client, "127.0.0.1", 80, 3); +} + int main(int argc, char *argv[]) { char ini_path[2048] = {0}; strcpy(ini_path,argv[0]); strcat(ini_path,".ini"); mk_env_init1(0, 0, 1, ini_path, 0, NULL, NULL); - mk_websocket_events events = { - .on_mk_websocket_session_create = on_mk_websocket_session_create, - .on_mk_websocket_session_destory = on_mk_websocket_session_destory, - .on_mk_websocket_session_data = on_mk_websocket_session_data, - .on_mk_websocket_session_manager = on_mk_websocket_session_manager, - .on_mk_websocket_session_err = on_mk_websocket_session_err - }; - mk_websocket_events_listen(&events); - mk_websocket_server_start(80,0); + test_server(); + test_client(); signal(SIGINT, s_on_exit );// 设置退出信号 while (flag) { diff --git a/src/Http/WebSocketClient.h b/src/Http/WebSocketClient.h index 6eeae0ac..cb140757 100644 --- a/src/Http/WebSocketClient.h +++ b/src/Http/WebSocketClient.h @@ -348,21 +348,25 @@ public: /** * 重载startConnect方法, * 目的是替换TcpClient的连接服务器行为,使之先完成WebSocket握手 - * @param strUrl websocket服务器ip或域名 + * @param host websocket服务器ip或域名 * @param iPort websocket服务器端口 * @param fTimeOutSec 超时时间 */ - void startConnect(const string &strUrl, uint16_t iPort, float fTimeOutSec = 3) override { + void startConnect(const string &host, uint16_t iPort, float fTimeOutSec = 3) override { string ws_url; if(useWSS){ //加密的ws - ws_url = StrPrinter << "wss://" + strUrl << ":" << iPort << "/" ; + ws_url = StrPrinter << "wss://" + host << ":" << iPort << "/" ; }else{ //明文ws - ws_url = StrPrinter << "ws://" + strUrl << ":" << iPort << "/" ; + ws_url = StrPrinter << "ws://" + host << ":" << iPort << "/" ; } _wsClient->startWsClient(ws_url,fTimeOutSec); } + + void startWebSocket(const string &ws_url,float fTimeOutSec = 3){ + _wsClient->startWsClient(ws_url,fTimeOutSec); + } private: typename HttpWsClient::Ptr _wsClient; };