Merge pull request #14 from xiongziliang/master

update
This commit is contained in:
baiyfcu 2020-04-26 15:30:28 +08:00 committed by GitHub
commit 12eabbe426
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 945 additions and 527 deletions

@ -1 +1 @@
Subproject commit 987683f1045613098e2bcd534bc90a13d16df8a4
Subproject commit 4ede70fc435eb0a4d3a752b521170d86440b3935

@ -1 +1 @@
Subproject commit 24519a594c2c634b21fbe09fad28d54c4eba0885
Subproject commit abc08f61bb1250b94d252cfeaea249527912dd3b

View File

@ -116,7 +116,6 @@ bash build_docker_images.sh
- [支持linux、windows、mac的rtmp/rtsp播放器](https://github.com/xiongziliang/ZLMediaPlayer)
- [配套的管理WEB网站](https://github.com/chenxiaolei/ZLMediaKit_NVR_UI)
- [DotNetCore的RESTful客户端](https://github.com/MingZhuLiu/ZLMediaKit.DotNetCore.Sdk)
- [支持GB28181信令服务器、onvif的NVS系统](https://gitee.com/qinqi/JNVS)
## 授权协议
@ -126,7 +125,7 @@ bash build_docker_images.sh
## 联系方式
- 邮箱:<771730766@qq.com>(本项目相关或流媒体相关问题请走issue流程否则恕不邮件答复)
- 邮箱:<1213642868@qq.com>(本项目相关或流媒体相关问题请走issue流程否则恕不邮件答复)
- QQ群542509000
## 怎么提问?

View File

@ -343,7 +343,7 @@ SOFTWARE.
## Contact
- Email<771730766@qq.com>
- Email<1213642868@qq.com>
- QQ chat group542509000

View File

@ -40,8 +40,13 @@ extern "C" {
typedef struct {
// 线程数
int thread_num;
// 日志级别,支持0~4
int log_level;
//文件日志保存路径,路径可以不存在(内部可以创建文件夹)设置为NULL关闭日志输出至文件
const char *log_file_path;
//文件日志保存天数,设置为0关闭日志文件
int log_file_days;
// 配置文件是内容还是路径
int ini_is_path;
@ -71,14 +76,18 @@ API_EXPORT void API_CALL mk_stop_all_server();
* mk_env_init便
* @param thread_num 线
* @param log_level ,0~4
* @param log_file_path ,()NULL关闭日志输出至文件
* @param log_file_days ,0
* @param ini_is_path
* @param ini NULL,
* @param ssl_is_path ssl证书是内容还是路径
* @param ssl ssl证书内容或路径NULL
* @param ssl_pwd NULL
*/
API_EXPORT void API_CALL mk_env_init1( int thread_num,
API_EXPORT void API_CALL mk_env_init1(int thread_num,
int log_level,
const char *log_file_path,
int log_file_days,
int ini_is_path,
const char *ini,
int ssl_is_path,

View File

@ -36,7 +36,7 @@ typedef struct {
*/
void (API_CALL *on_mk_media_publish)(const mk_media_info url_info,
const mk_publish_auth_invoker invoker,
const mk_tcp_session sender);
const mk_sock_info sender);
/**
* rtsp/rtmp/http-flv/hls事件广播
@ -47,7 +47,7 @@ typedef struct {
*/
void (API_CALL *on_mk_media_play)(const mk_media_info url_info,
const mk_auth_invoker invoker,
const mk_tcp_session sender);
const mk_sock_info sender);
/**
* 广
@ -55,7 +55,7 @@ typedef struct {
* @param sender
*/
void (API_CALL *on_mk_media_not_found)(const mk_media_info url_info,
const mk_tcp_session sender);
const mk_sock_info sender);
/**
*
@ -73,7 +73,7 @@ typedef struct {
void (API_CALL *on_mk_http_request)(const mk_parser parser,
const mk_http_response_invoker invoker,
int *consumed,
const mk_tcp_session sender);
const mk_sock_info sender);
/**
* http文件服务器中,http访问文件或目录的广播,访http目录的权限
@ -87,7 +87,7 @@ typedef struct {
const char *path,
int is_dir,
const mk_http_access_path_invoker invoker,
mk_tcp_session sender);
const mk_sock_info sender);
/**
* http文件服务器中,http访问文件或目录前的广播,http url到文件路径的映射
@ -98,7 +98,7 @@ typedef struct {
*/
void (API_CALL *on_mk_http_before_access)(const mk_parser parser,
char *path,
const mk_tcp_session sender);
const mk_sock_info sender);
/**
* rtsp流是否需要认证invoker并传入realm,realm
@ -108,7 +108,7 @@ typedef struct {
*/
void (API_CALL *on_mk_rtsp_get_realm)(const mk_media_info url_info,
const mk_rtsp_get_realm_invoker invoker,
const mk_tcp_session sender);
const mk_sock_info sender);
/**
* user_name为用户名must_no_encrypt如果为true(base64认证方式),
@ -125,7 +125,7 @@ typedef struct {
const char *user_name,
int must_no_encrypt,
const mk_rtsp_auth_invoker invoker,
const mk_tcp_session sender);
const mk_sock_info sender);
/**
* mp4分片文件成功后广播
@ -138,7 +138,7 @@ typedef struct {
void (API_CALL *on_mk_shell_login)(const char *user_name,
const char *passwd,
const mk_auth_invoker invoker,
const mk_tcp_session sender);
const mk_sock_info sender);
/**
* rtsp/rtmp/http-flv会话后流量汇报事件广播
@ -146,15 +146,12 @@ typedef struct {
* @param total_bytes
* @param total_seconds tcp会话时长
* @param is_player
* @param peer_ip ip
* @param peer_port
*/
void (API_CALL *on_mk_flow_report)(const mk_media_info url_info,
uint64_t total_bytes,
uint64_t total_seconds,
int is_player,
const char *peer_ip,
uint16_t peer_port);
const mk_sock_info sender);
} mk_events;

View File

@ -73,6 +73,10 @@ API_EXPORT const char* API_CALL mk_media_info_get_vhost(const mk_media_info ctx)
API_EXPORT const char* API_CALL mk_media_info_get_app(const mk_media_info ctx);
//MediaInfo::_streamid
API_EXPORT const char* API_CALL mk_media_info_get_stream(const mk_media_info ctx);
//MediaInfo::_host
API_EXPORT const char* API_CALL mk_media_info_get_host(const mk_media_info ctx);
//MediaInfo::_port
API_EXPORT uint16_t API_CALL mk_media_info_get_port(const mk_media_info ctx);
///////////////////////////////////////////MediaSource/////////////////////////////////////////////

View File

@ -17,19 +17,41 @@
extern "C" {
#endif
///////////////////////////////////////////SockInfo/////////////////////////////////////////////
//SockInfo对象的C映射
typedef void* mk_sock_info;
//SockInfo::get_peer_ip()
API_EXPORT const char* API_CALL mk_sock_info_peer_ip(const mk_sock_info ctx, char *buf);
//SockInfo::get_local_ip()
API_EXPORT const char* API_CALL mk_sock_info_local_ip(const mk_sock_info ctx, char *buf);
//SockInfo::get_peer_port()
API_EXPORT uint16_t API_CALL mk_sock_info_peer_port(const mk_sock_info ctx);
//SockInfo::get_local_port()
API_EXPORT uint16_t API_CALL mk_sock_info_local_port(const mk_sock_info ctx);
#ifndef SOCK_INFO_API_RENAME
#define SOCK_INFO_API_RENAME
//mk_tcp_session对象转换成mk_sock_info对象后再获取网络相关信息
#define mk_tcp_session_peer_ip(x,buf) mk_sock_info_peer_ip(mk_tcp_session_get_sock_info(x),buf)
#define mk_tcp_session_local_ip(x,buf) mk_sock_info_local_ip(mk_tcp_session_get_sock_info(x),buf)
#define mk_tcp_session_peer_port(x) mk_sock_info_peer_port(mk_tcp_session_get_sock_info(x))
#define mk_tcp_session_local_port(x) mk_sock_info_local_port(mk_tcp_session_get_sock_info(x))
//mk_tcp_client对象转换成mk_sock_info对象后再获取网络相关信息
#define mk_tcp_client_peer_ip(x,buf) mk_sock_info_peer_ip(mk_tcp_client_get_sock_info(x),buf)
#define mk_tcp_client_local_ip(x,buf) mk_sock_info_local_ip(mk_tcp_client_get_sock_info(x),buf)
#define mk_tcp_client_peer_port(x) mk_sock_info_peer_port(mk_tcp_client_get_sock_info(x))
#define mk_tcp_client_local_port(x) mk_sock_info_local_port(mk_tcp_client_get_sock_info(x))
#endif
///////////////////////////////////////////TcpSession/////////////////////////////////////////////
//TcpSession对象的C映射
typedef void* mk_tcp_session;
//获取基类指针以便获取其网络相关信息
API_EXPORT mk_sock_info API_CALL mk_tcp_session_get_sock_info(const mk_tcp_session ctx);
//TcpSession::safeShutdown()
API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg);
//TcpSession::get_peer_ip()
API_EXPORT const char* API_CALL mk_tcp_session_peer_ip(const mk_tcp_session ctx);
//TcpSession::get_local_ip()
API_EXPORT const char* API_CALL mk_tcp_session_local_ip(const mk_tcp_session ctx);
//TcpSession::get_peer_port()
API_EXPORT uint16_t API_CALL mk_tcp_session_peer_port(const mk_tcp_session ctx);
//TcpSession::get_local_port()
API_EXPORT uint16_t API_CALL mk_tcp_session_local_port(const mk_tcp_session ctx);
//TcpSession::send()
API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx,const char *data,int len);
//切换到该对象所在线程后再TcpSession::send()
@ -115,6 +137,8 @@ API_EXPORT void API_CALL mk_tcp_server_events_listen(const mk_tcp_session_events
///////////////////////////////////////////自定义tcp客户端/////////////////////////////////////////////
typedef void* mk_tcp_client;
//获取基类指针以便获取其网络相关信息
API_EXPORT mk_sock_info API_CALL mk_tcp_client_get_sock_info(const mk_tcp_client ctx);
typedef struct {
/**

17
api/source/mk_common.cpp Executable file → Normal file
View File

@ -41,6 +41,8 @@ API_EXPORT void API_CALL mk_env_init(const mk_config *cfg) {
assert(cfg);
mk_env_init1(cfg->thread_num,
cfg->log_level,
cfg->log_file_path,
cfg->log_file_days,
cfg->ini_is_path,
cfg->ini,
cfg->ssl_is_path,
@ -61,18 +63,29 @@ API_EXPORT void API_CALL mk_stop_all_server(){
stopAllTcpServer();
}
API_EXPORT void API_CALL mk_env_init1( int thread_num,
API_EXPORT void API_CALL mk_env_init1(int thread_num,
int log_level,
const char *log_file_path,
int log_file_days,
int ini_is_path,
const char *ini,
int ssl_is_path,
const char *ssl,
const char *ssl_pwd) {
//确保只初始化一次
static onceToken token([&]() {
//控制台日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("console", (LogLevel) log_level));
if(log_file_path && log_file_days){
//日志文件
auto channel = std::make_shared<FileChannel>("FileChannel", File::absolutePath(log_file_path, ""), (LogLevel) log_level);
Logger::Instance().add(channel);
}
//异步日志线程
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
//设置线程数
EventPollerPool::setPoolSize(thread_num);
WorkThreadPool::setPoolSize(thread_num);

View File

@ -46,7 +46,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
s_events.on_mk_http_request((mk_parser)&parser,
(mk_http_response_invoker)&invoker,
&consumed_int,
(mk_tcp_session)&sender);
(mk_sock_info)&sender);
consumed = consumed_int;
}
});
@ -57,7 +57,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
path.c_str(),
is_dir,
(mk_http_access_path_invoker)&invoker,
(mk_tcp_session)&sender);
(mk_sock_info)&sender);
} else{
invoker("","",0);
}
@ -69,7 +69,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
strcpy(path_c,path.c_str());
s_events.on_mk_http_before_access((mk_parser) &parser,
path_c,
(mk_tcp_session) &sender);
(mk_sock_info) &sender);
path = path_c;
}
});
@ -79,7 +79,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
if (s_events.on_mk_rtsp_get_realm) {
s_events.on_mk_rtsp_get_realm((mk_media_info) &args,
(mk_rtsp_get_realm_invoker) &invoker,
(mk_tcp_session) &sender);
(mk_sock_info) &sender);
}else{
invoker("");
}
@ -92,7 +92,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
user_name.c_str(),
must_no_encrypt,
(mk_rtsp_auth_invoker) &invoker,
(mk_tcp_session) &sender);
(mk_sock_info) &sender);
}
});
@ -100,7 +100,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
if (s_events.on_mk_media_publish) {
s_events.on_mk_media_publish((mk_media_info) &args,
(mk_publish_auth_invoker) &invoker,
(mk_tcp_session) &sender);
(mk_sock_info) &sender);
}else{
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
@ -113,7 +113,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
if (s_events.on_mk_media_play) {
s_events.on_mk_media_play((mk_media_info) &args,
(mk_auth_invoker) &invoker,
(mk_tcp_session) &sender);
(mk_sock_info) &sender);
}else{
invoker("");
}
@ -124,7 +124,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
s_events.on_mk_shell_login(user_name.c_str(),
passwd.c_str(),
(mk_auth_invoker) &invoker,
(mk_tcp_session) &sender);
(mk_sock_info) &sender);
}else{
invoker("");
}
@ -136,15 +136,14 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
totalBytes,
totalDuration,
isPlayer,
peerIP.c_str(),
peerPort);
(mk_sock_info)&sender);
}
});
NoticeCenter::Instance().addListener(&s_tag,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){
if (s_events.on_mk_media_not_found) {
s_events.on_mk_media_not_found((mk_media_info) &args,
(mk_tcp_session) &sender);
(mk_sock_info) &sender);
}
});

View File

@ -12,7 +12,6 @@
#include "mk_events_objects.h"
#include "Common/config.h"
#include "Record/MP4Recorder.h"
#include "Network/TcpSession.h"
#include "Http/HttpSession.h"
#include "Http/HttpBody.h"
#include "Http/HttpClient.h"
@ -114,7 +113,7 @@ API_EXPORT const char* API_CALL mk_parser_get_tail(const mk_parser ctx){
API_EXPORT const char* API_CALL mk_parser_get_header(const mk_parser ctx,const char *key){
assert(ctx && key);
Parser *parser = (Parser *)ctx;
return parser->getValues()[key].c_str();
return parser->getHeader()[key].c_str();
}
API_EXPORT const char* API_CALL mk_parser_get_content(const mk_parser ctx, int *length){
assert(ctx);
@ -137,16 +136,31 @@ API_EXPORT const char* API_CALL mk_media_info_get_schema(const mk_media_info ctx
MediaInfo *info = (MediaInfo *)ctx;
return info->_schema.c_str();
}
API_EXPORT const char* API_CALL mk_media_info_get_vhost(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_vhost.c_str();
}
API_EXPORT const char* API_CALL mk_media_info_get_host(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_host.c_str();
}
API_EXPORT uint16_t API_CALL mk_media_info_get_port(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return std::stoi(info->_port);
}
API_EXPORT const char* API_CALL mk_media_info_get_app(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_app.c_str();
}
API_EXPORT const char* API_CALL mk_media_info_get_stream(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
@ -274,7 +288,7 @@ API_EXPORT void API_CALL mk_http_response_invoker_do_file(const mk_http_response
assert(ctx && request_parser && response_header && response_file_path);
auto header = get_http_header(response_header);
HttpSession::HttpResponseInvoker *invoker = (HttpSession::HttpResponseInvoker *)ctx;
(*invoker).responseFile(((Parser*)(request_parser))->getValues(),header,response_file_path);
(*invoker).responseFile(((Parser *) (request_parser))->getHeader(), header, response_file_path);
}
API_EXPORT void API_CALL mk_http_response_invoker_do(const mk_http_response_invoker ctx,

View File

@ -8,45 +8,56 @@
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "string.h"
#include "mk_tcp.h"
#include "mk_tcp_private.h"
#include "Http/WebSocketClient.h"
#include "Http/WebSocketSession.h"
using namespace mediakit;
API_EXPORT const char* API_CALL mk_sock_info_peer_ip(const mk_sock_info ctx, char *buf){
assert(ctx);
SockInfo *sock = (SockInfo *)ctx;
strcpy(buf,sock->get_peer_ip().c_str());
return buf;
}
API_EXPORT const char* API_CALL mk_sock_info_local_ip(const mk_sock_info ctx, char *buf){
assert(ctx);
SockInfo *sock = (SockInfo *)ctx;
strcpy(buf,sock->get_peer_ip().c_str());
return buf;
}
API_EXPORT uint16_t API_CALL mk_sock_info_peer_port(const mk_sock_info ctx){
assert(ctx);
SockInfo *sock = (SockInfo *)ctx;
return sock->get_peer_port();
}
API_EXPORT uint16_t API_CALL mk_sock_info_local_port(const mk_sock_info ctx){
assert(ctx);
SockInfo *sock = (SockInfo *)ctx;
return sock->get_local_port();
}
////////////////////////////////////////////////////////////////////////////////////////
API_EXPORT mk_sock_info API_CALL mk_tcp_session_get_sock_info(const mk_tcp_session ctx){
assert(ctx);
TcpSessionForC *session = (TcpSessionForC *)ctx;
return (SockInfo *)session;
}
API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg){
assert(ctx);
TcpSession *session = (TcpSession *)ctx;
TcpSessionForC *session = (TcpSessionForC *)ctx;
session->safeShutdown(SockException((ErrCode)err,err_msg));
}
API_EXPORT const char* API_CALL mk_tcp_session_peer_ip(const mk_tcp_session ctx){
assert(ctx);
TcpSession *session = (TcpSession *)ctx;
return session->get_peer_ip().c_str();
}
API_EXPORT const char* API_CALL mk_tcp_session_local_ip(const mk_tcp_session ctx){
assert(ctx);
TcpSession *session = (TcpSession *)ctx;
return session->get_local_ip().c_str();
}
API_EXPORT uint16_t API_CALL mk_tcp_session_peer_port(const mk_tcp_session ctx){
assert(ctx);
TcpSession *session = (TcpSession *)ctx;
return session->get_peer_port();
}
API_EXPORT uint16_t API_CALL mk_tcp_session_local_port(const mk_tcp_session ctx){
assert(ctx);
TcpSession *session = (TcpSession *)ctx;
return session->get_local_port();
}
API_EXPORT void API_CALL mk_tcp_session_send(const mk_tcp_session ctx,const char *data,int len){
assert(ctx && data);
if(!len){
len = strlen(data);
}
TcpSession *session = (TcpSession *)ctx;
session->send(data,len);
TcpSessionForC *session = (TcpSessionForC *)ctx;
session->SockSender::send(data,len);
}
API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx,const char *data,int len){
@ -55,12 +66,12 @@ API_EXPORT void API_CALL mk_tcp_session_send_safe(const mk_tcp_session ctx,const
len = strlen(data);
}
try {
weak_ptr<TcpSession> weak_session = ((TcpSession *)ctx)->shared_from_this();
weak_ptr<TcpSession> weak_session = ((TcpSessionForC *)ctx)->shared_from_this();
string str = string(data,len);
((TcpSession *)ctx)->async([weak_session,str](){
((TcpSessionForC *)ctx)->async([weak_session,str](){
auto session_session = weak_session.lock();
if(session_session){
session_session->send(str);
session_session->SockSender::send(str);
}
});
}catch (std::exception &ex){
@ -205,6 +216,12 @@ TcpClientForC::Ptr *mk_tcp_client_create_l(mk_tcp_client_events *events, mk_tcp_
}
}
API_EXPORT mk_sock_info API_CALL mk_tcp_client_get_sock_info(const mk_tcp_client ctx){
assert(ctx);
TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
return (SockInfo *)client->get();
}
API_EXPORT mk_tcp_client API_CALL mk_tcp_client_create(mk_tcp_client_events *events, mk_tcp_type type){
auto ret = mk_tcp_client_create_l(events,type);
(*ret)->setClient(ret);
@ -213,25 +230,25 @@ API_EXPORT mk_tcp_client API_CALL mk_tcp_client_create(mk_tcp_client_events *eve
API_EXPORT void API_CALL mk_tcp_client_release(mk_tcp_client ctx){
assert(ctx);
TcpClient::Ptr *client = (TcpClient::Ptr *)ctx;
TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
delete client;
}
API_EXPORT void API_CALL mk_tcp_client_connect(mk_tcp_client ctx, const char *host, uint16_t port, float time_out_sec){
assert(ctx);
TcpClient::Ptr *client = (TcpClient::Ptr *)ctx;
TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
(*client)->startConnect(host,port);
}
API_EXPORT void API_CALL mk_tcp_client_send(mk_tcp_client ctx, const char *data, int len){
assert(ctx && data);
TcpClient::Ptr *client = (TcpClient::Ptr *)ctx;
(*client)->send(data,len);
TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
(*client)->SockSender::send(data,len);
}
API_EXPORT void API_CALL mk_tcp_client_send_safe(mk_tcp_client ctx, const char *data, int len){
assert(ctx && data);
TcpClient::Ptr *client = (TcpClient::Ptr *)ctx;
TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
weak_ptr<TcpClient> weakClient = *client;
Buffer::Ptr buf = (*client)->obtainBuffer(data,len);
(*client)->async([weakClient,buf](){

View File

@ -17,7 +17,7 @@
using namespace toolkit;
class TcpClientForC : public TcpClient {
public:
public:
typedef std::shared_ptr<TcpClientForC> Ptr;
TcpClientForC(mk_tcp_client_events *events) ;
~TcpClientForC() override ;
@ -27,13 +27,13 @@ class TcpClientForC : public TcpClient {
void onConnect(const SockException &ex) override;
void setClient(mk_tcp_client client);
void *_user_data;
private:
private:
mk_tcp_client_events _events;
mk_tcp_client _client;
};
class TcpSessionForC : public TcpSession {
public:
public:
TcpSessionForC(const Socket::Ptr &pSock) ;
~TcpSessionForC() override = default;
void onRecv(const Buffer::Ptr &buffer) override ;

View File

@ -46,14 +46,15 @@ void API_CALL on_mk_media_changed(int regist,
*/
void API_CALL on_mk_media_publish(const mk_media_info url_info,
const mk_publish_auth_invoker invoker,
const mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,
"client info, local: %s:%d, peer: %s:%d\n"
"%s/%s/%s/%s, url params: %s",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender,ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender,ip + 32),
mk_sock_info_peer_port(sender),
mk_media_info_get_schema(url_info),
mk_media_info_get_vhost(url_info),
mk_media_info_get_app(url_info),
@ -73,15 +74,16 @@ void API_CALL on_mk_media_publish(const mk_media_info url_info,
*/
void API_CALL on_mk_media_play(const mk_media_info url_info,
const mk_auth_invoker invoker,
const mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,
"client info, local: %s:%d, peer: %s:%d\n"
"%s/%s/%s/%s, url params: %s",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender,ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender,ip + 32),
mk_sock_info_peer_port(sender),
mk_media_info_get_schema(url_info),
mk_media_info_get_vhost(url_info),
mk_media_info_get_app(url_info),
@ -98,14 +100,15 @@ void API_CALL on_mk_media_play(const mk_media_info url_info,
* @param sender
*/
void API_CALL on_mk_media_not_found(const mk_media_info url_info,
const mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,
"client info, local: %s:%d, peer: %s:%d\n"
"%s/%s/%s/%s, url params: %s",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender,ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender,ip + 32),
mk_sock_info_peer_port(sender),
mk_media_info_get_schema(url_info),
mk_media_info_get_vhost(url_info),
mk_media_info_get_app(url_info),
@ -137,17 +140,18 @@ void API_CALL on_mk_media_no_reader(const mk_media_source sender) {
void API_CALL on_mk_http_request(const mk_parser parser,
const mk_http_response_invoker invoker,
int *consumed,
const mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,
"client info, local: %s:%d, peer: %s:%d\n"
"%s %s?%s %s\n"
"User-Agent: %s\n"
"%s",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender,ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender,ip + 32),
mk_sock_info_peer_port(sender),
mk_parser_get_method(parser),
mk_parser_get_url(parser),
mk_parser_get_url_params(parser),
@ -191,17 +195,18 @@ void API_CALL on_mk_http_access(const mk_parser parser,
const char *path,
int is_dir,
const mk_http_access_path_invoker invoker,
mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,
"client info, local: %s:%d, peer: %s:%d, path: %s ,is_dir: %d\n"
"%s %s?%s %s\n"
"User-Agent: %s\n"
"%s",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender, ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender, ip + 32),
mk_sock_info_peer_port(sender),
path,(int)is_dir,
mk_parser_get_method(parser),
mk_parser_get_url(parser),
@ -223,16 +228,18 @@ void API_CALL on_mk_http_access(const mk_parser parser,
*/
void API_CALL on_mk_http_before_access(const mk_parser parser,
char *path,
const mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,
"client info, local: %s:%d, peer: %s:%d, path: %s\n"
"%s %s?%s %s\n"
"User-Agent: %s\n"
"%s",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender,ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender,ip + 32),
mk_sock_info_peer_port(sender),
path,
mk_parser_get_method(parser),
mk_parser_get_url(parser),
@ -251,14 +258,15 @@ void API_CALL on_mk_http_before_access(const mk_parser parser,
*/
void API_CALL on_mk_rtsp_get_realm(const mk_media_info url_info,
const mk_rtsp_get_realm_invoker invoker,
const mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,
"client info, local: %s:%d, peer: %s:%d\n"
"%s/%s/%s/%s, url params: %s",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender,ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender,ip + 32),
mk_sock_info_peer_port(sender),
mk_media_info_get_schema(url_info),
mk_media_info_get_vhost(url_info),
mk_media_info_get_app(url_info),
@ -284,16 +292,17 @@ void API_CALL on_mk_rtsp_auth(const mk_media_info url_info,
const char *user_name,
int must_no_encrypt,
const mk_rtsp_auth_invoker invoker,
const mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,
"client info, local: %s:%d, peer: %s:%d\n"
"%s/%s/%s/%s, url params: %s\n"
"realm: %s, user_name: %s, must_no_encrypt: %d",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender,ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender,ip + 32),
mk_sock_info_peer_port(sender),
mk_media_info_get_schema(url_info),
mk_media_info_get_vhost(url_info),
mk_media_info_get_app(url_info),
@ -338,13 +347,15 @@ void API_CALL on_mk_record_mp4(const mk_mp4_info mp4) {
void API_CALL on_mk_shell_login(const char *user_name,
const char *passwd,
const mk_auth_invoker invoker,
const mk_tcp_session sender) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,"client info, local: %s:%d, peer: %s:%d\n"
"user_name: %s, passwd: %s",
mk_tcp_session_local_ip(sender),
mk_tcp_session_local_port(sender),
mk_tcp_session_peer_ip(sender),
mk_tcp_session_peer_port(sender),
mk_sock_info_local_ip(sender,ip),
mk_sock_info_local_port(sender),
mk_sock_info_peer_ip(sender,ip + 32),
mk_sock_info_peer_port(sender),
user_name, passwd);
//允许登录shell
mk_auth_invoker_do(invoker, NULL);
@ -363,8 +374,8 @@ void API_CALL on_mk_flow_report(const mk_media_info url_info,
uint64_t total_bytes,
uint64_t total_seconds,
int is_player,
const char *peer_ip,
uint16_t peer_port) {
const mk_sock_info sender) {
char ip[64];
log_printf(LOG_LEV,"%s/%s/%s/%s, url params: %s,"
"total_bytes: %d, total_seconds: %d, is_player: %d, peer_ip:%s, peer_port:%d",
mk_media_info_get_schema(url_info),
@ -372,7 +383,11 @@ void API_CALL on_mk_flow_report(const mk_media_info url_info,
mk_media_info_get_app(url_info),
mk_media_info_get_stream(url_info),
mk_media_info_get_params(url_info),
(int)total_bytes, (int)total_seconds, (int)is_player,peer_ip, (int)peer_port);
(int)total_bytes,
(int)total_seconds,
(int)is_player,
mk_sock_info_peer_ip(sender,ip),
(int)mk_sock_info_peer_port(sender));
}
static int flag = 1;
@ -387,6 +402,8 @@ int main(int argc, char *argv[]) {
.ini = ini_path,
.ini_is_path = 1,
.log_level = 0,
.log_file_path = NULL,
.log_file_days = 0,
.ssl = ssl_path,
.ssl_is_path = 1,
.ssl_pwd = NULL,

View File

@ -39,7 +39,8 @@ typedef struct {
* @param session
*/
void API_CALL on_mk_tcp_session_create(uint16_t server_port,mk_tcp_session session){
log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session));
char ip[64];
log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session,ip),(int)mk_tcp_session_peer_port(session));
tcp_session_user_data *user_data = malloc(sizeof(tcp_session_user_data));
user_data->_session = session;
mk_tcp_session_set_user_data(session,user_data);
@ -52,7 +53,8 @@ void API_CALL on_mk_tcp_session_create(uint16_t server_port,mk_tcp_session sessi
* @param len
*/
void API_CALL on_mk_tcp_session_data(uint16_t server_port,mk_tcp_session session,const char *data,int len){
log_printf(LOG_LEV,"from %s %d, data[%d]: %s",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session), len,data);
char ip[64];
log_printf(LOG_LEV,"from %s %d, data[%d]: %s",mk_tcp_session_peer_ip(session,ip),(int)mk_tcp_session_peer_port(session), len,data);
mk_tcp_session_send(session,"echo:",0);
mk_tcp_session_send(session,data,len);
}
@ -62,7 +64,8 @@ void API_CALL on_mk_tcp_session_data(uint16_t server_port,mk_tcp_session session
* @param session
*/
void API_CALL on_mk_tcp_session_manager(uint16_t server_port,mk_tcp_session session){
log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session));
char ip[64];
log_printf(LOG_LEV,"%s %d",mk_tcp_session_peer_ip(session,ip),(int)mk_tcp_session_peer_port(session));
}
/**
@ -73,7 +76,8 @@ void API_CALL on_mk_tcp_session_manager(uint16_t server_port,mk_tcp_session sess
* @param msg
*/
void API_CALL on_mk_tcp_session_disconnect(uint16_t server_port,mk_tcp_session session,int code,const char *msg){
log_printf(LOG_LEV,"%s %d: %d %s",mk_tcp_session_peer_ip(session),(int)mk_tcp_session_peer_port(session),code,msg);
char ip[64];
log_printf(LOG_LEV,"%s %d: %d %s",mk_tcp_session_peer_ip(session,ip),(int)mk_tcp_session_peer_port(session),code,msg);
tcp_session_user_data *user_data = (tcp_session_user_data *)mk_tcp_session_get_user_data(session);
free(user_data);
}

View File

@ -75,7 +75,7 @@ void Process::run(const string &cmd, const string &log_file_tmp) {
int log_fd = -1;
int flags = O_CREAT | O_WRONLY | O_APPEND;
mode_t mode = S_IRWXO | S_IRWXG | S_IRWXU;// S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
File::createfile_path(log_file.data(), mode);
File::create_path(log_file.data(), mode);
if ((log_fd = ::open(log_file.c_str(), flags, mode)) < 0) {
fprintf(stderr, "open log file %s failed:%d(%s)\r\n", log_file.data(), errno, strerror(errno));
}

View File

@ -12,6 +12,7 @@
#include <functional>
#include <sstream>
#include <unordered_map>
#include <math.h>
#include "jsoncpp/json.h"
#include "Util/util.h"
#include "Util/logger.h"
@ -86,7 +87,7 @@ public:
~SuccessException() = default;
};
#define API_ARGS1 TcpSession &sender,HttpSession::KeyValue &headerIn, HttpSession::KeyValue &headerOut, ApiArgsType &allArgs, Json::Value &val
#define API_ARGS1 SockInfo &sender,HttpSession::KeyValue &headerIn, HttpSession::KeyValue &headerOut, ApiArgsType &allArgs, Json::Value &val
#define API_ARGS2 API_ARGS1, const HttpSession::HttpResponseInvoker &invoker
#define API_ARGS_VALUE1 sender,headerIn,headerOut,allArgs,val
#define API_ARGS_VALUE2 API_ARGS_VALUE1, invoker
@ -154,7 +155,7 @@ static inline void addHttpListener(){
val["code"] = API::Success;
HttpSession::KeyValue headerOut;
auto allArgs = getAllArgs(parser);
HttpSession::KeyValue &headerIn = parser.getValues();
HttpSession::KeyValue &headerIn = parser.getHeader();
GET_CONFIG(string,charSet,Http::kCharSet);
headerOut["Content-Type"] = StrPrinter << "application/json; charset=" << charSet;
if(api_debug){
@ -372,6 +373,44 @@ void installWebApi() {
#endif//#if !defined(_WIN32)
static auto makeMediaSourceJson = [](const MediaSource::Ptr &media){
Value item;
item["schema"] = media->getSchema();
item["vhost"] = media->getVhost();
item["app"] = media->getApp();
item["stream"] = media->getId();
item["readerCount"] = media->readerCount();
item["totalReaderCount"] = media->totalReaderCount();
for(auto &track : media->getTracks()){
Value obj;
auto codec_type = track->getTrackType();
obj["codec_id"] = track->getCodecId();
obj["codec_id_name"] = track->getCodecName();
obj["ready"] = track->ready();
obj["codec_type"] = codec_type;
switch(codec_type){
case TrackAudio : {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
obj["sample_rate"] = audio_track->getAudioSampleRate();
obj["channels"] = audio_track->getAudioChannel();
obj["sample_bit"] = audio_track->getAudioSampleBit();
break;
}
case TrackVideo : {
auto video_track = dynamic_pointer_cast<VideoTrack>(track);
obj["width"] = video_track->getVideoWidth();
obj["height"] = video_track->getVideoHeight();
obj["fps"] = round(video_track->getVideoFps());
break;
}
default:
break;
}
item["tracks"].append(obj);
}
return item;
};
//获取流列表,可选筛选参数
//测试url0(获取所有流) http://127.0.0.1/index/api/getMediaList
//测试url1(获取虚拟主机为"__defaultVost__"的流) http://127.0.0.1/index/api/getMediaList?vhost=__defaultVost__
@ -389,21 +428,7 @@ void installWebApi() {
if(!allArgs["app"].empty() && allArgs["app"] != media->getApp()){
return;
}
Value item;
item["schema"] = media->getSchema();
item["vhost"] = media->getVhost();
item["app"] = media->getApp();
item["stream"] = media->getId();
item["readerCount"] = media->readerCount();
item["totalReaderCount"] = media->totalReaderCount();
for(auto &track : media->getTracks()){
Value obj;
obj["codec_id"] = track->getCodecId();
obj["codec_type"] = track->getTrackType();
obj["ready"] = track->ready();
item["tracks"].append(obj);
}
val["data"].append(item);
val["data"].append(makeMediaSourceJson(media));
});
});
@ -423,16 +448,9 @@ void installWebApi() {
val["online"] = false;
return;
}
val = makeMediaSourceJson(src);
val["online"] = true;
val["readerCount"] = src->readerCount();
val["totalReaderCount"] = src->totalReaderCount();
for(auto &track : src->getTracks()){
Value obj;
obj["codec_id"] = track->getCodecId();
obj["codec_type"] = track->getTrackType();
obj["ready"] = track->ready();
val["tracks"].append(obj);
}
val["code"] = API::Success;
});
//主动关断流,包括关断拉流、推流

View File

@ -253,16 +253,16 @@ void installWebHook(){
});
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){
if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || peerIP == "127.0.0.1"){
if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1"){
return;
}
auto body = make_json(args);
body["totalBytes"] = (Json::UInt64)totalBytes;
body["duration"] = (Json::UInt64)totalDuration;
body["player"] = isPlayer;
body["ip"] = peerIP;
body["port"] = peerPort;
body["id"] = sessionIdentifier;
body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier();
//执行hook
do_http_hook(hook_flowreport,body, nullptr);
});
@ -444,7 +444,7 @@ void installWebHook(){
body["path"] = path;
body["is_dir"] = is_dir;
body["params"] = parser.Params();
for(auto &pr : parser.getValues()){
for(auto &pr : parser.getHeader()){
body[string("header.") + pr.first] = pr.second;
}
//执行hook

View File

@ -25,36 +25,30 @@ void MediaSink::addTrack(const Track::Ptr &track_in) {
auto track = track_in->clone();
auto codec_id = track->getCodecId();
_track_map[codec_id] = track;
_allTrackReady = false;
_trackReadyCallback[codec_id] = [this, track]() {
_all_track_ready = false;
_track_ready_callback[codec_id] = [this, track]() {
onTrackReady(track);
};
_ticker.resetTime();
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
if (_allTrackReady) {
if (_all_track_ready) {
onTrackFrame(frame);
return;
}
//还有track未准备好如果是视频的话如果直接丢帧可能导致丢失I帧
checkTrackIfReady(nullptr);
if (_allTrackReady) {
//运行至这里说明Track状态由未就绪切换为已就绪状态,那么这帧就不应该丢弃
onTrackFrame(frame);
} else if(frame->keyFrame()){
WarnL << "some track is unreadydrop key frame of: " << frame->getCodecName();
} else {
//还有Track未就绪先缓存之
_frame_unread[frame->getCodecId()].emplace_back(Frame::getCacheAbleFrame(frame));
}
}));
}
void MediaSink::resetTracks() {
lock_guard<recursive_mutex> lck(_mtx);
_allTrackReady = false;
_all_track_ready = false;
_track_map.clear();
_trackReadyCallback.clear();
_track_ready_callback.clear();
_ticker.resetTime();
_max_track_size = 2;
_frame_unread.clear();
}
void MediaSink::inputFrame(const Frame::Ptr &frame) {
@ -63,21 +57,21 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) {
if (it == _track_map.end()) {
return;
}
checkTrackIfReady(it->second);
it->second->inputFrame(frame);
checkTrackIfReady(nullptr);
}
void MediaSink::checkTrackIfReady_l(const Track::Ptr &track){
//Track由未就绪状态转换成就绪状态我们就触发onTrackReady回调
auto it_callback = _trackReadyCallback.find(track->getCodecId());
if (it_callback != _trackReadyCallback.end() && track->ready()) {
auto it_callback = _track_ready_callback.find(track->getCodecId());
if (it_callback != _track_ready_callback.end() && track->ready()) {
it_callback->second();
_trackReadyCallback.erase(it_callback);
_track_ready_callback.erase(it_callback);
}
}
void MediaSink::checkTrackIfReady(const Track::Ptr &track){
if (!_allTrackReady && !_trackReadyCallback.empty()) {
if (!_all_track_ready && !_track_ready_callback.empty()) {
if (track) {
checkTrackIfReady_l(track);
} else {
@ -87,14 +81,14 @@ void MediaSink::checkTrackIfReady(const Track::Ptr &track){
}
}
if(!_allTrackReady){
if(!_all_track_ready){
if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){
//如果超过规定时间那么不再等待并忽略未准备好的Track
emitAllTrackReady();
return;
}
if(!_trackReadyCallback.empty()){
if(!_track_ready_callback.empty()){
//在超时时间内如果存在未准备好的Track那么继续等待
return;
}
@ -114,22 +108,20 @@ void MediaSink::checkTrackIfReady(const Track::Ptr &track){
}
void MediaSink::addTrackCompleted(){
{
lock_guard<recursive_mutex> lck(_mtx);
_max_track_size = _track_map.size();
}
checkTrackIfReady(nullptr);
}
void MediaSink::emitAllTrackReady() {
if (_allTrackReady) {
if (_all_track_ready) {
return;
}
DebugL << "all track ready use " << _ticker.elapsedTime() << "ms";
if (!_trackReadyCallback.empty()) {
if (!_track_ready_callback.empty()) {
//这是超时强制忽略未准备好的Track
_trackReadyCallback.clear();
_track_ready_callback.clear();
//移除未准备好的Track
for (auto it = _track_map.begin(); it != _track_map.end();) {
if (!it->second->ready()) {
@ -143,8 +135,20 @@ void MediaSink::emitAllTrackReady() {
if (!_track_map.empty()) {
//最少有一个有效的Track
_allTrackReady = true;
_all_track_ready = true;
onAllTrackReady();
//全部Track就绪我们一次性把之前的帧输出
for(auto &pr : _frame_unread){
if (_track_map.find(pr.first) == _track_map.end()) {
//该Track已经被移除
continue;
}
pr.second.for_each([&](const Frame::Ptr &frame) {
onTrackFrame(frame);
});
}
_frame_unread.clear();
}
}

View File

@ -114,8 +114,9 @@ private:
private:
mutable recursive_mutex _mtx;
unordered_map<int,Track::Ptr> _track_map;
unordered_map<int,function<void()> > _trackReadyCallback;
bool _allTrackReady = false;
unordered_map<int,List<Frame::Ptr> > _frame_unread;
unordered_map<int,function<void()> > _track_ready_callback;
bool _all_track_ready = false;
Ticker _ticker;
int _max_track_size = 2;
};

View File

@ -8,6 +8,7 @@
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <math.h>
#include "MediaSource.h"
#include "Record/MP4Reader.h"
#include "Util/util.h"
@ -190,7 +191,7 @@ void findAsync_l(const MediaInfo &info, const std::shared_ptr<TcpSession> &sessi
void *listener_tag = session.get();
weak_ptr<TcpSession> weakSession = session;
//广播未找到流,此时可以立即去拉流,这样还来得及
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,info,*session);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,info, static_cast<SockInfo &>(*session));
//最多等待一定时间,如果这个时间内,流未注册上,那么返回未找到流
GET_CONFIG(int,maxWaitMS,General::kMaxStreamWaitTimeMS);
@ -293,7 +294,34 @@ void MediaSource::regist() {
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
g_mapMediaSrc[_strSchema][_strVhost][_strApp][_strId] = shared_from_this();
}
InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId;
_StrPrinter codec_info;
auto tracks = getTracks(true);
for(auto &track : tracks) {
auto codec_type = track->getTrackType();
codec_info << track->getCodecName();
switch (codec_type) {
case TrackAudio : {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
codec_info << "["
<< audio_track->getAudioSampleRate() << "/"
<< audio_track->getAudioChannel() << "/"
<< audio_track->getAudioSampleBit() << "] ";
break;
}
case TrackVideo : {
auto video_track = dynamic_pointer_cast<VideoTrack>(track);
codec_info << "["
<< video_track->getVideoWidth() << "/"
<< video_track->getVideoHeight() << "/"
<< round(video_track->getVideoFps()) << "] ";
break;
}
default:
break;
}
}
InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId << " " << codec_info;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, true, *this);
}
@ -319,7 +347,7 @@ bool MediaSource::unregist() {
}
if(ret){
InfoL << "" << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId;
InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, false, *this);
}
return ret;
@ -486,14 +514,22 @@ static bool isFlushAble_merge(bool is_audio, uint32_t last_stamp, uint32_t new_s
return cache_size > 20;
}
bool FlushPolicy::isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
GET_CONFIG(bool,ultraLowDelay, General::kUltraLowDelay);
GET_CONFIG(int,mergeWriteMS, General::kMergeWriteMS);
if(ultraLowDelay || mergeWriteMS <= 0){
bool FlushPolicy::isFlushAble(uint32_t new_stamp, int cache_size) {
bool ret = false;
GET_CONFIG(bool, ultraLowDelay, General::kUltraLowDelay);
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
if (ultraLowDelay || mergeWriteMS <= 0) {
//关闭了合并写或者合并写阈值小于等于0
return isFlushAble_default(_is_audio, last_stamp, new_stamp, cache_size);
ret = isFlushAble_default(_is_audio, _last_stamp, new_stamp, cache_size);
} else {
ret = isFlushAble_merge(_is_audio, _last_stamp, new_stamp, cache_size, mergeWriteMS);
}
return isFlushAble_merge(_is_audio, last_stamp, new_stamp, cache_size,mergeWriteMS);
if (ret) {
// DebugL << _is_audio << " " << _last_stamp << " " << new_stamp;
_last_stamp = new_stamp;
}
return ret;
}
} /* namespace mediakit */

View File

@ -173,9 +173,10 @@ public:
return packet->timeStamp;
}
bool isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size);
bool isFlushAble(uint32_t new_stamp, int cache_size);
private:
bool _is_audio;
uint32_t _last_stamp= 0;
};
/// 视频合并写缓存模板
@ -185,21 +186,19 @@ private:
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class VideoPacketCache {
public:
VideoPacketCache() : _policy(true) {
VideoPacketCache() : _policy(false) {
_cache = std::make_shared<packet_list>();
}
virtual ~VideoPacketCache() = default;
void inputVideo(const std::shared_ptr<packet> &rtp, bool key_pos) {
auto new_stamp = _policy.getStamp(rtp);
if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) {
if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) {
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_stamp = new_stamp;
if (key_pos) {
_key_pos = key_pos;
}
@ -220,7 +219,6 @@ private:
private:
policy _policy;
std::shared_ptr<packet_list> _cache;
uint32_t _last_stamp = 0;
bool _key_pos = false;
};
@ -231,20 +229,18 @@ private:
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class AudioPacketCache {
public:
AudioPacketCache() : _policy(false) {
AudioPacketCache() : _policy(true) {
_cache = std::make_shared<packet_list>();
}
virtual ~AudioPacketCache() = default;
void inputAudio(const std::shared_ptr<packet> &rtp) {
auto new_stamp = _policy.getStamp(rtp);
if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) {
if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) {
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_stamp = new_stamp;
}
virtual void onFlushAudio(std::shared_ptr<packet_list> &) = 0;
@ -261,7 +257,6 @@ private:
private:
policy _policy;
std::shared_ptr<packet_list> _cache;
uint32_t _last_stamp = 0;
};
} /* namespace mediakit */

View File

@ -35,4 +35,113 @@ string FindField(const char* buf, const char* start, const char *end ,int bufSiz
return string(msg_start, msg_end);
}
Parser::Parser() {}
Parser::~Parser() {}
void Parser::Parse(const char *buf) {
//解析
const char *start = buf;
Clear();
while (true) {
auto line = FindField(start, NULL, "\r\n");
if (line.size() == 0) {
break;
}
if (start == buf) {
_strMethod = FindField(line.data(), NULL, " ");
_strFullUrl = FindField(line.data(), " ", " ");
auto args_pos = _strFullUrl.find('?');
if (args_pos != string::npos) {
_strUrl = _strFullUrl.substr(0, args_pos);
_params = _strFullUrl.substr(args_pos + 1);
_mapUrlArgs = parseArgs(_params);
} else {
_strUrl = _strFullUrl;
}
_strTail = FindField(line.data(), (_strFullUrl + " ").data(), NULL);
} else {
auto field = FindField(line.data(), NULL, ": ");
auto value = FindField(line.data(), ": ", NULL);
if (field.size() != 0) {
_mapHeaders.emplace_force(field, value);
}
}
start = start + line.size() + 2;
if (strncmp(start, "\r\n", 2) == 0) { //协议解析完毕
_strContent = FindField(start, "\r\n", NULL);
break;
}
}
}
const string &Parser::Method() const {
return _strMethod;
}
const string &Parser::Url() const {
return _strUrl;
}
const string &Parser::FullUrl() const {
return _strFullUrl;
}
const string &Parser::Tail() const {
return _strTail;
}
const string &Parser::operator[](const char *name) const {
auto it = _mapHeaders.find(name);
if (it == _mapHeaders.end()) {
return _strNull;
}
return it->second;
}
const string &Parser::Content() const {
return _strContent;
}
void Parser::Clear() {
_strMethod.clear();
_strUrl.clear();
_strFullUrl.clear();
_params.clear();
_strTail.clear();
_strContent.clear();
_mapHeaders.clear();
_mapUrlArgs.clear();
}
const string &Parser::Params() const {
return _params;
}
void Parser::setUrl(const string &url) {
this->_strUrl = url;
}
void Parser::setContent(const string &content) {
this->_strContent = content;
}
StrCaseMap &Parser::getHeader() const {
return _mapHeaders;
}
StrCaseMap &Parser::getUrlArgs() const {
return _mapUrlArgs;
}
StrCaseMap Parser::parseArgs(const string &str, const char *pair_delim, const char *key_delim) {
StrCaseMap ret;
auto arg_vec = split(str, pair_delim);
for (string &key_val : arg_vec) {
auto key = FindField(key_val.data(), NULL, key_delim);
auto val = FindField(key_val.data(), key_delim, NULL);
ret.emplace_force(trim(key), trim(val));
}
return ret;
}
}//namespace mediakit

View File

@ -57,122 +57,39 @@ class StrCaseMap : public multimap<string, string, StrCaseCompare>{
}
};
//rtsp/http/sip解析类
class Parser {
public:
Parser() {}
virtual ~Parser() {}
void Parse(const char *buf) {
//解析
const char *start = buf;
Clear();
while (true) {
auto line = FindField(start, NULL, "\r\n");
if (line.size() == 0) {
break;
}
if (start == buf) {
_strMethod = FindField(line.data(), NULL, " ");
_strFullUrl = FindField(line.data(), " ", " ");
auto args_pos = _strFullUrl.find('?');
if (args_pos != string::npos) {
_strUrl = _strFullUrl.substr(0, args_pos);
_params = _strFullUrl.substr(args_pos + 1);
_mapUrlArgs = parseArgs(_params);
} else {
_strUrl = _strFullUrl;
}
_strTail = FindField(line.data(), (_strFullUrl + " ").data(), NULL);
} else {
auto field = FindField(line.data(), NULL, ": ");
auto value = FindField(line.data(), ": ", NULL);
if (field.size() != 0) {
_mapHeaders.emplace_force(field,value);
}
}
start = start + line.size() + 2;
if (strncmp(start, "\r\n", 2) == 0) { //协议解析完毕
_strContent = FindField(start, "\r\n", NULL);
break;
}
}
}
const string &Method() const {
//rtsp方法
return _strMethod;
}
const string &Url() const {
//rtsp url
return _strUrl;
}
const string &FullUrl() const {
//rtsp url with args
return _strFullUrl;
}
const string &Tail() const {
//RTSP/1.0
return _strTail;
}
const string &operator[](const char *name) const {
//rtsp field
auto it = _mapHeaders.find(name);
if (it == _mapHeaders.end()) {
return _strNull;
}
return it->second;
}
const string &Content() const {
return _strContent;
}
void Clear() {
_strMethod.clear();
_strUrl.clear();
_strFullUrl.clear();
_params.clear();
_strTail.clear();
_strContent.clear();
_mapHeaders.clear();
_mapUrlArgs.clear();
}
const string &Params() const {
return _params;
}
void setUrl(const string &url) {
this->_strUrl = url;
}
void setContent(const string &content) {
this->_strContent = content;
}
StrCaseMap &getValues() const {
return _mapHeaders;
}
StrCaseMap &getUrlArgs() const {
return _mapUrlArgs;
}
static StrCaseMap parseArgs(const string &str, const char *pair_delim = "&", const char *key_delim = "=") {
StrCaseMap ret;
auto arg_vec = split(str, pair_delim);
for (string &key_val : arg_vec) {
auto key = FindField(key_val.data(), NULL, key_delim);
auto val = FindField(key_val.data(), key_delim, NULL);
ret.emplace_force(trim(key),trim(val));
}
return ret;
}
public:
Parser();
~Parser();
//解析信令
void Parse(const char *buf);
//获取命令字
const string &Method() const;
//获取中间url不包含?后面的参数
const string &Url() const;
//获取中间url包含?后面的参数
const string &FullUrl() const;
//获取命令协议名
const string &Tail() const;
//根据header key名获取请求header value值
const string &operator[](const char *name) const;
//获取http body或sdp
const string &Content() const;
//清空,为了重用
void Clear();
//获取?后面的参数
const string &Params() const;
//重新设置url
void setUrl(const string &url);
//重新设置content
void setContent(const string &content);
//获取header列表
StrCaseMap &getHeader() const;
//获取url参数列表
StrCaseMap &getUrlArgs() const;
//解析?后面的参数
static StrCaseMap parseArgs(const string &str, const char *pair_delim = "&", const char *key_delim = "=");
private:
string _strMethod;
string _strUrl;

View File

@ -62,25 +62,25 @@ extern const string kBroadcastRecordMP4;
//收到http api请求广播
extern const string kBroadcastHttpRequest;
#define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed,TcpSession &sender
#define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed,SockInfo &sender
//在http文件服务器中,收到http访问文件或目录的广播,通过该事件控制访问http目录的权限
extern const string kBroadcastHttpAccess;
#define BroadcastHttpAccessArgs const Parser &parser,const string &path,const bool &is_dir,const HttpSession::HttpAccessPathInvoker &invoker,TcpSession &sender
#define BroadcastHttpAccessArgs const Parser &parser,const string &path,const bool &is_dir,const HttpSession::HttpAccessPathInvoker &invoker,SockInfo &sender
//在http文件服务器中,收到http访问文件或目录前的广播,通过该事件可以控制http url到文件路径的映射
//在该事件中通过自行覆盖path参数可以做到譬如根据虚拟主机或者app选择不同http根目录的目的
extern const string kBroadcastHttpBeforeAccess;
#define BroadcastHttpBeforeAccessArgs const Parser &parser,string &path,TcpSession &sender
#define BroadcastHttpBeforeAccessArgs const Parser &parser,string &path,SockInfo &sender
//该流是否需要认证是的话调用invoker并传入realm,否则传入空的realm.如果该事件不监听则不认证
extern const string kBroadcastOnGetRtspRealm;
#define BroadcastOnGetRtspRealmArgs const MediaInfo &args,const RtspSession::onGetRealm &invoker,TcpSession &sender
#define BroadcastOnGetRtspRealmArgs const MediaInfo &args,const RtspSession::onGetRealm &invoker,SockInfo &sender
//请求认证用户密码事件user_name为用户名must_no_encrypt如果为true则必须提供明文密码(因为此时是base64认证方式),否则会导致认证失败
//获取到密码后请调用invoker并输入对应类型的密码和密码类型invoker执行时会匹配密码
extern const string kBroadcastOnRtspAuth;
#define BroadcastOnRtspAuthArgs const MediaInfo &args,const string &realm,const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker,TcpSession &sender
#define BroadcastOnRtspAuthArgs const MediaInfo &args,const string &realm,const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker,SockInfo &sender
//推流鉴权结果回调对象
//如果errMessage为空则代表鉴权成功
@ -91,7 +91,7 @@ typedef std::function<void(const string &errMessage,bool enableRtxp,bool enableH
//收到rtsp/rtmp推流事件广播通过该事件控制推流鉴权
extern const string kBroadcastMediaPublish;
#define BroadcastMediaPublishArgs const MediaInfo &args,const Broadcast::PublishAuthInvoker &invoker,TcpSession &sender
#define BroadcastMediaPublishArgs const MediaInfo &args,const Broadcast::PublishAuthInvoker &invoker,SockInfo &sender
//播放鉴权结果回调对象
//如果errMessage为空则代表鉴权成功
@ -99,19 +99,19 @@ typedef std::function<void(const string &errMessage)> AuthInvoker;
//播放rtsp/rtmp/http-flv事件广播通过该事件控制播放鉴权
extern const string kBroadcastMediaPlayed;
#define BroadcastMediaPlayedArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,TcpSession &sender
#define BroadcastMediaPlayedArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,SockInfo &sender
//shell登录鉴权
extern const string kBroadcastShellLogin;
#define BroadcastShellLoginArgs const string &user_name,const string &passwd,const Broadcast::AuthInvoker &invoker,TcpSession &sender
#define BroadcastShellLoginArgs const string &user_name,const string &passwd,const Broadcast::AuthInvoker &invoker,SockInfo &sender
//停止rtsp/rtmp/http-flv会话后流量汇报事件广播
extern const string kBroadcastFlowReport;
#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer, const string &sessionIdentifier, const string &peerIP,const uint16_t &peerPort
#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer, SockInfo &sender
//未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了
extern const string kBroadcastNotFoundStream;
#define BroadcastNotFoundStreamArgs const MediaInfo &args,TcpSession &sender
#define BroadcastNotFoundStreamArgs const MediaInfo &args,SockInfo &sender
//某个流无人消费时触发,目的为了实现无人观看时主动断开拉流等业务逻辑
extern const string kBroadcastStreamNoneReader;

View File

@ -223,13 +223,13 @@ public:
* @param frame
*/
void inputFrame(const Frame::Ptr &frame) override{
if(_cfg.empty()){
if (_cfg.empty()) {
//未获取到aac_cfg信息
if(frame->prefixSize() >= 7) {
if (frame->prefixSize() >= 7) {
//7个字节的adts头
_cfg = makeAdtsConfig(reinterpret_cast<const uint8_t *>(frame->data()));
_cfg = makeAdtsConfig((uint8_t *)(frame->data()));
onReady();
}else{
} else {
WarnL << "无法获取adts头!";
}
}

View File

@ -93,7 +93,7 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) {
if (_aac_cfg.empty()) {
if (frame->prefixSize() >= 7) {
//包含adts头,从adts头获取aac配置信息
_aac_cfg = makeAdtsConfig(reinterpret_cast<const uint8_t *>(frame->data()));
_aac_cfg = makeAdtsConfig((uint8_t *)(frame->data()));
}
makeConfigPacket();
}

View File

@ -64,6 +64,19 @@ class G711FrameNoCacheAble : public FrameNoCacheAble {
public:
typedef std::shared_ptr<G711FrameNoCacheAble> Ptr;
//兼容通用接口
G711FrameNoCacheAble(char *ptr,uint32_t size,uint32_t dts, uint32_t pts = 0,int prefixeSize = 0){
_ptr = ptr;
_size = size;
_dts = dts;
_prefixSize = prefixeSize;
}
//兼容通用接口
void setCodec(CodecId codecId){
_codecId = codecId;
}
G711FrameNoCacheAble(CodecId codecId, char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 0){
_codecId = codecId;
_ptr = ptr;

View File

@ -113,7 +113,7 @@ void HttpClient::onConnect(const SockException &ex) {
printer << pr.first + ": ";
printer << pr.second + "\r\n";
}
send(printer << "\r\n");
SockSender::send(printer << "\r\n");
onFlush();
}
@ -147,8 +147,8 @@ int64_t HttpClient::onRecvHeader(const char *data, uint64_t len) {
}
}
checkCookie(_parser.getValues());
_totalBodySize = onResponseHeader(_parser.Url(), _parser.getValues());
checkCookie(_parser.getHeader());
_totalBodySize = onResponseHeader(_parser.Url(), _parser.getHeader());
if(!_parser["Content-Length"].empty()){
//有Content-Length字段时忽略onResponseHeader的返回值

View File

@ -94,7 +94,7 @@ public:
return _parser.Url();
}
const HttpHeader &responseHeader() const{
return _parser.getValues();
return _parser.getHeader();
}
const Parser& response() const{
return _parser;

View File

@ -28,7 +28,7 @@ void HttpDownloader::startDownload(const string& url, const string& filePath,boo
if(_filePath.empty()){
_filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest();
}
_saveFile = File::createfile_file(_filePath.data(),bAppend ? "ab" : "wb");
_saveFile = File::create_file(_filePath.data(), bAppend ? "ab" : "wb");
if(!_saveFile){
auto strErr = StrPrinter << "打开文件失败:" << filePath << endl;
throw std::runtime_error(strErr);

View File

@ -306,9 +306,41 @@ static bool emitHlsPlayed(const Parser &parser, const MediaInfo &mediaInfo, cons
//cookie有效期为kHlsCookieSecond
invoker(err,"",kHlsCookieSecond);
};
return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,mediaInfo,mediaAuthInvoker,sender);
return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,mediaInfo,mediaAuthInvoker,static_cast<SockInfo &>(sender));
}
class SockInfoImp : public SockInfo{
public:
typedef std::shared_ptr<SockInfoImp> Ptr;
SockInfoImp() = default;
~SockInfoImp() override = default;
string get_local_ip() override{
return _local_ip;
}
uint16_t get_local_port() override{
return _local_port;
}
string get_peer_ip() override{
return _peer_ip;
}
uint16_t get_peer_port() override{
return _peer_port;
}
string getIdentifier() const override{
return _identifier;
}
string _local_ip;
string _peer_ip;
string _identifier;
uint16_t _local_port;
uint16_t _peer_port;
};
/**
* http客户端是否有权限访问文件的逻辑步骤
@ -325,7 +357,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI
auto path = parser.Url();
//先根据http头中的cookie字段获取cookie
HttpServerCookie::Ptr cookie = HttpCookieManager::Instance().getCookie(kCookieName, parser.getValues());
HttpServerCookie::Ptr cookie = HttpCookieManager::Instance().getCookie(kCookieName, parser.getHeader());
//如果不是从http头中找到的cookie,我们让http客户端设置下cookie
bool cookie_from_header = true;
if (!cookie && !uid.empty()) {
@ -362,12 +394,16 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI
}
bool is_hls = mediaInfo._schema == HLS_SCHEMA;
string identifier = sender.getIdentifier();
string peer_ip = sender.get_peer_ip();
uint16_t peer_port = sender.get_peer_port();
SockInfoImp::Ptr info = std::make_shared<SockInfoImp>();
info->_identifier = sender.getIdentifier();
info->_peer_ip = sender.get_peer_ip();
info->_peer_port = sender.get_peer_port();
info->_local_ip = sender.get_local_ip();
info->_local_port = sender.get_local_port();
//该用户从来未获取过cookie这个时候我们广播是否允许该用户访问该http目录
HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, mediaInfo, identifier, peer_ip, peer_port]
HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, mediaInfo, info]
(const string &errMsg, const string &cookie_path_in, int cookieLifeSecond) {
HttpServerCookie::Ptr cookie;
if (cookieLifeSecond) {
@ -390,7 +426,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI
attachment._is_hls = is_hls;
if(is_hls){
//hls相关信息
attachment._hls_data = std::make_shared<HlsCookieData>(mediaInfo, identifier, peer_ip, peer_port);
attachment._hls_data = std::make_shared<HlsCookieData>(mediaInfo, info);
//hls未查找MediaSource
attachment._have_find_media_source = false;
}
@ -407,7 +443,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI
}
//事件未被拦截则认为是http下载请求
bool flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpAccess, parser, path, is_dir, accessPathInvoker, sender);
bool flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpAccess, parser, path, is_dir, accessPathInvoker, static_cast<SockInfo &>(sender));
if (!flag) {
//此事件无人监听,我们默认都有权限访问
callback("", nullptr);
@ -488,7 +524,7 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo
}
cb(codeOut.data(), HttpFileManager::getContentType(strFile.data()), headerOut, body);
};
invoker.responseFile(parser.getValues(), httpHeader, strFile);
invoker.responseFile(parser.getHeader(), httpHeader, strFile);
};
if (!is_hls) {
@ -521,7 +557,7 @@ static string getFilePath(const Parser &parser,const MediaInfo &mediaInfo, TcpSe
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
GET_CONFIG(string, rootPath, Http::kRootPath);
auto ret = File::absolutePath(enableVhost ? mediaInfo._vhost + parser.Url() : parser.Url(), rootPath);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpBeforeAccess, parser, ret, sender);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpBeforeAccess, parser, ret, static_cast<SockInfo &>(sender));
return std::move(ret);
}

View File

@ -59,7 +59,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) {
string cmd = _parser.Method();
auto it = s_func_map.find(cmd);
if (it == s_func_map.end()) {
WarnL << "不支持该命令:" << cmd;
WarnP(this) << "不支持该命令:" << cmd;
sendResponse("405 Not Allowed", true);
return 0;
}
@ -108,7 +108,7 @@ void HttpSession::onError(const SockException& err) {
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, getIdentifier(), get_peer_ip(), get_peer_port());
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, static_cast<SockInfo &>(*this));
}
return;
}
@ -241,7 +241,7 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
onRes(err);
});
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
onRes("");
@ -456,7 +456,7 @@ void HttpSession::sendResponse(const char *pcStatus,
str += "\r\n";
}
str += "\r\n";
send(std::move(str));
SockSender::send(std::move(str));
_ticker.resetTime();
if(!size){
@ -520,7 +520,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke){
};
///////////////////广播HTTP事件///////////////////////////
bool consumed = false;//该事件是否被消费
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,static_cast<SockInfo &>(*this));
if(!consumed && doInvoke){
//该事件无人消费所以返回404
invoker("404 Not Found",KeyValue(), HttpBody::Ptr());
@ -611,7 +611,7 @@ void HttpSession::setSocketFlags(){
//推流模式下关闭TCP_NODELAY会增加推流端的延时但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下开启MSG_MORE会增加延时但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}

View File

@ -41,11 +41,11 @@ public:
typedef std::function<void(const string &errMsg,const string &accessPath, int cookieLifeSecond)> HttpAccessPathInvoker;
HttpSession(const Socket::Ptr &pSock);
virtual ~HttpSession();
~HttpSession() override;
virtual void onRecv(const Buffer::Ptr &) override;
virtual void onError(const SockException &err) override;
virtual void onManager() override;
void onRecv(const Buffer::Ptr &) override;
void onError(const SockException &err) override;
void onManager() override;
static string urlDecode(const string &str);
protected:
//FlvMuxer override
@ -80,7 +80,7 @@ protected:
* @return true代表允许websocket连接
*/
virtual bool onWebSocketConnect(const Parser &header){
WarnL << "http server do not support websocket default";
WarnP(this) << "http server do not support websocket default";
return false;
}

View File

@ -73,7 +73,7 @@ public:
HttpWsClient(ClientTypeImp<ClientType,DataType> &delegate) : _delegate(delegate){
_Sec_WebSocket_Key = encodeBase64(SHA1::encode_bin(makeRandStr(16, false)));
setPoller(delegate.getPoller());
_poller = delegate.getPoller();
}
~HttpWsClient(){}

View File

@ -92,7 +92,7 @@ void HlsMakerImp::onWriteHls(const char *data, int len) {
std::shared_ptr<FILE> HlsMakerImp::makeFile(const string &file,bool setbuf) {
auto file_buf = _file_buf;
auto ret= shared_ptr<FILE>(File::createfile_file(file.data(), "wb"), [file_buf](FILE *fp) {
auto ret= shared_ptr<FILE>(File::create_file(file.data(), "wb"), [file_buf](FILE *fp) {
if (fp) {
fclose(fp);
}

View File

@ -12,11 +12,9 @@
namespace mediakit{
HlsCookieData::HlsCookieData(const MediaInfo &info, const string &sessionIdentifier, const string &peer_ip, uint16_t peer_port) {
HlsCookieData::HlsCookieData(const MediaInfo &info, const std::shared_ptr<SockInfo> &sock_info) {
_info = info;
_sessionIdentifier = sessionIdentifier;
_peer_ip = peer_ip;
_peer_port = peer_port;
_sock_info = sock_info;
_added = std::make_shared<bool>(false);
addReaderCount();
}
@ -45,13 +43,13 @@ HlsCookieData::~HlsCookieData() {
src->modifyReaderCount(false);
}
uint64_t duration = (_ticker.createdTime() - _ticker.elapsedTime()) / 1000;
WarnL << _sessionIdentifier << "(" << _peer_ip << ":" << _peer_port << ") "
WarnL << _sock_info->getIdentifier() << "(" << _sock_info->get_peer_ip() << ":" << _sock_info->get_peer_port() << ") "
<< "HLS播放器(" << _info._vhost << "/" << _info._app << "/" << _info._streamid
<< ")断开,耗时(s):" << duration;
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_bytes > iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, _bytes, duration, true, _sessionIdentifier, _peer_ip, _peer_port);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, _bytes, duration, true, static_cast<SockInfo&>(*_sock_info));
}
}
}

View File

@ -77,7 +77,7 @@ private:
class HlsCookieData{
public:
typedef std::shared_ptr<HlsCookieData> Ptr;
HlsCookieData(const MediaInfo &info, const string &sessionIdentifier, const string &peer_ip, uint16_t peer_port);
HlsCookieData(const MediaInfo &info, const std::shared_ptr<SockInfo> &sock_info);
~HlsCookieData();
void addByteUsage(uint64_t bytes);
private:
@ -85,12 +85,10 @@ private:
private:
uint64_t _bytes = 0;
MediaInfo _info;
string _sessionIdentifier;
string _peer_ip;
uint16_t _peer_port;
std::shared_ptr<bool> _added;
weak_ptr<HlsMediaSource> _src;
Ticker _ticker;
std::shared_ptr<SockInfo> _sock_info;
HlsMediaSource::RingType::RingReader::Ptr _ring_reader;
};

View File

@ -72,7 +72,7 @@ MP4File::Reader MP4File::createReader(){
void MP4File::openFile(const char *file,const char *mode) {
//创建文件
auto fp = File::createfile_file(file,mode);
auto fp = File::create_file(file, mode);
if(!fp){
throw std::runtime_error(string("打开文件失败:") + file);
}

View File

@ -14,6 +14,7 @@
#include "Extension/H265.h"
#include "Extension/H264.h"
#include "Extension/AAC.h"
#include "Extension/G711.h"
using namespace toolkit;
namespace mediakit {
@ -120,6 +121,12 @@ void MP4Demuxer::onAudioTrack(uint32_t track_id, uint8_t object, int channel_cou
_track_to_codec.emplace(track_id, audio);
}
break;
case MOV_OBJECT_G711a:
case MOV_OBJECT_G711u:{
auto audio = std::make_shared<G711Track>(object == MOV_OBJECT_G711a ? CodecG711A : CodecG711U, sample_rate, channel_count, bit_per_sample / channel_count );
_track_to_codec.emplace(track_id, audio);
}
break;
default:
WarnL << "不支持该编码类型的MP4,已忽略:" << getObjectName(object);
break;
@ -223,8 +230,16 @@ Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int6
}
return std::make_shared<FrameWrapper<H265FrameNoCacheAble> >(buf, pts, dts, 4);
}
case CodecAAC :
return std::make_shared<FrameWrapper<AACFrameNoCacheAble> >(buf, pts, dts, 0);
case CodecG711A:
case CodecG711U: {
auto frame = std::make_shared<FrameWrapper<G711FrameNoCacheAble> >(buf, pts, dts, 0);
frame->setCodec(codec);
return frame;
}
default:
return nullptr;
}

View File

@ -124,22 +124,47 @@ void MP4Muxer::inputFrame(const Frame::Ptr &frame) {
void MP4Muxer::addTrack(const Track::Ptr &track) {
switch (track->getCodecId()) {
case CodecG711A:
case CodecG711U: {
auto audio_track = dynamic_pointer_cast<G711Track>(track);
if (!audio_track) {
WarnL << "不是G711 Track";
return;
}
if (!audio_track->ready()) {
WarnL << "G711 Track未就绪";
return;
}
auto track_id = mov_writer_add_audio(_mov_writter.get(),
track->getCodecId() == CodecG711A ? MOV_OBJECT_G711a : MOV_OBJECT_G711u,
audio_track->getAudioChannel(),
audio_track->getAudioSampleBit() * audio_track->getAudioChannel(),
audio_track->getAudioSampleRate(),
nullptr, 0);
if (track_id < 0) {
WarnL << "添加G711 Track失败:" << track_id;
return;
}
_codec_to_trackid[track->getCodecId()].track_id = track_id;
}
break;
case CodecAAC: {
auto aac_track = dynamic_pointer_cast<AACTrack>(track);
if (!aac_track) {
auto audio_track = dynamic_pointer_cast<AACTrack>(track);
if (!audio_track) {
WarnL << "不是AAC Track";
return;
}
if(!aac_track->ready()){
if(!audio_track->ready()){
WarnL << "AAC Track未就绪";
return;
}
auto track_id = mov_writer_add_audio(_mov_writter.get(),
MOV_OBJECT_AAC,
aac_track->getAudioChannel(),
aac_track->getAudioSampleBit() * aac_track->getAudioChannel(),
aac_track->getAudioSampleRate(),
aac_track->getAacCfg().data(), 2);
audio_track->getAudioChannel(),
audio_track->getAudioSampleBit() * audio_track->getAudioChannel(),
audio_track->getAudioSampleRate(),
audio_track->getAacCfg().data(), 2);
if(track_id < 0){
WarnL << "添加AAC Track失败:" << track_id;
return;

View File

@ -15,6 +15,7 @@
#include "Common/MediaSink.h"
#include "Extension/AAC.h"
#include "Extension/G711.h"
#include "Extension/H264.h"
#include "Extension/H265.h"
#include "Common/Stamp.h"

View File

@ -28,17 +28,30 @@ void TsMuxer::addTrack(const Track::Ptr &track) {
case CodecH264: {
_have_video = true;
_codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_H264, nullptr, 0);
}
break;
}
case CodecH265: {
_have_video = true;
_codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_H265, nullptr, 0);
}
break;
}
case CodecAAC: {
_codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_AAC, nullptr, 0);
}
break;
}
case CodecG711A: {
_codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_AUDIO_G711A, nullptr, 0);
break;
}
case CodecG711U: {
_codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_AUDIO_G711U, nullptr, 0);
break;
}
default:
break;
}

View File

@ -177,7 +177,7 @@ void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const RtmpMediaSour
}
});
//新建文件
_file.reset(File::createfile_file(file_path.data(),"wb"),[fileBuf](FILE *fp){
_file.reset(File::create_file(file_path.data(), "wb"), [fileBuf](FILE *fp){
if(fp){
fflush(fp);
fclose(fp);

View File

@ -39,6 +39,28 @@ void RtmpMuxer::addTrack(const Track::Ptr &track) {
}
switch (track->getCodecId()){
case CodecG711A:
case CodecG711U:{
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
if(!audio_track){
return;
}
if (audio_track->getAudioSampleRate() != 8000 ||
audio_track->getAudioChannel() != 1 ||
audio_track->getAudioSampleBit() != 16) {
WarnL << "RTMP只支持8000/1/16规格的G711,目前规格是:"
<< audio_track->getAudioSampleRate() << "/"
<< audio_track->getAudioChannel() << "/"
<< audio_track->getAudioSampleBit()
<< ",该音频已被忽略";
return;
}
break;
}
default : break;
}
auto &encoder = _encoder[track->getTrackType()];
//生成rtmp编码器,克隆该Track防止循环引用
encoder = Factory::getRtmpCodecByTrack(track->clone());

View File

@ -231,7 +231,7 @@ void RtmpPusher::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
SockUtil::setNoDelay(_sock->rawFD(), false);
}
}

View File

@ -39,7 +39,7 @@ void RtmpSession::onError(const SockException& err) {
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, getIdentifier(), get_peer_ip(), get_peer_port());
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast<SockInfo &>(*this));
}
}
@ -171,10 +171,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
onRes(err,enableRtxp,enableHls,enableMP4);
});
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,
_mediaInfo,
invoker,
*this);
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认鉴权成功
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
@ -346,7 +343,8 @@ void RtmpSession::doPlay(AMFDecoder &dec){
strongSelf->doPlayResponse(err,[pToken](bool){});
});
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
doPlayResponse("",[pToken](bool){});
@ -536,7 +534,7 @@ void RtmpSession::setSocketFlags(){
//推流模式下关闭TCP_NODELAY会增加推流端的延时但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下开启MSG_MORE会增加延时但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}

View File

@ -9,11 +9,13 @@
*/
#if defined(ENABLE_RTPPROXY)
#include "mpeg-ps.h"
#include "mpeg-ts-proto.h"
#include "RtpProcess.h"
#include "Util/File.h"
#include "Extension/H265.h"
#include "Extension/AAC.h"
#include "Extension/G711.h"
#define RTP_APP_NAME "rtp"
namespace mediakit{
@ -56,7 +58,7 @@ string printSSRC(uint32_t ui32Ssrc) {
}
static string printAddress(const struct sockaddr *addr){
return StrPrinter << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
}
RtpProcess::RtpProcess(uint32_t ssrc) {
@ -66,17 +68,15 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
_track->_samplerate = 90000;
_track->_type = TrackVideo;
_track->_ssrc = _ssrc;
DebugL << printSSRC(_ssrc);
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
_muxer = std::make_shared<MultiMediaSourceMuxer>(DEFAULT_VHOST,"rtp",printSSRC(_ssrc),0,toRtxp,toRtxp,toHls,toMP4);
_media_info._schema = RTP_APP_NAME;
_media_info._vhost = DEFAULT_VHOST;
_media_info._app = RTP_APP_NAME;
_media_info._streamid = printSSRC(_ssrc);
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
{
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".rtp",dump_dir).data(),"wb") : nullptr;
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr;
if(fp){
_save_file_rtp.reset(fp,[](FILE *fp){
fclose(fp);
@ -85,7 +85,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
}
{
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".mp2",dump_dir).data(),"wb") : nullptr;
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".mp2", dump_dir).data(), "wb") : nullptr;
if(fp){
_save_file_ps.reset(fp,[](FILE *fp){
fclose(fp);
@ -94,7 +94,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
}
{
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".video",dump_dir).data(),"wb") : nullptr;
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr;
if(fp){
_save_file_video.reset(fp,[](FILE *fp){
fclose(fp);
@ -105,28 +105,48 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
}
RtpProcess::~RtpProcess() {
if(_addr){
DebugL << printSSRC(_ssrc) << " " << printAddress(_addr);
DebugP(this);
if (_addr) {
delete _addr;
}else{
DebugL << printSSRC(_ssrc);
}
uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000;
WarnP(this) << "RTP推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")断开,耗时(s):" << duration;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_total_bytes > iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this));
}
}
bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
GET_CONFIG(bool,check_source,RtpProxy::kCheckSource);
//检查源是否合法
if(!_addr){
_addr = new struct sockaddr;
_sock = sock;
memcpy(_addr,addr, sizeof(struct sockaddr));
DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") bind to address:" << printAddress(_addr);
DebugP(this) << "bind to address:" << printAddress(_addr);
//推流鉴权
emitOnPublish();
}
if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){
DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
if(!_muxer){
//无权限推流
return false;
}
if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){
DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
return false;
}
_total_bytes += data_len;
_last_rtp_time.resetTime();
bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len);
if(dts_out){
@ -141,8 +161,8 @@ static inline bool checkTS(const uint8_t *packet, int bytes){
}
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
if(rtp->sequence != _sequence + 1){
WarnL << rtp->sequence << " != " << _sequence << "+1";
if(rtp->sequence != _sequence + 1 && rtp->sequence != 0){
WarnP(this) << rtp->sequence << " != " << _sequence << "+1";
}
_sequence = rtp->sequence;
if(_save_file_rtp){
@ -163,11 +183,11 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam
//创建解码器
if(checkTS(packet, bytes)){
//猜测是ts负载
InfoL << "judged to be TS: " << printSSRC(_ssrc);
InfoP(this) << "judged to be TS";
_decoder = Decoder::createDecoder(Decoder::decoder_ts);
}else{
//猜测是ps负载
InfoL << "judged to be PS: " << printSSRC(_ssrc);
InfoP(this) << "judged to be PS";
_decoder = Decoder::createDecoder(Decoder::decoder_ps);
}
_decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){
@ -177,26 +197,36 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam
auto ret = _decoder->input((uint8_t *)packet,bytes);
if(ret != bytes){
WarnL << ret << " != " << bytes << " " << flags;
WarnP(this) << ret << " != " << bytes << " " << flags;
}
}
#define SWITCH_CASE(codec_id) case codec_id : return #codec_id
static const char *getCodecName(int codec_id) {
switch (codec_id) {
SWITCH_CASE(STREAM_VIDEO_MPEG4);
SWITCH_CASE(STREAM_VIDEO_H264);
SWITCH_CASE(STREAM_VIDEO_H265);
SWITCH_CASE(STREAM_VIDEO_SVAC);
SWITCH_CASE(STREAM_AUDIO_MP3);
SWITCH_CASE(STREAM_AUDIO_AAC);
SWITCH_CASE(STREAM_AUDIO_G711);
SWITCH_CASE(STREAM_AUDIO_G722);
SWITCH_CASE(STREAM_AUDIO_G723);
SWITCH_CASE(STREAM_AUDIO_G729);
SWITCH_CASE(STREAM_AUDIO_SVAC);
default:
return "unknown codec";
SWITCH_CASE(PSI_STREAM_MPEG1);
SWITCH_CASE(PSI_STREAM_MPEG2);
SWITCH_CASE(PSI_STREAM_AUDIO_MPEG1);
SWITCH_CASE(PSI_STREAM_MP3);
SWITCH_CASE(PSI_STREAM_AAC);
SWITCH_CASE(PSI_STREAM_MPEG4);
SWITCH_CASE(PSI_STREAM_MPEG4_AAC_LATM);
SWITCH_CASE(PSI_STREAM_H264);
SWITCH_CASE(PSI_STREAM_MPEG4_AAC);
SWITCH_CASE(PSI_STREAM_H265);
SWITCH_CASE(PSI_STREAM_AUDIO_AC3);
SWITCH_CASE(PSI_STREAM_AUDIO_EAC3);
SWITCH_CASE(PSI_STREAM_AUDIO_DTS);
SWITCH_CASE(PSI_STREAM_VIDEO_DIRAC);
SWITCH_CASE(PSI_STREAM_VIDEO_VC1);
SWITCH_CASE(PSI_STREAM_VIDEO_SVAC);
SWITCH_CASE(PSI_STREAM_AUDIO_SVAC);
SWITCH_CASE(PSI_STREAM_AUDIO_G711A);
SWITCH_CASE(PSI_STREAM_AUDIO_G711U);
SWITCH_CASE(PSI_STREAM_AUDIO_G722);
SWITCH_CASE(PSI_STREAM_AUDIO_G723);
SWITCH_CASE(PSI_STREAM_AUDIO_G729);
default : return "unknown codec";
}
}
@ -206,18 +236,18 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
_stamps[codecid].revise(dts,pts,dts,pts,false);
switch (codecid) {
case STREAM_VIDEO_H264: {
case PSI_STREAM_H264: {
_dts = dts;
if (!_codecid_video) {
//获取到视频
_codecid_video = codecid;
InfoL << "got video track: H264";
InfoP(this) << "got video track: H264";
auto track = std::make_shared<H264Track>();
_muxer->addTrack(track);
}
if (codecid != _codecid_video) {
WarnL << "video track change to H264 from codecid:" << getCodecName(_codecid_video);
WarnP(this) << "video track change to H264 from codecid:" << getCodecName(_codecid_video);
return;
}
@ -231,17 +261,17 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
break;
}
case STREAM_VIDEO_H265: {
case PSI_STREAM_H265: {
_dts = dts;
if (!_codecid_video) {
//获取到视频
_codecid_video = codecid;
InfoL << "got video track: H265";
InfoP(this) << "got video track: H265";
auto track = std::make_shared<H265Track>();
_muxer->addTrack(track);
}
if (codecid != _codecid_video) {
WarnL << "video track change to H265 from codecid:" << getCodecName(_codecid_video);
WarnP(this) << "video track change to H265 from codecid:" << getCodecName(_codecid_video);
return;
}
if(_save_file_video){
@ -254,26 +284,47 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
break;
}
case STREAM_AUDIO_AAC: {
case PSI_STREAM_AAC: {
_dts = dts;
if (!_codecid_audio) {
//获取到音频
_codecid_audio = codecid;
InfoL << "got audio track: AAC";
InfoP(this) << "got audio track: AAC";
auto track = std::make_shared<AACTrack>();
_muxer->addTrack(track);
}
if (codecid != _codecid_audio) {
WarnL << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio);
WarnP(this) << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio);
return;
}
_muxer->inputFrame(std::make_shared<AACFrameNoCacheAble>((char *) data, bytes, dts, 0, 7));
break;
}
case PSI_STREAM_AUDIO_G711A:
case PSI_STREAM_AUDIO_G711U: {
_dts = dts;
auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U;
if (!_codecid_audio) {
//获取到音频
_codecid_audio = codecid;
InfoP(this) << "got audio track: G711";
//G711传统只支持 8000/1/16的规格FFmpeg貌似做了扩展但是这里不管它了
auto track = std::make_shared<G711Track>(codec, 8000, 1, 16);
_muxer->addTrack(track);
}
if (codecid != _codecid_audio) {
WarnP(this) << "audio track change to G711 from codecid:" << getCodecName(_codecid_audio);
return;
}
_muxer->inputFrame(std::make_shared<G711FrameNoCacheAble>(codec, (char *) data, bytes, dts));
break;
}
default:
if(codecid != 0){
WarnL << "unsupported codec type:" << getCodecName(codecid);
WarnP(this) << "unsupported codec type:" << getCodecName(codecid) << " " << (int)codecid;
}
return;
}
@ -288,19 +339,77 @@ bool RtpProcess::alive() {
}
string RtpProcess::get_peer_ip() {
return inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
if(_addr){
return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
}
return "0.0.0.0";
}
uint16_t RtpProcess::get_peer_port() {
if(!_addr){
return 0;
}
return ntohs(((struct sockaddr_in *) _addr)->sin_port);
}
string RtpProcess::get_local_ip() {
if(_sock){
return _sock->get_local_ip();
}
return "0.0.0.0";
}
uint16_t RtpProcess::get_local_port() {
if(_sock){
return _sock->get_local_port();
}
return 0;
}
string RtpProcess::getIdentifier() const{
return _media_info._streamid;
}
int RtpProcess::totalReaderCount(){
return _muxer->totalReaderCount();
return _muxer ? _muxer->totalReaderCount() : 0;
}
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
if(_muxer){
_muxer->setMediaListener(listener);
}else{
_listener = listener;
}
}
void RtpProcess::emitOnPublish() {
weak_ptr<RtpProcess> weak_self = shared_from_this();
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableRtxp, bool enableHls, bool enableMP4) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
if (err.empty()) {
strongSelf->_muxer = std::make_shared<MultiMediaSourceMuxer>(strongSelf->_media_info._vhost,
strongSelf->_media_info._app,
strongSelf->_media_info._streamid, 0,
enableRtxp, enableRtxp, enableHls, enableMP4);
strongSelf->_muxer->setMediaListener(strongSelf->_listener);
InfoP(strongSelf) << "允许RTP推流";
} else {
WarnP(strongSelf) << "禁止RTP推流:" << err;
}
};
//触发推流鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
GET_CONFIG(bool, toRtxp, General::kPublishToRtxp);
GET_CONFIG(bool, toHls, General::kPublishToHls);
GET_CONFIG(bool, toMP4, General::kPublishToMP4);
invoker("", toRtxp, toHls, toMP4);
}
}

View File

@ -24,21 +24,31 @@ namespace mediakit{
string printSSRC(uint32_t ui32Ssrc);
class FrameMerger;
class RtpProcess : public RtpReceiver , public RtpDecoder{
class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public std::enable_shared_from_this<RtpProcess>{
public:
typedef std::shared_ptr<RtpProcess> Ptr;
RtpProcess(uint32_t ssrc);
~RtpProcess();
bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
bool alive();
string get_peer_ip();
uint16_t get_peer_port();
string get_local_ip() override;
uint16_t get_local_port() override;
string get_peer_ip() override;
uint16_t get_peer_port() override;
string getIdentifier() const override;
int totalReaderCount();
void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
protected:
void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ;
void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) override;
void onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts, const void *data,int bytes);
private:
void emitOnPublish();
private:
std::shared_ptr<FILE> _save_file_rtp;
std::shared_ptr<FILE> _save_file_ps;
@ -55,6 +65,10 @@ private:
unordered_map<int,Stamp> _stamps;
uint32_t _dts = 0;
Decoder::Ptr _decoder;
std::weak_ptr<MediaSourceEvent> _listener;
MediaInfo _media_info;
uint64_t _total_bytes = 0;
Socket::Ptr _sock;
};
}//namespace mediakit

View File

@ -15,7 +15,7 @@ namespace mediakit{
INSTANCE_IMP(RtpSelector);
bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
uint32_t ssrc = 0;
if(!getSSRC(data,data_len,ssrc)){
WarnL << "get ssrc from rtp failed:" << data_len;
@ -23,7 +23,7 @@ bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr
}
auto process = getProcess(ssrc, true);
if(process){
return process->inputRtp(data,data_len, addr,dts_out);
return process->inputRtp(sock, data,data_len, addr,dts_out);
}
return false;
}

View File

@ -45,7 +45,7 @@ public:
~RtpSelector();
static RtpSelector &Instance();
bool inputRtp(const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr );
bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr );
static bool getSSRC(const char *data,int data_len, uint32_t &ssrc);
RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew);
void delProcess(uint32_t ssrc,const RtpProcess *ptr);

View File

@ -57,7 +57,7 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) {
_process = RtpSelector::Instance().getProcess(_ssrc, true);
_process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
}
_process->inputRtp(data + 2, len - 2, &addr);
_process->inputRtp(_sock,data + 2, len - 2, &addr);
_ticker.resetTime();
}

