diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 6e5f08d8..25062620 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 6e5f08d89cd6fd06a4185f5de937ef629c24a83e +Subproject commit 25062620233c62475aaffc0a9960e2689d8418ce diff --git a/src/Http/WebSocketClient.h b/src/Http/WebSocketClient.h index 5152bd57..7fd0b177 100644 --- a/src/Http/WebSocketClient.h +++ b/src/Http/WebSocketClient.h @@ -72,9 +72,10 @@ class HttpWsClient : public HttpClientImp , public WebSocketSplitter{ public: typedef shared_ptr Ptr; - HttpWsClient(ClientTypeImp &delegate) : _delegate(delegate){ + HttpWsClient(const std::shared_ptr > &delegate) : _weak_delegate(delegate), + _delegate(*delegate) { _Sec_WebSocket_Key = encodeBase64(makeRandStr(16, false)); - setPoller(delegate.getPoller()); + setPoller(_delegate.getPoller()); } ~HttpWsClient(){} @@ -153,14 +154,20 @@ protected: //TcpClient override + void onRecv(const Buffer::Ptr &buf) override { + auto strong_ref = _weak_delegate.lock();; + HttpClientImp::onRecv(buf); + } + /** * 定时触发 */ void onManager() override { - if(_onRecv){ + auto strong_ref = _weak_delegate.lock();; + if (_onRecv) { //websocket连接成功了 _delegate.onManager(); - } else{ + } else { //websocket连接中... HttpClientImp::onManager(); } @@ -169,11 +176,12 @@ protected: /** * 数据全部发送完毕后回调 */ - void onFlush() override{ - if(_onRecv){ + void onFlush() override { + auto strong_ref = _weak_delegate.lock();; + if (_onRecv) { //websocket连接成功了 _delegate.onFlush(); - } else{ + } else { //websocket连接中... HttpClientImp::onFlush(); } @@ -182,8 +190,9 @@ protected: /** * tcp连接结果 */ - void onConnect(const SockException &ex) override{ - if(ex){ + void onConnect(const SockException &ex) override { + auto strong_ref = _weak_delegate.lock();; + if (ex) { //tcp连接失败,直接返回失败 onWebSocketException(ex); return; @@ -195,7 +204,8 @@ protected: /** * tcp连接断开 */ - void onErr(const SockException &ex) override{ + void onErr(const SockException &ex) override { + auto strong_ref = _weak_delegate.lock();; //tcp断开或者shutdown导致的断开 onWebSocketException(ex); } @@ -335,6 +345,7 @@ private: private: string _Sec_WebSocket_Key; function _onRecv; + weak_ptr > _weak_delegate; ClientTypeImp &_delegate; string _payload_section; string _payload_cache; @@ -354,7 +365,6 @@ public: template WebSocketClient(ArgsType &&...args) : ClientTypeImp(std::forward(args)...){ - _wsClient.reset(new HttpWsClient(*this)); } ~WebSocketClient() override { _wsClient->closeWsClient(); @@ -377,10 +387,14 @@ public: //明文ws ws_url = StrPrinter << "ws://" + host << ":" << port << "/"; } - _wsClient->startWsClient(ws_url, timeout_sec); + startWebSocket(ws_url, timeout_sec); } void startWebSocket(const string &ws_url,float fTimeOutSec = 3){ + _wsClient = std::make_shared >(static_pointer_cast(this->shared_from_this())); + _wsClient->setOnCreateSocket([this](const EventPoller::Ptr &){ + return this->createSocket(); + }); _wsClient->startWsClient(ws_url,fTimeOutSec); }