/* * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved. * * This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit). * * Use of this source code is governed by MIT license that can be found in the * LICENSE file in the root of the source tree. All contributing project authors * may be found in the AUTHORS file in the root of the source tree. */ #ifndef NETWORK_SOCKET_H #define NETWORK_SOCKET_H #include #include #include #include #include #include #include "Util/SpeedStatistic.h" #include "sockutil.h" #include "Poller/Timer.h" #include "Poller/EventPoller.h" #include "BufferSock.h" namespace toolkit { #if defined(MSG_NOSIGNAL) #define FLAG_NOSIGNAL MSG_NOSIGNAL #else #define FLAG_NOSIGNAL 0 #endif //MSG_NOSIGNAL #if defined(MSG_MORE) #define FLAG_MORE MSG_MORE #else #define FLAG_MORE 0 #endif //MSG_MORE #if defined(MSG_DONTWAIT) #define FLAG_DONTWAIT MSG_DONTWAIT #else #define FLAG_DONTWAIT 0 #endif //MSG_DONTWAIT //默认的socket flags:不触发SIGPIPE,非阻塞发送 #define SOCKET_DEFAULE_FLAGS (FLAG_NOSIGNAL | FLAG_DONTWAIT ) //发送超时时间,如果在规定时间内一直没有发送数据成功,那么将触发onErr事件 #define SEND_TIME_OUT_SEC 10 //错误类型枚举 typedef enum { Err_success = 0, //成功 success Err_eof, //eof Err_timeout, //超时 socket timeout Err_refused,//连接被拒绝 socket refused Err_reset,//连接被重置 socket reset Err_dns,//dns解析失败 dns resolve failed Err_shutdown,//主动关闭 socket shutdown Err_other = 0xFF,//其他错误 other error } ErrCode; //错误信息类 class SockException : public std::exception { public: SockException(ErrCode code = Err_success, const std::string &msg = "", int custom_code = 0) { _msg = msg; _code = code; _custom_code = custom_code; } //重置错误 void reset(ErrCode code, const std::string &msg, int custom_code = 0) { _msg = msg; _code = code; _custom_code = custom_code; } //错误提示 const char *what() const noexcept override { return _msg.c_str(); } //错误代码 ErrCode getErrCode() const { return _code; } //用户自定义错误代码 int getCustomCode() const { return _custom_code; } //判断是否真的有错 operator bool() const { return _code != Err_success; } private: ErrCode _code; int _custom_code = 0; std::string _msg; }; //std::cout等输出流可以直接输出SockException对象 std::ostream &operator<<(std::ostream &ost, const SockException &err); class SockNum { public: using Ptr = std::shared_ptr; typedef enum { Sock_Invalid = -1, Sock_TCP = 0, Sock_UDP = 1, Sock_TCP_Server = 2 } SockType; SockNum(int fd, SockType type) { _fd = fd; _type = type; } ~SockNum() { #if defined (OS_IPHONE) unsetSocketOfIOS(_fd); #endif //OS_IPHONE // 停止socket收发能力 #if defined(_WIN32) ::shutdown(_fd, SD_BOTH); #else ::shutdown(_fd, SHUT_RDWR); #endif close(_fd); } int rawFd() const { return _fd; } SockType type() { return _type; } void setConnected() { #if defined (OS_IPHONE) setSocketOfIOS(_fd); #endif //OS_IPHONE } #if defined (OS_IPHONE) private: void *readStream=nullptr; void *writeStream=nullptr; bool setSocketOfIOS(int socket); void unsetSocketOfIOS(int socket); #endif //OS_IPHONE private: int _fd; SockType _type; }; //socket 文件描述符的包装 //在析构时自动溢出监听并close套接字 //防止描述符溢出 class SockFD : public noncopyable { public: using Ptr = std::shared_ptr; /** * 创建一个fd对象 * @param num 文件描述符,int数字 * @param poller 事件监听器 */ SockFD(SockNum::Ptr num, const EventPoller::Ptr &poller) { _num = std::move(num); _poller = poller; } /** * 复制一个fd对象 * @param that 源对象 * @param poller 事件监听器 */ SockFD(const SockFD &that, const EventPoller::Ptr &poller) { _num = that._num; _poller = poller; if (_poller == that._poller) { throw std::invalid_argument("Copy a SockFD with same poller"); } } ~SockFD() { delEvent(); } void delEvent() { if (_poller) { auto num = _num; // 移除io事件成功后再close fd _poller->delEvent(num->rawFd(), [num](bool) {}); _poller = nullptr; } } void setConnected() { _num->setConnected(); } int rawFd() const { return _num->rawFd(); } const SockNum::Ptr& sockNum() const { return _num; } SockNum::SockType type() { return _num->type(); } const EventPoller::Ptr& getPoller() const { return _poller; } private: SockNum::Ptr _num; EventPoller::Ptr _poller; }; template class MutexWrapper { public: MutexWrapper(bool enable) { _enable = enable; } ~MutexWrapper() = default; inline void lock() { if (_enable) { _mtx.lock(); } } inline void unlock() { if (_enable) { _mtx.unlock(); } } private: bool _enable; Mtx _mtx; }; class SockInfo { public: SockInfo() = default; virtual ~SockInfo() = default; //获取本机ip virtual std::string get_local_ip() = 0; //获取本机端口号 virtual uint16_t get_local_port() = 0; //获取对方ip virtual std::string get_peer_ip() = 0; //获取对方端口号 virtual uint16_t get_peer_port() = 0; //获取标识符 virtual std::string getIdentifier() const { return ""; } }; #define TraceP(ptr) TraceL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") " #define DebugP(ptr) DebugL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") " #define InfoP(ptr) InfoL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") " #define WarnP(ptr) WarnL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") " #define ErrorP(ptr) ErrorL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") " //异步IO Socket对象,包括tcp客户端、服务器和udp套接字 class Socket : public std::enable_shared_from_this, public noncopyable, public SockInfo { public: using Ptr = std::shared_ptr; //接收数据回调 using onReadCB = std::function; using onMultiReadCB = std::function; //发生错误回调 using onErrCB = std::function; //tcp监听接收到连接请求 using onAcceptCB = std::function &complete)>; //socket发送缓存清空事件,返回true代表下次继续监听该事件,否则停止 using onFlush = std::function; //在接收到连接请求前,拦截Socket默认生成方式 using onCreateSocket = std::function; //发送buffer成功与否回调 using onSendResult = BufferList::SendResult; /** * 构造socket对象,尚未有实质操作 * @param poller 绑定的poller线程 * @param enable_mutex 是否启用互斥锁(接口是否线程安全) */ static Ptr createSocket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true); ~Socket() override; /** * 创建tcp客户端并异步连接服务器 * @param url 目标服务器ip或域名 * @param port 目标服务器端口 * @param con_cb 结果回调 * @param timeout_sec 超时时间 * @param local_ip 绑定本地网卡ip * @param local_port 绑定本地网卡端口号 */ void connect(const std::string &url, uint16_t port, const onErrCB &con_cb, float timeout_sec = 5, const std::string &local_ip = "::", uint16_t local_port = 0); /** * 创建tcp监听服务器 * @param port 监听端口,0则随机 * @param local_ip 监听的网卡ip * @param backlog tcp最大积压数 * @return 是否成功 */ bool listen(uint16_t port, const std::string &local_ip = "::", int backlog = 1024); /** * 创建udp套接字,udp是无连接的,所以可以作为服务器和客户端 * @param port 绑定的端口为0则随机 * @param local_ip 绑定的网卡ip * @return 是否成功 */ bool bindUdpSock(uint16_t port, const std::string &local_ip = "::", bool enable_reuse = true); /** * 包装外部fd,本对象负责close fd * 内部会设置fd为NoBlocked,NoSigpipe,CloExec * 其他设置需要自行使用SockUtil进行设置 */ bool fromSock(int fd, SockNum::SockType type); /** * 从另外一个Socket克隆 * 目的是一个socket可以被多个poller对象监听,提高性能或实现Socket归属线程的迁移 * @param other 原始的socket对象 * @return 是否成功 */ bool cloneSocket(const Socket &other); ////////////设置事件回调//////////// /** * 设置数据接收回调,tcp或udp客户端有效 * @param cb 回调对象 */ void setOnRead(onReadCB cb); void setOnMultiRead(onMultiReadCB cb); /** * 设置异常事件(包括eof等)回调 * @param cb 回调对象 */ void setOnErr(onErrCB cb); /** * 设置tcp监听接收到连接回调 * @param cb 回调对象 */ void setOnAccept(onAcceptCB cb); /** * 设置socket写缓存清空事件回调 * 通过该回调可以实现发送流控 * @param cb 回调对象 */ void setOnFlush(onFlush cb); /** * 设置accept时,socket构造事件回调 * @param cb 回调 */ void setOnBeforeAccept(onCreateSocket cb); /** * 设置发送buffer结果回调 * @param cb 回调 */ void setOnSendResult(onSendResult cb); ////////////发送数据相关接口//////////// /** * 发送数据指针 * @param buf 数据指针 * @param size 数据长度 * @param addr 目标地址 * @param addr_len 目标地址长度 * @param try_flush 是否尝试写socket * @return -1代表失败(socket无效),0代表数据长度为0,否则返回数据长度 */ ssize_t send(const char *buf, size_t size = 0, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true); /** * 发送string */ ssize_t send(std::string buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true); /** * 发送Buffer对象,Socket对象发送数据的统一出口 * socket对象发送数据的统一出口 */ ssize_t send(Buffer::Ptr buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true); /** * 尝试将所有数据写socket * @return -1代表失败(socket无效或者发送超时),0代表成功? */ int flushAll(); /** * 关闭socket且触发onErr回调,onErr回调将在poller线程中进行 * @param err 错误原因 * @return 是否成功触发onErr回调 */ bool emitErr(const SockException &err) noexcept; /** * 关闭或开启数据接收 * @param enabled 是否开启 */ void enableRecv(bool enabled); /** * 获取裸文件描述符,请勿进行close操作(因为Socket对象会管理其生命周期) * @return 文件描述符 */ int rawFD() const; /** * tcp客户端是否处于连接状态 * 支持Sock_TCP类型socket */ bool alive() const; /** * 返回socket类型 */ SockNum::SockType sockType() const; /** * 设置发送超时主动断开时间;默认10秒 * @param second 发送超时数据,单位秒 */ void setSendTimeOutSecond(uint32_t second); /** * 套接字是否忙,如果套接字写缓存已满则返回true * @return 套接字是否忙 */ bool isSocketBusy() const; /** * 获取poller线程对象 * @return poller线程对象 */ const EventPoller::Ptr &getPoller() const; /** * 绑定udp 目标地址,后续发送时就不用再单独指定了 * @param dst_addr 目标地址 * @param addr_len 目标地址长度 * @param soft_bind 是否软绑定,软绑定时不调用udp connect接口,只保存目标地址信息,发送时再传递到sendto函数 * @return 是否成功 */ bool bindPeerAddr(const struct sockaddr *dst_addr, socklen_t addr_len = 0, bool soft_bind = false); /** * 设置发送flags * @param flags 发送的flag */ void setSendFlags(int flags = SOCKET_DEFAULE_FLAGS); /** * 关闭套接字 * @param close_fd 是否关闭fd还是只移除io事件监听 */ void closeSock(bool close_fd = true); /** * 获取发送缓存包个数(不是字节数) */ size_t getSendBufferCount(); /** * 获取上次socket发送缓存清空至今的毫秒数,单位毫秒 */ uint64_t elapsedTimeAfterFlushed(); /** * 获取接收速率,单位bytes/s */ int getRecvSpeed(); /** * 获取发送速率,单位bytes/s */ int getSendSpeed(); ////////////SockInfo override//////////// std::string get_local_ip() override; uint16_t get_local_port() override; std::string get_peer_ip() override; uint16_t get_peer_port() override; std::string getIdentifier() const override; private: Socket(EventPoller::Ptr poller, bool enable_mutex = true); void setSock(SockNum::Ptr sock); int onAccept(const SockNum::Ptr &sock, int event) noexcept; ssize_t onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &buffer) noexcept; void onWriteAble(const SockNum::Ptr &sock); void onConnected(const SockNum::Ptr &sock, const onErrCB &cb); void onFlushed(); void startWriteAbleEvent(const SockNum::Ptr &sock); void stopWriteAbleEvent(const SockNum::Ptr &sock); bool flushData(const SockNum::Ptr &sock, bool poller_thread); bool attachEvent(const SockNum::Ptr &sock); ssize_t send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush = true); void connect_l(const std::string &url, uint16_t port, const onErrCB &con_cb_in, float timeout_sec, const std::string &local_ip, uint16_t local_port); bool fromSock_l(SockNum::Ptr sock); private: // send socket时的flag int _sock_flags = SOCKET_DEFAULE_FLAGS; // 最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数 uint32_t _max_send_buffer_ms = SEND_TIME_OUT_SEC * 1000; // 控制是否接收监听socket可读事件,关闭后可用于流量控制 std::atomic _enable_recv { true }; // 标记该socket是否可写,socket写缓存满了就不可写 std::atomic _sendable { true }; // 是否已经触发err回调了 bool _err_emit = false; // 是否启用网速统计 bool _enable_speed = false; // udp发送目标地址 std::shared_ptr _udp_send_dst; // 接收速率统计 BytesSpeed _recv_speed; // 发送速率统计 BytesSpeed _send_speed; // tcp连接超时定时器 Timer::Ptr _con_timer; // tcp连接结果回调对象 std::shared_ptr _async_con_cb; // 记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器 Ticker _send_flush_ticker; // socket fd的抽象类 SockFD::Ptr _sock_fd; // 本socket绑定的poller线程,事件触发于此线程 EventPoller::Ptr _poller; // 跨线程访问_sock_fd时需要上锁 mutable MutexWrapper _mtx_sock_fd; // socket异常事件(比如说断开) onErrCB _on_err; // 收到数据事件 onMultiReadCB _on_multi_read; // socket缓存清空事件(可用于发送流速控制) onFlush _on_flush; // tcp监听收到accept请求事件 onAcceptCB _on_accept; // tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程) onCreateSocket _on_before_accept; // 设置上述回调函数的锁 MutexWrapper _mtx_event; // 一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存 List> _send_buf_waiting; // 一级发送缓存锁 MutexWrapper _mtx_send_buf_waiting; // 二级发送缓存, socket可写时,会把二级缓存批量写入到socket List _send_buf_sending; // 二级发送缓存锁 MutexWrapper _mtx_send_buf_sending; // 发送buffer结果回调 BufferList::SendResult _send_result; // 对象个数统计 ObjectStatistic _statistic; // 链接缓存地址,防止tcp reset 导致无法获取对端的地址 struct sockaddr_storage _local_addr; struct sockaddr_storage _peer_addr; }; class SockSender { public: SockSender() = default; virtual ~SockSender() = default; virtual ssize_t send(Buffer::Ptr buf) = 0; virtual void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) = 0; //发送char * SockSender &operator << (const char *buf); //发送字符串 SockSender &operator << (std::string buf); //发送Buffer对象 SockSender &operator << (Buffer::Ptr buf); //发送其他类型是数据 template SockSender &operator << (T &&buf) { std::ostringstream ss; ss << std::forward(buf); send(ss.str()); return *this; } ssize_t send(std::string buf); ssize_t send(const char *buf, size_t size = 0); }; //Socket对象的包装类 class SocketHelper : public SockSender, public SockInfo, public TaskExecutorInterface, public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; SocketHelper(const Socket::Ptr &sock); ~SocketHelper() override = default; ///////////////////// Socket util std::functions ///////////////////// /** * 获取poller线程 */ const EventPoller::Ptr& getPoller() const; /** * 设置批量发送标记,用于提升性能 * @param try_flush 批量发送标记 */ void setSendFlushFlag(bool try_flush); /** * 设置socket发送flags * @param flags socket发送flags */ void setSendFlags(int flags); /** * 套接字是否忙,如果套接字写缓存已满则返回true */ bool isSocketBusy() const; /** * 设置Socket创建器,自定义Socket创建方式 * @param cb 创建器 */ void setOnCreateSocket(Socket::onCreateSocket cb); /** * 创建socket对象 */ Socket::Ptr createSocket(); /** * 获取socket对象 */ const Socket::Ptr &getSock() const; /** * 尝试将所有数据写socket * @return -1代表失败(socket无效或者发送超时),0代表成功? */ int flushAll(); /** * 是否ssl加密 */ virtual bool overSsl() const { return false; } ///////////////////// SockInfo override ///////////////////// std::string get_local_ip() override; uint16_t get_local_port() override; std::string get_peer_ip() override; uint16_t get_peer_port() override; ///////////////////// TaskExecutorInterface override ///////////////////// /** * 任务切换到所属poller线程执行 * @param task 任务 * @param may_sync 是否运行同步执行任务 */ Task::Ptr async(TaskIn task, bool may_sync = true) override; Task::Ptr async_first(TaskIn task, bool may_sync = true) override; ///////////////////// SockSender override ///////////////////// /** * 使能 SockSender 其他未被重写的send重载函数 */ using SockSender::send; /** * 统一发送数据的出口 */ ssize_t send(Buffer::Ptr buf) override; /** * 触发onErr事件 */ void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) override; /** * 线程安全的脱离 Server 并触发 onError 事件 * @param ex 触发 onError 事件的原因 */ void safeShutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")); ///////////////////// event functions ///////////////////// /** * 接收数据入口 * @param buf 数据,可以重复使用内存区,不可被缓存使用 */ virtual void onRecv(const Buffer::Ptr &buf) = 0; /** * 收到 eof 或其他导致脱离 Server 事件的回调 * 收到该事件时, 该对象一般将立即被销毁 * @param err 原因 */ virtual void onError(const SockException &err) = 0; /** * 数据全部发送完毕后回调 */ virtual void onFlush() {} /** * 每隔一段时间触发, 用来做超时管理 */ virtual void onManager() = 0; protected: void setPoller(const EventPoller::Ptr &poller); void setSock(const Socket::Ptr &sock); private: bool _try_flush = true; Socket::Ptr _sock; EventPoller::Ptr _poller; Socket::onCreateSocket _on_create_socket; }; } // namespace toolkit #endif /* NETWORK_SOCKET_H */