diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 0b406073..91246bb0 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 0b406073125080ab8edd13ee7c14e573e54baa35 +Subproject commit 91246bb01475c7336040a4b7ec35d0584887f365 diff --git a/src/Http/WebSocketClient.cpp b/src/Http/WebSocketClient.cpp deleted file mode 100644 index 1ec0f7a5..00000000 --- a/src/Http/WebSocketClient.cpp +++ /dev/null @@ -1,288 +0,0 @@ -#include "WebSocketClient.h" - - - -int mediakit::WebSocketClient::send(const string& buf) -{ - if (_sock) - { - if (_WSClientStatus == WORKING) - { - _session->send(buf); - return 0; - } - else - { - return -1; - } - } -} - -void mediakit::WebSocketClient::clear() -{ - _method.clear(); - _path.clear(); - _parser.Clear(); - _recvedBodySize = 0; - _totalBodySize = 0; - _aliveTicker.resetTime(); - _chunkedSplitter.reset(); - HttpRequestSplitter::reset(); -} - -const std::string & mediakit::WebSocketClient::responseStatus() const -{ - return _parser.Url(); -} - -const mediakit::WebSocketClient::HttpHeader & mediakit::WebSocketClient::responseHeader() const -{ - return _parser.getValues(); -} - -const mediakit::Parser& mediakit::WebSocketClient::response() const -{ - return _parser; -} - -const std::string & mediakit::WebSocketClient::getUrl() const -{ - return _url; -} - -int64_t mediakit::WebSocketClient::onResponseHeader(const string &status, const HttpHeader &headers) -{ - DebugL << status; - //无Content-Length字段时默认后面全是content - return -1; -} - -void mediakit::WebSocketClient::onResponseBody(const char *buf, int64_t size, int64_t recvedSize, int64_t totalSize) -{ - DebugL << size << " " << recvedSize << " " << totalSize; -} - -void mediakit::WebSocketClient::onResponseCompleted() -{ - DebugL; -} - -int64_t mediakit::WebSocketClient::onRecvHeader(const char *data, uint64_t len) -{ - _parser.Parse(data); - if (_parser.Url() == "101") - { - switch (_WSClientStatus) - { - case HANDSHAKING: - { - StrCaseMap& valueMap = _parser.getValues(); - auto key = valueMap.find("Sec-WebSocket-Accept"); - if (key != valueMap.end() && key->second.length() > 0) { - onConnect(SockException()); - } - break; - } - } - return -1; - } - else - { - shutdown(SockException(Err_shutdown, _parser.Url().c_str())); - return 0; - } - return -1; -} - -void mediakit::WebSocketClient::onRecvContent(const char *data, uint64_t len) -{ - if (_chunkedSplitter) { - _chunkedSplitter->input(data, len); - return; - } - auto recvedBodySize = _recvedBodySize + len; - if (_totalBodySize < 0) { - //不限长度的content,最大支持INT64_MAX个字节 - onResponseBody(data, len, recvedBodySize, INT64_MAX); - _recvedBodySize = recvedBodySize; - return; - } - - //固定长度的content - if (recvedBodySize < _totalBodySize) { - //content还未接收完毕 - onResponseBody(data, len, recvedBodySize, _totalBodySize); - _recvedBodySize = recvedBodySize; - return; - } - - //content接收完毕 - onResponseBody(data, _totalBodySize - _recvedBodySize, _totalBodySize, _totalBodySize); - bool biggerThanExpected = recvedBodySize > _totalBodySize; - onResponseCompleted_l(); - if (biggerThanExpected) { - //声明的content数据比真实的小,那么我们只截取前面部分的并断开链接 - shutdown(SockException(Err_shutdown, "http response content size bigger than expected")); - } -} - -void mediakit::WebSocketClient::onConnect(const SockException &ex) -{ - _aliveTicker.resetTime(); - if (ex) { - onDisconnect(ex); - return; - } - - //先假设http客户端只会接收一点点数据(只接受http头,节省内存) - _sock->setReadBuffer(std::make_shared(1 * 1024)); - - _totalBodySize = 0; - _recvedBodySize = 0; - HttpRequestSplitter::reset(); - _chunkedSplitter.reset(); - - if (_WSClientStatus == WSCONNECT) - { - //Websocket握手 - string random = get_random(16); - auto Sec_WebSocket_Key = encodeBase64(SHA1::encode_bin(random)); - _key = Sec_WebSocket_Key; - string p = generate_websocket_client_handshake(_ip.c_str(), _port, _url.c_str(), _key.c_str()); - TcpClient::send(p); - _WSClientStatus = HANDSHAKING; - } - else if (_WSClientStatus == HANDSHAKING) - { - _WSClientStatus = WORKING; - } - - onFlush(); -} - -void mediakit::WebSocketClient::onRecv(const Buffer::Ptr &pBuf) -{ - _aliveTicker.resetTime(); - if (_WSClientStatus == HANDSHAKING || _WSClientStatus == WSCONNECT) - HttpRequestSplitter::input(pBuf->data(), pBuf->size()); - else if (_WSClientStatus == WORKING) - { - WebSocketSplitter::decode((uint8_t *)pBuf->data(), pBuf->size()); - } -} - -void mediakit::WebSocketClient::onErr(const SockException &ex) -{ - _session->onError(ex); - onDisconnect(ex); -} - -void mediakit::WebSocketClient::onManager() -{ - if (_WSClientStatus != WORKING) - { - if (_fTimeOutSec > 0 && _aliveTicker.elapsedTime() > _fTimeOutSec * 1000) { - //超时 - shutdown(SockException(Err_timeout, "ws server respone timeout")); - } - } - else - _session->onManager(); -} - -std::string mediakit::WebSocketClient::generate_websocket_client_handshake(const char* ip, uint16_t port, const char * path, const char * key) -{ - /** -* @brief 业务数据被分片的单片最大大小, 等于 65535 - 14 - 1 -*/ -#define DATA_FRAME_MAX_LEN 65520 -#define HANDSHAKE_SIZE 1024 - char buf[HANDSHAKE_SIZE] = { 0 }; - - snprintf(buf, HANDSHAKE_SIZE, - "GET %s HTTP/1.1\r\n" - "Host: %s:%d\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Sec-WebSocket-Key: %s\r\n" - "Sec-WebSocket-Version: 13\r\n" - "\r\n", - path, ip, port, key); - string temBuf(buf); - return temBuf; -} - -std::string mediakit::WebSocketClient::get_random(size_t n) -{ - random_device rd; - _StrPrinter printer; - for (int i = 0; i < n; i++) - { - unsigned int rnd = rd(); - printer << rnd % 9; - } - - return string(printer); -} - -void mediakit::WebSocketClient::onWebSocketDecodeHeader(const WebSocketHeader &packet) -{ - //新包,原来的包残余数据清空掉 - _remian_data.clear(); - - if (_firstPacket) { - //这是个WebSocket会话而不是普通的Http会话 - _firstPacket = false; - //此处截取数据并进行websocket协议打包 - } -} - -void mediakit::WebSocketClient::onWebSocketDecodePlayload(const WebSocketHeader &packet, const uint8_t *ptr, uint64_t len, uint64_t recved) -{ - _remian_data.append((char *)ptr, len); -} - -void mediakit::WebSocketClient::onWebSocketDecodeComplete(const WebSocketHeader &header_in) -{ - WebSocketHeader& header = const_cast(header_in); - auto flag = header._mask_flag; - header._mask_flag = false; - - switch (header._opcode) { - case WebSocketHeader::CLOSE: { - shutdown(SockException(Err_timeout, "session timeouted")); - } - break; - case WebSocketHeader::PING: { - const_cast(header)._opcode = WebSocketHeader::PONG; - WebSocketSplitter::encode(header, (uint8_t *)_remian_data.data(), _remian_data.size()); - } - break; - case WebSocketHeader::CONTINUATION: { - - } - break; - case WebSocketHeader::TEXT: - case WebSocketHeader::BINARY: { - BufferString::Ptr buffer = std::make_shared(_remian_data); - _session->onRecv(buffer); - } - break; - default: - break; - } - _remian_data.clear(); - header._mask_flag = flag; -} - -void mediakit::WebSocketClient::onWebSocketEncodeData(const uint8_t *ptr, uint64_t len) -{ - TcpClient::send(string((char*)ptr, len)); -} - -void mediakit::WebSocketClient::onResponseCompleted_l() -{ - _totalBodySize = 0; - _recvedBodySize = 0; - onResponseCompleted(); -} diff --git a/src/Http/WebSocketClient.h b/src/Http/WebSocketClient.h index 51a00874..5b7555e5 100644 --- a/src/Http/WebSocketClient.h +++ b/src/Http/WebSocketClient.h @@ -1,205 +1,326 @@ -锘#ifndef Http_WebSocketClient_h -#define Http_WebSocketClient_h +/* + * MIT License + * + * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#ifndef ZLMEDIAKIT_WebSocketClient_H +#define ZLMEDIAKIT_WebSocketClient_H -#include -#include -#include -#include #include "Util/util.h" -#include "Util/mini.h" -#include "Network/TcpClient.h" -#include "Common/Parser.h" -#include "HttpRequestSplitter.h" -#include "HttpCookie.h" -#include "HttpChunkedSplitter.h" -#include "strCoding.h" -#include "Http/HttpClient.h" -#include "Http/WebSocketSplitter.h" -#include "Http/WebSocketSession.h" -#include -#include -#include "Common/config.h" -#include "Util/SHA1.h" #include "Util/base64.h" - - - -using namespace std; +#include "Util/SHA1.h" +#include "Network/TcpClient.h" +#include "HttpClientImp.h" +#include "WebSocketSplitter.h" using namespace toolkit; -namespace mediakit { +namespace mediakit{ -/** -* @brief 瀹㈡埛绔殑鐘舵 -*/ -typedef enum WSClientStatus { - WSCONNECT, - HANDSHAKING, ///鎻℃墜涓 - WORKING, ///宸ヤ綔涓 -} WSClientStatus; +template +class HttpWsClient; -class WebSocketClient : public TcpClient , public HttpRequestSplitter, public WebSocketSplitter -{ +template +class ClientTypeImp : public ClientType { public: - typedef StrCaseMap HttpHeader; - typedef std::shared_ptr Ptr; - WebSocketClient() :_WSClientStatus(WSCONNECT) {} - virtual ~WebSocketClient() {} - - template - void startConnect(const string &strUrl, uint16_t iPort, float fTimeOutSec = 3) - { - _ip = strUrl; - _port = iPort; - TcpClient::startConnect(strUrl, iPort, fTimeOutSec); - - typedef function onBeforeSendCB; - /** - * 璇ョ被瀹炵幇浜員cpSession娲剧敓绫诲彂閫佹暟鎹殑鎴彇 - * 鐩殑鏄彂閫佷笟鍔℃暟鎹墠杩涜websocket鍗忚鐨勬墦鍖 - */ - class SessionImp : public SessionType { - public: - SessionImp(const Socket::Ptr &pSock) :SessionType(pSock) {} - - ~SessionImp() {} - - /** - * 璁剧疆鍙戦佹暟鎹埅鍙栧洖璋冨嚱鏁 - * @param cb 鎴彇鍥炶皟鍑芥暟 - */ - void setOnBeforeSendCB(const onBeforeSendCB &cb) { - _beforeSendCB = cb; - } - protected: - /** - * 閲嶈浇send鍑芥暟鎴彇鏁版嵁 - * @param buf 闇瑕佹埅鍙栫殑鏁版嵁 - * @return 鏁版嵁瀛楄妭鏁 - */ - int send(const Buffer::Ptr &buf) override { - if (_beforeSendCB) { - return _beforeSendCB(buf); - } - return SessionType::send(buf); - } - private: - onBeforeSendCB _beforeSendCB; - }; - std::shared_ptr temSession = std::make_shared(_sock); - //姝ゅ鎴彇鏁版嵁骞惰繘琛寃ebsocket鍗忚鎵撳寘 - weak_ptr weakSelf = dynamic_pointer_cast(WebSocketClient::shared_from_this()); - - _sock->setOnErr([weakSelf](const SockException &ex) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return; - } - strongSelf->onErr(ex); - }); - - temSession->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf) { - auto strongSelf = weakSelf.lock(); - if (strongSelf) { - WebSocketHeader header; - header._fin = true; - header._reserved = 0; - header._opcode = WebSocketHeader::BINARY; - header._mask_flag = false; - strongSelf->WebSocketSplitter::encode(header, (uint8_t *)buf->data(), buf->size()); - } - return buf->size(); - }); - _session = temSession; - _session->onManager(); - } - - virtual int send(const string& buf); - - virtual void clear(); - - const string &responseStatus() const; - - const HttpHeader &responseHeader() const; - - const Parser& response() const; - - const string &getUrl() const; + typedef function onBeforeSendCB; + friend class HttpWsClient; + template + ClientTypeImp(ArgsType &&...args): ClientType(std::forward(args)...){} + ~ClientTypeImp() override {}; protected: - virtual int64_t onResponseHeader(const string &status,const HttpHeader &headers);; + /** + * 鍙戦佸墠鎷︽埅骞舵墦鍖呬负websocket鍗忚 + * @param buf + * @return + */ + int send(const Buffer::Ptr &buf) override{ + if(_beforeSendCB){ + return _beforeSendCB(buf); + } + return ClientType::send(buf); + } + /** + * 璁剧疆鍙戦佹暟鎹埅鍙栧洖璋冨嚱鏁 + * @param cb 鎴彇鍥炶皟鍑芥暟 + */ + void setOnBeforeSendCB(const onBeforeSendCB &cb){ + _beforeSendCB = cb; + } +private: + onBeforeSendCB _beforeSendCB; +}; - virtual void onResponseBody(const char *buf,int64_t size,int64_t recvedSize,int64_t totalSize);; +template +class HttpWsClient : public HttpClientImp , public WebSocketSplitter{ +public: + typedef shared_ptr Ptr; + + HttpWsClient(ClientTypeImp &delegate) : _delegate(delegate){ + _Sec_WebSocket_Key = encodeBase64(SHA1::encode_bin(makeRandStr(16, false))); + } + ~HttpWsClient(){} + + void startWsClient(const string &ws_url,float fTimeOutSec){ + string http_url = ws_url; + replace(http_url,"ws://","http://"); + replace(http_url,"wss://","https://"); + setMethod("GET"); + addHeader("Upgrade","websocket"); + addHeader("Connection","Upgrade"); + addHeader("Sec-WebSocket-Version","13"); + addHeader("Sec-WebSocket-Key",_Sec_WebSocket_Key); + _onRecv = nullptr; + sendRequest(http_url,fTimeOutSec); + } +protected: + //HttpClientImp override + + /** + * 鏀跺埌http鍥炲澶 + * @param status 鐘舵佺爜锛岃濡:200 OK + * @param headers http澶 + * @return 杩斿洖鍚庣画content鐨勯暱搴︼紱-1:鍚庣画鏁版嵁鍏ㄦ槸content锛>=0:鍥哄畾闀垮害content + * 闇瑕佹寚鍑虹殑鏄紝鍦╤ttp澶翠腑甯︽湁Content-Length瀛楁鏃讹紝璇ヨ繑鍥炲兼棤鏁 + */ + int64_t onResponseHeader(const string &status,const HttpHeader &headers) override { + if(status == "101"){ + auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(_Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); + if(Sec_WebSocket_Accept == const_cast(headers)["Sec-WebSocket-Accept"]){ + //success + onWebSocketException(SockException()); + return 0; + } + shutdown(SockException(Err_shutdown,StrPrinter << "Sec-WebSocket-Accept mismatch")); + return 0; + } + + shutdown(SockException(Err_shutdown,StrPrinter << "bad http status code:" << status)); + return 0; + }; /** * 鎺ユ敹http鍥炲瀹屾瘯, */ - virtual void onResponseCompleted(); + void onResponseCompleted() override {} + + //TcpClient override + + void onManager() override { + if(_onRecv){ + //websocket杩炴帴鎴愬姛浜 + _delegate.onManager(); + } else{ + //websocket杩炴帴涓... + HttpClientImp::onManager(); + } + } + //鏁版嵁鍏ㄩ儴鍙戦佸畬姣曞悗鍥炶皟 + void onFlush() override{ + if(_onRecv){ + //websocket杩炴帴鎴愬姛浜 + _delegate.onFlush(); + } else{ + //websocket杩炴帴涓... + HttpClientImp::onFlush(); + } + } /** - * http閾炬帴鏂紑鍥炶皟 - * @param ex 鏂紑鍘熷洜 + * tcp杩炴帴缁撴灉 + * @param ex */ - virtual void onDisconnect(const SockException &ex){} + void onConnect(const SockException &ex) override{ + if(ex){ + //tcp杩炴帴澶辫触锛岀洿鎺ヨ繑鍥炲け璐 + onWebSocketException(ex); + return; + } + //寮濮媤ebsocket鎻℃墜 + HttpClientImp::onConnect(ex); + } - //HttpRequestSplitter override - int64_t onRecvHeader(const char *data, uint64_t len) override; + /** + * tcp鏀跺埌鏁版嵁 + * @param pBuf + */ + void onRecv(const Buffer::Ptr &pBuf) override{ + if(_onRecv){ + //瀹屾垚websocket鎻℃墜鍚庯紝鎷︽埅websocket鏁版嵁 + _onRecv(pBuf); + }else{ + //websocket鎻℃墜鏁版嵁 + HttpClientImp::onRecv(pBuf); + } + } - void onRecvContent(const char *data, uint64_t len) override; + //tcp杩炴帴鏂紑 + void onErr(const SockException &ex) override{ + //tcp鏂紑鎴栬卻hutdown瀵艰嚧鐨勬柇寮 + onWebSocketException(ex); + } -protected: - virtual void onConnect(const SockException &ex) override; + //WebSocketSplitter override + /** + * 鏀跺埌涓涓獁ebSocket鏁版嵁鍖呭寘澶达紝鍚庣画灏嗙户缁Е鍙憃nWebSocketDecodePlayload鍥炶皟 + * @param header 鏁版嵁鍖呭ご + */ + void onWebSocketDecodeHeader(const WebSocketHeader &header) override{ + _payload.clear(); + } - virtual void onRecv(const Buffer::Ptr &pBuf) override; + /** + * 鏀跺埌webSocket鏁版嵁鍖呰礋杞 + * @param header 鏁版嵁鍖呭寘澶 + * @param ptr 璐熻浇鏁版嵁鎸囬拡 + * @param len 璐熻浇鏁版嵁闀垮害 + * @param recved 宸叉帴鏀舵暟鎹暱搴(鍖呭惈鏈鏁版嵁闀垮害)锛岀瓑浜巋eader._playload_len鏃跺垯鎺ュ彈瀹屾瘯 + */ + void onWebSocketDecodePlayload(const WebSocketHeader &header, const uint8_t *ptr, uint64_t len, uint64_t recved) override{ + _payload.append((char *)ptr,len); + } - virtual void onErr(const SockException &ex) override; - virtual void onFlush() override {}; + /** + * 鎺ユ敹鍒板畬鏁寸殑涓涓獁ebSocket鏁版嵁鍖呭悗鍥炶皟 + * @param header 鏁版嵁鍖呭寘澶 + */ + void onWebSocketDecodeComplete(const WebSocketHeader &header_in) override{ + WebSocketHeader& header = const_cast(header_in); + auto flag = header._mask_flag; + //websocket瀹㈡埛绔彂閫佹暟鎹渶瑕佸姞瀵 + header._mask_flag = true; - virtual void onManager() override; + switch (header._opcode){ + case WebSocketHeader::CLOSE:{ + //鏈嶅姟鍣ㄤ富鍔ㄥ叧闂 + WebSocketSplitter::encode(header,nullptr); + shutdown(SockException(Err_eof,"websocket server close the connection")); + } + break; + case WebSocketHeader::PING:{ + //蹇冭烦鍖 + header._opcode = WebSocketHeader::PONG; + WebSocketSplitter::encode(header,std::make_shared(std::move(_payload))); + } + break; + case WebSocketHeader::CONTINUATION:{ -protected: - string generate_websocket_client_handshake(const char* ip, uint16_t port, const char * path, const char * key); + } + break; + case WebSocketHeader::TEXT: + case WebSocketHeader::BINARY:{ + //鎺ユ敹瀹屾瘯websocket鏁版嵁鍖咃紝瑙﹀彂onRecv浜嬩欢 + _delegate.onRecv(std::make_shared(std::move(_payload))); + } + break; + default: + break; + } + _payload.clear(); + header._mask_flag = flag; + } - string get_random(size_t n); + /** + * websocket鏁版嵁缂栫爜鍥炶皟 + * @param ptr 鏁版嵁鎸囬拡 + * @param len 鏁版嵁鎸囬拡闀垮害 + */ + void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{ + HttpClientImp::send(buffer); + } +private: + void onWebSocketException(const SockException &ex){ + if(!ex){ + //websocket鎻℃墜鎴愬姛 + //姝ゅ鎴彇TcpClient娲剧敓绫诲彂閫佺殑鏁版嵁骞惰繘琛寃ebsocket鍗忚鎵撳寘 + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _delegate.setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + WebSocketHeader header; + header._fin = true; + header._reserved = 0; + header._opcode = DataType; + //瀹㈡埛绔渶瑕佸姞瀵 + header._mask_flag = true; + strongSelf->WebSocketSplitter::encode(header,buf); + } + return buf->size(); + }); + //瑙﹀彂杩炴帴鎴愬姛浜嬩欢 + _delegate._sock = HttpClientImp::_sock; + _delegate.onConnect(ex); + //鎷︽埅websocket鏁版嵁鎺ユ敹 + _onRecv = [this](const Buffer::Ptr &pBuf){ + WebSocketSplitter::decode((uint8_t*)pBuf->data(),pBuf->size()); + }; + return; + } - void onWebSocketDecodeHeader(const WebSocketHeader &packet) override; + //websocket鎻℃墜澶辫触鎴栬卼cp杩炴帴澶辫触鎴栬呬腑閫旀柇寮 + if(_onRecv){ + //鎻℃墜鎴愬姛涔嬪悗鐨勪腑閫旀柇寮 + _onRecv = nullptr; + _delegate.onErr(ex); + return; + } - void onWebSocketDecodePlayload(const WebSocketHeader &packet, const uint8_t *ptr, uint64_t len, uint64_t recved) override; - - void onWebSocketDecodeComplete(const WebSocketHeader &header_in) override; - - virtual void onWebSocketEncodeData(const uint8_t *ptr, uint64_t len); + //websocket鎻℃墜澶辫触鎴栬卼cp杩炴帴澶辫触 + _delegate.onConnect(ex); + } private: - void onResponseCompleted_l(); - -protected: - bool _isHttps; -private: - string _ip; - int _port; - string _url; - string _method; - string _path; - //recv - int64_t _recvedBodySize; - int64_t _totalBodySize; - Parser _parser; - string _lastHost; - Ticker _aliveTicker; - float _fTimeOutSec = 0; - std::shared_ptr _chunkedSplitter; - - std::string _key; ///瀹㈡埛绔殑key - WSClientStatus _WSClientStatus; ///瀹㈡埛绔姸鎬 - bool _firstPacket = true; - string _remian_data; - - std::shared_ptr _session; - + string _Sec_WebSocket_Key; + function _onRecv; + ClientTypeImp &_delegate; + string _payload; }; -} /* namespace mediakit */ +template +class WebSocketClient : public ClientTypeImp{ +public: + typedef std::shared_ptr Ptr; -#endif /* Http_HttpClient_h */ + template + WebSocketClient(ArgsType &&...args) : ClientTypeImp(std::forward(args)...){ + _wsClient.reset(new HttpWsClient(*this)); + } + ~WebSocketClient() override {} + + void startConnect(const string &strUrl, uint16_t iPort, float fTimeOutSec = 3) override { + string ws_url; + if(userWSS){ + ws_url = StrPrinter << "wss://" + strUrl << ":" << iPort << "/" ; + }else{ + ws_url = StrPrinter << "ws://" + strUrl << ":" << iPort << "/" ; + } + _wsClient->startWsClient(ws_url,fTimeOutSec); + } +private: + typename HttpWsClient::Ptr _wsClient; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_WebSocketClient_H diff --git a/tests/test_wsClient.cpp b/tests/test_wsClient.cpp new file mode 100644 index 00000000..6106515a --- /dev/null +++ b/tests/test_wsClient.cpp @@ -0,0 +1,84 @@ +锘/* + * MIT License + * + * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include "Util/MD5.h" +#include "Util/logger.h" +#include "Http/WebSocketClient.h" +using namespace std; +using namespace toolkit; +using namespace mediakit; + +class EchoTcpClient : public TcpClient { +public: + EchoTcpClient(const EventPoller::Ptr &poller = nullptr){ + InfoL; + } + ~EchoTcpClient() override { + InfoL; + } +protected: + void onRecv(const Buffer::Ptr &pBuf) override { + DebugL << pBuf->toString(); + } + //琚姩鏂紑杩炴帴鍥炶皟 + void onErr(const SockException &ex) override { + WarnL << ex.what(); + } + //tcp杩炴帴鎴愬姛鍚庢瘡2绉掕Е鍙戜竴娆¤浜嬩欢 + void onManager() override { + send("echo test!"); + DebugL << "send echo test"; + } + //杩炴帴鏈嶅姟鍣ㄧ粨鏋滃洖璋 + void onConnect(const SockException &ex) override{ + DebugL << ex.what(); + } + + //鏁版嵁鍏ㄩ儴鍙戦佸畬姣曞悗鍥炶皟 + void onFlush() override{ + DebugL; + } +}; + +int main(int argc, char *argv[]) { + //璁剧疆閫鍑轰俊鍙峰鐞嗗嚱鏁 + static semaphore sem; + signal(SIGINT, [](int) { sem.post(); });// 璁剧疆閫鍑轰俊鍙 + + //璁剧疆鏃ュ織 + Logger::Instance().add(std::make_shared()); + Logger::Instance().setWriter(std::make_shared()); + + WebSocketClient::Ptr client = std::make_shared >(); + client->startConnect("127.0.0.1",80); + + sem.wait(); + return 0; +} +