View File

@ -17,6 +17,7 @@ UdpRecver::UdpRecver() {
}
UdpRecver::~UdpRecver() {
_sock->setOnRead(nullptr);
}
bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) {
@ -26,8 +27,9 @@ bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) {
});
auto &ref = RtpSelector::Instance();
_sock->setOnRead([&ref](const Buffer::Ptr &buf, struct sockaddr *addr, int ){
ref.inputRtp(buf->data(),buf->size(),addr);
auto sock = _sock;
_sock->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int ){
ref.inputRtp(sock, buf->data(),buf->size(),addr);
});
return _sock->bindUdpSock(local_port,local_ip);
}

View File

@ -141,7 +141,7 @@ uint16_t RtpMultiCaster::getPort(TrackType trackType){
return _apUdpSock[trackType]->get_local_port();
}
string RtpMultiCaster::getIP(){
return inet_ntoa(_aPeerUdpAddr[0].sin_addr);
return SockUtil::inet_ntoa(_aPeerUdpAddr[0].sin_addr);
}
RtpMultiCaster::Ptr RtpMultiCaster::make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream){
try{

View File

@ -38,7 +38,7 @@ public:
}
static string toString(uint32_t iAddr){
iAddr = htonl(iAddr);
return ::inet_ntoa((struct in_addr &)(iAddr));
return SockUtil::inet_ntoa((struct in_addr &)(iAddr));
}
virtual ~MultiCastAddressMaker(){}
std::shared_ptr<uint32_t> obtain(uint32_t iTry = 10);

View File

@ -119,10 +119,12 @@ void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
}
input(pBuf->data(),pBuf->size());
}
void RtspPlayer::onErr(const SockException &ex) {
//定时器_pPlayTimer为空后表明握手结束了
onPlayResult_l(ex,!_pPlayTimer);
}
// from live555
bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) {
if(!_rtspRealm.empty()){
@ -155,6 +157,7 @@ bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) {
}
return false;
}
void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
string authInfo = parser["WWW-Authenticate"];
//发送DESCRIBE命令后的回复
@ -237,7 +240,6 @@ void RtspPlayer::createUdpSockIfNecessary(int track_idx){
}
}
//发送SETUP命令
void RtspPlayer::sendSetup(unsigned int trackIndex) {
_onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex);
@ -337,7 +339,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
return;
}
if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) {
WarnL << "收到其他地址的rtp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
return;
}
strongSelf->handleOneRtp(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size());
@ -351,7 +353,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
return;
}
if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) {
WarnL << "收到其他地址的rtcp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
return;
}
strongSelf->onRtcpPacket(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size());
@ -394,6 +396,7 @@ void RtspPlayer::sendPause(int type , uint32_t seekMS){
break;
}
}
void RtspPlayer::pause(bool bPause) {
sendPause(bPause ? type_pause : type_seek, getProgressMilliSecond());
}
@ -468,10 +471,8 @@ void RtspPlayer::onRtpPacket(const char *data, uint64_t len) {
}
}
void RtspPlayer::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
}
//此处预留rtcp处理函数
void RtspPlayer::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){}
#if 0
//改代码提取自FFmpeg参考之
@ -597,7 +598,6 @@ void RtspPlayer::sendReceiverReport(bool overTcp,int iTrackIndex){
}
}
void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){
//统计丢包率
if (_aui16FirstSeq[trackidx] == 0 || rtppt->sequence < _aui16FirstSeq[trackidx]) {
@ -613,6 +613,7 @@ void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){
rtppt->timeStamp = dts_out;
onRecvRTP_l(rtppt,_aTrackInfo[trackidx]);
}
float RtspPlayer::getPacketLossRate(TrackType type) const{
int iTrackIdx = getTrackIndexByTrackType(type);
if(iTrackIdx == -1){
@ -637,6 +638,7 @@ float RtspPlayer::getPacketLossRate(TrackType type) const{
uint32_t RtspPlayer::getProgressMilliSecond() const{
return MAX(_stamp[0].getRelativeStamp(),_stamp[1].getRelativeStamp());
}
void RtspPlayer::seekToMilliSecond(uint32_t ms) {
sendPause(type_seek,ms);
}
@ -654,6 +656,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std
}
sendRtspRequest(cmd,url,header_map);
}
void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) {
auto header = header_const;
header.emplace("CSeq",StrPrinter << _uiCseq++);
@ -701,7 +704,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC
for (auto &pr : header){
printer << pr.first << ": " << pr.second << "\r\n";
}
send(printer << "\r\n");
SockSender::send(printer << "\r\n");
}
void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &track) {
@ -725,9 +728,8 @@ void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &tra
ticker.resetTime();
}
}
}
void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) {
WarnL << ex.getErrCode() << " " << ex.what();
@ -795,5 +797,3 @@ int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const {
}
} /* namespace mediakit */

