From eda7a59f3c6e505a35e5f25d972e43fea7b31626 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 23 Apr 2020 22:04:59 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80=E4=BA=8B=E4=BB=B6=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/include/mk_events.h | 5 +--- api/source/mk_events.cpp | 21 ++++++++-------- api/tests/server.c | 5 ++-- server/WebHook.cpp | 8 +++--- src/Common/config.h | 2 +- src/Http/HttpFileManager.cpp | 47 +++++++++++++++++++++++++++++++---- src/Http/HttpSession.cpp | 2 +- src/Record/HlsMediaSource.cpp | 10 +++----- src/Record/HlsMediaSource.h | 6 ++--- src/Rtmp/RtmpSession.cpp | 2 +- src/Rtsp/RtspSession.cpp | 2 +- 11 files changed, 69 insertions(+), 41 deletions(-) diff --git a/api/include/mk_events.h b/api/include/mk_events.h index 47bbb4a2..c3334621 100644 --- a/api/include/mk_events.h +++ b/api/include/mk_events.h @@ -146,15 +146,12 @@ typedef struct { * @param total_bytes 耗费上下行总流量,单位字节数 * @param total_seconds 本次tcp会话时长,单位秒 * @param is_player 客户端是否为播放器 - * @param peer_ip 客户端ip - * @param peer_port 客户端端口号 */ void (API_CALL *on_mk_flow_report)(const mk_media_info url_info, uint64_t total_bytes, uint64_t total_seconds, int is_player, - const char *peer_ip, - uint16_t peer_port); + const mk_sock_info sender); } mk_events; diff --git a/api/source/mk_events.cpp b/api/source/mk_events.cpp index e7f5d676..12ee4a46 100644 --- a/api/source/mk_events.cpp +++ b/api/source/mk_events.cpp @@ -46,7 +46,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ s_events.on_mk_http_request((mk_parser)&parser, (mk_http_response_invoker)&invoker, &consumed_int, - (mk_tcp_session)&sender); + (mk_sock_info)&sender); consumed = consumed_int; } }); @@ -57,7 +57,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ path.c_str(), is_dir, (mk_http_access_path_invoker)&invoker, - (mk_tcp_session)&sender); + (mk_sock_info)&sender); } else{ invoker("","",0); } @@ -69,7 +69,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ strcpy(path_c,path.c_str()); s_events.on_mk_http_before_access((mk_parser) &parser, path_c, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); path = path_c; } }); @@ -79,7 +79,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ if (s_events.on_mk_rtsp_get_realm) { s_events.on_mk_rtsp_get_realm((mk_media_info) &args, (mk_rtsp_get_realm_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); }else{ invoker(""); } @@ -92,7 +92,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ user_name.c_str(), must_no_encrypt, (mk_rtsp_auth_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); } }); @@ -100,7 +100,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ if (s_events.on_mk_media_publish) { s_events.on_mk_media_publish((mk_media_info) &args, (mk_publish_auth_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); }else{ GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); GET_CONFIG(bool,toHls,General::kPublishToHls); @@ -113,7 +113,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ if (s_events.on_mk_media_play) { s_events.on_mk_media_play((mk_media_info) &args, (mk_auth_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); }else{ invoker(""); } @@ -124,7 +124,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ s_events.on_mk_shell_login(user_name.c_str(), passwd.c_str(), (mk_auth_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); }else{ invoker(""); } @@ -136,15 +136,14 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ totalBytes, totalDuration, isPlayer, - peerIP.c_str(), - peerPort); + (mk_sock_info)&sender); } }); NoticeCenter::Instance().addListener(&s_tag,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){ if (s_events.on_mk_media_not_found) { s_events.on_mk_media_not_found((mk_media_info) &args, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); } }); diff --git a/api/tests/server.c b/api/tests/server.c index e6da1ef3..52635192 100644 --- a/api/tests/server.c +++ b/api/tests/server.c @@ -363,8 +363,7 @@ void API_CALL on_mk_flow_report(const mk_media_info url_info, uint64_t total_bytes, uint64_t total_seconds, int is_player, - const char *peer_ip, - uint16_t peer_port) { + const mk_sock_info sender) { log_printf(LOG_LEV,"%s/%s/%s/%s, url params: %s," "total_bytes: %d, total_seconds: %d, is_player: %d, peer_ip:%s, peer_port:%d", mk_media_info_get_schema(url_info), @@ -372,7 +371,7 @@ void API_CALL on_mk_flow_report(const mk_media_info url_info, mk_media_info_get_app(url_info), mk_media_info_get_stream(url_info), mk_media_info_get_params(url_info), - (int)total_bytes, (int)total_seconds, (int)is_player,peer_ip, (int)peer_port); + (int)total_bytes, (int)total_seconds, (int)is_player,mk_sock_info_peer_ip(sender), (int)mk_sock_info_peer_port(sender)); } static int flag = 1; diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 34e9127e..471db2ba 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -253,16 +253,16 @@ void installWebHook(){ }); NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){ - if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || peerIP == "127.0.0.1"){ + if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1"){ return; } auto body = make_json(args); body["totalBytes"] = (Json::UInt64)totalBytes; body["duration"] = (Json::UInt64)totalDuration; body["player"] = isPlayer; - body["ip"] = peerIP; - body["port"] = peerPort; - body["id"] = sessionIdentifier; + body["ip"] = sender.get_peer_ip(); + body["port"] = sender.get_peer_port(); + body["id"] = sender.getIdentifier(); //执行hook do_http_hook(hook_flowreport,body, nullptr); }); diff --git a/src/Common/config.h b/src/Common/config.h index 84b5024a..aa42acaa 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -107,7 +107,7 @@ extern const string kBroadcastShellLogin; //停止rtsp/rtmp/http-flv会话后流量汇报事件广播 extern const string kBroadcastFlowReport; -#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer, const string &sessionIdentifier, const string &peerIP,const uint16_t &peerPort +#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer, SockInfo &sender //未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了 extern const string kBroadcastNotFoundStream; diff --git a/src/Http/HttpFileManager.cpp b/src/Http/HttpFileManager.cpp index f505abbd..11551fe7 100644 --- a/src/Http/HttpFileManager.cpp +++ b/src/Http/HttpFileManager.cpp @@ -309,6 +309,39 @@ static bool emitHlsPlayed(const Parser &parser, const MediaInfo &mediaInfo, cons return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,mediaInfo,mediaAuthInvoker,static_cast(sender)); } +class SockInfoImp : public SockInfo{ +public: + typedef std::shared_ptr Ptr; + SockInfoImp() = default; + ~SockInfoImp() override = default; + + const string &get_local_ip() override{ + return _local_ip; + } + + uint16_t get_local_port() override{ + return _local_port; + } + + const string &get_peer_ip() override{ + return _peer_ip; + + } + + uint16_t get_peer_port() override{ + return _peer_port; + } + + string getIdentifier() const override{ + return _identifier; + } + + string _local_ip; + string _peer_ip; + string _identifier; + uint16_t _local_port; + uint16_t _peer_port; +}; /** * 判断http客户端是否有权限访问文件的逻辑步骤 @@ -362,12 +395,16 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI } bool is_hls = mediaInfo._schema == HLS_SCHEMA; - string identifier = sender.getIdentifier(); - string peer_ip = sender.get_peer_ip(); - uint16_t peer_port = sender.get_peer_port(); + + SockInfoImp::Ptr info = std::make_shared(); + info->_identifier = sender.getIdentifier(); + info->_peer_ip = sender.get_peer_ip(); + info->_peer_port = sender.get_peer_port(); + info->_local_ip = sender.get_local_ip(); + info->_local_port = sender.get_local_port(); //该用户从来未获取过cookie,这个时候我们广播是否允许该用户访问该http目录 - HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, mediaInfo, identifier, peer_ip, peer_port] + HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, mediaInfo, info] (const string &errMsg, const string &cookie_path_in, int cookieLifeSecond) { HttpServerCookie::Ptr cookie; if (cookieLifeSecond) { @@ -390,7 +427,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI attachment._is_hls = is_hls; if(is_hls){ //hls相关信息 - attachment._hls_data = std::make_shared(mediaInfo, identifier, peer_ip, peer_port); + attachment._hls_data = std::make_shared(mediaInfo, info); //hls未查找MediaSource attachment._have_find_media_source = false; } diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 9e443e31..9a1bbc5d 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -108,7 +108,7 @@ void HttpSession::onError(const SockException& err) { GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, getIdentifier(), get_peer_ip(), get_peer_port()); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, static_cast(*this)); } return; } diff --git a/src/Record/HlsMediaSource.cpp b/src/Record/HlsMediaSource.cpp index 1f5afa4f..4133a206 100644 --- a/src/Record/HlsMediaSource.cpp +++ b/src/Record/HlsMediaSource.cpp @@ -12,11 +12,9 @@ namespace mediakit{ -HlsCookieData::HlsCookieData(const MediaInfo &info, const string &sessionIdentifier, const string &peer_ip, uint16_t peer_port) { +HlsCookieData::HlsCookieData(const MediaInfo &info, const std::shared_ptr &sock_info) { _info = info; - _sessionIdentifier = sessionIdentifier; - _peer_ip = peer_ip; - _peer_port = peer_port; + _sock_info = sock_info; _added = std::make_shared(false); addReaderCount(); } @@ -45,13 +43,13 @@ HlsCookieData::~HlsCookieData() { src->modifyReaderCount(false); } uint64_t duration = (_ticker.createdTime() - _ticker.elapsedTime()) / 1000; - WarnL << _sessionIdentifier << "(" << _peer_ip << ":" << _peer_port << ") " + WarnL << _sock_info->getIdentifier() << "(" << _sock_info->get_peer_ip() << ":" << _sock_info->get_peer_port() << ") " << "HLS播放器(" << _info._vhost << "/" << _info._app << "/" << _info._streamid << ")断开,耗时(s):" << duration; GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_bytes > iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, _bytes, duration, true, _sessionIdentifier, _peer_ip, _peer_port); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, _bytes, duration, true, static_cast(*_sock_info)); } } } diff --git a/src/Record/HlsMediaSource.h b/src/Record/HlsMediaSource.h index 513e90ac..d6bcab5e 100644 --- a/src/Record/HlsMediaSource.h +++ b/src/Record/HlsMediaSource.h @@ -77,7 +77,7 @@ private: class HlsCookieData{ public: typedef std::shared_ptr Ptr; - HlsCookieData(const MediaInfo &info, const string &sessionIdentifier, const string &peer_ip, uint16_t peer_port); + HlsCookieData(const MediaInfo &info, const std::shared_ptr &sock_info); ~HlsCookieData(); void addByteUsage(uint64_t bytes); private: @@ -85,12 +85,10 @@ private: private: uint64_t _bytes = 0; MediaInfo _info; - string _sessionIdentifier; - string _peer_ip; - uint16_t _peer_port; std::shared_ptr _added; weak_ptr _src; Ticker _ticker; + std::shared_ptr _sock_info; HlsMediaSource::RingType::RingReader::Ptr _ring_reader; }; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 632f016a..d606f118 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -39,7 +39,7 @@ void RtmpSession::onError(const SockException& err) { GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, getIdentifier(), get_peer_ip(), get_peer_port()); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast(*this)); } } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 247e5893..f8591e6c 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -92,7 +92,7 @@ void RtspSession::onError(const SockException& err) { //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, getIdentifier(), get_peer_ip(), get_peer_port()); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast(*this)); } }