diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 987683f1..4ede70fc 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 987683f1045613098e2bcd534bc90a13d16df8a4 +Subproject commit 4ede70fc435eb0a4d3a752b521170d86440b3935 diff --git a/3rdpart/media-server b/3rdpart/media-server index 24519a59..abc08f61 160000 --- a/3rdpart/media-server +++ b/3rdpart/media-server @@ -1 +1 @@ -Subproject commit 24519a594c2c634b21fbe09fad28d54c4eba0885 +Subproject commit abc08f61bb1250b94d252cfeaea249527912dd3b diff --git a/README.md b/README.md index d59474f4..53999cf8 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,6 @@ bash build_docker_images.sh - [支持linux、windows、mac的rtmp/rtsp播放器](https://github.com/xiongziliang/ZLMediaPlayer) - [配套的管理WEB网站](https://github.com/chenxiaolei/ZLMediaKit_NVR_UI) - [DotNetCore的RESTful客户端](https://github.com/MingZhuLiu/ZLMediaKit.DotNetCore.Sdk) - - [支持GB28181信令服务器、onvif的NVS系统](https://gitee.com/qinqi/JNVS) ## 授权协议 @@ -126,7 +125,7 @@ bash build_docker_images.sh ## 联系方式 - - 邮箱:<771730766@qq.com>(本项目相关或流媒体相关问题请走issue流程,否则恕不邮件答复) + - 邮箱:<1213642868@qq.com>(本项目相关或流媒体相关问题请走issue流程,否则恕不邮件答复) - QQ群:542509000 ## 怎么提问? diff --git a/README_en.md b/README_en.md index 19b9e069..a85441ca 100644 --- a/README_en.md +++ b/README_en.md @@ -343,7 +343,7 @@ SOFTWARE. ## Contact - - Email:<771730766@qq.com> + - Email:<1213642868@qq.com> - QQ chat group:542509000 diff --git a/api/include/mk_common.h b/api/include/mk_common.h index 119f3da9..c9657a9a 100755 --- a/api/include/mk_common.h +++ b/api/include/mk_common.h @@ -40,8 +40,13 @@ extern "C" { typedef struct { // 线程数 int thread_num; + // 日志级别,支持0~4 int log_level; + //文件日志保存路径,路径可以不存在(内部可以创建文件夹),设置为NULL关闭日志输出至文件 + const char *log_file_path; + //文件日志保存天数,设置为0关闭日志文件 + int log_file_days; // 配置文件是内容还是路径 int ini_is_path; @@ -71,19 +76,23 @@ API_EXPORT void API_CALL mk_stop_all_server(); * 基础类型参数版本的mk_env_init,为了方便其他语言调用 * @param thread_num 线程数 * @param log_level 日志级别,支持0~4 + * @param log_file_path 文件日志保存路径,路径可以不存在(内部可以创建文件夹),设置为NULL关闭日志输出至文件 + * @param log_file_days 文件日志保存天数,设置为0关闭日志文件 * @param ini_is_path 配置文件是内容还是路径 * @param ini 配置文件内容或路径,可以为NULL,如果该文件不存在,那么将导出默认配置至该文件 * @param ssl_is_path ssl证书是内容还是路径 * @param ssl ssl证书内容或路径,可以为NULL * @param ssl_pwd 证书密码,可以为NULL */ -API_EXPORT void API_CALL mk_env_init1( int thread_num, - int log_level, - int ini_is_path, - const char *ini, - int ssl_is_path, - const char *ssl, - const char *ssl_pwd); +API_EXPORT void API_CALL mk_env_init1(int thread_num, + int log_level, + const char *log_file_path, + int log_file_days, + int ini_is_path, + const char *ini, + int ssl_is_path, + const char *ssl, + const char *ssl_pwd); /** * 设置配置项 diff --git a/api/include/mk_events.h b/api/include/mk_events.h index 79f435a8..c3334621 100644 --- a/api/include/mk_events.h +++ b/api/include/mk_events.h @@ -36,7 +36,7 @@ typedef struct { */ void (API_CALL *on_mk_media_publish)(const mk_media_info url_info, const mk_publish_auth_invoker invoker, - const mk_tcp_session sender); + const mk_sock_info sender); /** * 播放rtsp/rtmp/http-flv/hls事件广播,通过该事件控制播放鉴权 @@ -47,7 +47,7 @@ typedef struct { */ void (API_CALL *on_mk_media_play)(const mk_media_info url_info, const mk_auth_invoker invoker, - const mk_tcp_session sender); + const mk_sock_info sender); /** * 未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了 @@ -55,7 +55,7 @@ typedef struct { * @param sender 播放客户端相关信息 */ void (API_CALL *on_mk_media_not_found)(const mk_media_info url_info, - const mk_tcp_session sender); + const mk_sock_info sender); /** * 某个流无人消费时触发,目的为了实现无人观看时主动断开拉流等业务逻辑 @@ -73,7 +73,7 @@ typedef struct { void (API_CALL *on_mk_http_request)(const mk_parser parser, const mk_http_response_invoker invoker, int *consumed, - const mk_tcp_session sender); + const mk_sock_info sender); /** * 在http文件服务器中,收到http访问文件或目录的广播,通过该事件控制访问http目录的权限 @@ -87,7 +87,7 @@ typedef struct { const char *path, int is_dir, const mk_http_access_path_invoker invoker, - mk_tcp_session sender); + const mk_sock_info sender); /** * 在http文件服务器中,收到http访问文件或目录前的广播,通过该事件可以控制http url到文件路径的映射 @@ -98,7 +98,7 @@ typedef struct { */ void (API_CALL *on_mk_http_before_access)(const mk_parser parser, char *path, - const mk_tcp_session sender); + const mk_sock_info sender); /** * 该rtsp流是否需要认证?是的话调用invoker并传入realm,否则传入空的realm @@ -108,7 +108,7 @@ typedef struct { */ void (API_CALL *on_mk_rtsp_get_realm)(const mk_media_info url_info, const mk_rtsp_get_realm_invoker invoker, - const mk_tcp_session sender); + const mk_sock_info sender); /** * 请求认证用户密码事件,user_name为用户名,must_no_encrypt如果为true,则必须提供明文密码(因为此时是base64认证方式),否则会导致认证失败 @@ -125,7 +125,7 @@ typedef struct { const char *user_name, int must_no_encrypt, const mk_rtsp_auth_invoker invoker, - const mk_tcp_session sender); + const mk_sock_info sender); /** * 录制mp4分片文件成功后广播 @@ -138,7 +138,7 @@ typedef struct { void (API_CALL *on_mk_shell_login)(const char *user_name, const char *passwd, const mk_auth_invoker invoker, - const mk_tcp_session sender); + const mk_sock_info sender); /** * 停止rtsp/rtmp/http-flv会话后流量汇报事件广播 @@ -146,15 +146,12 @@ typedef struct { * @param total_bytes 耗费上下行总流量,单位字节数 * @param total_seconds 本次tcp会话时长,单位秒 * @param is_player 客户端是否为播放器 - * @param peer_ip 客户端ip - * @param peer_port 客户端端口号 */ void (API_CALL *on_mk_flow_report)(const mk_media_info url_info, uint64_t total_bytes, uint64_t total_seconds, int is_player, - const char *peer_ip, - uint16_t peer_port); + const mk_sock_info sender); } mk_events; diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index 7c74fab9..a6669fed 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -73,6 +73,10 @@ API_EXPORT const char* API_CALL mk_media_info_get_vhost(const mk_media_info ctx) API_EXPORT const char* API_CALL mk_media_info_get_app(const mk_media_info ctx); //MediaInfo::_streamid API_EXPORT const char* API_CALL mk_media_info_get_stream(const mk_media_info ctx); +//MediaInfo::_host +API_EXPORT const char* API_CALL mk_media_info_get_host(const mk_media_info ctx); +//MediaInfo::_port +API_EXPORT uint16_t API_CALL mk_media_info_get_port(const mk_media_info ctx); ///////////////////////////////////////////MediaSource///////////////////////////////////////////// diff --git a/api/include/mk_tcp.h b/api/include/mk_tcp.h index 1a2dac80..71048585 100644 --- a/api/include/mk_tcp.h +++ b/api/include/mk_tcp.h @@ -17,19 +17,41 @@ extern "C" { #endif +///////////////////////////////////////////SockInfo///////////////////////////////////////////// +//SockInfo对象的C映射 +typedef void* mk_sock_info; + +//SockInfo::get_peer_ip() +API_EXPORT const char* API_CALL mk_sock_info_peer_ip(const mk_sock_info ctx, char *buf); +//SockInfo::get_local_ip() +API_EXPORT const char* API_CALL mk_sock_info_local_ip(const mk_sock_info ctx, char *buf); +//SockInfo::get_peer_port() +API_EXPORT uint16_t API_CALL mk_sock_info_peer_port(const mk_sock_info ctx); +//SockInfo::get_local_port() +API_EXPORT uint16_t API_CALL mk_sock_info_local_port(const mk_sock_info ctx); + +#ifndef SOCK_INFO_API_RENAME +#define SOCK_INFO_API_RENAME +//mk_tcp_session对象转换成mk_sock_info对象后再获取网络相关信息 +#define mk_tcp_session_peer_ip(x,buf) mk_sock_info_peer_ip(mk_tcp_session_get_sock_info(x),buf) +#define mk_tcp_session_local_ip(x,buf) mk_sock_info_local_ip(mk_tcp_session_get_sock_info(x),buf) +#define mk_tcp_session_peer_port(x) mk_sock_info_peer_port(mk_tcp_session_get_sock_info(x)) +#define mk_tcp_session_local_port(x) mk_sock_info_local_port(mk_tcp_session_get_sock_info(x)) + +//mk_tcp_client对象转换成mk_sock_info对象后再获取网络相关信息 +#define mk_tcp_client_peer_ip(x,buf) mk_sock_info_peer_ip(mk_tcp_client_get_sock_info(x),buf) +#define mk_tcp_client_local_ip(x,buf) mk_sock_info_local_ip(mk_tcp_client_get_sock_info(x),buf) +#define mk_tcp_client_peer_port(x) mk_sock_info_peer_port(mk_tcp_client_get_sock_info(x)) +#define mk_tcp_client_local_port(x) mk_sock_info_local_port(mk_tcp_client_get_sock_info(x)) +#endif ///////////////////////////////////////////TcpSession///////////////////////////////////////////// //TcpSession对象的C映射 typedef void* mk_tcp_session; +//获取基类指针以便获取其网络相关信息 +API_EXPORT mk_sock_info API_CALL mk_tcp_session_get_sock_info(const mk_tcp_session ctx); + //TcpSession::safeShutdown() API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg); -//TcpSession::get_peer_ip() -API_EXPORT const char* API_CALL mk_tcp_session_peer_ip(const mk_tcp_session ctx); -//TcpSession::get_local_ip() -API_EXPORT const char* API_CALL mk_tcp_session_local_ip(const mk_tcp_session ctx); -//TcpSession::get_peer_port() -API_EXPORT uint16_t API_CALL mk_tcp_session_peer_port(const mk_tcp_session ctx); -//TcpSession::get_local_port() -API_EXPORT uint16_t API_CALL mk_tcp_session_local_port(const mk_tcp_session ctx); //TcpSession::send() API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx,const char *data,int len); //切换到该对象所在线程后再TcpSession::send() @@ -115,6 +137,8 @@ API_EXPORT void API_CALL mk_tcp_server_events_listen(const mk_tcp_session_events ///////////////////////////////////////////自定义tcp客户端///////////////////////////////////////////// typedef void* mk_tcp_client; +//获取基类指针以便获取其网络相关信息 +API_EXPORT mk_sock_info API_CALL mk_tcp_client_get_sock_info(const mk_tcp_client ctx); typedef struct { /** diff --git a/api/source/mk_common.cpp b/api/source/mk_common.cpp old mode 100755 new mode 100644 index ead4c361..0baa311c --- a/api/source/mk_common.cpp +++ b/api/source/mk_common.cpp @@ -41,6 +41,8 @@ API_EXPORT void API_CALL mk_env_init(const mk_config *cfg) { assert(cfg); mk_env_init1(cfg->thread_num, cfg->log_level, + cfg->log_file_path, + cfg->log_file_days, cfg->ini_is_path, cfg->ini, cfg->ssl_is_path, @@ -61,18 +63,29 @@ API_EXPORT void API_CALL mk_stop_all_server(){ stopAllTcpServer(); } -API_EXPORT void API_CALL mk_env_init1( int thread_num, - int log_level, - int ini_is_path, - const char *ini, - int ssl_is_path, - const char *ssl, - const char *ssl_pwd) { - +API_EXPORT void API_CALL mk_env_init1(int thread_num, + int log_level, + const char *log_file_path, + int log_file_days, + int ini_is_path, + const char *ini, + int ssl_is_path, + const char *ssl, + const char *ssl_pwd) { + //确保只初始化一次 static onceToken token([&]() { + //控制台日志 Logger::Instance().add(std::make_shared("console", (LogLevel) log_level)); + if(log_file_path && log_file_days){ + //日志文件 + auto channel = std::make_shared("FileChannel", File::absolutePath(log_file_path, ""), (LogLevel) log_level); + Logger::Instance().add(channel); + } + + //异步日志线程 Logger::Instance().setWriter(std::make_shared()); + //设置线程数 EventPollerPool::setPoolSize(thread_num); WorkThreadPool::setPoolSize(thread_num); diff --git a/api/source/mk_events.cpp b/api/source/mk_events.cpp index e7f5d676..12ee4a46 100644 --- a/api/source/mk_events.cpp +++ b/api/source/mk_events.cpp @@ -46,7 +46,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ s_events.on_mk_http_request((mk_parser)&parser, (mk_http_response_invoker)&invoker, &consumed_int, - (mk_tcp_session)&sender); + (mk_sock_info)&sender); consumed = consumed_int; } }); @@ -57,7 +57,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ path.c_str(), is_dir, (mk_http_access_path_invoker)&invoker, - (mk_tcp_session)&sender); + (mk_sock_info)&sender); } else{ invoker("","",0); } @@ -69,7 +69,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ strcpy(path_c,path.c_str()); s_events.on_mk_http_before_access((mk_parser) &parser, path_c, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); path = path_c; } }); @@ -79,7 +79,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ if (s_events.on_mk_rtsp_get_realm) { s_events.on_mk_rtsp_get_realm((mk_media_info) &args, (mk_rtsp_get_realm_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); }else{ invoker(""); } @@ -92,7 +92,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ user_name.c_str(), must_no_encrypt, (mk_rtsp_auth_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); } }); @@ -100,7 +100,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ if (s_events.on_mk_media_publish) { s_events.on_mk_media_publish((mk_media_info) &args, (mk_publish_auth_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); }else{ GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); GET_CONFIG(bool,toHls,General::kPublishToHls); @@ -113,7 +113,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ if (s_events.on_mk_media_play) { s_events.on_mk_media_play((mk_media_info) &args, (mk_auth_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); }else{ invoker(""); } @@ -124,7 +124,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ s_events.on_mk_shell_login(user_name.c_str(), passwd.c_str(), (mk_auth_invoker) &invoker, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); }else{ invoker(""); } @@ -136,15 +136,14 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ totalBytes, totalDuration, isPlayer, - peerIP.c_str(), - peerPort); + (mk_sock_info)&sender); } }); NoticeCenter::Instance().addListener(&s_tag,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){ if (s_events.on_mk_media_not_found) { s_events.on_mk_media_not_found((mk_media_info) &args, - (mk_tcp_session) &sender); + (mk_sock_info) &sender); } }); diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index d4249563..98a233d8 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -12,7 +12,6 @@ #include "mk_events_objects.h" #include "Common/config.h" #include "Record/MP4Recorder.h" -#include "Network/TcpSession.h" #include "Http/HttpSession.h" #include "Http/HttpBody.h" #include "Http/HttpClient.h" @@ -114,7 +113,7 @@ API_EXPORT const char* API_CALL mk_parser_get_tail(const mk_parser ctx){ API_EXPORT const char* API_CALL mk_parser_get_header(const mk_parser ctx,const char *key){ assert(ctx && key); Parser *parser = (Parser *)ctx; - return parser->getValues()[key].c_str(); + return parser->getHeader()[key].c_str(); } API_EXPORT const char* API_CALL mk_parser_get_content(const mk_parser ctx, int *length){ assert(ctx); @@ -137,16 +136,31 @@ API_EXPORT const char* API_CALL mk_media_info_get_schema(const mk_media_info ctx MediaInfo *info = (MediaInfo *)ctx; return info->_schema.c_str(); } + API_EXPORT const char* API_CALL mk_media_info_get_vhost(const mk_media_info ctx){ assert(ctx); MediaInfo *info = (MediaInfo *)ctx; return info->_vhost.c_str(); } + +API_EXPORT const char* API_CALL mk_media_info_get_host(const mk_media_info ctx){ + assert(ctx); + MediaInfo *info = (MediaInfo *)ctx; + return info->_host.c_str(); +} + +API_EXPORT uint16_t API_CALL mk_media_info_get_port(const mk_media_info ctx){ + assert(ctx); + MediaInfo *info = (MediaInfo *)ctx; + return std::stoi(info->_port); +} + API_EXPORT const char* API_CALL mk_media_info_get_app(const mk_media_info ctx){ assert(ctx); MediaInfo *info = (MediaInfo *)ctx; return info->_app.c_str(); } + API_EXPORT const char* API_CALL mk_media_info_get_stream(const mk_media_info ctx){ assert(ctx); MediaInfo *info = (MediaInfo *)ctx; @@ -274,7 +288,7 @@ API_EXPORT void API_CALL mk_http_response_invoker_do_file(const mk_http_response assert(ctx && request_parser && response_header && response_file_path); auto header = get_http_header(response_header); HttpSession::HttpResponseInvoker *invoker = (HttpSession::HttpResponseInvoker *)ctx; - (*invoker).responseFile(((Parser*)(request_parser))->getValues(),header,response_file_path); + (*invoker).responseFile(((Parser *) (request_parser))->getHeader(), header, response_file_path); } API_EXPORT void API_CALL mk_http_response_invoker_do(const mk_http_response_invoker ctx, diff --git a/api/source/mk_tcp.cpp b/api/source/mk_tcp.cpp index ace6bd1d..4795141e 100644 --- a/api/source/mk_tcp.cpp +++ b/api/source/mk_tcp.cpp @@ -8,45 +8,56 @@ * may be found in the AUTHORS file in the root of the source tree. */ +#include "string.h" #include "mk_tcp.h" #include "mk_tcp_private.h" #include "Http/WebSocketClient.h" #include "Http/WebSocketSession.h" using namespace mediakit; +API_EXPORT const char* API_CALL mk_sock_info_peer_ip(const mk_sock_info ctx, char *buf){ + assert(ctx); + SockInfo *sock = (SockInfo *)ctx; + strcpy(buf,sock->get_peer_ip().c_str()); + return buf; +} +API_EXPORT const char* API_CALL mk_sock_info_local_ip(const mk_sock_info ctx, char *buf){ + assert(ctx); + SockInfo *sock = (SockInfo *)ctx; + strcpy(buf,sock->get_peer_ip().c_str()); + return buf; +} +API_EXPORT uint16_t API_CALL mk_sock_info_peer_port(const mk_sock_info ctx){ + assert(ctx); + SockInfo *sock = (SockInfo *)ctx; + return sock->get_peer_port(); +} +API_EXPORT uint16_t API_CALL mk_sock_info_local_port(const mk_sock_info ctx){ + assert(ctx); + SockInfo *sock = (SockInfo *)ctx; + return sock->get_local_port(); +} + //////////////////////////////////////////////////////////////////////////////////////// +API_EXPORT mk_sock_info API_CALL mk_tcp_session_get_sock_info(const mk_tcp_session ctx){ + assert(ctx); + TcpSessionForC *session = (TcpSessionForC *)ctx; + return (SockInfo *)session; +} + API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg){ assert(ctx); - TcpSession *session = (TcpSession *)ctx; + TcpSessionForC *session = (TcpSessionForC *)ctx; session->safeShutdown(SockException((ErrCode)err,err_msg)); } -API_EXPORT const char* API_CALL mk_tcp_session_peer_ip(const mk_tcp_session ctx){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - return session->get_peer_ip().c_str(); -} -API_EXPORT const char* API_CALL mk_tcp_session_local_ip(const mk_tcp_session ctx){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - return session->get_local_ip().c_str(); -} -API_EXPORT uint16_t API_CALL mk_tcp_session_peer_port(const mk_tcp_session ctx){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - return session->get_peer_port(); -} -API_EXPORT uint16_t API_CALL mk_tcp_session_local_port(const mk_tcp_session ctx){ - assert(ctx); - TcpSession *session = (TcpSession *)ctx; - return session->get_local_port(); -} + API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx,const char *data,int len){ assert(ctx && data); if(!len){ len = strlen(data); } - TcpSession *session = (TcpSession *)ctx; - session->send(data,len); + TcpSessionForC *session = (TcpSessionForC *)ctx; + session->SockSender::send(data,len); } API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx,const char *data,int len){ @@ -55,12 +66,12 @@ API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx,const len = strlen(data); } try { - weak_ptr weak_session = ((TcpSession *)ctx)->shared_from_this(); + weak_ptr weak_session = ((TcpSessionForC *)ctx)->shared_from_this(); string str = string(data,len); - ((TcpSession *)ctx)->async([weak_session,str](){ + ((TcpSessionForC *)ctx)->async([weak_session,str](){ auto session_session = weak_session.lock(); if(session_session){ - session_session->send(str); + session_session->SockSender::send(str); } }); }catch (std::exception &ex){ @@ -205,6 +216,12 @@ TcpClientForC::Ptr *mk_tcp_client_create_l(mk_tcp_client_events *events, mk_tcp_ } } +API_EXPORT mk_sock_info API_CALL mk_tcp_client_get_sock_info(const mk_tcp_client ctx){ + assert(ctx); + TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx; + return (SockInfo *)client->get(); +} + API_EXPORT mk_tcp_client API_CALL mk_tcp_client_create(mk_tcp_client_events *events, mk_tcp_type type){ auto ret = mk_tcp_client_create_l(events,type); (*ret)->setClient(ret); @@ -213,25 +230,25 @@ API_EXPORT mk_tcp_client API_CALL mk_tcp_client_create(mk_tcp_client_events *eve API_EXPORT void API_CALL mk_tcp_client_release(mk_tcp_client ctx){ assert(ctx); - TcpClient::Ptr *client = (TcpClient::Ptr *)ctx; + TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx; delete client; } API_EXPORT void API_CALL mk_tcp_client_connect(mk_tcp_client ctx, const char *host, uint16_t port, float time_out_sec){ assert(ctx); - TcpClient::Ptr *client = (TcpClient::Ptr *)ctx; + TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx; (*client)->startConnect(host,port); } API_EXPORT void API_CALL mk_tcp_client_send(mk_tcp_client ctx, const char *data, int len){ assert(ctx && data); - TcpClient::Ptr *client = (TcpClient::Ptr *)ctx; - (*client)->send(data,len); + TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx; + (*client)->SockSender::send(data,len); } API_EXPORT void API_CALL mk_tcp_client_send_safe(mk_tcp_client ctx, const char *data, int len){ assert(ctx && data); - TcpClient::Ptr *client = (TcpClient::Ptr *)ctx; + TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx; weak_ptr weakClient = *client; Buffer::Ptr buf = (*client)->obtainBuffer(data,len); (*client)->async([weakClient,buf](){ diff --git a/api/source/mk_tcp_private.h b/api/source/mk_tcp_private.h index eba7401e..fd48fc3b 100644 --- a/api/source/mk_tcp_private.h +++ b/api/source/mk_tcp_private.h @@ -17,7 +17,7 @@ using namespace toolkit; class TcpClientForC : public TcpClient { - public: +public: typedef std::shared_ptr Ptr; TcpClientForC(mk_tcp_client_events *events) ; ~TcpClientForC() override ; @@ -27,13 +27,13 @@ class TcpClientForC : public TcpClient { void onConnect(const SockException &ex) override; void setClient(mk_tcp_client client); void *_user_data; - private: +private: mk_tcp_client_events _events; mk_tcp_client _client; }; class TcpSessionForC : public TcpSession { - public: +public: TcpSessionForC(const Socket::Ptr &pSock) ; ~TcpSessionForC() override = default; void onRecv(const Buffer::Ptr &buffer) override ; diff --git a/api/tests/server.c b/api/tests/server.c index b481f170..07eaf12a 100644 --- a/api/tests/server.c +++ b/api/tests/server.c @@ -46,14 +46,15 @@ void API_CALL on_mk_media_changed(int regist, */ void API_CALL on_mk_media_publish(const mk_media_info url_info, const mk_publish_auth_invoker invoker, - const mk_tcp_session sender) { + const mk_sock_info sender) { + char ip[64]; log_printf(LOG_LEV, "client info, local: %s:%d, peer: %s:%d\n" "%s/%s/%s/%s, url params: %s", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender,ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender,ip + 32), + mk_sock_info_peer_port(sender), mk_media_info_get_schema(url_info), mk_media_info_get_vhost(url_info), mk_media_info_get_app(url_info), @@ -73,15 +74,16 @@ void API_CALL on_mk_media_publish(const mk_media_info url_info, */ void API_CALL on_mk_media_play(const mk_media_info url_info, const mk_auth_invoker invoker, - const mk_tcp_session sender) { + const mk_sock_info sender) { + char ip[64]; log_printf(LOG_LEV, "client info, local: %s:%d, peer: %s:%d\n" "%s/%s/%s/%s, url params: %s", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender,ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender,ip + 32), + mk_sock_info_peer_port(sender), mk_media_info_get_schema(url_info), mk_media_info_get_vhost(url_info), mk_media_info_get_app(url_info), @@ -98,14 +100,15 @@ void API_CALL on_mk_media_play(const mk_media_info url_info, * @param sender 播放客户端相关信息 */ void API_CALL on_mk_media_not_found(const mk_media_info url_info, - const mk_tcp_session sender) { + const mk_sock_info sender) { + char ip[64]; log_printf(LOG_LEV, "client info, local: %s:%d, peer: %s:%d\n" "%s/%s/%s/%s, url params: %s", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender,ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender,ip + 32), + mk_sock_info_peer_port(sender), mk_media_info_get_schema(url_info), mk_media_info_get_vhost(url_info), mk_media_info_get_app(url_info), @@ -137,17 +140,18 @@ void API_CALL on_mk_media_no_reader(const mk_media_source sender) { void API_CALL on_mk_http_request(const mk_parser parser, const mk_http_response_invoker invoker, int *consumed, - const mk_tcp_session sender) { + const mk_sock_info sender) { + char ip[64]; log_printf(LOG_LEV, "client info, local: %s:%d, peer: %s:%d\n" "%s %s?%s %s\n" "User-Agent: %s\n" "%s", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender,ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender,ip + 32), + mk_sock_info_peer_port(sender), mk_parser_get_method(parser), mk_parser_get_url(parser), mk_parser_get_url_params(parser), @@ -191,17 +195,18 @@ void API_CALL on_mk_http_access(const mk_parser parser, const char *path, int is_dir, const mk_http_access_path_invoker invoker, - mk_tcp_session sender) { + const mk_sock_info sender) { + char ip[64]; log_printf(LOG_LEV, "client info, local: %s:%d, peer: %s:%d, path: %s ,is_dir: %d\n" "%s %s?%s %s\n" "User-Agent: %s\n" "%s", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender, ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender, ip + 32), + mk_sock_info_peer_port(sender), path,(int)is_dir, mk_parser_get_method(parser), mk_parser_get_url(parser), @@ -223,16 +228,18 @@ void API_CALL on_mk_http_access(const mk_parser parser, */ void API_CALL on_mk_http_before_access(const mk_parser parser, char *path, - const mk_tcp_session sender) { + const mk_sock_info sender) { + + char ip[64]; log_printf(LOG_LEV, "client info, local: %s:%d, peer: %s:%d, path: %s\n" "%s %s?%s %s\n" "User-Agent: %s\n" "%s", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender,ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender,ip + 32), + mk_sock_info_peer_port(sender), path, mk_parser_get_method(parser), mk_parser_get_url(parser), @@ -251,14 +258,15 @@ void API_CALL on_mk_http_before_access(const mk_parser parser, */ void API_CALL on_mk_rtsp_get_realm(const mk_media_info url_info, const mk_rtsp_get_realm_invoker invoker, - const mk_tcp_session sender) { + const mk_sock_info sender) { + char ip[64]; log_printf(LOG_LEV, "client info, local: %s:%d, peer: %s:%d\n" "%s/%s/%s/%s, url params: %s", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender,ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender,ip + 32), + mk_sock_info_peer_port(sender), mk_media_info_get_schema(url_info), mk_media_info_get_vhost(url_info), mk_media_info_get_app(url_info), @@ -284,16 +292,17 @@ void API_CALL on_mk_rtsp_auth(const mk_media_info url_info, const char *user_name, int must_no_encrypt, const mk_rtsp_auth_invoker invoker, - const mk_tcp_session sender) { + const mk_sock_info sender) { + char ip[64]; log_printf(LOG_LEV, "client info, local: %s:%d, peer: %s:%d\n" "%s/%s/%s/%s, url params: %s\n" "realm: %s, user_name: %s, must_no_encrypt: %d", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender,ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender,ip + 32), + mk_sock_info_peer_port(sender), mk_media_info_get_schema(url_info), mk_media_info_get_vhost(url_info), mk_media_info_get_app(url_info), @@ -338,13 +347,15 @@ void API_CALL on_mk_record_mp4(const mk_mp4_info mp4) { void API_CALL on_mk_shell_login(const char *user_name, const char *passwd, const mk_auth_invoker invoker, - const mk_tcp_session sender) { + const mk_sock_info sender) { + + char ip[64]; log_printf(LOG_LEV,"client info, local: %s:%d, peer: %s:%d\n" "user_name: %s, passwd: %s", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_sock_info_local_ip(sender,ip), + mk_sock_info_local_port(sender), + mk_sock_info_peer_ip(sender,ip + 32), + mk_sock_info_peer_port(sender), user_name, passwd); //允许登录shell mk_auth_invoker_do(invoker, NULL); @@ -363,8 +374,8 @@ void API_CALL on_mk_flow_report(const mk_media_info url_info, uint64_t total_bytes, uint64_t total_seconds, int is_player, - const char *peer_ip, - uint16_t peer_port) { + const mk_sock_info sender) { + char ip[64]; log_printf(LOG_LEV,"%s/%s/%s/%s, url params: %s," "total_bytes: %d, total_seconds: %d, is_player: %d, peer_ip:%s, peer_port:%d", mk_media_info_get_schema(url_info), @@ -372,7 +383,11 @@ void API_CALL on_mk_flow_report(const mk_media_info url_info, mk_media_info_get_app(url_info), mk_media_info_get_stream(url_info), mk_media_info_get_params(url_info), - (int)total_bytes, (int)total_seconds, (int)is_player,peer_ip, (int)peer_port); + (int)total_bytes, + (int)total_seconds, + (int)is_player, + mk_sock_info_peer_ip(sender,ip), + (int)mk_sock_info_peer_port(sender)); } static int flag = 1; @@ -387,6 +402,8 @@ int main(int argc, char *argv[]) { .ini = ini_path, .ini_is_path = 1, .log_level = 0, + .log_file_path = NULL, + .log_file_days = 0, .ssl = ssl_path, .ssl_is_path = 1, .ssl_pwd = NULL, diff --git a/api/tests/websocket.c b/api/tests/websocket.c index 86e5c286..a2427275 100644 --- a/api/tests/websocket.c +++ b/api/tests/websocket.c @@ -39,7 +39,8 @@ typedef struct { * @param session 会话处理对象 */ void API_CALL on_mk_tcp_session_create(uint16_t server_port,mk_tcp_session session){ - log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session)); + char ip[64]; + log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session,ip),(int)mk_tcp_session_peer_port(session)); tcp_session_user_data *user_data = malloc(sizeof(tcp_session_user_data)); user_data->_session = session; mk_tcp_session_set_user_data(session,user_data); @@ -52,7 +53,8 @@ void API_CALL on_mk_tcp_session_create(uint16_t server_port,mk_tcp_session sessi * @param len 数据长度 */ void API_CALL on_mk_tcp_session_data(uint16_t server_port,mk_tcp_session session,const char *data,int len){ - log_printf(LOG_LEV,"from %s %d, data[%d]: %s",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session), len,data); + char ip[64]; + log_printf(LOG_LEV,"from %s %d, data[%d]: %s",mk_tcp_session_peer_ip(session,ip),(int)mk_tcp_session_peer_port(session), len,data); mk_tcp_session_send(session,"echo:",0); mk_tcp_session_send(session,data,len); } @@ -62,7 +64,8 @@ void API_CALL on_mk_tcp_session_data(uint16_t server_port,mk_tcp_session session * @param session 会话处理对象 */ void API_CALL on_mk_tcp_session_manager(uint16_t server_port,mk_tcp_session session){ - log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session)); + char ip[64]; + log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session,ip),(int)mk_tcp_session_peer_port(session)); } /** @@ -73,7 +76,8 @@ void API_CALL on_mk_tcp_session_manager(uint16_t server_port,mk_tcp_session sess * @param msg 错误提示 */ void API_CALL on_mk_tcp_session_disconnect(uint16_t server_port,mk_tcp_session session,int code,const char *msg){ - log_printf(LOG_LEV,"%s %d: %d %s",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session),code,msg); + char ip[64]; + log_printf(LOG_LEV,"%s %d: %d %s",mk_tcp_session_peer_ip(session,ip),(int)mk_tcp_session_peer_port(session),code,msg); tcp_session_user_data *user_data = (tcp_session_user_data *)mk_tcp_session_get_user_data(session); free(user_data); } diff --git a/server/Process.cpp b/server/Process.cpp index 1eeb8081..79870f14 100644 --- a/server/Process.cpp +++ b/server/Process.cpp @@ -75,7 +75,7 @@ void Process::run(const string &cmd, const string &log_file_tmp) { int log_fd = -1; int flags = O_CREAT | O_WRONLY | O_APPEND; mode_t mode = S_IRWXO | S_IRWXG | S_IRWXU;// S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; - File::createfile_path(log_file.data(), mode); + File::create_path(log_file.data(), mode); if ((log_fd = ::open(log_file.c_str(), flags, mode)) < 0) { fprintf(stderr, "open log file %s failed:%d(%s)\r\n", log_file.data(), errno, strerror(errno)); } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index f932dd66..47bb2842 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include "jsoncpp/json.h" #include "Util/util.h" #include "Util/logger.h" @@ -86,7 +87,7 @@ public: ~SuccessException() = default; }; -#define API_ARGS1 TcpSession &sender,HttpSession::KeyValue &headerIn, HttpSession::KeyValue &headerOut, ApiArgsType &allArgs, Json::Value &val +#define API_ARGS1 SockInfo &sender,HttpSession::KeyValue &headerIn, HttpSession::KeyValue &headerOut, ApiArgsType &allArgs, Json::Value &val #define API_ARGS2 API_ARGS1, const HttpSession::HttpResponseInvoker &invoker #define API_ARGS_VALUE1 sender,headerIn,headerOut,allArgs,val #define API_ARGS_VALUE2 API_ARGS_VALUE1, invoker @@ -154,7 +155,7 @@ static inline void addHttpListener(){ val["code"] = API::Success; HttpSession::KeyValue headerOut; auto allArgs = getAllArgs(parser); - HttpSession::KeyValue &headerIn = parser.getValues(); + HttpSession::KeyValue &headerIn = parser.getHeader(); GET_CONFIG(string,charSet,Http::kCharSet); headerOut["Content-Type"] = StrPrinter << "application/json; charset=" << charSet; if(api_debug){ @@ -372,6 +373,44 @@ void installWebApi() { #endif//#if !defined(_WIN32) + static auto makeMediaSourceJson = [](const MediaSource::Ptr &media){ + Value item; + item["schema"] = media->getSchema(); + item["vhost"] = media->getVhost(); + item["app"] = media->getApp(); + item["stream"] = media->getId(); + item["readerCount"] = media->readerCount(); + item["totalReaderCount"] = media->totalReaderCount(); + for(auto &track : media->getTracks()){ + Value obj; + auto codec_type = track->getTrackType(); + obj["codec_id"] = track->getCodecId(); + obj["codec_id_name"] = track->getCodecName(); + obj["ready"] = track->ready(); + obj["codec_type"] = codec_type; + switch(codec_type){ + case TrackAudio : { + auto audio_track = dynamic_pointer_cast(track); + obj["sample_rate"] = audio_track->getAudioSampleRate(); + obj["channels"] = audio_track->getAudioChannel(); + obj["sample_bit"] = audio_track->getAudioSampleBit(); + break; + } + case TrackVideo : { + auto video_track = dynamic_pointer_cast(track); + obj["width"] = video_track->getVideoWidth(); + obj["height"] = video_track->getVideoHeight(); + obj["fps"] = round(video_track->getVideoFps()); + break; + } + default: + break; + } + item["tracks"].append(obj); + } + return item; + }; + //获取流列表,可选筛选参数 //测试url0(获取所有流) http://127.0.0.1/index/api/getMediaList //测试url1(获取虚拟主机为"__defaultVost__"的流) http://127.0.0.1/index/api/getMediaList?vhost=__defaultVost__ @@ -389,21 +428,7 @@ void installWebApi() { if(!allArgs["app"].empty() && allArgs["app"] != media->getApp()){ return; } - Value item; - item["schema"] = media->getSchema(); - item["vhost"] = media->getVhost(); - item["app"] = media->getApp(); - item["stream"] = media->getId(); - item["readerCount"] = media->readerCount(); - item["totalReaderCount"] = media->totalReaderCount(); - for(auto &track : media->getTracks()){ - Value obj; - obj["codec_id"] = track->getCodecId(); - obj["codec_type"] = track->getTrackType(); - obj["ready"] = track->ready(); - item["tracks"].append(obj); - } - val["data"].append(item); + val["data"].append(makeMediaSourceJson(media)); }); }); @@ -423,16 +448,9 @@ void installWebApi() { val["online"] = false; return; } + val = makeMediaSourceJson(src); val["online"] = true; - val["readerCount"] = src->readerCount(); - val["totalReaderCount"] = src->totalReaderCount(); - for(auto &track : src->getTracks()){ - Value obj; - obj["codec_id"] = track->getCodecId(); - obj["codec_type"] = track->getTrackType(); - obj["ready"] = track->ready(); - val["tracks"].append(obj); - } + val["code"] = API::Success; }); //主动关断流,包括关断拉流、推流 diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 113f2b41..471db2ba 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -253,16 +253,16 @@ void installWebHook(){ }); NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){ - if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || peerIP == "127.0.0.1"){ + if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1"){ return; } auto body = make_json(args); body["totalBytes"] = (Json::UInt64)totalBytes; body["duration"] = (Json::UInt64)totalDuration; body["player"] = isPlayer; - body["ip"] = peerIP; - body["port"] = peerPort; - body["id"] = sessionIdentifier; + body["ip"] = sender.get_peer_ip(); + body["port"] = sender.get_peer_port(); + body["id"] = sender.getIdentifier(); //执行hook do_http_hook(hook_flowreport,body, nullptr); }); @@ -444,7 +444,7 @@ void installWebHook(){ body["path"] = path; body["is_dir"] = is_dir; body["params"] = parser.Params(); - for(auto &pr : parser.getValues()){ + for(auto &pr : parser.getHeader()){ body[string("header.") + pr.first] = pr.second; } //执行hook diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp index 53697a56..acb5e7da 100644 --- a/src/Common/MediaSink.cpp +++ b/src/Common/MediaSink.cpp @@ -25,36 +25,30 @@ void MediaSink::addTrack(const Track::Ptr &track_in) { auto track = track_in->clone(); auto codec_id = track->getCodecId(); _track_map[codec_id] = track; - _allTrackReady = false; - _trackReadyCallback[codec_id] = [this, track]() { + _all_track_ready = false; + _track_ready_callback[codec_id] = [this, track]() { onTrackReady(track); }; _ticker.resetTime(); track->addDelegate(std::make_shared([this](const Frame::Ptr &frame) { - if (_allTrackReady) { + if (_all_track_ready) { onTrackFrame(frame); - return; - } - - //还有track未准备好,如果是视频的话,如果直接丢帧可能导致丢失I帧 - checkTrackIfReady(nullptr); - if (_allTrackReady) { - //运行至这里说明Track状态由未就绪切换为已就绪状态,那么这帧就不应该丢弃 - onTrackFrame(frame); - } else if(frame->keyFrame()){ - WarnL << "some track is unready,drop key frame of: " << frame->getCodecName(); + } else { + //还有Track未就绪,先缓存之 + _frame_unread[frame->getCodecId()].emplace_back(Frame::getCacheAbleFrame(frame)); } })); } void MediaSink::resetTracks() { lock_guard lck(_mtx); - _allTrackReady = false; + _all_track_ready = false; _track_map.clear(); - _trackReadyCallback.clear(); + _track_ready_callback.clear(); _ticker.resetTime(); _max_track_size = 2; + _frame_unread.clear(); } void MediaSink::inputFrame(const Frame::Ptr &frame) { @@ -63,21 +57,21 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) { if (it == _track_map.end()) { return; } - checkTrackIfReady(it->second); it->second->inputFrame(frame); + checkTrackIfReady(nullptr); } void MediaSink::checkTrackIfReady_l(const Track::Ptr &track){ //Track由未就绪状态转换成就绪状态,我们就触发onTrackReady回调 - auto it_callback = _trackReadyCallback.find(track->getCodecId()); - if (it_callback != _trackReadyCallback.end() && track->ready()) { + auto it_callback = _track_ready_callback.find(track->getCodecId()); + if (it_callback != _track_ready_callback.end() && track->ready()) { it_callback->second(); - _trackReadyCallback.erase(it_callback); + _track_ready_callback.erase(it_callback); } } void MediaSink::checkTrackIfReady(const Track::Ptr &track){ - if (!_allTrackReady && !_trackReadyCallback.empty()) { + if (!_all_track_ready && !_track_ready_callback.empty()) { if (track) { checkTrackIfReady_l(track); } else { @@ -87,14 +81,14 @@ void MediaSink::checkTrackIfReady(const Track::Ptr &track){ } } - if(!_allTrackReady){ + if(!_all_track_ready){ if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){ //如果超过规定时间,那么不再等待并忽略未准备好的Track emitAllTrackReady(); return; } - if(!_trackReadyCallback.empty()){ + if(!_track_ready_callback.empty()){ //在超时时间内,如果存在未准备好的Track,那么继续等待 return; } @@ -114,22 +108,20 @@ void MediaSink::checkTrackIfReady(const Track::Ptr &track){ } void MediaSink::addTrackCompleted(){ - { - lock_guard lck(_mtx); - _max_track_size = _track_map.size(); - } + lock_guard lck(_mtx); + _max_track_size = _track_map.size(); checkTrackIfReady(nullptr); } void MediaSink::emitAllTrackReady() { - if (_allTrackReady) { + if (_all_track_ready) { return; } DebugL << "all track ready use " << _ticker.elapsedTime() << "ms"; - if (!_trackReadyCallback.empty()) { + if (!_track_ready_callback.empty()) { //这是超时强制忽略未准备好的Track - _trackReadyCallback.clear(); + _track_ready_callback.clear(); //移除未准备好的Track for (auto it = _track_map.begin(); it != _track_map.end();) { if (!it->second->ready()) { @@ -143,8 +135,20 @@ void MediaSink::emitAllTrackReady() { if (!_track_map.empty()) { //最少有一个有效的Track - _allTrackReady = true; + _all_track_ready = true; onAllTrackReady(); + + //全部Track就绪,我们一次性把之前的帧输出 + for(auto &pr : _frame_unread){ + if (_track_map.find(pr.first) == _track_map.end()) { + //该Track已经被移除 + continue; + } + pr.second.for_each([&](const Frame::Ptr &frame) { + onTrackFrame(frame); + }); + } + _frame_unread.clear(); } } diff --git a/src/Common/MediaSink.h b/src/Common/MediaSink.h index 7874092e..ea7e3408 100644 --- a/src/Common/MediaSink.h +++ b/src/Common/MediaSink.h @@ -114,8 +114,9 @@ private: private: mutable recursive_mutex _mtx; unordered_map _track_map; - unordered_map > _trackReadyCallback; - bool _allTrackReady = false; + unordered_map > _frame_unread; + unordered_map > _track_ready_callback; + bool _all_track_ready = false; Ticker _ticker; int _max_track_size = 2; }; diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 1947900a..3c41c0ec 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -8,6 +8,7 @@ * may be found in the AUTHORS file in the root of the source tree. */ +#include #include "MediaSource.h" #include "Record/MP4Reader.h" #include "Util/util.h" @@ -190,7 +191,7 @@ void findAsync_l(const MediaInfo &info, const std::shared_ptr &sessi void *listener_tag = session.get(); weak_ptr weakSession = session; //广播未找到流,此时可以立即去拉流,这样还来得及 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,info,*session); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,info, static_cast(*session)); //最多等待一定时间,如果这个时间内,流未注册上,那么返回未找到流 GET_CONFIG(int,maxWaitMS,General::kMaxStreamWaitTimeMS); @@ -293,7 +294,34 @@ void MediaSource::regist() { lock_guard lock(g_mtxMediaSrc); g_mapMediaSrc[_strSchema][_strVhost][_strApp][_strId] = shared_from_this(); } - InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId; + _StrPrinter codec_info; + auto tracks = getTracks(true); + for(auto &track : tracks) { + auto codec_type = track->getTrackType(); + codec_info << track->getCodecName(); + switch (codec_type) { + case TrackAudio : { + auto audio_track = dynamic_pointer_cast(track); + codec_info << "[" + << audio_track->getAudioSampleRate() << "/" + << audio_track->getAudioChannel() << "/" + << audio_track->getAudioSampleBit() << "] "; + break; + } + case TrackVideo : { + auto video_track = dynamic_pointer_cast(track); + codec_info << "[" + << video_track->getVideoWidth() << "/" + << video_track->getVideoHeight() << "/" + << round(video_track->getVideoFps()) << "] "; + break; + } + default: + break; + } + } + + InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId << " " << codec_info; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, true, *this); } @@ -319,7 +347,7 @@ bool MediaSource::unregist() { } if(ret){ - InfoL << "" << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId; + InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, false, *this); } return ret; @@ -486,14 +514,22 @@ static bool isFlushAble_merge(bool is_audio, uint32_t last_stamp, uint32_t new_s return cache_size > 20; } -bool FlushPolicy::isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size) { - GET_CONFIG(bool,ultraLowDelay, General::kUltraLowDelay); - GET_CONFIG(int,mergeWriteMS, General::kMergeWriteMS); - if(ultraLowDelay || mergeWriteMS <= 0){ +bool FlushPolicy::isFlushAble(uint32_t new_stamp, int cache_size) { + bool ret = false; + GET_CONFIG(bool, ultraLowDelay, General::kUltraLowDelay); + GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); + if (ultraLowDelay || mergeWriteMS <= 0) { //关闭了合并写或者合并写阈值小于等于0 - return isFlushAble_default(_is_audio, last_stamp, new_stamp, cache_size); + ret = isFlushAble_default(_is_audio, _last_stamp, new_stamp, cache_size); + } else { + ret = isFlushAble_merge(_is_audio, _last_stamp, new_stamp, cache_size, mergeWriteMS); } - return isFlushAble_merge(_is_audio, last_stamp, new_stamp, cache_size,mergeWriteMS); + + if (ret) { +// DebugL << _is_audio << " " << _last_stamp << " " << new_stamp; + _last_stamp = new_stamp; + } + return ret; } } /* namespace mediakit */ \ No newline at end of file diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index c743d574..ab0a0cfa 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -173,9 +173,10 @@ public: return packet->timeStamp; } - bool isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size); + bool isFlushAble(uint32_t new_stamp, int cache_size); private: bool _is_audio; + uint32_t _last_stamp= 0; }; /// 视频合并写缓存模板 @@ -185,21 +186,19 @@ private: template > > class VideoPacketCache { public: - VideoPacketCache() : _policy(true) { + VideoPacketCache() : _policy(false) { _cache = std::make_shared(); } virtual ~VideoPacketCache() = default; void inputVideo(const std::shared_ptr &rtp, bool key_pos) { - auto new_stamp = _policy.getStamp(rtp); - if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) { + if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) { flushAll(); } //追加数据到最后 _cache->emplace_back(rtp); - _last_stamp = new_stamp; if (key_pos) { _key_pos = key_pos; } @@ -220,7 +219,6 @@ private: private: policy _policy; std::shared_ptr _cache; - uint32_t _last_stamp = 0; bool _key_pos = false; }; @@ -231,20 +229,18 @@ private: template > > class AudioPacketCache { public: - AudioPacketCache() : _policy(false) { + AudioPacketCache() : _policy(true) { _cache = std::make_shared(); } virtual ~AudioPacketCache() = default; void inputAudio(const std::shared_ptr &rtp) { - auto new_stamp = _policy.getStamp(rtp); - if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) { + if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) { flushAll(); } //追加数据到最后 _cache->emplace_back(rtp); - _last_stamp = new_stamp; } virtual void onFlushAudio(std::shared_ptr &) = 0; @@ -261,7 +257,6 @@ private: private: policy _policy; std::shared_ptr _cache; - uint32_t _last_stamp = 0; }; } /* namespace mediakit */ diff --git a/src/Common/Parser.cpp b/src/Common/Parser.cpp index 158201cd..1c866a64 100644 --- a/src/Common/Parser.cpp +++ b/src/Common/Parser.cpp @@ -35,4 +35,113 @@ string FindField(const char* buf, const char* start, const char *end ,int bufSiz return string(msg_start, msg_end); } +Parser::Parser() {} +Parser::~Parser() {} + +void Parser::Parse(const char *buf) { + //解析 + const char *start = buf; + Clear(); + while (true) { + auto line = FindField(start, NULL, "\r\n"); + if (line.size() == 0) { + break; + } + if (start == buf) { + _strMethod = FindField(line.data(), NULL, " "); + _strFullUrl = FindField(line.data(), " ", " "); + auto args_pos = _strFullUrl.find('?'); + if (args_pos != string::npos) { + _strUrl = _strFullUrl.substr(0, args_pos); + _params = _strFullUrl.substr(args_pos + 1); + _mapUrlArgs = parseArgs(_params); + } else { + _strUrl = _strFullUrl; + } + _strTail = FindField(line.data(), (_strFullUrl + " ").data(), NULL); + } else { + auto field = FindField(line.data(), NULL, ": "); + auto value = FindField(line.data(), ": ", NULL); + if (field.size() != 0) { + _mapHeaders.emplace_force(field, value); + } + } + start = start + line.size() + 2; + if (strncmp(start, "\r\n", 2) == 0) { //协议解析完毕 + _strContent = FindField(start, "\r\n", NULL); + break; + } + } +} + +const string &Parser::Method() const { + return _strMethod; +} + +const string &Parser::Url() const { + return _strUrl; +} + +const string &Parser::FullUrl() const { + return _strFullUrl; +} + +const string &Parser::Tail() const { + return _strTail; +} + +const string &Parser::operator[](const char *name) const { + auto it = _mapHeaders.find(name); + if (it == _mapHeaders.end()) { + return _strNull; + } + return it->second; +} + +const string &Parser::Content() const { + return _strContent; +} + +void Parser::Clear() { + _strMethod.clear(); + _strUrl.clear(); + _strFullUrl.clear(); + _params.clear(); + _strTail.clear(); + _strContent.clear(); + _mapHeaders.clear(); + _mapUrlArgs.clear(); +} + +const string &Parser::Params() const { + return _params; +} + +void Parser::setUrl(const string &url) { + this->_strUrl = url; +} + +void Parser::setContent(const string &content) { + this->_strContent = content; +} + +StrCaseMap &Parser::getHeader() const { + return _mapHeaders; +} + +StrCaseMap &Parser::getUrlArgs() const { + return _mapUrlArgs; +} + +StrCaseMap Parser::parseArgs(const string &str, const char *pair_delim, const char *key_delim) { + StrCaseMap ret; + auto arg_vec = split(str, pair_delim); + for (string &key_val : arg_vec) { + auto key = FindField(key_val.data(), NULL, key_delim); + auto val = FindField(key_val.data(), key_delim, NULL); + ret.emplace_force(trim(key), trim(val)); + } + return ret; +} + }//namespace mediakit \ No newline at end of file diff --git a/src/Common/Parser.h b/src/Common/Parser.h index 3630f8cd..00d2a4cb 100644 --- a/src/Common/Parser.h +++ b/src/Common/Parser.h @@ -57,122 +57,39 @@ class StrCaseMap : public multimap{ } }; +//rtsp/http/sip解析类 class Parser { - public: - Parser() {} - - virtual ~Parser() {} - - void Parse(const char *buf) { - //解析 - const char *start = buf; - Clear(); - while (true) { - auto line = FindField(start, NULL, "\r\n"); - if (line.size() == 0) { - break; - } - if (start == buf) { - _strMethod = FindField(line.data(), NULL, " "); - _strFullUrl = FindField(line.data(), " ", " "); - auto args_pos = _strFullUrl.find('?'); - if (args_pos != string::npos) { - _strUrl = _strFullUrl.substr(0, args_pos); - _params = _strFullUrl.substr(args_pos + 1); - _mapUrlArgs = parseArgs(_params); - } else { - _strUrl = _strFullUrl; - } - _strTail = FindField(line.data(), (_strFullUrl + " ").data(), NULL); - } else { - auto field = FindField(line.data(), NULL, ": "); - auto value = FindField(line.data(), ": ", NULL); - if (field.size() != 0) { - _mapHeaders.emplace_force(field,value); - } - } - start = start + line.size() + 2; - if (strncmp(start, "\r\n", 2) == 0) { //协议解析完毕 - _strContent = FindField(start, "\r\n", NULL); - break; - } - } - } - - const string &Method() const { - //rtsp方法 - return _strMethod; - } - - const string &Url() const { - //rtsp url - return _strUrl; - } - - const string &FullUrl() const { - //rtsp url with args - return _strFullUrl; - } - - const string &Tail() const { - //RTSP/1.0 - return _strTail; - } - - const string &operator[](const char *name) const { - //rtsp field - auto it = _mapHeaders.find(name); - if (it == _mapHeaders.end()) { - return _strNull; - } - return it->second; - } - - const string &Content() const { - return _strContent; - } - - void Clear() { - _strMethod.clear(); - _strUrl.clear(); - _strFullUrl.clear(); - _params.clear(); - _strTail.clear(); - _strContent.clear(); - _mapHeaders.clear(); - _mapUrlArgs.clear(); - } - const string &Params() const { - return _params; - } - - void setUrl(const string &url) { - this->_strUrl = url; - } - - void setContent(const string &content) { - this->_strContent = content; - } - - StrCaseMap &getValues() const { - return _mapHeaders; - } - - StrCaseMap &getUrlArgs() const { - return _mapUrlArgs; - } - - static StrCaseMap parseArgs(const string &str, const char *pair_delim = "&", const char *key_delim = "=") { - StrCaseMap ret; - auto arg_vec = split(str, pair_delim); - for (string &key_val : arg_vec) { - auto key = FindField(key_val.data(), NULL, key_delim); - auto val = FindField(key_val.data(), key_delim, NULL); - ret.emplace_force(trim(key),trim(val)); - } - return ret; - } - +public: + Parser(); + ~Parser(); + //解析信令 + void Parse(const char *buf); + //获取命令字 + const string &Method() const; + //获取中间url,不包含?后面的参数 + const string &Url() const; + //获取中间url,包含?后面的参数 + const string &FullUrl() const; + //获取命令协议名 + const string &Tail() const; + //根据header key名,获取请求header value值 + const string &operator[](const char *name) const; + //获取http body或sdp + const string &Content() const; + //清空,为了重用 + void Clear(); + //获取?后面的参数 + const string &Params() const; + //重新设置url + void setUrl(const string &url); + //重新设置content + void setContent(const string &content); + //获取header列表 + StrCaseMap &getHeader() const; + //获取url参数列表 + StrCaseMap &getUrlArgs() const; + //解析?后面的参数 + static StrCaseMap parseArgs(const string &str, const char *pair_delim = "&", const char *key_delim = "="); private: string _strMethod; string _strUrl; diff --git a/src/Common/config.h b/src/Common/config.h index 2c681e05..aa42acaa 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -62,25 +62,25 @@ extern const string kBroadcastRecordMP4; //收到http api请求广播 extern const string kBroadcastHttpRequest; -#define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed,TcpSession &sender +#define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed,SockInfo &sender //在http文件服务器中,收到http访问文件或目录的广播,通过该事件控制访问http目录的权限 extern const string kBroadcastHttpAccess; -#define BroadcastHttpAccessArgs const Parser &parser,const string &path,const bool &is_dir,const HttpSession::HttpAccessPathInvoker &invoker,TcpSession &sender +#define BroadcastHttpAccessArgs const Parser &parser,const string &path,const bool &is_dir,const HttpSession::HttpAccessPathInvoker &invoker,SockInfo &sender //在http文件服务器中,收到http访问文件或目录前的广播,通过该事件可以控制http url到文件路径的映射 //在该事件中通过自行覆盖path参数,可以做到譬如根据虚拟主机或者app选择不同http根目录的目的 extern const string kBroadcastHttpBeforeAccess; -#define BroadcastHttpBeforeAccessArgs const Parser &parser,string &path,TcpSession &sender +#define BroadcastHttpBeforeAccessArgs const Parser &parser,string &path,SockInfo &sender //该流是否需要认证?是的话调用invoker并传入realm,否则传入空的realm.如果该事件不监听则不认证 extern const string kBroadcastOnGetRtspRealm; -#define BroadcastOnGetRtspRealmArgs const MediaInfo &args,const RtspSession::onGetRealm &invoker,TcpSession &sender +#define BroadcastOnGetRtspRealmArgs const MediaInfo &args,const RtspSession::onGetRealm &invoker,SockInfo &sender //请求认证用户密码事件,user_name为用户名,must_no_encrypt如果为true,则必须提供明文密码(因为此时是base64认证方式),否则会导致认证失败 //获取到密码后请调用invoker并输入对应类型的密码和密码类型,invoker执行时会匹配密码 extern const string kBroadcastOnRtspAuth; -#define BroadcastOnRtspAuthArgs const MediaInfo &args,const string &realm,const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker,TcpSession &sender +#define BroadcastOnRtspAuthArgs const MediaInfo &args,const string &realm,const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker,SockInfo &sender //推流鉴权结果回调对象 //如果errMessage为空则代表鉴权成功 @@ -91,7 +91,7 @@ typedef std::function AuthInvoker; //播放rtsp/rtmp/http-flv事件广播,通过该事件控制播放鉴权 extern const string kBroadcastMediaPlayed; -#define BroadcastMediaPlayedArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,TcpSession &sender +#define BroadcastMediaPlayedArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,SockInfo &sender //shell登录鉴权 extern const string kBroadcastShellLogin; -#define BroadcastShellLoginArgs const string &user_name,const string &passwd,const Broadcast::AuthInvoker &invoker,TcpSession &sender +#define BroadcastShellLoginArgs const string &user_name,const string &passwd,const Broadcast::AuthInvoker &invoker,SockInfo &sender //停止rtsp/rtmp/http-flv会话后流量汇报事件广播 extern const string kBroadcastFlowReport; -#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer, const string &sessionIdentifier, const string &peerIP,const uint16_t &peerPort +#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer, SockInfo &sender //未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了 extern const string kBroadcastNotFoundStream; -#define BroadcastNotFoundStreamArgs const MediaInfo &args,TcpSession &sender +#define BroadcastNotFoundStreamArgs const MediaInfo &args,SockInfo &sender //某个流无人消费时触发,目的为了实现无人观看时主动断开拉流等业务逻辑 extern const string kBroadcastStreamNoneReader; diff --git a/src/Extension/AAC.h b/src/Extension/AAC.h index 547da59c..faf69198 100644 --- a/src/Extension/AAC.h +++ b/src/Extension/AAC.h @@ -223,13 +223,13 @@ public: * @param frame 数据帧 */ void inputFrame(const Frame::Ptr &frame) override{ - if(_cfg.empty()){ + if (_cfg.empty()) { //未获取到aac_cfg信息 - if(frame->prefixSize() >= 7) { + if (frame->prefixSize() >= 7) { //7个字节的adts头 - _cfg = makeAdtsConfig(reinterpret_cast(frame->data())); + _cfg = makeAdtsConfig((uint8_t *)(frame->data())); onReady(); - }else{ + } else { WarnL << "无法获取adts头!"; } } diff --git a/src/Extension/AACRtmp.cpp b/src/Extension/AACRtmp.cpp index 6a22e45e..2b1d3bd2 100644 --- a/src/Extension/AACRtmp.cpp +++ b/src/Extension/AACRtmp.cpp @@ -93,7 +93,7 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) { if (_aac_cfg.empty()) { if (frame->prefixSize() >= 7) { //包含adts头,从adts头获取aac配置信息 - _aac_cfg = makeAdtsConfig(reinterpret_cast(frame->data())); + _aac_cfg = makeAdtsConfig((uint8_t *)(frame->data())); } makeConfigPacket(); } diff --git a/src/Extension/G711.h b/src/Extension/G711.h index d1644201..e4929e7b 100644 --- a/src/Extension/G711.h +++ b/src/Extension/G711.h @@ -64,6 +64,19 @@ class G711FrameNoCacheAble : public FrameNoCacheAble { public: typedef std::shared_ptr Ptr; + //兼容通用接口 + G711FrameNoCacheAble(char *ptr,uint32_t size,uint32_t dts, uint32_t pts = 0,int prefixeSize = 0){ + _ptr = ptr; + _size = size; + _dts = dts; + _prefixSize = prefixeSize; + } + + //兼容通用接口 + void setCodec(CodecId codecId){ + _codecId = codecId; + } + G711FrameNoCacheAble(CodecId codecId, char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 0){ _codecId = codecId; _ptr = ptr; diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index 6238afde..c2c22745 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -113,7 +113,7 @@ void HttpClient::onConnect(const SockException &ex) { printer << pr.first + ": "; printer << pr.second + "\r\n"; } - send(printer << "\r\n"); + SockSender::send(printer << "\r\n"); onFlush(); } @@ -147,8 +147,8 @@ int64_t HttpClient::onRecvHeader(const char *data, uint64_t len) { } } - checkCookie(_parser.getValues()); - _totalBodySize = onResponseHeader(_parser.Url(), _parser.getValues()); + checkCookie(_parser.getHeader()); + _totalBodySize = onResponseHeader(_parser.Url(), _parser.getHeader()); if(!_parser["Content-Length"].empty()){ //有Content-Length字段时忽略onResponseHeader的返回值 diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index e1f80b47..f78934d2 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -94,7 +94,7 @@ public: return _parser.Url(); } const HttpHeader &responseHeader() const{ - return _parser.getValues(); + return _parser.getHeader(); } const Parser& response() const{ return _parser; diff --git a/src/Http/HttpDownloader.cpp b/src/Http/HttpDownloader.cpp index f5148db4..a634c114 100644 --- a/src/Http/HttpDownloader.cpp +++ b/src/Http/HttpDownloader.cpp @@ -28,7 +28,7 @@ void HttpDownloader::startDownload(const string& url, const string& filePath,boo if(_filePath.empty()){ _filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest(); } - _saveFile = File::createfile_file(_filePath.data(),bAppend ? "ab" : "wb"); + _saveFile = File::create_file(_filePath.data(), bAppend ? "ab" : "wb"); if(!_saveFile){ auto strErr = StrPrinter << "打开文件失败:" << filePath << endl; throw std::runtime_error(strErr); diff --git a/src/Http/HttpFileManager.cpp b/src/Http/HttpFileManager.cpp index 90a37992..584b6f25 100644 --- a/src/Http/HttpFileManager.cpp +++ b/src/Http/HttpFileManager.cpp @@ -306,9 +306,41 @@ static bool emitHlsPlayed(const Parser &parser, const MediaInfo &mediaInfo, cons //cookie有效期为kHlsCookieSecond invoker(err,"",kHlsCookieSecond); }; - return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,mediaInfo,mediaAuthInvoker,sender); + return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,mediaInfo,mediaAuthInvoker,static_cast(sender)); } +class SockInfoImp : public SockInfo{ +public: + typedef std::shared_ptr Ptr; + SockInfoImp() = default; + ~SockInfoImp() override = default; + + string get_local_ip() override{ + return _local_ip; + } + + uint16_t get_local_port() override{ + return _local_port; + } + + string get_peer_ip() override{ + return _peer_ip; + } + + uint16_t get_peer_port() override{ + return _peer_port; + } + + string getIdentifier() const override{ + return _identifier; + } + + string _local_ip; + string _peer_ip; + string _identifier; + uint16_t _local_port; + uint16_t _peer_port; +}; /** * 判断http客户端是否有权限访问文件的逻辑步骤 @@ -325,7 +357,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI auto path = parser.Url(); //先根据http头中的cookie字段获取cookie - HttpServerCookie::Ptr cookie = HttpCookieManager::Instance().getCookie(kCookieName, parser.getValues()); + HttpServerCookie::Ptr cookie = HttpCookieManager::Instance().getCookie(kCookieName, parser.getHeader()); //如果不是从http头中找到的cookie,我们让http客户端设置下cookie bool cookie_from_header = true; if (!cookie && !uid.empty()) { @@ -362,12 +394,16 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI } bool is_hls = mediaInfo._schema == HLS_SCHEMA; - string identifier = sender.getIdentifier(); - string peer_ip = sender.get_peer_ip(); - uint16_t peer_port = sender.get_peer_port(); + + SockInfoImp::Ptr info = std::make_shared(); + info->_identifier = sender.getIdentifier(); + info->_peer_ip = sender.get_peer_ip(); + info->_peer_port = sender.get_peer_port(); + info->_local_ip = sender.get_local_ip(); + info->_local_port = sender.get_local_port(); //该用户从来未获取过cookie,这个时候我们广播是否允许该用户访问该http目录 - HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, mediaInfo, identifier, peer_ip, peer_port] + HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, mediaInfo, info] (const string &errMsg, const string &cookie_path_in, int cookieLifeSecond) { HttpServerCookie::Ptr cookie; if (cookieLifeSecond) { @@ -390,7 +426,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI attachment._is_hls = is_hls; if(is_hls){ //hls相关信息 - attachment._hls_data = std::make_shared(mediaInfo, identifier, peer_ip, peer_port); + attachment._hls_data = std::make_shared(mediaInfo, info); //hls未查找MediaSource attachment._have_find_media_source = false; } @@ -407,7 +443,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI } //事件未被拦截,则认为是http下载请求 - bool flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpAccess, parser, path, is_dir, accessPathInvoker, sender); + bool flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpAccess, parser, path, is_dir, accessPathInvoker, static_cast(sender)); if (!flag) { //此事件无人监听,我们默认都有权限访问 callback("", nullptr); @@ -488,7 +524,7 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo } cb(codeOut.data(), HttpFileManager::getContentType(strFile.data()), headerOut, body); }; - invoker.responseFile(parser.getValues(), httpHeader, strFile); + invoker.responseFile(parser.getHeader(), httpHeader, strFile); }; if (!is_hls) { @@ -521,7 +557,7 @@ static string getFilePath(const Parser &parser,const MediaInfo &mediaInfo, TcpSe GET_CONFIG(bool, enableVhost, General::kEnableVhost); GET_CONFIG(string, rootPath, Http::kRootPath); auto ret = File::absolutePath(enableVhost ? mediaInfo._vhost + parser.Url() : parser.Url(), rootPath); - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpBeforeAccess, parser, ret, sender); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpBeforeAccess, parser, ret, static_cast(sender)); return std::move(ret); } diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 92cb886d..26056ed0 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -59,7 +59,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { string cmd = _parser.Method(); auto it = s_func_map.find(cmd); if (it == s_func_map.end()) { - WarnL << "不支持该命令:" << cmd; + WarnP(this) << "不支持该命令:" << cmd; sendResponse("405 Not Allowed", true); return 0; } @@ -108,7 +108,7 @@ void HttpSession::onError(const SockException& err) { GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, getIdentifier(), get_peer_ip(), get_peer_port()); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, static_cast(*this)); } return; } @@ -241,7 +241,7 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ onRes(err); }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this); + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast(*this)); if(!flag){ //该事件无人监听,默认不鉴权 onRes(""); @@ -456,7 +456,7 @@ void HttpSession::sendResponse(const char *pcStatus, str += "\r\n"; } str += "\r\n"; - send(std::move(str)); + SockSender::send(std::move(str)); _ticker.resetTime(); if(!size){ @@ -520,7 +520,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke){ }; ///////////////////广播HTTP事件/////////////////////////// bool consumed = false;//该事件是否被消费 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,static_cast(*this)); if(!consumed && doInvoke){ //该事件无人消费,所以返回404 invoker("404 Not Found",KeyValue(), HttpBody::Ptr()); @@ -611,7 +611,7 @@ void HttpSession::setSocketFlags(){ //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 SockUtil::setNoDelay(_sock->rawFD(), false); //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 - (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } } diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 540a7e3c..09bcfca5 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -41,11 +41,11 @@ public: typedef std::function HttpAccessPathInvoker; HttpSession(const Socket::Ptr &pSock); - virtual ~HttpSession(); + ~HttpSession() override; - virtual void onRecv(const Buffer::Ptr &) override; - virtual void onError(const SockException &err) override; - virtual void onManager() override; + void onRecv(const Buffer::Ptr &) override; + void onError(const SockException &err) override; + void onManager() override; static string urlDecode(const string &str); protected: //FlvMuxer override @@ -80,7 +80,7 @@ protected: * @return true代表允许websocket连接,否则拒绝 */ virtual bool onWebSocketConnect(const Parser &header){ - WarnL << "http server do not support websocket default"; + WarnP(this) << "http server do not support websocket default"; return false; } diff --git a/src/Http/WebSocketClient.h b/src/Http/WebSocketClient.h index a26fdc38..36e67516 100644 --- a/src/Http/WebSocketClient.h +++ b/src/Http/WebSocketClient.h @@ -73,7 +73,7 @@ public: HttpWsClient(ClientTypeImp &delegate) : _delegate(delegate){ _Sec_WebSocket_Key = encodeBase64(SHA1::encode_bin(makeRandStr(16, false))); - setPoller(delegate.getPoller()); + _poller = delegate.getPoller(); } ~HttpWsClient(){} diff --git a/src/Record/HlsMakerImp.cpp b/src/Record/HlsMakerImp.cpp index c0b64c19..28f88714 100644 --- a/src/Record/HlsMakerImp.cpp +++ b/src/Record/HlsMakerImp.cpp @@ -92,7 +92,7 @@ void HlsMakerImp::onWriteHls(const char *data, int len) { std::shared_ptr HlsMakerImp::makeFile(const string &file,bool setbuf) { auto file_buf = _file_buf; - auto ret= shared_ptr(File::createfile_file(file.data(), "wb"), [file_buf](FILE *fp) { + auto ret= shared_ptr(File::create_file(file.data(), "wb"), [file_buf](FILE *fp) { if (fp) { fclose(fp); } diff --git a/src/Record/HlsMediaSource.cpp b/src/Record/HlsMediaSource.cpp index 1f5afa4f..4133a206 100644 --- a/src/Record/HlsMediaSource.cpp +++ b/src/Record/HlsMediaSource.cpp @@ -12,11 +12,9 @@ namespace mediakit{ -HlsCookieData::HlsCookieData(const MediaInfo &info, const string &sessionIdentifier, const string &peer_ip, uint16_t peer_port) { +HlsCookieData::HlsCookieData(const MediaInfo &info, const std::shared_ptr &sock_info) { _info = info; - _sessionIdentifier = sessionIdentifier; - _peer_ip = peer_ip; - _peer_port = peer_port; + _sock_info = sock_info; _added = std::make_shared(false); addReaderCount(); } @@ -45,13 +43,13 @@ HlsCookieData::~HlsCookieData() { src->modifyReaderCount(false); } uint64_t duration = (_ticker.createdTime() - _ticker.elapsedTime()) / 1000; - WarnL << _sessionIdentifier << "(" << _peer_ip << ":" << _peer_port << ") " + WarnL << _sock_info->getIdentifier() << "(" << _sock_info->get_peer_ip() << ":" << _sock_info->get_peer_port() << ") " << "HLS播放器(" << _info._vhost << "/" << _info._app << "/" << _info._streamid << ")断开,耗时(s):" << duration; GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_bytes > iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, _bytes, duration, true, _sessionIdentifier, _peer_ip, _peer_port); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, _bytes, duration, true, static_cast(*_sock_info)); } } } diff --git a/src/Record/HlsMediaSource.h b/src/Record/HlsMediaSource.h index 513e90ac..d6bcab5e 100644 --- a/src/Record/HlsMediaSource.h +++ b/src/Record/HlsMediaSource.h @@ -77,7 +77,7 @@ private: class HlsCookieData{ public: typedef std::shared_ptr Ptr; - HlsCookieData(const MediaInfo &info, const string &sessionIdentifier, const string &peer_ip, uint16_t peer_port); + HlsCookieData(const MediaInfo &info, const std::shared_ptr &sock_info); ~HlsCookieData(); void addByteUsage(uint64_t bytes); private: @@ -85,12 +85,10 @@ private: private: uint64_t _bytes = 0; MediaInfo _info; - string _sessionIdentifier; - string _peer_ip; - uint16_t _peer_port; std::shared_ptr _added; weak_ptr _src; Ticker _ticker; + std::shared_ptr _sock_info; HlsMediaSource::RingType::RingReader::Ptr _ring_reader; }; diff --git a/src/Record/MP4.cpp b/src/Record/MP4.cpp index 878c62c0..571efa16 100644 --- a/src/Record/MP4.cpp +++ b/src/Record/MP4.cpp @@ -72,7 +72,7 @@ MP4File::Reader MP4File::createReader(){ void MP4File::openFile(const char *file,const char *mode) { //创建文件 - auto fp = File::createfile_file(file,mode); + auto fp = File::create_file(file, mode); if(!fp){ throw std::runtime_error(string("打开文件失败:") + file); } diff --git a/src/Record/MP4Demuxer.cpp b/src/Record/MP4Demuxer.cpp index 94394e63..f157f9a5 100644 --- a/src/Record/MP4Demuxer.cpp +++ b/src/Record/MP4Demuxer.cpp @@ -14,6 +14,7 @@ #include "Extension/H265.h" #include "Extension/H264.h" #include "Extension/AAC.h" +#include "Extension/G711.h" using namespace toolkit; namespace mediakit { @@ -120,6 +121,12 @@ void MP4Demuxer::onAudioTrack(uint32_t track_id, uint8_t object, int channel_cou _track_to_codec.emplace(track_id, audio); } break; + case MOV_OBJECT_G711a: + case MOV_OBJECT_G711u:{ + auto audio = std::make_shared(object == MOV_OBJECT_G711a ? CodecG711A : CodecG711U, sample_rate, channel_count, bit_per_sample / channel_count ); + _track_to_codec.emplace(track_id, audio); + } + break; default: WarnL << "不支持该编码类型的MP4,已忽略:" << getObjectName(object); break; @@ -223,8 +230,16 @@ Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int6 } return std::make_shared >(buf, pts, dts, 4); } + case CodecAAC : return std::make_shared >(buf, pts, dts, 0); + + case CodecG711A: + case CodecG711U: { + auto frame = std::make_shared >(buf, pts, dts, 0); + frame->setCodec(codec); + return frame; + } default: return nullptr; } diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index 2eb5436a..64cfbc4e 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -124,22 +124,47 @@ void MP4Muxer::inputFrame(const Frame::Ptr &frame) { void MP4Muxer::addTrack(const Track::Ptr &track) { switch (track->getCodecId()) { + case CodecG711A: + case CodecG711U: { + auto audio_track = dynamic_pointer_cast(track); + if (!audio_track) { + WarnL << "不是G711 Track"; + return; + } + if (!audio_track->ready()) { + WarnL << "G711 Track未就绪"; + return; + } + auto track_id = mov_writer_add_audio(_mov_writter.get(), + track->getCodecId() == CodecG711A ? MOV_OBJECT_G711a : MOV_OBJECT_G711u, + audio_track->getAudioChannel(), + audio_track->getAudioSampleBit() * audio_track->getAudioChannel(), + audio_track->getAudioSampleRate(), + nullptr, 0); + if (track_id < 0) { + WarnL << "添加G711 Track失败:" << track_id; + return; + } + _codec_to_trackid[track->getCodecId()].track_id = track_id; + } + break; + case CodecAAC: { - auto aac_track = dynamic_pointer_cast(track); - if (!aac_track) { + auto audio_track = dynamic_pointer_cast(track); + if (!audio_track) { WarnL << "不是AAC Track"; return; } - if(!aac_track->ready()){ + if(!audio_track->ready()){ WarnL << "AAC Track未就绪"; return; } auto track_id = mov_writer_add_audio(_mov_writter.get(), MOV_OBJECT_AAC, - aac_track->getAudioChannel(), - aac_track->getAudioSampleBit() * aac_track->getAudioChannel(), - aac_track->getAudioSampleRate(), - aac_track->getAacCfg().data(), 2); + audio_track->getAudioChannel(), + audio_track->getAudioSampleBit() * audio_track->getAudioChannel(), + audio_track->getAudioSampleRate(), + audio_track->getAacCfg().data(), 2); if(track_id < 0){ WarnL << "添加AAC Track失败:" << track_id; return; diff --git a/src/Record/MP4Muxer.h b/src/Record/MP4Muxer.h index 7975bf2f..e5fd4807 100644 --- a/src/Record/MP4Muxer.h +++ b/src/Record/MP4Muxer.h @@ -15,6 +15,7 @@ #include "Common/MediaSink.h" #include "Extension/AAC.h" +#include "Extension/G711.h" #include "Extension/H264.h" #include "Extension/H265.h" #include "Common/Stamp.h" diff --git a/src/Record/TsMuxer.cpp b/src/Record/TsMuxer.cpp index 4da7eab9..79419c74 100644 --- a/src/Record/TsMuxer.cpp +++ b/src/Record/TsMuxer.cpp @@ -28,17 +28,30 @@ void TsMuxer::addTrack(const Track::Ptr &track) { case CodecH264: { _have_video = true; _codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_H264, nullptr, 0); - } break; + } + case CodecH265: { _have_video = true; _codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_H265, nullptr, 0); - } break; + } + case CodecAAC: { _codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_AAC, nullptr, 0); - } break; + } + + case CodecG711A: { + _codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_AUDIO_G711A, nullptr, 0); + break; + } + + case CodecG711U: { + _codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_AUDIO_G711U, nullptr, 0); + break; + } + default: break; } diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp index b3df0459..94ad3996 100644 --- a/src/Rtmp/FlvMuxer.cpp +++ b/src/Rtmp/FlvMuxer.cpp @@ -177,7 +177,7 @@ void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const RtmpMediaSour } }); //新建文件 - _file.reset(File::createfile_file(file_path.data(),"wb"),[fileBuf](FILE *fp){ + _file.reset(File::create_file(file_path.data(), "wb"), [fileBuf](FILE *fp){ if(fp){ fflush(fp); fclose(fp); diff --git a/src/Rtmp/RtmpMuxer.cpp b/src/Rtmp/RtmpMuxer.cpp index 5f935df1..2547642e 100644 --- a/src/Rtmp/RtmpMuxer.cpp +++ b/src/Rtmp/RtmpMuxer.cpp @@ -39,6 +39,28 @@ void RtmpMuxer::addTrack(const Track::Ptr &track) { } + switch (track->getCodecId()){ + case CodecG711A: + case CodecG711U:{ + auto audio_track = dynamic_pointer_cast(track); + if(!audio_track){ + return; + } + if (audio_track->getAudioSampleRate() != 8000 || + audio_track->getAudioChannel() != 1 || + audio_track->getAudioSampleBit() != 16) { + WarnL << "RTMP只支持8000/1/16规格的G711,目前规格是:" + << audio_track->getAudioSampleRate() << "/" + << audio_track->getAudioChannel() << "/" + << audio_track->getAudioSampleBit() + << ",该音频已被忽略"; + return; + } + break; + } + default : break; + } + auto &encoder = _encoder[track->getTrackType()]; //生成rtmp编码器,克隆该Track,防止循环引用 encoder = Factory::getRtmpCodecByTrack(track->clone()); diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index e5de8e60..3722e23d 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -231,7 +231,7 @@ void RtmpPusher::setSocketFlags(){ GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay); if(!ultraLowDelay) { //提高发送性能 - (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); SockUtil::setNoDelay(_sock->rawFD(), false); } } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 8c9028fc..d606f118 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -39,7 +39,7 @@ void RtmpSession::onError(const SockException& err) { GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, getIdentifier(), get_peer_ip(), get_peer_port()); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast(*this)); } } @@ -171,10 +171,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { onRes(err,enableRtxp,enableHls,enableMP4); }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, - _mediaInfo, - invoker, - *this); + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast(*this)); if(!flag){ //该事件无人监听,默认鉴权成功 GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); @@ -346,7 +343,8 @@ void RtmpSession::doPlay(AMFDecoder &dec){ strongSelf->doPlayResponse(err,[pToken](bool){}); }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this); + + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast(*this)); if(!flag){ //该事件无人监听,默认不鉴权 doPlayResponse("",[pToken](bool){}); @@ -536,7 +534,7 @@ void RtmpSession::setSocketFlags(){ //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 SockUtil::setNoDelay(_sock->rawFD(), false); //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 - (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } } diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 2efcb722..7c9053e2 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -9,11 +9,13 @@ */ #if defined(ENABLE_RTPPROXY) -#include "mpeg-ps.h" +#include "mpeg-ts-proto.h" #include "RtpProcess.h" #include "Util/File.h" #include "Extension/H265.h" #include "Extension/AAC.h" +#include "Extension/G711.h" +#define RTP_APP_NAME "rtp" namespace mediakit{ @@ -56,7 +58,7 @@ string printSSRC(uint32_t ui32Ssrc) { } static string printAddress(const struct sockaddr *addr){ - return StrPrinter << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port); + return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port); } RtpProcess::RtpProcess(uint32_t ssrc) { @@ -66,17 +68,15 @@ RtpProcess::RtpProcess(uint32_t ssrc) { _track->_samplerate = 90000; _track->_type = TrackVideo; _track->_ssrc = _ssrc; - DebugL << printSSRC(_ssrc); - GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); - GET_CONFIG(bool,toHls,General::kPublishToHls); - GET_CONFIG(bool,toMP4,General::kPublishToMP4); - - _muxer = std::make_shared(DEFAULT_VHOST,"rtp",printSSRC(_ssrc),0,toRtxp,toRtxp,toHls,toMP4); + _media_info._schema = RTP_APP_NAME; + _media_info._vhost = DEFAULT_VHOST; + _media_info._app = RTP_APP_NAME; + _media_info._streamid = printSSRC(_ssrc); GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); { - FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".rtp",dump_dir).data(),"wb") : nullptr; + FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr; if(fp){ _save_file_rtp.reset(fp,[](FILE *fp){ fclose(fp); @@ -85,7 +85,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) { } { - FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".mp2",dump_dir).data(),"wb") : nullptr; + FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".mp2", dump_dir).data(), "wb") : nullptr; if(fp){ _save_file_ps.reset(fp,[](FILE *fp){ fclose(fp); @@ -94,7 +94,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) { } { - FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".video",dump_dir).data(),"wb") : nullptr; + FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr; if(fp){ _save_file_video.reset(fp,[](FILE *fp){ fclose(fp); @@ -105,28 +105,48 @@ RtpProcess::RtpProcess(uint32_t ssrc) { } RtpProcess::~RtpProcess() { - if(_addr){ - DebugL << printSSRC(_ssrc) << " " << printAddress(_addr); + DebugP(this); + if (_addr) { delete _addr; - }else{ - DebugL << printSSRC(_ssrc); + } + + uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000; + WarnP(this) << "RTP推流器(" + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid + << ")断开,耗时(s):" << duration; + + //流量统计事件广播 + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); + if (_total_bytes > iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast(*this)); } } -bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { +bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { GET_CONFIG(bool,check_source,RtpProxy::kCheckSource); //检查源是否合法 if(!_addr){ _addr = new struct sockaddr; + _sock = sock; memcpy(_addr,addr, sizeof(struct sockaddr)); - DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") bind to address:" << printAddress(_addr); + DebugP(this) << "bind to address:" << printAddress(_addr); + //推流鉴权 + emitOnPublish(); } - if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){ - DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") address dismatch:" << printAddress(addr) << " != " << printAddress(_addr); + if(!_muxer){ + //无权限推流 return false; } + if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){ + DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr); + return false; + } + + _total_bytes += data_len; _last_rtp_time.resetTime(); bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len); if(dts_out){ @@ -141,8 +161,8 @@ static inline bool checkTS(const uint8_t *packet, int bytes){ } void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { - if(rtp->sequence != _sequence + 1){ - WarnL << rtp->sequence << " != " << _sequence << "+1"; + if(rtp->sequence != _sequence + 1 && rtp->sequence != 0){ + WarnP(this) << rtp->sequence << " != " << _sequence << "+1"; } _sequence = rtp->sequence; if(_save_file_rtp){ @@ -163,11 +183,11 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam //创建解码器 if(checkTS(packet, bytes)){ //猜测是ts负载 - InfoL << "judged to be TS: " << printSSRC(_ssrc); + InfoP(this) << "judged to be TS"; _decoder = Decoder::createDecoder(Decoder::decoder_ts); }else{ //猜测是ps负载 - InfoL << "judged to be PS: " << printSSRC(_ssrc); + InfoP(this) << "judged to be PS"; _decoder = Decoder::createDecoder(Decoder::decoder_ps); } _decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){ @@ -177,26 +197,36 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam auto ret = _decoder->input((uint8_t *)packet,bytes); if(ret != bytes){ - WarnL << ret << " != " << bytes << " " << flags; + WarnP(this) << ret << " != " << bytes << " " << flags; } } #define SWITCH_CASE(codec_id) case codec_id : return #codec_id static const char *getCodecName(int codec_id) { switch (codec_id) { - SWITCH_CASE(STREAM_VIDEO_MPEG4); - SWITCH_CASE(STREAM_VIDEO_H264); - SWITCH_CASE(STREAM_VIDEO_H265); - SWITCH_CASE(STREAM_VIDEO_SVAC); - SWITCH_CASE(STREAM_AUDIO_MP3); - SWITCH_CASE(STREAM_AUDIO_AAC); - SWITCH_CASE(STREAM_AUDIO_G711); - SWITCH_CASE(STREAM_AUDIO_G722); - SWITCH_CASE(STREAM_AUDIO_G723); - SWITCH_CASE(STREAM_AUDIO_G729); - SWITCH_CASE(STREAM_AUDIO_SVAC); - default: - return "unknown codec"; + SWITCH_CASE(PSI_STREAM_MPEG1); + SWITCH_CASE(PSI_STREAM_MPEG2); + SWITCH_CASE(PSI_STREAM_AUDIO_MPEG1); + SWITCH_CASE(PSI_STREAM_MP3); + SWITCH_CASE(PSI_STREAM_AAC); + SWITCH_CASE(PSI_STREAM_MPEG4); + SWITCH_CASE(PSI_STREAM_MPEG4_AAC_LATM); + SWITCH_CASE(PSI_STREAM_H264); + SWITCH_CASE(PSI_STREAM_MPEG4_AAC); + SWITCH_CASE(PSI_STREAM_H265); + SWITCH_CASE(PSI_STREAM_AUDIO_AC3); + SWITCH_CASE(PSI_STREAM_AUDIO_EAC3); + SWITCH_CASE(PSI_STREAM_AUDIO_DTS); + SWITCH_CASE(PSI_STREAM_VIDEO_DIRAC); + SWITCH_CASE(PSI_STREAM_VIDEO_VC1); + SWITCH_CASE(PSI_STREAM_VIDEO_SVAC); + SWITCH_CASE(PSI_STREAM_AUDIO_SVAC); + SWITCH_CASE(PSI_STREAM_AUDIO_G711A); + SWITCH_CASE(PSI_STREAM_AUDIO_G711U); + SWITCH_CASE(PSI_STREAM_AUDIO_G722); + SWITCH_CASE(PSI_STREAM_AUDIO_G723); + SWITCH_CASE(PSI_STREAM_AUDIO_G729); + default : return "unknown codec"; } } @@ -206,18 +236,18 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d _stamps[codecid].revise(dts,pts,dts,pts,false); switch (codecid) { - case STREAM_VIDEO_H264: { + case PSI_STREAM_H264: { _dts = dts; if (!_codecid_video) { //获取到视频 _codecid_video = codecid; - InfoL << "got video track: H264"; + InfoP(this) << "got video track: H264"; auto track = std::make_shared(); _muxer->addTrack(track); } if (codecid != _codecid_video) { - WarnL << "video track change to H264 from codecid:" << getCodecName(_codecid_video); + WarnP(this) << "video track change to H264 from codecid:" << getCodecName(_codecid_video); return; } @@ -231,17 +261,17 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d break; } - case STREAM_VIDEO_H265: { + case PSI_STREAM_H265: { _dts = dts; if (!_codecid_video) { //获取到视频 _codecid_video = codecid; - InfoL << "got video track: H265"; + InfoP(this) << "got video track: H265"; auto track = std::make_shared(); _muxer->addTrack(track); } if (codecid != _codecid_video) { - WarnL << "video track change to H265 from codecid:" << getCodecName(_codecid_video); + WarnP(this) << "video track change to H265 from codecid:" << getCodecName(_codecid_video); return; } if(_save_file_video){ @@ -254,26 +284,47 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d break; } - case STREAM_AUDIO_AAC: { + case PSI_STREAM_AAC: { _dts = dts; if (!_codecid_audio) { //获取到音频 _codecid_audio = codecid; - InfoL << "got audio track: AAC"; + InfoP(this) << "got audio track: AAC"; auto track = std::make_shared(); _muxer->addTrack(track); } if (codecid != _codecid_audio) { - WarnL << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio); + WarnP(this) << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio); return; } _muxer->inputFrame(std::make_shared((char *) data, bytes, dts, 0, 7)); break; } + + case PSI_STREAM_AUDIO_G711A: + case PSI_STREAM_AUDIO_G711U: { + _dts = dts; + auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; + if (!_codecid_audio) { + //获取到音频 + _codecid_audio = codecid; + InfoP(this) << "got audio track: G711"; + //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 + auto track = std::make_shared(codec, 8000, 1, 16); + _muxer->addTrack(track); + } + + if (codecid != _codecid_audio) { + WarnP(this) << "audio track change to G711 from codecid:" << getCodecName(_codecid_audio); + return; + } + _muxer->inputFrame(std::make_shared(codec, (char *) data, bytes, dts)); + break; + } default: if(codecid != 0){ - WarnL << "unsupported codec type:" << getCodecName(codecid); + WarnP(this) << "unsupported codec type:" << getCodecName(codecid) << " " << (int)codecid; } return; } @@ -288,19 +339,77 @@ bool RtpProcess::alive() { } string RtpProcess::get_peer_ip() { - return inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); + if(_addr){ + return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); + } + return "0.0.0.0"; } uint16_t RtpProcess::get_peer_port() { + if(!_addr){ + return 0; + } return ntohs(((struct sockaddr_in *) _addr)->sin_port); } +string RtpProcess::get_local_ip() { + if(_sock){ + return _sock->get_local_ip(); + } + return "0.0.0.0"; +} + +uint16_t RtpProcess::get_local_port() { + if(_sock){ + return _sock->get_local_port(); + } + return 0; +} + +string RtpProcess::getIdentifier() const{ + return _media_info._streamid; +} + int RtpProcess::totalReaderCount(){ - return _muxer->totalReaderCount(); + return _muxer ? _muxer->totalReaderCount() : 0; } void RtpProcess::setListener(const std::weak_ptr &listener){ - _muxer->setMediaListener(listener); + if(_muxer){ + _muxer->setMediaListener(listener); + }else{ + _listener = listener; + } +} + +void RtpProcess::emitOnPublish() { + weak_ptr weak_self = shared_from_this(); + Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableRtxp, bool enableHls, bool enableMP4) { + auto strongSelf = weak_self.lock(); + if (!strongSelf) { + return; + } + if (err.empty()) { + strongSelf->_muxer = std::make_shared(strongSelf->_media_info._vhost, + strongSelf->_media_info._app, + strongSelf->_media_info._streamid, 0, + enableRtxp, enableRtxp, enableHls, enableMP4); + strongSelf->_muxer->setMediaListener(strongSelf->_listener); + InfoP(strongSelf) << "允许RTP推流"; + } else { + WarnP(strongSelf) << "禁止RTP推流:" << err; + } + }; + + //触发推流鉴权事件 + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); + if(!flag){ + //该事件无人监听,默认不鉴权 + GET_CONFIG(bool, toRtxp, General::kPublishToRtxp); + GET_CONFIG(bool, toHls, General::kPublishToHls); + GET_CONFIG(bool, toMP4, General::kPublishToMP4); + invoker("", toRtxp, toHls, toMP4); + } } diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 50f0a6bb..cf252301 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -24,21 +24,31 @@ namespace mediakit{ string printSSRC(uint32_t ui32Ssrc); class FrameMerger; -class RtpProcess : public RtpReceiver , public RtpDecoder{ +class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; RtpProcess(uint32_t ssrc); ~RtpProcess(); - bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); + bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool alive(); - string get_peer_ip(); - uint16_t get_peer_port(); + + string get_local_ip() override; + uint16_t get_local_port() override; + string get_peer_ip() override; + uint16_t get_peer_port() override; + string getIdentifier() const override; + int totalReaderCount(); void setListener(const std::weak_ptr &listener); + protected: void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) override; void onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts, const void *data,int bytes); + +private: + void emitOnPublish(); + private: std::shared_ptr _save_file_rtp; std::shared_ptr _save_file_ps; @@ -55,6 +65,10 @@ private: unordered_map _stamps; uint32_t _dts = 0; Decoder::Ptr _decoder; + std::weak_ptr _listener; + MediaInfo _media_info; + uint64_t _total_bytes = 0; + Socket::Ptr _sock; }; }//namespace mediakit diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index fbddccdc..d40e5ceb 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -15,7 +15,7 @@ namespace mediakit{ INSTANCE_IMP(RtpSelector); -bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { +bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { uint32_t ssrc = 0; if(!getSSRC(data,data_len,ssrc)){ WarnL << "get ssrc from rtp failed:" << data_len; @@ -23,7 +23,7 @@ bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr } auto process = getProcess(ssrc, true); if(process){ - return process->inputRtp(data,data_len, addr,dts_out); + return process->inputRtp(sock, data,data_len, addr,dts_out); } return false; } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 8e4ca205..d6354f48 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -45,7 +45,7 @@ public: ~RtpSelector(); static RtpSelector &Instance(); - bool inputRtp(const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr ); + bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr ); static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew); void delProcess(uint32_t ssrc,const RtpProcess *ptr); diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 57d71d41..b51bbda4 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -57,7 +57,7 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) { _process = RtpSelector::Instance().getProcess(_ssrc, true); _process->setListener(dynamic_pointer_cast(shared_from_this())); } - _process->inputRtp(data + 2, len - 2, &addr); + _process->inputRtp(_sock,data + 2, len - 2, &addr); _ticker.resetTime(); } diff --git a/src/Rtp/UdpRecver.cpp b/src/Rtp/UdpRecver.cpp index 484855cf..365b78a3 100644 --- a/src/Rtp/UdpRecver.cpp +++ b/src/Rtp/UdpRecver.cpp @@ -17,6 +17,7 @@ UdpRecver::UdpRecver() { } UdpRecver::~UdpRecver() { + _sock->setOnRead(nullptr); } bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) { @@ -26,8 +27,9 @@ bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) { }); auto &ref = RtpSelector::Instance(); - _sock->setOnRead([&ref](const Buffer::Ptr &buf, struct sockaddr *addr, int ){ - ref.inputRtp(buf->data(),buf->size(),addr); + auto sock = _sock; + _sock->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int ){ + ref.inputRtp(sock, buf->data(),buf->size(),addr); }); return _sock->bindUdpSock(local_port,local_ip); } diff --git a/src/Rtsp/RtpMultiCaster.cpp b/src/Rtsp/RtpMultiCaster.cpp index 88a38244..b341a0fb 100644 --- a/src/Rtsp/RtpMultiCaster.cpp +++ b/src/Rtsp/RtpMultiCaster.cpp @@ -141,7 +141,7 @@ uint16_t RtpMultiCaster::getPort(TrackType trackType){ return _apUdpSock[trackType]->get_local_port(); } string RtpMultiCaster::getIP(){ - return inet_ntoa(_aPeerUdpAddr[0].sin_addr); + return SockUtil::inet_ntoa(_aPeerUdpAddr[0].sin_addr); } RtpMultiCaster::Ptr RtpMultiCaster::make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream){ try{ diff --git a/src/Rtsp/RtpMultiCaster.h b/src/Rtsp/RtpMultiCaster.h index 3e960cdb..d3893ad7 100644 --- a/src/Rtsp/RtpMultiCaster.h +++ b/src/Rtsp/RtpMultiCaster.h @@ -38,7 +38,7 @@ public: } static string toString(uint32_t iAddr){ iAddr = htonl(iAddr); - return ::inet_ntoa((struct in_addr &)(iAddr)); + return SockUtil::inet_ntoa((struct in_addr &)(iAddr)); } virtual ~MultiCastAddressMaker(){} std::shared_ptr obtain(uint32_t iTry = 10); diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 4b79d93d..dc44ee15 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -119,10 +119,12 @@ void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { } input(pBuf->data(),pBuf->size()); } + void RtspPlayer::onErr(const SockException &ex) { //定时器_pPlayTimer为空后表明握手结束了 onPlayResult_l(ex,!_pPlayTimer); } + // from live555 bool RtspPlayer::handleAuthenticationFailure(const string ¶msStr) { if(!_rtspRealm.empty()){ @@ -155,6 +157,7 @@ bool RtspPlayer::handleAuthenticationFailure(const string ¶msStr) { } return false; } + void RtspPlayer::handleResDESCRIBE(const Parser& parser) { string authInfo = parser["WWW-Authenticate"]; //发送DESCRIBE命令后的回复 @@ -237,7 +240,6 @@ void RtspPlayer::createUdpSockIfNecessary(int track_idx){ } } - //发送SETUP命令 void RtspPlayer::sendSetup(unsigned int trackIndex) { _onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex); @@ -337,7 +339,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) return; } if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) { - WarnL << "收到其他地址的rtp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); + WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); return; } strongSelf->handleOneRtp(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size()); @@ -351,7 +353,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) return; } if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) { - WarnL << "收到其他地址的rtcp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); + WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); return; } strongSelf->onRtcpPacket(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size()); @@ -394,6 +396,7 @@ void RtspPlayer::sendPause(int type , uint32_t seekMS){ break; } } + void RtspPlayer::pause(bool bPause) { sendPause(bPause ? type_pause : type_seek, getProgressMilliSecond()); } @@ -468,10 +471,8 @@ void RtspPlayer::onRtpPacket(const char *data, uint64_t len) { } } -void RtspPlayer::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){ - -} - +//此处预留rtcp处理函数 +void RtspPlayer::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){} #if 0 //改代码提取自FFmpeg,参考之 @@ -597,7 +598,6 @@ void RtspPlayer::sendReceiverReport(bool overTcp,int iTrackIndex){ } } - void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){ //统计丢包率 if (_aui16FirstSeq[trackidx] == 0 || rtppt->sequence < _aui16FirstSeq[trackidx]) { @@ -613,6 +613,7 @@ void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){ rtppt->timeStamp = dts_out; onRecvRTP_l(rtppt,_aTrackInfo[trackidx]); } + float RtspPlayer::getPacketLossRate(TrackType type) const{ int iTrackIdx = getTrackIndexByTrackType(type); if(iTrackIdx == -1){ @@ -637,6 +638,7 @@ float RtspPlayer::getPacketLossRate(TrackType type) const{ uint32_t RtspPlayer::getProgressMilliSecond() const{ return MAX(_stamp[0].getRelativeStamp(),_stamp[1].getRelativeStamp()); } + void RtspPlayer::seekToMilliSecond(uint32_t ms) { sendPause(type_seek,ms); } @@ -654,6 +656,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std } sendRtspRequest(cmd,url,header_map); } + void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) { auto header = header_const; header.emplace("CSeq",StrPrinter << _uiCseq++); @@ -701,7 +704,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC for (auto &pr : header){ printer << pr.first << ": " << pr.second << "\r\n"; } - send(printer << "\r\n"); + SockSender::send(printer << "\r\n"); } void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &track) { @@ -725,9 +728,8 @@ void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &tra ticker.resetTime(); } } - - } + void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) { WarnL << ex.getErrCode() << " " << ex.what(); @@ -795,5 +797,3 @@ int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const { } } /* namespace mediakit */ - - diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 922a553f..5243be4c 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -395,7 +395,7 @@ void RtspPusher::setSocketFlags(){ GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay); if(!ultraLowDelay) { //提高发送性能 - (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); SockUtil::setNoDelay(_sock->rawFD(), false); } } @@ -471,7 +471,7 @@ void RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrC if(!sdp.empty()){ printer << sdp; } - send(printer); + SockSender::send(printer); } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index d1e07a82..f8591e6c 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -92,7 +92,7 @@ void RtspSession::onError(const SockException& err) { //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, getIdentifier(), get_peer_ip(), get_peer_port()); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast(*this)); } } @@ -302,7 +302,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ }; //rtsp推流需要鉴权 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this); + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast(*this)); if(!flag){ //该事件无人监听,默认不鉴权 GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); @@ -341,10 +341,7 @@ void RtspSession::handleReq_Describe(const Parser &parser) { }; //广播是否需要认证事件 - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, - _mediaInfo, - invoker, - *this)){ + if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,_mediaInfo,invoker,static_cast(*this))){ //无人监听此事件,说明无需认证 invoker(""); } @@ -446,7 +443,7 @@ void RtspSession::onAuthBasic(const string &realm,const string &strBase64){ }; //此时必须提供明文密码 - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,*this)){ + if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,static_cast(*this))){ //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 WarnP(this) << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 @@ -530,7 +527,7 @@ void RtspSession::onAuthDigest(const string &realm,const string &strMd5){ }; //此时可以提供明文或md5加密的密码 - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,*this)){ + if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,static_cast(*this))){ //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 WarnP(this) << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 @@ -824,7 +821,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { }; if(_bFirstPlay){ //第一次收到play命令,需要鉴权 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this); + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast(*this)); if(!flag){ //该事件无人监听,默认不鉴权 onRes(""); @@ -955,7 +952,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) { WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") - << inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr); + << SockUtil::inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr); return true; } @@ -1244,7 +1241,7 @@ void RtspSession::setSocketFlags(){ //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 SockUtil::setNoDelay(_sock->rawFD(), false); //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 - (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } } diff --git a/src/Rtsp/UDPServer.cpp b/src/Rtsp/UDPServer.cpp index 7dd47615..a31b3815 100644 --- a/src/Rtsp/UDPServer.cpp +++ b/src/Rtsp/UDPServer.cpp @@ -76,7 +76,7 @@ void UDPServer::onErr(const string& strKey, const SockException& err) { void UDPServer::onRcvData(int intervaled, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) { //TraceL << trackIndex; struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr; - string peerIp = inet_ntoa(in->sin_addr); + string peerIp = SockUtil::inet_ntoa(in->sin_addr); lock_guard lck(_mtxDataHandler); auto it0 = _mapDataHandler.find(peerIp); if (it0 == _mapDataHandler.end()) { diff --git a/src/Shell/ShellSession.cpp b/src/Shell/ShellSession.cpp index e7f29285..0345d879 100644 --- a/src/Shell/ShellSession.cpp +++ b/src/Shell/ShellSession.cpp @@ -41,7 +41,7 @@ void ShellSession::onRecv(const Buffer::Ptr&buf) { _beatTicker.resetTime(); _strRecvBuf.append(buf->data(), buf->size()); if (_strRecvBuf.find("\xff\xf4\xff\0xfd\x06") != std::string::npos) { - send("\033[0m\r\n Bye bye!\r\n"); + SockSender::send("\033[0m\r\n Bye bye!\r\n"); shutdown(SockException(Err_other,"received Ctrl+C")); return; } @@ -78,20 +78,20 @@ inline bool ShellSession::onCommandLine(const string& line) { try { std::shared_ptr ss(new stringstream); CMDRegister::Instance()(line,ss); - send(ss->str()); + SockSender::send(ss->str()); }catch(ExitException &ex){ return false; }catch(std::exception &ex){ - send(ex.what()); - send("\r\n"); + SockSender::send(ex.what()); + SockSender::send("\r\n"); } printShellPrefix(); return true; } inline void ShellSession::pleaseInputUser() { - send("\033[0m"); - send(StrPrinter << SERVER_NAME << " login: " << endl); + SockSender::send("\033[0m"); + SockSender::send(StrPrinter << SERVER_NAME << " login: " << endl); _loginInterceptor = [this](const string &user_name) { _strUserName=user_name; pleaseInputPasswd(); @@ -99,24 +99,24 @@ inline void ShellSession::pleaseInputUser() { }; } inline void ShellSession::pleaseInputPasswd() { - send("Password: \033[8m"); + SockSender::send("Password: \033[8m"); _loginInterceptor = [this](const string &passwd) { auto onAuth = [this](const string &errMessage){ if(!errMessage.empty()){ //鉴权失败 - send(StrPrinter - <<"\033[0mAuth failed(" - << errMessage - <<"), please try again.\r\n" - <<_strUserName<<"@"<(*this)); if(!flag){ //如果无人监听shell登录事件,那么默认shell无法登录 onAuth("please listen kBroadcastShellLogin event"); @@ -146,7 +146,7 @@ inline void ShellSession::pleaseInputPasswd() { } inline void ShellSession::printShellPrefix() { - send(StrPrinter << _strUserName << "@" << SERVER_NAME << "# " << endl); + SockSender::send(StrPrinter << _strUserName << "@" << SERVER_NAME << "# " << endl); } }/* namespace mediakit */ diff --git a/tests/test_httpApi.cpp b/tests/test_httpApi.cpp index 4cf3ab85..5d24c40f 100644 --- a/tests/test_httpApi.cpp +++ b/tests/test_httpApi.cpp @@ -64,7 +64,7 @@ void initEventListener(){ } ///////////////header////////////////// printer << "\r\nheader:\r\n"; - for(auto &pr : parser.getValues()){ + for(auto &pr : parser.getHeader()){ printer << "\t" << pr.first << " : " << pr.second << "\r\n"; } ////////////////content///////////////// diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 4f16899c..58686f2c 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -55,7 +55,7 @@ static bool loadFile(const char *path){ } uint32_t timeStamp; - RtpSelector::Instance().inputRtp(rtp,len, &addr,&timeStamp); + RtpSelector::Instance().inputRtp(nullptr,rtp,len, &addr,&timeStamp); if(timeStamp_last){ auto diff = timeStamp - timeStamp_last; if(diff > 0){ diff --git a/tests/test_wsClient.cpp b/tests/test_wsClient.cpp index a2d7b07b..54100bdd 100644 --- a/tests/test_wsClient.cpp +++ b/tests/test_wsClient.cpp @@ -36,7 +36,7 @@ protected: } //tcp连接成功后每2秒触发一次该事件 void onManager() override { - send("echo test!"); + SockSender::send("echo test!"); DebugL << "send echo test"; } //连接服务器结果回调 diff --git a/tests/test_wsServer.cpp b/tests/test_wsServer.cpp index 48a8a60c..e93ea7e7 100644 --- a/tests/test_wsServer.cpp +++ b/tests/test_wsServer.cpp @@ -35,7 +35,7 @@ public: } void onRecv(const Buffer::Ptr &buffer) override { //回显数据 - send("from EchoSession:"); + SockSender::send("from EchoSession:"); send(buffer); } void onError(const SockException &err) override{ @@ -62,7 +62,7 @@ public: } void onRecv(const Buffer::Ptr &buffer) override { //回显数据 - send("from EchoSessionWithUrl:"); + SockSender::send("from EchoSessionWithUrl:"); send(buffer); } void onError(const SockException &err) override{