View File

@ -395,7 +395,7 @@ void RtspPusher::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
SockUtil::setNoDelay(_sock->rawFD(), false);
}
}
@ -471,7 +471,7 @@ void RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrC
if(!sdp.empty()){
printer << sdp;
}
send(printer);
SockSender::send(printer);
}

View File

@ -92,7 +92,7 @@ void RtspSession::onError(const SockException& err) {
//流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, getIdentifier(), get_peer_ip(), get_peer_port());
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast<SockInfo &>(*this));
}
}
@ -302,7 +302,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
};
//rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
@ -341,10 +341,7 @@ void RtspSession::handleReq_Describe(const Parser &parser) {
};
//广播是否需要认证事件
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
_mediaInfo,
invoker,
*this)){
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,_mediaInfo,invoker,static_cast<SockInfo &>(*this))){
//无人监听此事件,说明无需认证
invoker("");
}
@ -446,7 +443,7 @@ void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
};
//此时必须提供明文密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,*this)){
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,static_cast<SockInfo &>(*this))){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnP(this) << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
@ -530,7 +527,7 @@ void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
};
//此时可以提供明文或md5加密的密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,*this)){
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,static_cast<SockInfo &>(*this))){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnP(this) << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
@ -824,7 +821,7 @@ void RtspSession::handleReq_Play(const Parser &parser) {
};
if(_bFirstPlay){
//第一次收到play命令需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
onRes("");
@ -955,7 +952,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) {
if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
<< inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
<< SockUtil::inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
return true;
}
@ -1244,7 +1241,7 @@ void RtspSession::setSocketFlags(){
//推流模式下关闭TCP_NODELAY会增加推流端的延时但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下开启MSG_MORE会增加延时但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}

View File

@ -76,7 +76,7 @@ void UDPServer::onErr(const string& strKey, const SockException& err) {
void UDPServer::onRcvData(int intervaled, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) {
//TraceL << trackIndex;
struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr;
string peerIp = inet_ntoa(in->sin_addr);
string peerIp = SockUtil::inet_ntoa(in->sin_addr);
lock_guard<mutex> lck(_mtxDataHandler);
auto it0 = _mapDataHandler.find(peerIp);
if (it0 == _mapDataHandler.end()) {

View File

@ -41,7 +41,7 @@ void ShellSession::onRecv(const Buffer::Ptr&buf) {
_beatTicker.resetTime();
_strRecvBuf.append(buf->data(), buf->size());
if (_strRecvBuf.find("\xff\xf4\xff\0xfd\x06") != std::string::npos) {
send("\033[0m\r\n Bye bye!\r\n");
SockSender::send("\033[0m\r\n Bye bye!\r\n");
shutdown(SockException(Err_other,"received Ctrl+C"));
return;
}
@ -78,20 +78,20 @@ inline bool ShellSession::onCommandLine(const string& line) {
try {
std::shared_ptr<stringstream> ss(new stringstream);
CMDRegister::Instance()(line,ss);
send(ss->str());
SockSender::send(ss->str());
}catch(ExitException &ex){
return false;
}catch(std::exception &ex){
send(ex.what());
send("\r\n");
SockSender::send(ex.what());
SockSender::send("\r\n");
}
printShellPrefix();
return true;
}
inline void ShellSession::pleaseInputUser() {
send("\033[0m");
send(StrPrinter << SERVER_NAME << " login: " << endl);
SockSender::send("\033[0m");
SockSender::send(StrPrinter << SERVER_NAME << " login: " << endl);
_loginInterceptor = [this](const string &user_name) {
_strUserName=user_name;
pleaseInputPasswd();
@ -99,24 +99,24 @@ inline void ShellSession::pleaseInputUser() {
};
}
inline void ShellSession::pleaseInputPasswd() {
send("Password: \033[8m");
SockSender::send("Password: \033[8m");
_loginInterceptor = [this](const string &passwd) {
auto onAuth = [this](const string &errMessage){
if(!errMessage.empty()){
//鉴权失败
send(StrPrinter
<<"\033[0mAuth failed("
SockSender::send(StrPrinter
<< "\033[0mAuth failed("
<< errMessage
<<"), please try again.\r\n"
<<_strUserName<<"@"<<SERVER_NAME
<<"'s password: \033[8m"
<<endl);
<< "), please try again.\r\n"
<< _strUserName << "@" << SERVER_NAME
<< "'s password: \033[8m"
<< endl);
return;
}
send("\033[0m");
send("-----------------------------------------\r\n");
send(StrPrinter<<"欢迎来到"<<SERVER_NAME<<", 你可输入\"help\"查看帮助.\r\n"<<endl);
send("-----------------------------------------\r\n");
SockSender::send("\033[0m");
SockSender::send("-----------------------------------------\r\n");
SockSender::send(StrPrinter<<"欢迎来到"<<SERVER_NAME<<", 你可输入\"help\"查看帮助.\r\n"<<endl);
SockSender::send("-----------------------------------------\r\n");
printShellPrefix();
_loginInterceptor=nullptr;
};
@ -136,7 +136,7 @@ inline void ShellSession::pleaseInputPasswd() {
});
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastShellLogin,_strUserName,passwd,invoker,*this);
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastShellLogin,_strUserName,passwd,invoker,static_cast<SockInfo &>(*this));
if(!flag){
//如果无人监听shell登录事件那么默认shell无法登录
onAuth("please listen kBroadcastShellLogin event");
@ -146,7 +146,7 @@ inline void ShellSession::pleaseInputPasswd() {
}
inline void ShellSession::printShellPrefix() {
send(StrPrinter << _strUserName << "@" << SERVER_NAME << "# " << endl);
SockSender::send(StrPrinter << _strUserName << "@" << SERVER_NAME << "# " << endl);
}
}/* namespace mediakit */

View File

@ -64,7 +64,7 @@ void initEventListener(){
}
///////////////header//////////////////
printer << "\r\nheader:\r\n";
for(auto &pr : parser.getValues()){
for(auto &pr : parser.getHeader()){
printer << "\t" << pr.first << " : " << pr.second << "\r\n";
}
////////////////content/////////////////

View File

@ -55,7 +55,7 @@ static bool loadFile(const char *path){
}
uint32_t timeStamp;
RtpSelector::Instance().inputRtp(rtp,len, &addr,&timeStamp);
RtpSelector::Instance().inputRtp(nullptr,rtp,len, &addr,&timeStamp);
if(timeStamp_last){
auto diff = timeStamp - timeStamp_last;
if(diff > 0){

View File

@ -36,7 +36,7 @@ protected:
}
//tcp连接成功后每2秒触发一次该事件
void onManager() override {
send("echo test!");
SockSender::send("echo test!");
DebugL << "send echo test";
}
//连接服务器结果回调

View File

@ -35,7 +35,7 @@ public:
}
void onRecv(const Buffer::Ptr &buffer) override {
//回显数据
send("from EchoSession:");
SockSender::send("from EchoSession:");
send(buffer);
}
void onError(const SockException &err) override{
@ -62,7 +62,7 @@ public:
}
void onRecv(const Buffer::Ptr &buffer) override {
//回显数据
send("from EchoSessionWithUrl:");
SockSender::send("from EchoSessionWithUrl:");
send(buffer);
}
void onError(const SockException &err) override{