From 309507574930f2678af4ff5898558d74e1fe60b8 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Mon, 16 Sep 2019 17:42:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84websocket=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpSession.cpp | 47 +++++++++++++++------------ src/Http/HttpSession.h | 45 +++++++++++++------------ src/Http/WebSocketSession.h | 65 +++++++++++++++++++------------------ 3 files changed, 85 insertions(+), 72 deletions(-) diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index b986e7ac..b4953ed1 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -195,8 +195,7 @@ void HttpSession::onManager() { } } - -inline bool HttpSession::checkWebSocket(){ +bool HttpSession::checkWebSocket(){ auto Sec_WebSocket_Key = _parser["Sec-WebSocket-Key"]; if(Sec_WebSocket_Key.empty()){ return false; @@ -223,12 +222,17 @@ inline bool HttpSession::checkWebSocket(){ } //如果checkLiveFlvStream返回false,则代表不是websocket-flv,而是普通的websocket连接 + if(!onWebSocketConnect(_parser)){ + sendResponse("501 Not Implemented",headerOut,""); + shutdown(SockException(Err_shutdown,"WebSocket server not implemented")); + return true; + } sendResponse("101 Switching Protocols",headerOut,""); return true; } //http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 //如果url(除去?以及后面的参数)后缀是.flv,那么表明该url是一个http-flv直播。 -inline bool HttpSession::checkLiveFlvStream(const function &cb){ +bool HttpSession::checkLiveFlvStream(const function &cb){ auto pos = strrchr(_parser.Url().data(),'.'); if(!pos){ //未找到".flv"后缀 @@ -316,9 +320,9 @@ inline bool HttpSession::checkLiveFlvStream(const function &cb){ return true; } -inline bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet) ; +bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet) ; -inline static string findIndexFile(const string &dir){ +static string findIndexFile(const string &dir){ DIR *pDir; dirent *pDirent; if ((pDir = opendir(dir.data())) == NULL) { @@ -336,7 +340,7 @@ inline static string findIndexFile(const string &dir){ return ""; } -inline string HttpSession::getClientUid(){ +string HttpSession::getClientUid(){ //如果http客户端不支持cookie,那么我们可以通过url参数来追踪用户 //如果url参数也没有,那么只能通过ip+端口号来追踪用户 //追踪用户的目的是为了减少http短链接情况的重复鉴权验证,通过缓存记录鉴权结果,提高性能 @@ -349,13 +353,13 @@ inline string HttpSession::getClientUid(){ //字符串是否以xx结尾 -static inline bool end_of(const string &str, const string &substr){ +static bool end_of(const string &str, const string &substr){ auto pos = str.rfind(substr); return pos != string::npos && pos == str.size() - substr.size(); }; //拦截hls的播放请求 -static inline bool checkHls(BroadcastHttpAccessArgs){ +static bool checkHls(BroadcastHttpAccessArgs){ if(!end_of(args._streamid,("/hls.m3u8"))) { //不是hls return false; @@ -371,7 +375,7 @@ static inline bool checkHls(BroadcastHttpAccessArgs){ return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,args_copy,mediaAuthInvoker,sender); } -inline void HttpSession::canAccessPath(const string &path_in,bool is_dir,const function &callback_in){ +void HttpSession::canAccessPath(const string &path_in,bool is_dir,const function &callback_in){ auto path = path_in; replace(const_cast(path),"//","/"); @@ -472,13 +476,12 @@ inline void HttpSession::canAccessPath(const string &path_in,bool is_dir,const f } -inline void HttpSession::Handle_Req_GET(int64_t &content_len) { +void HttpSession::Handle_Req_GET(int64_t &content_len) { //先看看是否为WebSocket请求 if(checkWebSocket()){ content_len = -1; - auto parserCopy = _parser; - _contentCallBack = [this,parserCopy](const char *data,uint64_t len){ - onRecvWebSocketData(parserCopy,data,len); + _contentCallBack = [this](const char *data,uint64_t len){ + WebSocketSplitter::decode((uint8_t *)data,len); //_contentCallBack是可持续的,后面还要处理后续数据 return true; }; @@ -666,7 +669,7 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) { }); } -inline bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet) { +bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet) { string strPathPrefix(strFullPath); string last_dir_name; if(strPathPrefix.back() == '/'){ @@ -764,7 +767,8 @@ inline bool makeMeun(const string &httpPath,const string &strFullPath, string &s ss.str().swap(strRet); return true; } -inline void HttpSession::sendResponse(const char* pcStatus, const KeyValue& header, const string& strContent) { + +void HttpSession::sendResponse(const char* pcStatus, const KeyValue& header, const string& strContent) { _StrPrinter printer; printer << "HTTP/1.1 " << pcStatus << "\r\n"; for (auto &pr : header) { @@ -775,7 +779,8 @@ inline void HttpSession::sendResponse(const char* pcStatus, const KeyValue& head send(strSend); _ticker.resetTime(); } -inline HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentSize,const char* pcContentType) { + +HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentSize,const char* pcContentType) { KeyValue headerOut; GET_CONFIG(string,charSet,Http::kCharSet); GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond); @@ -814,14 +819,14 @@ string HttpSession::urlDecode(const string &str){ return ret; } -inline void HttpSession::urlDecode(Parser &parser){ +void HttpSession::urlDecode(Parser &parser){ parser.setUrl(urlDecode(parser.Url())); for(auto &pr : _parser.getUrlArgs()){ const_cast(pr.second) = urlDecode(pr.second); } } -inline bool HttpSession::emitHttpEvent(bool doInvoke){ +bool HttpSession::emitHttpEvent(bool doInvoke){ ///////////////////是否断开本链接/////////////////////// GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount); @@ -857,7 +862,8 @@ inline bool HttpSession::emitHttpEvent(bool doInvoke){ } return consumed; } -inline void HttpSession::Handle_Req_POST(int64_t &content_len) { + +void HttpSession::Handle_Req_POST(int64_t &content_len) { GET_CONFIG(uint64_t,maxReqSize,Http::kMaxReqSize); GET_CONFIG(int,maxReqCnt,Http::kMaxReqCount); @@ -944,7 +950,8 @@ void HttpSession::responseDelay(bool bClose, } sendResponse(codeOut.data(), headerOut, contentOut); } -inline void HttpSession::sendNotFound(bool bClose) { + +void HttpSession::sendNotFound(bool bClose) { GET_CONFIG(string,notFound,Http::kNotFound); sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); } diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 4ab9dcad..e6199668 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -72,8 +72,8 @@ protected: void onWrite(const Buffer::Ptr &data) override ; void onDetach() override; std::shared_ptr getSharedPtr() override; - //HttpRequestSplitter override + //HttpRequestSplitter override int64_t onRecvHeader(const char *data,uint64_t len) override; void onRecvContent(const char *data,uint64_t len) override; @@ -94,29 +94,32 @@ protected: shutdown(SockException(Err_shutdown,"http post content is too huge,default closed")); } - void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{ - shutdown(SockException(Err_shutdown,"websocket connection default closed")); - }; - - void onRecvWebSocketData(const Parser &header,const char *data,uint64_t len){ - WebSocketSplitter::decode((uint8_t *)data,len); + /** + * websocket客户端连接上事件 + * @param header http头 + * @return true代表允许websocket连接,否则拒绝 + */ + virtual bool onWebSocketConnect(const Parser &header){ + WarnL << "http server do not support websocket default"; + return false; } + //WebSocketSplitter override /** - * 发送数据进行websocket协议打包后回调 - * @param buffer - */ + * 发送数据进行websocket协议打包后回调 + * @param buffer websocket协议数据 + */ void onWebSocketEncodeData(const Buffer::Ptr &buffer) override; private: - inline void Handle_Req_GET(int64_t &content_len); - inline void Handle_Req_POST(int64_t &content_len); - inline bool checkLiveFlvStream(const function &cb = nullptr); - inline bool checkWebSocket(); - inline bool emitHttpEvent(bool doInvoke); - inline void urlDecode(Parser &parser); - inline void sendNotFound(bool bClose); - inline void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent); - inline KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html"); + void Handle_Req_GET(int64_t &content_len); + void Handle_Req_POST(int64_t &content_len); + bool checkLiveFlvStream(const function &cb = nullptr); + bool checkWebSocket(); + bool emitHttpEvent(bool doInvoke); + void urlDecode(Parser &parser); + void sendNotFound(bool bClose); + void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent); + KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html"); void responseDelay(bool bClose, const string &codeOut, const KeyValue &headerOut, @@ -134,14 +137,14 @@ private: * @param is_dir path是否为目录 * @param callback 有权限或无权限的回调 */ - inline void canAccessPath(const string &path,bool is_dir,const function &callback); + void canAccessPath(const string &path,bool is_dir,const function &callback); /** * 获取用户唯一识别id * 有url参数返回参数,无参数返回ip+端口号 * @return */ - inline string getClientUid(); + string getClientUid(); //设置socket标志 void setSocketFlags(); diff --git a/src/Http/WebSocketSession.h b/src/Http/WebSocketSession.h index d8f2504e..6e55d224 100644 --- a/src/Http/WebSocketSession.h +++ b/src/Http/WebSocketSession.h @@ -34,7 +34,7 @@ * 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等 * @tparam SessionType 业务协议的TcpSession类 */ -template +template class WebSocketSession : public HttpSessionType { public: WebSocketSession(const Socket::Ptr &pSock) : HttpSessionType(pSock){} @@ -61,6 +61,37 @@ public: _weakServer = const_cast(server).shared_from_this(); } protected: + /** + * websocket客户端连接上事件 + * @param header http头 + * @return true代表允许websocket连接,否则拒绝 + */ + bool onWebSocketConnect(const Parser &header) override{ + //创建websocket session类 + _session = std::make_shared(HttpSessionType::getIdentifier(),HttpSessionType::_sock); + auto strongServer = _weakServer.lock(); + if(strongServer){ + _session->attachServer(*strongServer); + } + + //此处截取数据并进行websocket协议打包 + weak_ptr weakSelf = dynamic_pointer_cast(HttpSessionType::shared_from_this()); + _session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + WebSocketHeader header; + header._fin = true; + header._reserved = 0; + header._opcode = DataType; + header._mask_flag = false; + strongSelf->WebSocketSplitter::encode(header,buf); + } + return buf->size(); + }); + + //允许websocket客户端 + return true; + } /** * 开始收到一个webSocket数据包 * @param packet @@ -68,34 +99,6 @@ protected: void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{ //新包,原来的包残余数据清空掉 _remian_data.clear(); - - if(_firstPacket){ - //这是个WebSocket会话而不是普通的Http会话 - _firstPacket = false; - _session = std::make_shared(HttpSessionType::getIdentifier(),HttpSessionType::_sock); - - auto strongServer = _weakServer.lock(); - if(strongServer){ - - //此处截取数据并进行websocket协议打包 - weak_ptr weakSelf = dynamic_pointer_cast(HttpSessionType::shared_from_this()); - _session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf) { - auto strongSelf = weakSelf.lock(); - if (strongSelf) { - WebSocketHeader header; - header._fin = true; - header._reserved = 0; - header._opcode = WebSocketHeader::TEXT; - header._mask_flag = false; - strongSelf->WebSocketSplitter::encode(header, (uint8_t *)buf->data(), buf->size()); - } - return buf->size(); - }); - - _session->attachServer(*strongServer); - } - - } } /** @@ -124,7 +127,7 @@ protected: } break; case WebSocketHeader::PING:{ - const_cast(header)._opcode = WebSocketHeader::PONG; + header._opcode = WebSocketHeader::PONG; HttpSessionType::encode(header,std::make_shared(_remian_data)); } break; @@ -191,7 +194,6 @@ private: string _identifier; }; private: - bool _firstPacket = true; string _remian_data; weak_ptr _weakServer; std::shared_ptr _session; @@ -213,6 +215,7 @@ public: DebugL << getIdentifier() << " " << TcpSession::getIdentifier(); } void onRecv(const Buffer::Ptr &buffer) override { + //回显数据 send(buffer); } void onError(const SockException &err) override{