Merge pull request #9 from xiongziliang/master

update
This commit is contained in:
baiyfcu 2019-10-30 13:20:09 +08:00 committed by GitHub
commit 5d1df021fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 1282 additions and 662 deletions

4
.gitattributes vendored
View File

@ -1,2 +1,2 @@
release/ filter=lfs diff=lfs merge=lfs -text *.h linguist-language=cpp
*.a filter=lfs diff=lfs merge=lfs -text *.c linguist-language=cpp

5
.github/FUNDING.yml vendored Normal file
View File

@ -0,0 +1,5 @@
# These are supported funding model platforms
custom: ['https://www.paypal.me/xiachu']
ko_fi: xiachu
issuehunt: xiongziliang
liberapay: xiachu

@ -1 +1 @@
Subproject commit e256dabd370220a5b19e3c5b54cf29dd5cccd48e Subproject commit 8d1681b5bb247e7f47ae0f8c414f6eeb376b742b

View File

@ -4,7 +4,7 @@ android {
compileSdkVersion 28 compileSdkVersion 28
defaultConfig { defaultConfig {
applicationId "com.zlmediakit.demo" applicationId "com.zlmediakit.demo"
minSdkVersion 19 minSdkVersion 15
targetSdkVersion 28 targetSdkVersion 28
versionCode 1 versionCode 1
versionName "1.0" versionName "1.0"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -174,6 +174,7 @@ JNI_API(jboolean,startDemo,jstring ini_dir){
mINI::Instance()["http.sslport"] = 8443; mINI::Instance()["http.sslport"] = 8443;
mINI::Instance()["rtsp.port"] = 8554; mINI::Instance()["rtsp.port"] = 8554;
mINI::Instance()["rtsp.sslport"] = 8332; mINI::Instance()["rtsp.sslport"] = 8332;
mINI::Instance()["general.enableVhost"] = 0;
for(auto &pr : mINI::Instance()){ for(auto &pr : mINI::Instance()){
//替换hook默认地址 //替换hook默认地址
replace(pr.second,"https://127.0.0.1/","http://127.0.0.1:8080/"); replace(pr.second,"https://127.0.0.1/","http://127.0.0.1:8080/");

View File

@ -38,10 +38,10 @@ public class MainActivity extends AppCompatActivity {
if(permissionSuccess){ if(permissionSuccess){
Toast.makeText(this,"你可以修改配置文件再启动:" + sd_dir + "/zlmediakit.ini" ,Toast.LENGTH_LONG).show(); Toast.makeText(this,"你可以修改配置文件再启动:" + sd_dir + "/zlmediakit.ini" ,Toast.LENGTH_LONG).show();
Toast.makeText(this,"SSL证书请放置在" + sd_dir + "/zlmediakit.pem" ,Toast.LENGTH_LONG).show(); Toast.makeText(this,"SSL证书请放置在" + sd_dir + "/zlmediakit.pem" ,Toast.LENGTH_LONG).show();
ZLMediaKit.startDemo(sd_dir);
}else{ }else{
Toast.makeText(this,"请给予我权限,否则无法启动测试!" ,Toast.LENGTH_LONG).show(); Toast.makeText(this,"请给予我权限,否则无法启动测试!" ,Toast.LENGTH_LONG).show();
} }
ZLMediaKit.startDemo(sd_dir);
} }
private ZLMediaKit.MediaPlayer _player; private ZLMediaKit.MediaPlayer _player;

View File

@ -153,6 +153,15 @@ if(ENABLE_MP4RECORD)
endif(WIN32) endif(WIN32)
endif() endif()
if(${CMAKE_BUILD_TYPE} MATCHES "Release")
#jemalloc
find_package(JEMALLOC QUIET)
if(JEMALLOC_FOUND)
message(STATUS "found library:\"${JEMALLOC_LIBRARIES}\"")
include_directories(${JEMALLOC_INCLUDE_DIR})
list(APPEND LINK_LIB_LIST ${JEMALLOC_LIBRARIES})
endif()
endif()
if (WIN32) if (WIN32)
list(APPEND LINK_LIB_LIST WS2_32 Iphlpapi shlwapi) list(APPEND LINK_LIB_LIST WS2_32 Iphlpapi shlwapi)

View File

@ -8,12 +8,11 @@
## Why ZLMediaKit? ## Why ZLMediaKit?
- Developed based on C++ 11, the code is stable and reliable, avoiding the use of raw pointers, cross-platform porting is simple and convenient, and the code is clear and concise. - Developed based on C++ 11, the code is stable and reliable, avoiding the use of raw pointers, cross-platform porting is simple and convenient, and the code is clear and concise.
- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV`),and support Inter-protocol conversion. - Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV/Websocket-flv`),and support Inter-protocol conversion.
- Multiplexing asynchronous network IO based on epoll and multi threadextreme performance. - Multiplexing asynchronous network IO based on epoll and multi threadextreme performance.
- Well performance and stable test,can be used commercially. - Well performance and stable test,can be used commercially.
- Support linux, macos, ios, android, Windows Platforms. - Support linux, macos, ios, android, Windows Platforms.
- Very low latency(lower then one second), video opened immediately. - Very low latency(lower then one second), video opened immediately.
- **Now Support websocket-flv!**
## Features ## Features

View File

@ -4,14 +4,13 @@
## 项目特点 ## 项目特点
- 基于C++11开发避免使用裸指针代码稳定可靠同时跨平台移植简单方便代码清晰简洁。 - 基于C++11开发避免使用裸指针代码稳定可靠同时跨平台移植简单方便代码清晰简洁。
- 打包多种流媒体协议(RTSP/RTMP/HLS支持协议间的互相转换提供一站式的服务。 - 打包多种流媒体协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV),支持协议间的互相转换,提供一站式的服务。
- 使用epoll+线程池+异步网络IO模式开发并发性能优越。 - 使用epoll+线程池+异步网络IO模式开发并发性能优越。
- 已实现主流的的H264/H265+AAC流媒体方案代码精简,脉络清晰,适合学习。 - 已实现主流的的H264/H265+AAC流媒体方案代码精简,脉络清晰,适合学习。
- 编码格式与框架代码解耦,方便自由简洁的添加支持其他编码格式 - 编码格式与框架代码解耦,方便自由简洁的添加支持其他编码格式
- 代码经过大量的稳定性、性能测试,可满足商用服务器项目。 - 代码经过大量的稳定性、性能测试,可满足商用服务器项目。
- 支持linux、macos、ios、android、windows平台 - 支持linux、macos、ios、android、windows平台
- 支持画面秒开(GOP缓存)、极低延时([500毫秒内最低可达100毫秒](https://github.com/zlmediakit/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95)) - 支持画面秒开(GOP缓存)、极低延时([500毫秒内最低可达100毫秒](https://github.com/zlmediakit/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95))
- **支持websocket-flv直播**
- [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86) - [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86)
## 项目定位 ## 项目定位

View File

@ -6,12 +6,13 @@ apiDebug=1
secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc
[ffmpeg] [ffmpeg]
#FFmpeg可执行程序路径 #FFmpeg可执行程序绝对路径
bin=/usr/local/bin/ffmpeg bin=/usr/local/bin/ffmpeg
#FFmpeg拉流再推流的命令模板通过该模板可以设置再编码的一些参数 #FFmpeg拉流再推流的命令模板通过该模板可以设置再编码的一些参数
cmd=%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s cmd=%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s
#FFmpeg日志的路径如果置空则不生成FFmpeg日志 #FFmpeg日志的路径如果置空则不生成FFmpeg日志
log=/Users/xzl/git/ZLMediaKit/release/mac/Release/ffmpeg/ffmpeg.log #可以为相对(相对于本可执行程序目录)或绝对路径
log=./ffmpeg/ffmpeg.log
[general] [general]
#是否启用虚拟主机 #是否启用虚拟主机
@ -30,12 +31,18 @@ maxStreamWaitMS=5000
streamNoneReaderDelayMS=5000 streamNoneReaderDelayMS=5000
#是否开启低延时模式该模式下禁用MSG_MORE,启用TCP_NODEALY延时将降低但数据发送性能将降低 #是否开启低延时模式该模式下禁用MSG_MORE,启用TCP_NODEALY延时将降低但数据发送性能将降低
ultraLowDelay=1 ultraLowDelay=1
#拉流代理是否添加静音音频(直接拉流模式本协议无效)
addMuteAudio=1
#拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始,
#如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写)
resetWhenRePlay=1
[hls] [hls]
#hls写文件的buf大小调整参数可以提高文件io性能 #hls写文件的buf大小调整参数可以提高文件io性能
fileBufSize=65536 fileBufSize=65536
#hls保存文件路径 #hls保存文件路径
filePath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot #可以为相对(相对于本可执行程序目录)或绝对路径
filePath=./httpRoot
#hls最大切片时间 #hls最大切片时间
segDur=3 segDur=3
#m3u8索引中,hls保留切片个数(实际保留切片个数大2~3个) #m3u8索引中,hls保留切片个数(实际保留切片个数大2~3个)
@ -88,7 +95,8 @@ notFound=<html><head><title>404 Not Found</title></head><body bgcolor="white"><c
#http服务器监听端口 #http服务器监听端口
port=80 port=80
#http文件服务器根目录 #http文件服务器根目录
rootPath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot #可以为相对(相对于本可执行程序目录)或绝对路径
rootPath=./httpRoot
#http文件服务器读文件缓存大小单位BYTE调整该参数可以优化文件io性能 #http文件服务器读文件缓存大小单位BYTE调整该参数可以优化文件io性能
sendBufSize=65536 sendBufSize=65536
#https服务器监听端口 #https服务器监听端口
@ -109,12 +117,17 @@ appName=record
#mp4录制写文件缓存单位BYTE,调整参数可以提高文件io性能 #mp4录制写文件缓存单位BYTE,调整参数可以提高文件io性能
fileBufSize=65536 fileBufSize=65536
#mp4录制保存、mp4点播根路径 #mp4录制保存、mp4点播根路径
filePath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot #可以为相对(相对于本可执行程序目录)或绝对路径
filePath=./httpRoot
#mp4录制切片时间单位秒 #mp4录制切片时间单位秒
fileSecond=3600 fileSecond=3600
#mp4点播每次流化数据量单位毫秒 #mp4点播每次流化数据量单位毫秒
#减少该值可以让点播数据发送量更平滑增大该值则更节省cpu资源 #减少该值可以让点播数据发送量更平滑增大该值则更节省cpu资源
sampleMS=100 sampleMS=500
#mp4录制完成后是否进行二次关键帧索引写入头部
fastStart=0
#MP4点播(rtsp/rtmp/http-flv/ws-flv)是否循环播放文件
fileRepeat=0
[rtmp] [rtmp]
#rtmp必须在此时间内完成握手否则服务器会断开链接单位秒 #rtmp必须在此时间内完成握手否则服务器会断开链接单位秒
@ -158,6 +171,8 @@ keepAliveSecond=15
port=554 port=554
#rtsps服务器监听地址 #rtsps服务器监听地址
sslport=322 sslport=322
#在接收rtsp推流时是否重新生成时间戳(很多推流器的时间戳着实很烂)
modifyStamp=1
[shell] [shell]
#调试telnet服务器接受最大bufffer大小 #调试telnet服务器接受最大bufffer大小

View File

@ -39,7 +39,7 @@ const char kLog[] = FFmpeg_FIELD"log";
onceToken token([]() { onceToken token([]() {
mINI::Instance()[kBin] = trim(System::execute("which ffmpeg")); mINI::Instance()[kBin] = trim(System::execute("which ffmpeg"));
mINI::Instance()[kCmd] = "%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s"; mINI::Instance()[kCmd] = "%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s";
mINI::Instance()[kLog] = exeDir() + "ffmpeg/ffmpeg.log"; mINI::Instance()[kLog] = "./ffmpeg/ffmpeg.log";
}); });
} }
@ -64,7 +64,7 @@ void FFmpegSource::play(const string &src_url,const string &dst_url,int timeout_
char cmd[1024] = {0}; char cmd[1024] = {0};
snprintf(cmd, sizeof(cmd),ffmpeg_cmd.data(),ffmpeg_bin.data(),src_url.data(),dst_url.data()); snprintf(cmd, sizeof(cmd),ffmpeg_cmd.data(),ffmpeg_bin.data(),src_url.data(),dst_url.data());
_process.run(cmd,ffmpeg_log); _process.run(cmd,File::absolutePath("",ffmpeg_log));
InfoL << cmd; InfoL << cmd;
if(_media_info._host == "127.0.0.1"){ if(_media_info._host == "127.0.0.1"){

View File

@ -34,9 +34,8 @@
#include "Util/File.h" #include "Util/File.h"
#include "Util/logger.h" #include "Util/logger.h"
#include "Util/uv_errno.h" #include "Util/uv_errno.h"
#include "Util/TimeTicker.h" #include "Thread/WorkThreadPool.h"
#include "Process.h" #include "Process.h"
#include "Poller/Timer.h"
using namespace toolkit; using namespace toolkit;
void Process::run(const string &cmd, const string &log_file_tmp) { void Process::run(const string &cmd, const string &log_file_tmp) {
@ -46,12 +45,11 @@ void Process::run(const string &cmd, const string &log_file_tmp) {
throw std::runtime_error(StrPrinter << "fork child process falied,err:" << get_uv_errmsg()); throw std::runtime_error(StrPrinter << "fork child process falied,err:" << get_uv_errmsg());
} }
if (_pid == 0) { if (_pid == 0) {
//子进程
//子进程关闭core文件生成 //子进程关闭core文件生成
struct rlimit rlim = {0,0}; struct rlimit rlim = {0,0};
setrlimit(RLIMIT_CORE, &rlim); setrlimit(RLIMIT_CORE, &rlim);
//在启动子进程时暂时禁用SIGINT、SIGTERM信号
// ignore the SIGINT and SIGTERM // ignore the SIGINT and SIGTERM
signal(SIGINT, SIG_IGN); signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN); signal(SIGTERM, SIG_IGN);
@ -109,24 +107,73 @@ void Process::run(const string &cmd, const string &log_file_tmp) {
InfoL << "start child proces " << _pid; InfoL << "start child proces " << _pid;
} }
void Process::kill(int max_delay) {
/**
*
* @param pid
* @param exit_code_ptr
* @param block
* @return
*/
static bool s_wait(pid_t pid,int *exit_code_ptr,bool block) {
if (pid <= 0) {
return false;
}
int status = 0;
pid_t p = waitpid(pid, &status, block ? 0 : WNOHANG);
int exit_code = (status & 0xFF00) >> 8;
if(exit_code_ptr){
*exit_code_ptr = (status & 0xFF00) >> 8;
}
if (p < 0) {
WarnL << "waitpid failed, pid=" << pid << ", err=" << get_uv_errmsg();
return false;
}
if (p > 0) {
InfoL << "process terminated, pid=" << pid << ", exit code=" << exit_code;
return false;
}
//WarnL << "process is running, pid=" << _pid;
return true;
}
static void s_kill(pid_t pid,int max_delay,bool force){
if (pid <= 0) {
//pid无效
return;
}
if (::kill(pid, force ? SIGKILL : SIGTERM) == -1) {
//进程可能已经退出了
WarnL << "kill process " << pid << " failed:" << get_uv_errmsg();
return;
}
if(force){
//发送SIGKILL信号后阻塞等待退出
s_wait(pid, NULL, true);
DebugL << "force kill " << pid << " success!";
return;
}
//发送SIGTERM信号后2秒后检查子进程是否已经退出
WorkThreadPool::Instance().getPoller()->doDelayTask(max_delay,[pid](){
if (!s_wait(pid, nullptr, false)) {
//进程已经退出了
return 0;
}
//进程还在运行
WarnL << "process still working,force kill it:" << pid;
s_kill(pid,0, true);
return 0;
});
}
void Process::kill(int max_delay,bool force) {
if (_pid <= 0) { if (_pid <= 0) {
return; return;
} }
if (::kill(_pid, SIGTERM) == -1) { s_kill(_pid,max_delay,force);
WarnL << "kill process " << _pid << " falied,err:" << get_uv_errmsg();
} else {
//等待子进程退出
auto pid = _pid;
EventPollerPool::Instance().getPoller()->doDelayTask(max_delay,[pid](){
//最多等待2秒,2秒后强制杀掉程序
if (waitpid(pid, NULL, WNOHANG) == 0) {
::kill(pid, SIGKILL);
WarnL << "force kill process " << pid;
}
return 0;
});
}
_pid = -1; _pid = -1;
} }
@ -134,28 +181,10 @@ Process::~Process() {
kill(2000); kill(2000);
} }
Process::Process() { Process::Process() {}
}
bool Process::wait(bool block) { bool Process::wait(bool block) {
if (_pid <= 0) { return s_wait(_pid,&_exit_code,block);
return false;
}
int status = 0;
pid_t p = waitpid(_pid, &status, block ? 0 : WNOHANG);
_exit_code = (status & 0xFF00) >> 8;
if (p < 0) {
WarnL << "waitpid failed, pid=" << _pid << ", err=" << get_uv_errmsg();
return false;
}
if (p > 0) {
InfoL << "process terminated, pid=" << _pid << ", exit code=" << _exit_code;
return false;
}
//WarnL << "process is running, pid=" << _pid;
return true;
} }
int Process::exit_code() { int Process::exit_code() {

View File

@ -36,7 +36,7 @@ public:
Process(); Process();
~Process(); ~Process();
void run(const string &cmd,const string &log_file); void run(const string &cmd,const string &log_file);
void kill(int max_delay); void kill(int max_delay,bool force = false);
bool wait(bool block = true); bool wait(bool block = true);
int exit_code(); int exit_code();
private: private:

View File

@ -45,6 +45,7 @@
#include "Util/MD5.h" #include "Util/MD5.h"
#include "WebApi.h" #include "WebApi.h"
#include "WebHook.h" #include "WebHook.h"
#include "Thread/WorkThreadPool.h"
#if !defined(_WIN32) #if !defined(_WIN32)
#include "FFmpegSource.h" #include "FFmpegSource.h"
@ -180,19 +181,35 @@ static inline void addHttpListener(){
if(api_debug){ if(api_debug){
auto newInvoker = [invoker,parser,allArgs](const string &codeOut, auto newInvoker = [invoker,parser,allArgs](const string &codeOut,
const HttpSession::KeyValue &headerOut, const HttpSession::KeyValue &headerOut,
const string &contentOut){ const HttpBody::Ptr &body){
stringstream ss; stringstream ss;
for(auto &pr : allArgs ){ for(auto &pr : allArgs ){
ss << pr.first << " : " << pr.second << "\r\n"; ss << pr.first << " : " << pr.second << "\r\n";
} }
DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" //body默认为空
<< "# content:\r\n" << parser.Content() << "\r\n" int64_t size = 0;
<< "# args:\r\n" << ss.str() if (body && body->remainSize()) {
<< "# response:\r\n" //有body获取body大小
<< contentOut << "\r\n"; size = body->remainSize();
}
invoker(codeOut,headerOut,contentOut); if(size < 4 * 1024){
string contentOut = body->readData(size)->toString();
DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n"
<< "# content:\r\n" << parser.Content() << "\r\n"
<< "# args:\r\n" << ss.str()
<< "# response:\r\n"
<< contentOut << "\r\n";
invoker(codeOut,headerOut,contentOut);
} else{
DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n"
<< "# content:\r\n" << parser.Content() << "\r\n"
<< "# args:\r\n" << ss.str()
<< "# response size:"
<< size <<"\r\n";
invoker(codeOut,headerOut,body);
}
}; };
((HttpSession::HttpResponseInvoker &)invoker) = newInvoker; ((HttpSession::HttpResponseInvoker &)invoker) = newInvoker;
} }
@ -281,6 +298,23 @@ void installWebApi() {
}); });
}); });
//获取后台工作线程负载
//测试url http://127.0.0.1/index/api/getWorkThreadsLoad
API_REGIST_INVOKER(api, getWorkThreadsLoad, {
WorkThreadPool::Instance().getExecutorDelay([invoker, headerOut](const vector<int> &vecDelay) {
Value val;
auto vec = WorkThreadPool::Instance().getExecutorLoad();
int i = 0;
for (auto load : vec) {
Value obj(objectValue);
obj["load"] = load;
obj["delay"] = vecDelay[i++];
val["data"].append(obj);
}
invoker("200 OK", headerOut, val.toStyledString());
});
});
//获取服务器配置 //获取服务器配置
//测试url http://127.0.0.1/index/api/getServerConfig //测试url http://127.0.0.1/index/api/getServerConfig
API_REGIST(api, getServerConfig, { API_REGIST(api, getServerConfig, {
@ -578,6 +612,13 @@ void installWebApi() {
}); });
#endif #endif
//新增http api下载可执行程序文件接口
//测试url http://127.0.0.1/index/api/downloadBin
API_REGIST_INVOKER(api,downloadBin,{
CHECK_SECRET();
invoker.responseFile(headerIn,StrCaseMap(),exePath());
});
////////////以下是注册的Hook API//////////// ////////////以下是注册的Hook API////////////
API_REGIST(hook,on_publish,{ API_REGIST(hook,on_publish,{
//开始推流事件 //开始推流事件

View File

@ -32,19 +32,9 @@ namespace mediakit{
void MediaSink::addTrack(const Track::Ptr &track_in) { void MediaSink::addTrack(const Track::Ptr &track_in) {
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
//克隆Track只拷贝其数据不拷贝其数据转发关系 //克隆Track只拷贝其数据不拷贝其数据转发关系
auto track = track_in->clone(); auto track = track_in->clone();
weak_ptr<MediaSink> weakSelf = shared_from_this();
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([weakSelf](const Frame::Ptr &frame){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
if(!strongSelf->_anyTrackUnReady){
strongSelf->onTrackFrame(frame);
}
}));
auto codec_id = track->getCodecId(); auto codec_id = track->getCodecId();
_track_map[codec_id] = track; _track_map[codec_id] = track;
auto lam = [this,track](){ auto lam = [this,track](){
@ -58,6 +48,26 @@ void MediaSink::addTrack(const Track::Ptr &track_in) {
_trackReadyCallback[codec_id] = lam; _trackReadyCallback[codec_id] = lam;
_ticker.resetTime(); _ticker.resetTime();
} }
weak_ptr<MediaSink> weakSelf = shared_from_this();
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([weakSelf](const Frame::Ptr &frame){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
if(!strongSelf->_anyTrackUnReady){
strongSelf->onTrackFrame(frame);
}
}));
}
void MediaSink::resetTracks() {
lock_guard<recursive_mutex> lck(_mtx);
_anyTrackUnReady = false;
_allTrackReady = false;
_track_map.clear();
_trackReadyCallback.clear();
_ticker.resetTime();
} }
void MediaSink::inputFrame(const Frame::Ptr &frame) { void MediaSink::inputFrame(const Frame::Ptr &frame) {

View File

@ -52,7 +52,7 @@ public:
* frame * frame
* @param frame * @param frame
*/ */
void inputFrame(const Frame::Ptr &frame) override ; void inputFrame(const Frame::Ptr &frame) override;
/** /**
* trackTrack的clone方法 * trackTrack的clone方法
@ -61,21 +61,24 @@ public:
*/ */
virtual void addTrack(const Track::Ptr & track); virtual void addTrack(const Track::Ptr & track);
/**
* track
*/
virtual void resetTracks();
/** /**
* Track是否都准备好了 * Track是否都准备好了
* @return * @return
*/ */
bool isAllTrackReady() const ; bool isAllTrackReady() const;
/** /**
* Track * Track
* @param type track类型 * @param type track类型
* @param trackReady Track * @param trackReady Track
* @return * @return
*/ */
Track::Ptr getTrack(TrackType type,bool trackReady = true) const ; Track::Ptr getTrack(TrackType type,bool trackReady = true) const;
protected: protected:
/** /**
* track已经准备好ready()true * track已经准备好ready()true

View File

@ -69,6 +69,19 @@ public:
_record->addTrack(track); _record->addTrack(track);
} }
/**
*
*/
void resetTracks() {
if(_rtmp){
_rtmp->resetTracks();
}
if(_rtsp){
_rtsp->resetTracks();
}
_record->resetTracks();
}
/** /**
* rtmp * rtmp
* @param frame * @param frame

View File

@ -163,7 +163,7 @@ class Parser {
for (string &key_val : arg_vec) { for (string &key_val : arg_vec) {
auto key = FindField(key_val.data(), NULL, key_delim); auto key = FindField(key_val.data(), NULL, key_delim);
auto val = FindField(key_val.data(), key_delim, NULL); auto val = FindField(key_val.data(), key_delim, NULL);
ret.emplace_force(key,val); ret.emplace_force(trim(key),trim(val));
} }
return ret; return ret;
} }

View File

@ -77,12 +77,17 @@ const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS";
const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS"; const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS";
const string kEnableVhost = GENERAL_FIELD"enableVhost"; const string kEnableVhost = GENERAL_FIELD"enableVhost";
const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay"; const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay";
const string kAddMuteAudio = GENERAL_FIELD"addMuteAudio";
const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kFlowThreshold] = 1024; mINI::Instance()[kFlowThreshold] = 1024;
mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000; mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000;
mINI::Instance()[kMaxStreamWaitTimeMS] = 5 * 1000; mINI::Instance()[kMaxStreamWaitTimeMS] = 5 * 1000;
mINI::Instance()[kEnableVhost] = 1; mINI::Instance()[kEnableVhost] = 1;
mINI::Instance()[kUltraLowDelay] = 1; mINI::Instance()[kUltraLowDelay] = 1;
mINI::Instance()[kAddMuteAudio] = 1;
mINI::Instance()[kResetWhenRePlay] = 1;
},nullptr); },nullptr);
}//namespace General }//namespace General
@ -117,7 +122,7 @@ const string kMaxReqCount = HTTP_FIELD"maxReqCount";
const string kCharSet = HTTP_FIELD"charSet"; const string kCharSet = HTTP_FIELD"charSet";
//http 服务器根目录 //http 服务器根目录
#define HTTP_ROOT_PATH (exeDir() + "httpRoot") #define HTTP_ROOT_PATH "./httpRoot"
const string kRootPath = HTTP_FIELD"rootPath"; const string kRootPath = HTTP_FIELD"rootPath";
//http 404错误提示内容 //http 404错误提示内容
@ -172,7 +177,7 @@ onceToken token([](){
mINI::Instance()[kHandshakeSecond] = 15; mINI::Instance()[kHandshakeSecond] = 15;
mINI::Instance()[kKeepAliveSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15;
mINI::Instance()[kDirectProxy] = 1; mINI::Instance()[kDirectProxy] = 1;
mINI::Instance()[kModifyStamp] = true; mINI::Instance()[kModifyStamp] = false;
},nullptr); },nullptr);
} //namespace Rtsp } //namespace Rtsp
@ -254,7 +259,7 @@ namespace Record {
const string kAppName = RECORD_FIELD"appName"; const string kAppName = RECORD_FIELD"appName";
//每次流化MP4文件的时长,单位毫秒 //每次流化MP4文件的时长,单位毫秒
#define RECORD_SAMPLE_MS 100 #define RECORD_SAMPLE_MS 500
const string kSampleMS = RECORD_FIELD"sampleMS"; const string kSampleMS = RECORD_FIELD"sampleMS";
//MP4文件录制大小,默认一个小时 //MP4文件录制大小,默认一个小时
@ -268,6 +273,9 @@ const string kFilePath = RECORD_FIELD"filePath";
//mp4文件写缓存大小 //mp4文件写缓存大小
const string kFileBufSize = RECORD_FIELD"fileBufSize"; const string kFileBufSize = RECORD_FIELD"fileBufSize";
//mp4录制完成后是否进行二次关键帧索引写入头部
const string kFastStart = RECORD_FIELD"fastStart";
//mp4文件是否重头循环读取 //mp4文件是否重头循环读取
const string kFileRepeat = RECORD_FIELD"fileRepeat"; const string kFileRepeat = RECORD_FIELD"fileRepeat";
@ -277,6 +285,7 @@ onceToken token([](){
mINI::Instance()[kFileSecond] = RECORD_FILE_SECOND; mINI::Instance()[kFileSecond] = RECORD_FILE_SECOND;
mINI::Instance()[kFilePath] = RECORD_FILE_PATH; mINI::Instance()[kFilePath] = RECORD_FILE_PATH;
mINI::Instance()[kFileBufSize] = 64 * 1024; mINI::Instance()[kFileBufSize] = 64 * 1024;
mINI::Instance()[kFastStart] = false;
mINI::Instance()[kFileRepeat] = false; mINI::Instance()[kFileRepeat] = false;
},nullptr); },nullptr);

View File

@ -177,6 +177,11 @@ extern const string kMaxStreamWaitTimeMS;
extern const string kEnableVhost; extern const string kEnableVhost;
//超低延时模式,默认打开,打开后会降低延时但是转发性能会稍差 //超低延时模式,默认打开,打开后会降低延时但是转发性能会稍差
extern const string kUltraLowDelay; extern const string kUltraLowDelay;
//拉流代理时是否添加静音音频
extern const string kAddMuteAudio;
//拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始,
//如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写)
extern const string kResetWhenRePlay;
}//namespace General }//namespace General
@ -268,6 +273,8 @@ extern const string kFileSecond;
extern const string kFilePath; extern const string kFilePath;
//mp4文件写缓存大小 //mp4文件写缓存大小
extern const string kFileBufSize; extern const string kFileBufSize;
//mp4录制完成后是否进行二次关键帧索引写入头部
extern const string kFastStart;
//mp4文件是否重头循环读取 //mp4文件是否重头循环读取
extern const string kFileRepeat; extern const string kFileRepeat;
} //namespace Record } //namespace Record

View File

@ -41,8 +41,8 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
aac_cfg_str = FindField(track->_fmtp.data(), "config=", ";"); aac_cfg_str = FindField(track->_fmtp.data(), "config=", ";");
} }
if (aac_cfg_str.empty()) { if (aac_cfg_str.empty()) {
//延后获取adts头 //如果sdp中获取不到aac config信息那么在rtp也无法获取那么忽略该Track
return std::make_shared<AACTrack>(); return nullptr;
} }
string aac_cfg; string aac_cfg;
@ -60,10 +60,8 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
} }
if (strcasecmp(track->_codec.data(), "h264") == 0) { if (strcasecmp(track->_codec.data(), "h264") == 0) {
auto map = Parser::parseArgs(track->_fmtp," ","="); //a=fmtp:96 packetization-mode=1;profile-level-id=42C01F;sprop-parameter-sets=Z0LAH9oBQBboQAAAAwBAAAAPI8YMqA==,aM48gA==
for(auto &pr : map){ auto map = Parser::parseArgs(FindField(track->_fmtp.data()," ", nullptr),";","=");
trim(pr.second," ;");
}
auto sps_pps = map["sprop-parameter-sets"]; auto sps_pps = map["sprop-parameter-sets"];
if(sps_pps.empty()){ if(sps_pps.empty()){
return std::make_shared<H264Track>(); return std::make_shared<H264Track>();
@ -77,10 +75,7 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
if (strcasecmp(track->_codec.data(), "h265") == 0) { if (strcasecmp(track->_codec.data(), "h265") == 0) {
//a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBdoAKAgC0WNrkky/AIAAADAAgAAAMBlQg=; sprop-pps=RAHA8vA8kAA= //a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBdoAKAgC0WNrkky/AIAAADAAgAAAMBlQg=; sprop-pps=RAHA8vA8kAA=
auto map = Parser::parseArgs(track->_fmtp," ","="); auto map = Parser::parseArgs(FindField(track->_fmtp.data()," ", nullptr),";","=");
for(auto &pr : map){
trim(pr.second," ;");
}
auto vps = decodeBase64(map["sprop-vps"]); auto vps = decodeBase64(map["sprop-vps"]);
auto sps = decodeBase64(map["sprop-sps"]); auto sps = decodeBase64(map["sprop-sps"]);
auto pps = decodeBase64(map["sprop-pps"]); auto pps = decodeBase64(map["sprop-pps"]);

View File

@ -390,7 +390,7 @@ public:
_printer << "m=video 0 RTP/AVP " << playload_type << "\r\n"; _printer << "m=video 0 RTP/AVP " << playload_type << "\r\n";
_printer << "b=AS:" << bitrate << "\r\n"; _printer << "b=AS:" << bitrate << "\r\n";
_printer << "a=rtpmap:" << playload_type << " H264/" << 90000 << "\r\n"; _printer << "a=rtpmap:" << playload_type << " H264/" << 90000 << "\r\n";
_printer << "a=fmtp:" << playload_type << " packetization-mode=1;profile-level-id="; _printer << "a=fmtp:" << playload_type << " packetization-mode=1; profile-level-id=";
char strTemp[100]; char strTemp[100];
uint32_t profile_level_id = 0; uint32_t profile_level_id = 0;
@ -402,7 +402,7 @@ public:
memset(strTemp, 0, 100); memset(strTemp, 0, 100);
sprintf(strTemp, "%06X", profile_level_id); sprintf(strTemp, "%06X", profile_level_id);
_printer << strTemp; _printer << strTemp;
_printer << ";sprop-parameter-sets="; _printer << "; sprop-parameter-sets=";
memset(strTemp, 0, 100); memset(strTemp, 0, 100);
av_base64_encode(strTemp, 100, (uint8_t *) strSPS.data(), strSPS.size()); av_base64_encode(strTemp, 100, (uint8_t *) strSPS.data(), strSPS.size());
_printer << strTemp << ","; _printer << strTemp << ",";

257
src/Http/HttpBody.cpp Normal file
View File

@ -0,0 +1,257 @@
/*
* MIT License
*
* Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include "HttpBody.h"
#include "Util/util.h"
#include "Util/uv_errno.h"
#include "Util/logger.h"
#include "HttpClient.h"
#ifndef _WIN32
#include <sys/mman.h>
#endif
#ifndef _WIN32
#define ENABLE_MMAP
#endif
namespace mediakit {
HttpStringBody::HttpStringBody(const string &str){
_str = str;
}
uint64_t HttpStringBody::remainSize() {
return _str.size() - _offset;
}
Buffer::Ptr HttpStringBody::readData(uint32_t size) {
size = MIN(remainSize(),size);
if(!size){
//没有剩余字节了
return nullptr;
}
auto ret = std::make_shared<BufferString>(_str,_offset,size);
_offset += size;
return ret;
}
//////////////////////////////////////////////////////////////////
HttpFileBody::HttpFileBody(const string &filePath){
std::shared_ptr<FILE> fp(fopen(filePath.data(), "rb"), [](FILE *fp) {
if(fp){
fclose(fp);
}
});
if(!fp){
init(fp,0,0);
}else{
init(fp,0,HttpMultiFormBody::fileSize(fp.get()));
}
}
HttpFileBody::HttpFileBody(const std::shared_ptr<FILE> &fp, uint64_t offset, uint64_t max_size) {
init(fp,offset,max_size);
}
void HttpFileBody::init(const std::shared_ptr<FILE> &fp,uint64_t offset,uint64_t max_size){
_fp = fp;
_max_size = max_size;
#ifdef ENABLE_MMAP
do {
if(!_fp){
//文件不存在
break;
}
int fd = fileno(fp.get());
if (fd < 0) {
WarnL << "fileno failed:" << get_uv_errmsg(false);
break;
}
auto ptr = (char *) mmap(NULL, max_size, PROT_READ, MAP_SHARED, fd, offset);
if (ptr == MAP_FAILED) {
WarnL << "mmap failed:" << get_uv_errmsg(false);
break;
}
_map_addr.reset(ptr,[max_size,fp](char *ptr){
munmap(ptr,max_size);
});
} while (false);
#endif
if(!_map_addr && offset && fp.get()){
//未映射,那么fseek设置偏移量
fseek(fp.get(), offset, SEEK_SET);
}
}
class BufferMmap : public Buffer{
public:
typedef std::shared_ptr<BufferMmap> Ptr;
BufferMmap(const std::shared_ptr<char> &map_addr,uint64_t offset,int size){
_map_addr = map_addr;
_data = map_addr.get() + offset;
_size = size;
};
virtual ~BufferMmap(){};
//返回数据长度
char *data() const override {
return _data;
}
uint32_t size() const override{
return _size;
}
private:
std::shared_ptr<char> _map_addr;
char *_data;
uint32_t _size;
};
uint64_t HttpFileBody::remainSize() {
return _max_size - _offset;
}
Buffer::Ptr HttpFileBody::readData(uint32_t size) {
size = MIN(remainSize(),size);
if(!size){
//没有剩余字节了
return nullptr;
}
if(!_map_addr){
//fread模式
int iRead;
auto ret = _pool.obtain();
ret->setCapacity(size + 1);
do{
iRead = fread(ret->data(), 1, size, _fp.get());
}while(-1 == iRead && UV_EINTR == get_uv_error(false));
if(iRead > 0){
//读到数据了
ret->setSize(iRead);
_offset += iRead;
return std::move(ret);
}
//读取文件异常,文件真实长度小于声明长度
_offset = _max_size;
WarnL << "read file err:" << get_uv_errmsg();
return nullptr;
}
//mmap模式
auto ret = std::make_shared<BufferMmap>(_map_addr,_offset,size);
_offset += size;
return std::move(ret);
}
//////////////////////////////////////////////////////////////////
HttpMultiFormBody::HttpMultiFormBody(const HttpArgs &args,const string &filePath,const string &boundary){
std::shared_ptr<FILE> fp(fopen(filePath.data(), "rb"), [](FILE *fp) {
if(fp){
fclose(fp);
}
});
if(!fp){
throw std::invalid_argument(StrPrinter << "open file failed" << filePath << " " << get_uv_errmsg());
}
_fileBody = std::make_shared<HttpFileBody>(fp, 0, fileSize(fp.get()));
auto fileName = filePath;
auto pos = filePath.rfind('/');
if(pos != string::npos){
fileName = filePath.substr(pos + 1);
}
_bodyPrefix = multiFormBodyPrefix(args,boundary,fileName);
_bodySuffix = multiFormBodySuffix(boundary);
_totalSize = _bodyPrefix.size() + _bodySuffix.size() + _fileBody->remainSize();
}
uint64_t HttpMultiFormBody::remainSize() {
return _totalSize - _offset;
}
Buffer::Ptr HttpMultiFormBody::readData(uint32_t size){
if(_bodyPrefix.size()){
auto ret = std::make_shared<BufferString>(_bodyPrefix);
_offset += _bodyPrefix.size();
_bodyPrefix.clear();
return ret;
}
if(_fileBody->remainSize()){
auto ret = _fileBody->readData(size);
if(!ret){
//读取文件出现异常,提前中断
_offset = _totalSize;
}else{
_offset += ret->size();
}
return ret;
}
if(_bodySuffix.size()){
auto ret = std::make_shared<BufferString>(_bodySuffix);
_offset = _totalSize;
_bodySuffix.clear();
return ret;
}
return nullptr;
}
string HttpMultiFormBody::multiFormBodySuffix(const string &boundary){
string MPboundary = string("--") + boundary;
string endMPboundary = MPboundary + "--";
_StrPrinter body;
body << "\r\n" << endMPboundary;
return body;
}
uint64_t HttpMultiFormBody::fileSize(FILE *fp) {
auto current = ftell(fp);
fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */
auto end = ftell(fp); /* 得到文件大小 */
fseek(fp,current,SEEK_SET);
return end - current;
}
string HttpMultiFormBody::multiFormContentType(const string &boundary){
return StrPrinter << "multipart/form-data; boundary=" << boundary;
}
string HttpMultiFormBody::multiFormBodyPrefix(const HttpArgs &args,const string &boundary,const string &fileName){
string MPboundary = string("--") + boundary;
_StrPrinter body;
for(auto &pr : args){
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n";
body << pr.second << "\r\n";
}
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n";
body << "Content-Type: application/octet-stream\r\n\r\n" ;
return body;
}
}//namespace mediakit

145
src/Http/HttpBody.h Normal file
View File

@ -0,0 +1,145 @@
/*
* MIT License
*
* Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef ZLMEDIAKIT_FILEREADER_H
#define ZLMEDIAKIT_FILEREADER_H
#include <stdlib.h>
#include <memory>
#include "Network/Buffer.h"
#include "Util/ResourcePool.h"
#include "Util/logger.h"
using namespace std;
using namespace toolkit;
#ifndef MIN
#define MIN(a,b) ((a) < (b) ? (a) : (b) )
#endif //MIN
namespace mediakit {
/**
* http content部分基类定义
*/
class HttpBody{
public:
typedef std::shared_ptr<HttpBody> Ptr;
HttpBody(){}
virtual ~HttpBody(){}
/**
*
*/
virtual uint64_t remainSize() { return 0;};
/**
* size
* @param size
* @return
*/
virtual Buffer::Ptr readData(uint32_t size) { return nullptr;};
};
/**
* string类型的content
*/
class HttpStringBody : public HttpBody{
public:
typedef std::shared_ptr<HttpStringBody> Ptr;
HttpStringBody(const string &str);
virtual ~HttpStringBody(){}
uint64_t remainSize() override ;
Buffer::Ptr readData(uint32_t size) override ;
private:
mutable string _str;
uint64_t _offset = 0;
};
/**
* content
*/
class HttpFileBody : public HttpBody{
public:
typedef std::shared_ptr<HttpFileBody> Ptr;
/**
*
* @param fp 0
* @param offset
* @param max_size
*/
HttpFileBody(const std::shared_ptr<FILE> &fp,uint64_t offset,uint64_t max_size);
HttpFileBody(const string &file_path);
~HttpFileBody(){};
uint64_t remainSize() override ;
Buffer::Ptr readData(uint32_t size) override;
private:
void init(const std::shared_ptr<FILE> &fp,uint64_t offset,uint64_t max_size);
private:
std::shared_ptr<FILE> _fp;
uint64_t _max_size;
uint64_t _offset = 0;
std::shared_ptr<char> _map_addr;
ResourcePool<BufferRaw> _pool;
};
class HttpArgs;
/**
* http MultiForm http content
*/
class HttpMultiFormBody : public HttpBody {
public:
typedef std::shared_ptr<HttpMultiFormBody> Ptr;
/**
*
* @param args http提交参数列表
* @param filePath
* @param boundary boundary字符串
*/
HttpMultiFormBody(const HttpArgs &args,const string &filePath,const string &boundary = "0xKhTmLbOuNdArY");
virtual ~HttpMultiFormBody(){}
uint64_t remainSize() override ;
Buffer::Ptr readData(uint32_t size) override;
public:
static string multiFormBodyPrefix(const HttpArgs &args,const string &boundary,const string &fileName);
static string multiFormBodySuffix(const string &boundary);
static uint64_t fileSize(FILE *fp);
static string multiFormContentType(const string &boundary);
private:
string _bodyPrefix;
string _bodySuffix;
uint64_t _offset = 0;
uint64_t _totalSize;
HttpFileBody::Ptr _fileBody;
};
}//namespace mediakit
#endif //ZLMEDIAKIT_FILEREADER_H

View File

@ -242,8 +242,9 @@ void HttpClient::onRecvContent(const char *data, uint64_t len) {
void HttpClient::onFlush() { void HttpClient::onFlush() {
_aliveTicker.resetTime(); _aliveTicker.resetTime();
GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize);
while (_body && _body->remainSize() && !isSocketBusy()) { while (_body && _body->remainSize() && !isSocketBusy()) {
auto buffer = _body->readData(); auto buffer = _body->readData(sendBufSize);
if (!buffer) { if (!buffer) {
//数据发送结束或读取数据异常 //数据发送结束或读取数据异常
break; break;

View File

@ -39,7 +39,7 @@
#include "HttpCookie.h" #include "HttpCookie.h"
#include "HttpChunkedSplitter.h" #include "HttpChunkedSplitter.h"
#include "strCoding.h" #include "strCoding.h"
#include "HttpBody.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
@ -64,145 +64,6 @@ public:
} }
}; };
class HttpBody{
public:
typedef std::shared_ptr<HttpBody> Ptr;
HttpBody(){}
virtual ~HttpBody(){}
//剩余数据大小
virtual uint64_t remainSize() = 0;
virtual Buffer::Ptr readData() = 0;
};
class HttpStringBody : public HttpBody{
public:
typedef std::shared_ptr<HttpStringBody> Ptr;
HttpStringBody(const string &str){
_str = str;
}
virtual ~HttpStringBody(){}
uint64_t remainSize() override {
return _str.size();
}
Buffer::Ptr readData() override {
auto ret = std::make_shared<BufferString>(_str);
_str.clear();
return ret;
}
private:
mutable string _str;
};
class HttpMultiFormBody : public HttpBody {
public:
typedef std::shared_ptr<HttpMultiFormBody> Ptr;
template<typename MapType>
HttpMultiFormBody(const MapType &args,const string &filePath,const string &boundary,uint32_t sliceSize = 4 * 1024){
_fp = fopen(filePath.data(),"rb");
if(!_fp){
throw std::invalid_argument(StrPrinter << "打开文件失败:" << filePath << " " << get_uv_errmsg());
}
auto fileName = filePath;
auto pos = filePath.rfind('/');
if(pos != string::npos){
fileName = filePath.substr(pos + 1);
}
_bodyPrefix = multiFormBodyPrefix(args,boundary,fileName);
_bodySuffix = multiFormBodySuffix(boundary);
_totalSize = _bodyPrefix.size() + _bodySuffix.size() + fileSize(_fp);
_sliceSize = sliceSize;
}
virtual ~HttpMultiFormBody(){
fclose(_fp);
}
uint64_t remainSize() override {
return _totalSize - _offset;
}
Buffer::Ptr readData() override{
if(_bodyPrefix.size()){
auto ret = std::make_shared<BufferString>(_bodyPrefix);
_offset += _bodyPrefix.size();
_bodyPrefix.clear();
return ret;
}
if(0 == feof(_fp)){
auto ret = std::make_shared<BufferRaw>(_sliceSize);
//读文件
int size;
do{
size = fread(ret->data(),1,_sliceSize,_fp);
}while(-1 == size && UV_EINTR == get_uv_error(false));
if(size == -1){
_offset = _totalSize;
WarnL << "fread failed:" << get_uv_errmsg();
return nullptr;
}
_offset += size;
ret->setSize(size);
return ret;
}
if(_bodySuffix.size()){
auto ret = std::make_shared<BufferString>(_bodySuffix);
_offset = _totalSize;
_bodySuffix.clear();
return ret;
}
return nullptr;
}
public:
template<typename MapType>
static string multiFormBodyPrefix(const MapType &args,const string &boundary,const string &fileName){
string MPboundary = string("--") + boundary;
_StrPrinter body;
for(auto &pr : args){
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n";
body << pr.second << "\r\n";
}
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n";
body << "Content-Type: application/octet-stream\r\n\r\n" ;
return body;
}
static string multiFormBodySuffix(const string &boundary){
string MPboundary = string("--") + boundary;
string endMPboundary = MPboundary + "--";
_StrPrinter body;
body << "\r\n" << endMPboundary;
return body;
}
static uint64_t fileSize(FILE *fp) {
auto current = ftell(fp);
fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */
auto end = ftell(fp); /* 得到文件大小 */
fseek(fp,current,SEEK_SET);
return end - current;
}
static string multiFormContentType(const string &boundary){
return StrPrinter << "multipart/form-data; boundary=" << boundary;
}
private:
FILE *_fp;
string _bodyPrefix;
string _bodySuffix;
uint64_t _offset = 0;
uint64_t _totalSize;
uint32_t _sliceSize;
};
class HttpClient : public TcpClient , public HttpRequestSplitter class HttpClient : public TcpClient , public HttpRequestSplitter
{ {
public: public:

View File

@ -59,49 +59,138 @@ string dateStr() {
strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt)); strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
return buf; return buf;
} }
static const char*
get_mime_type(const char* name) { const char *HttpSession::get_mime_type(const char *name) {
const char* dot; const char *dot;
dot = strrchr(name, '.'); dot = strrchr(name, '.');
static HttpSession::KeyValue mapType; static HttpSession::KeyValue mapType;
static onceToken token([&]() { static onceToken token([&]() {
mapType.emplace(".html","text/html"); mapType.emplace(".html", "text/html");
mapType.emplace(".htm","text/html"); mapType.emplace(".htm", "text/html");
mapType.emplace(".mp4","video/mp4"); mapType.emplace(".mp4", "video/mp4");
mapType.emplace(".m3u8","application/vnd.apple.mpegurl"); mapType.emplace(".mkv", "video/x-matroska");
mapType.emplace(".jpg","image/jpeg"); mapType.emplace(".rmvb", "application/vnd.rn-realmedia");
mapType.emplace(".jpeg","image/jpeg"); mapType.emplace(".rm", "application/vnd.rn-realmedia");
mapType.emplace(".gif","image/gif"); mapType.emplace(".m3u8", "application/vnd.apple.mpegurl");
mapType.emplace(".png","image/png"); mapType.emplace(".jpg", "image/jpeg");
mapType.emplace(".ico","image/x-icon"); mapType.emplace(".jpeg", "image/jpeg");
mapType.emplace(".css","text/css"); mapType.emplace(".gif", "image/gif");
mapType.emplace(".js","application/javascript"); mapType.emplace(".png", "image/png");
mapType.emplace(".au","audio/basic"); mapType.emplace(".ico", "image/x-icon");
mapType.emplace(".wav","audio/wav"); mapType.emplace(".css", "text/css");
mapType.emplace(".avi","video/x-msvideo"); mapType.emplace(".js", "application/javascript");
mapType.emplace(".mov","video/quicktime"); mapType.emplace(".au", "audio/basic");
mapType.emplace(".qt","video/quicktime"); mapType.emplace(".wav", "audio/wav");
mapType.emplace(".mpeg","video/mpeg"); mapType.emplace(".avi", "video/x-msvideo");
mapType.emplace(".mpe","video/mpeg"); mapType.emplace(".mov", "video/quicktime");
mapType.emplace(".vrml","model/vrml"); mapType.emplace(".qt", "video/quicktime");
mapType.emplace(".wrl","model/vrml"); mapType.emplace(".mpeg", "video/mpeg");
mapType.emplace(".midi","audio/midi"); mapType.emplace(".mpe", "video/mpeg");
mapType.emplace(".mid","audio/midi"); mapType.emplace(".vrml", "model/vrml");
mapType.emplace(".mp3","audio/mpeg"); mapType.emplace(".wrl", "model/vrml");
mapType.emplace(".ogg","application/ogg"); mapType.emplace(".midi", "audio/midi");
mapType.emplace(".pac","application/x-ns-proxy-autoconfig"); mapType.emplace(".mid", "audio/midi");
mapType.emplace(".flv","video/x-flv"); mapType.emplace(".mp3", "audio/mpeg");
}, nullptr); mapType.emplace(".ogg", "application/ogg");
if(!dot){ mapType.emplace(".pac", "application/x-ns-proxy-autoconfig");
return "text/plain"; mapType.emplace(".flv", "video/x-flv");
} }, nullptr);
auto it = mapType.find(dot); if (!dot) {
if (it == mapType.end()) { return "text/plain";
return "text/plain"; }
} auto it = mapType.find(dot);
return it->second.data(); if (it == mapType.end()) {
return "text/plain";
}
return it->second.data();
} }
////////////////////////////////////////////////////////////////////////////////////////////////////
void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const{
if(_lambad){
_lambad(codeOut,headerOut,body);
}
}
void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const{
this->operator()(codeOut,headerOut,std::make_shared<HttpStringBody>(body));
}
HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda0 &lambda){
_lambad = lambda;
}
HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda1 &lambda){
if(!lambda){
_lambad = nullptr;
return;
}
_lambad = [lambda](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body){
string str;
if(body && body->remainSize()){
str = body->readData(body->remainSize())->toString();
}
lambda(codeOut,headerOut,str);
};
}
void HttpResponseInvokerImp::responseFile(const StrCaseMap &requestHeader,
const StrCaseMap &responseHeader,
const string &filePath) const {
StrCaseMap &httpHeader = const_cast<StrCaseMap&>(responseHeader);
do {
std::shared_ptr<FILE> fp(fopen(filePath.data(), "rb"), [](FILE *fp) {
if (fp) {
fclose(fp);
}
});
if (!fp) {
//打开文件失败
break;
}
auto &strRange = const_cast<StrCaseMap &>(requestHeader)["Range"];
int64_t iRangeStart = 0;
int64_t iRangeEnd = 0 ;
int64_t fileSize = HttpMultiFormBody::fileSize(fp.get());
const char *pcHttpResult = NULL;
if (strRange.size() == 0) {
//全部下载
pcHttpResult = "200 OK";
iRangeEnd = fileSize - 1;
} else {
//分节下载
pcHttpResult = "206 Partial Content";
iRangeStart = atoll(FindField(strRange.data(), "bytes=", "-").data());
iRangeEnd = atoll(FindField(strRange.data(), "-", "\r\n").data());
if (iRangeEnd == 0) {
iRangeEnd = fileSize - 1;
}
//分节下载返回Content-Range头
httpHeader.emplace("Content-Range", StrPrinter << "bytes " << iRangeStart << "-" << iRangeEnd << "/" << fileSize << endl);
}
//回复文件
HttpBody::Ptr fileBody = std::make_shared<HttpFileBody>(fp, iRangeStart, iRangeEnd - iRangeStart + 1);
(*this)(pcHttpResult, httpHeader, fileBody);
return;
}while(false);
GET_CONFIG(string,notFound,Http::kNotFound);
GET_CONFIG(string,charSet,Http::kCharSet);
auto strContentType = StrPrinter << "text/html; charset=" << charSet << endl;
httpHeader["Content-Type"] = strContentType;
(*this)("404 Not Found", httpHeader, notFound);
}
HttpResponseInvokerImp::operator bool(){
return _lambad.operator bool();
}
////////////////////////////////////////////////////////////////////////////////////////////////////
HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
TraceP(this); TraceP(this);
@ -128,7 +217,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) {
string cmd = _parser.Method(); string cmd = _parser.Method();
auto it = g_mapCmdIndex.find(cmd); auto it = g_mapCmdIndex.find(cmd);
if (it == g_mapCmdIndex.end()) { if (it == g_mapCmdIndex.end()) {
sendResponse("403 Forbidden", makeHttpHeader(true), ""); sendResponse("403 Forbidden", true);
shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << cmd)); shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << cmd));
return 0; return 0;
} }
@ -169,21 +258,32 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
} }
void HttpSession::onError(const SockException& err) { void HttpSession::onError(const SockException& err) {
if(_is_flv_stream){
//flv播放器
WarnP(this) << "播放器("
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
<< ")断开:" << err.what();
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
_mediaInfo,
_ui64TotalBytes,
_ticker.createdTime()/1000,
true,
*this);
}
return;
}
//http客户端
if(_ticker.createdTime() < 10 * 1000){ if(_ticker.createdTime() < 10 * 1000){
TraceP(this) << err.what(); TraceP(this) << err.what();
}else{ }else{
WarnP(this) << err.what(); WarnP(this) << err.what();
} }
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
_mediaInfo,
_ui64TotalBytes,
_ticker.createdTime()/1000,
true,
*this);
}
} }
void HttpSession::onManager() { void HttpSession::onManager() {
@ -202,7 +302,7 @@ bool HttpSession::checkWebSocket(){
} }
auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
KeyValue headerOut = makeHttpHeader(); KeyValue headerOut;
headerOut["Upgrade"] = "websocket"; headerOut["Upgrade"] = "websocket";
headerOut["Connection"] = "Upgrade"; headerOut["Connection"] = "Upgrade";
headerOut["Sec-WebSocket-Accept"] = Sec_WebSocket_Accept; headerOut["Sec-WebSocket-Accept"] = Sec_WebSocket_Accept;
@ -212,7 +312,7 @@ bool HttpSession::checkWebSocket(){
auto res_cb = [this,headerOut](){ auto res_cb = [this,headerOut](){
_flv_over_websocket = true; _flv_over_websocket = true;
sendResponse("101 Switching Protocols",headerOut,""); sendResponse("101 Switching Protocols",false,nullptr,headerOut,nullptr,false);
}; };
//判断是否为websocket-flv //判断是否为websocket-flv
@ -223,11 +323,11 @@ bool HttpSession::checkWebSocket(){
//如果checkLiveFlvStream返回false,则代表不是websocket-flv而是普通的websocket连接 //如果checkLiveFlvStream返回false,则代表不是websocket-flv而是普通的websocket连接
if(!onWebSocketConnect(_parser)){ if(!onWebSocketConnect(_parser)){
sendResponse("501 Not Implemented",headerOut,""); sendResponse("501 Not Implemented",true, nullptr, headerOut);
shutdown(SockException(Err_shutdown,"WebSocket server not implemented")); shutdown(SockException(Err_shutdown,"WebSocket server not implemented"));
return true; return true;
} }
sendResponse("101 Switching Protocols",headerOut,""); sendResponse("101 Switching Protocols",false, nullptr,headerOut);
return true; return true;
} }
//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 //http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2
@ -274,14 +374,14 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
auto onRes = [this,rtmp_src,cb](const string &err){ auto onRes = [this,rtmp_src,cb](const string &err){
bool authSuccess = err.empty(); bool authSuccess = err.empty();
if(!authSuccess){ if(!authSuccess){
sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err); sendResponse("401 Unauthorized", true, nullptr, KeyValue(), std::make_shared<HttpStringBody>(err));
shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err)); shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
return ; return ;
} }
if(!cb) { if(!cb) {
//找到rtmp源发送http头负载后续发送 //找到rtmp源发送http头负载后续发送
sendResponse("200 OK", makeHttpHeader(false, 0, get_mime_type(".flv")), ""); sendResponse("200 OK", false, "video/x-flv",KeyValue(),nullptr,false);
}else{ }else{
cb(); cb();
} }
@ -291,6 +391,7 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
try{ try{
start(getPoller(),rtmp_src); start(getPoller(),rtmp_src);
_is_flv_stream = true;
}catch (std::exception &ex){ }catch (std::exception &ex){
//该rtmp源不存在 //该rtmp源不存在
shutdown(SockException(Err_shutdown,"rtmp mediasource released")); shutdown(SockException(Err_shutdown,"rtmp mediasource released"));
@ -375,10 +476,7 @@ static bool checkHls(BroadcastHttpAccessArgs){
return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,args_copy,mediaAuthInvoker,sender); return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,args_copy,mediaAuthInvoker,sender);
} }
void HttpSession::canAccessPath(const string &path_in,bool is_dir,const function<void(const string &errMsg,const HttpServerCookie::Ptr &cookie)> &callback_in){ void HttpSession::canAccessPath(const string &path,bool is_dir,const function<void(const string &errMsg,const HttpServerCookie::Ptr &cookie)> &callback_in){
auto path = path_in;
replace(const_cast<string &>(path),"//","/");
auto callback = [callback_in,this](const string &errMsg,const HttpServerCookie::Ptr &cookie){ auto callback = [callback_in,this](const string &errMsg,const HttpServerCookie::Ptr &cookie){
try { try {
callback_in(errMsg,cookie); callback_in(errMsg,cookie);
@ -488,16 +586,15 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) {
return; return;
} }
//先看看该http事件是否被拦截
if(emitHttpEvent(false)){ if(emitHttpEvent(false)){
//拦截http api事件
return; return;
} }
//再看看是否为http-flv直播请求 if(checkLiveFlvStream()){
if(checkLiveFlvStream()){ //拦截http-flv播放器
//若是return return;
return; }
}
//事件未被拦截则认为是http下载请求 //事件未被拦截则认为是http下载请求
auto fullUrl = string(HTTP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl(); auto fullUrl = string(HTTP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl();
@ -507,7 +604,7 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) {
GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount); GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount);
GET_CONFIG(bool,enableVhost,General::kEnableVhost); GET_CONFIG(bool,enableVhost,General::kEnableVhost);
GET_CONFIG(string,rootPath,Http::kRootPath); GET_CONFIG(string,rootPath,Http::kRootPath);
string strFile = enableVhost ? rootPath + "/" + _mediaInfo._vhost + _parser.Url() :rootPath + _parser.Url(); auto strFile = File::absolutePath(enableVhost ? _mediaInfo._vhost + _parser.Url() : _parser.Url(),rootPath);
bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt); bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt);
do{ do{
@ -533,139 +630,38 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) {
if(!errMsg.empty()){ if(!errMsg.empty()){
const_cast<string &>(strMeun) = errMsg; const_cast<string &>(strMeun) = errMsg;
} }
auto headerOut = makeHttpHeader(bClose,strMeun.size()); KeyValue headerOut;
if(cookie){ if(cookie){
headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" , headerOut, strMeun); sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" ,bClose, "text/html", headerOut, std::make_shared<HttpStringBody>(strMeun));
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access folder"); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access folder");
}); });
return; return;
} }
}while(0); }while(0);
//访问的是文件
struct stat tFileStat;
if (0 != stat(strFile.data(), &tFileStat)) {
//文件不存在
sendNotFound(bClose);
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after send 404 not found on file");
}
//文件智能指针,防止退出时未关闭
std::shared_ptr<FILE> pFilePtr(fopen(strFile.data(), "rb"), [](FILE *pFile) {
if(pFile){
fclose(pFile);
}
});
if (!pFilePtr) {
//打开文件失败
sendNotFound(bClose);
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after send 404 not found on open file failed");
}
auto parser = _parser; auto parser = _parser;
//判断是否有权限访问该文件 //判断是否有权限访问该文件
canAccessPath(_parser.Url(),false,[this,parser,tFileStat,pFilePtr,bClose,strFile](const string &errMsg,const HttpServerCookie::Ptr &cookie){ canAccessPath(_parser.Url(),false,[this,parser,bClose,strFile](const string &errMsg,const HttpServerCookie::Ptr &cookie){
if(!errMsg.empty()){ if(!errMsg.empty()){
auto headerOut = makeHttpHeader(bClose,errMsg.size()); KeyValue headerOut;
if(cookie){ if(cookie){
headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
sendResponse("401 Unauthorized" , headerOut, errMsg); sendResponse("401 Unauthorized" ,bClose, nullptr, headerOut, std::make_shared<HttpStringBody>(errMsg));
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file failed"); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file failed");
} }
//判断是不是分节下载 KeyValue httpHeader;
auto &strRange = parser["Range"];
int64_t iRangeStart = 0, iRangeEnd = 0;
iRangeStart = atoll(FindField(strRange.data(), "bytes=", "-").data());
iRangeEnd = atoll(FindField(strRange.data(), "-", "\r\n").data());
if (iRangeEnd == 0) {
iRangeEnd = tFileStat.st_size - 1;
}
const char *pcHttpResult = NULL;
if (strRange.size() == 0) {
//全部下载
pcHttpResult = "200 OK";
} else {
//分节下载
pcHttpResult = "206 Partial Content";
fseek(pFilePtr.get(), iRangeStart, SEEK_SET);
}
auto httpHeader = makeHttpHeader(bClose, iRangeEnd - iRangeStart + 1, get_mime_type(strFile.data()));
if (strRange.size() != 0) {
//分节下载返回Content-Range头
httpHeader.emplace("Content-Range",StrPrinter<<"bytes " << iRangeStart << "-" << iRangeEnd << "/" << tFileStat.st_size<< endl);
}
if(cookie){ if(cookie){
httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
//先回复HTTP头部分
sendResponse(pcHttpResult,httpHeader,"");
if (iRangeEnd - iRangeStart < 0) {
//文件是空的!
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file");
}
//回复Content部分
std::shared_ptr<int64_t> piLeft(new int64_t(iRangeEnd - iRangeStart + 1));
GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); HttpResponseInvoker invoker = [this,bClose,&strFile](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){
sendResponse(codeOut.data(), bClose, get_mime_type(strFile.data()), headerOut, body);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
auto onFlush = [pFilePtr,bClose,weakSelf,piLeft]() {
TimeTicker();
auto strongSelf = weakSelf.lock();
while(*piLeft && strongSelf){
//更新超时定时器
strongSelf->_ticker.resetTime();
//从循环池获取一个内存片
auto sendBuf = strongSelf->obtainBuffer();
sendBuf->setCapacity(sendBufSize);
//本次需要读取文件字节数
int64_t iReq = MIN(sendBufSize,*piLeft);
//读文件
int iRead;
do{
iRead = fread(sendBuf->data(), 1, iReq, pFilePtr.get());
}while(-1 == iRead && UV_EINTR == get_uv_error(false));
//文件剩余字节数
*piLeft -= iRead;
if (iRead < iReq || !*piLeft) {
//文件读完
if(iRead>0) {
sendBuf->setSize(iRead);
strongSelf->send(sendBuf);
}
if(bClose) {
strongSelf->shutdown(SockException(Err_shutdown,"read file eof"));
}
return false;
}
//文件还未读完
sendBuf->setSize(iRead);
int iSent = strongSelf->send(sendBuf);
if(iSent == -1) {
//套机制销毁
return false;
}
if(strongSelf->isSocketBusy()){
//套接字忙,那么停止继续写
return true;
}
//继续写套接字
}
return false;
}; };
invoker.responseFile(parser.getValues(),httpHeader,strFile);
//文件下载提升发送性能
setSocketFlags();
onFlush();
_sock->setOnFlush(onFlush);
}); });
} }
@ -768,43 +764,137 @@ bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet)
return true; return true;
} }
void HttpSession::sendResponse(const char* pcStatus, const KeyValue& header, const string& strContent) {
_StrPrinter printer;
printer << "HTTP/1.1 " << pcStatus << "\r\n";
for (auto &pr : header) {
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n" << strContent;
auto strSend = printer << endl;
send(strSend);
_ticker.resetTime();
}
HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentSize,const char* pcContentType) { void HttpSession::sendResponse(const char *pcStatus,
KeyValue headerOut; bool bClose,
const char *pcContentType,
const HttpSession::KeyValue &header,
const HttpBody::Ptr &body,
bool set_content_len ){
GET_CONFIG(string,charSet,Http::kCharSet); GET_CONFIG(string,charSet,Http::kCharSet);
GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond); GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond);
GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount); GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount);
headerOut.emplace("Date", dateStr()); //body默认为空
headerOut.emplace("Server", SERVER_NAME); int64_t size = 0;
headerOut.emplace("Connection", bClose ? "close" : "keep-alive"); if (body && body->remainSize()) {
if(!bClose){ //有body获取body大小
headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl); size = body->remainSize();
} if (size >= INT64_MAX) {
if(pcContentType){ //不固定长度的body那么不设置content-length字段
auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl; size = -1;
headerOut.emplace("Content-Type",strContentType); }
} }
if(iContentSize > 0){
headerOut.emplace("Content-Length", StrPrinter<<iContentSize<<endl); if(!set_content_len || size == -1){
} //如果是不定长度body或者不设置conten-length,
//那么一定是Keep-Alive类型
bClose = false;
}
HttpSession::KeyValue &headerOut = const_cast<HttpSession::KeyValue &>(header);
headerOut.emplace("Date", dateStr());
headerOut.emplace("Server", SERVER_NAME);
headerOut.emplace("Connection", bClose ? "close" : "keep-alive");
if(!bClose){
headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl);
}
if(!_origin.empty()){ if(!_origin.empty()){
//设置跨域
headerOut.emplace("Access-Control-Allow-Origin",_origin); headerOut.emplace("Access-Control-Allow-Origin",_origin);
headerOut.emplace("Access-Control-Allow-Credentials", "true"); headerOut.emplace("Access-Control-Allow-Credentials", "true");
} }
return headerOut;
if(set_content_len && size >= 0){
//文件长度为定值或者,且不是http-flv强制设置Content-Length
headerOut["Content-Length"] = StrPrinter << size << endl;
}
if(size && !pcContentType){
//有body时设置缺省类型
pcContentType = "text/plain";
}
if(size && pcContentType){
//有body时设置文件类型
auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl;
headerOut.emplace("Content-Type",strContentType);
}
//发送http头
_StrPrinter printer;
printer << "HTTP/1.1 " << pcStatus << "\r\n";
for (auto &pr : header) {
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n";
send(printer << endl);
_ticker.resetTime();
if(!size){
//没有body
if(bClose){
shutdown(SockException(Err_shutdown,"close connection after send http header completed"));
}
return;
}
//发送http body
GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
auto onFlush = [body,bClose,weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return false;
}
while(true){
//更新超时计时器
strongSelf->_ticker.resetTime();
//读取文件
auto sendBuf = body->readData(sendBufSize);
if (!sendBuf) {
//文件读完
if(strongSelf->isSocketBusy() && bClose){
//套接字忙,我们等待触发下一次onFlush事件
//待所有数据flush到socket fd再移除onFlush事件监听
//标记文件读写完毕
return true;
}
//文件全部flush到socket fd可以直接关闭socket了
break;
}
//文件还未读完
if(strongSelf->send(sendBuf) == -1) {
//socket已经销毁不再监听onFlush事件
return false;
}
if(strongSelf->isSocketBusy()){
//socket忙那么停止继续写,等待下一次onFlush事件然后再读文件写socket
return true;
}
//socket还可写继续写socket
}
if(bClose) {
//最后一次flush事件文件也发送完毕了可以关闭socket了
strongSelf->shutdown(SockException(Err_shutdown,"close connection after send http body completed"));
}
//不再监听onFlush事件
return false;
};
if(body->remainSize() > sendBufSize){
//文件下载提升发送性能
setSocketFlags();
}
onFlush();
_sock->setOnFlush(onFlush);
} }
string HttpSession::urlDecode(const string &str){ string HttpSession::urlDecode(const string &str){
@ -833,20 +923,25 @@ bool HttpSession::emitHttpEvent(bool doInvoke){
bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt); bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt);
/////////////////////异步回复Invoker/////////////////////////////// /////////////////////异步回复Invoker///////////////////////////////
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const string &contentOut){ HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->async([weakSelf,bClose,codeOut,headerOut,contentOut]() { strongSelf->async([weakSelf,bClose,codeOut,headerOut,body]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
//本对象已经销毁
return; return;
} }
strongSelf->responseDelay(bClose,codeOut,headerOut,contentOut);
if(bClose){ if(codeOut.empty()){
strongSelf->shutdown(SockException(Err_shutdown,"Connection: close")); //回调提供的参数异常
} strongSelf->sendNotFound(bClose);
return;
}
strongSelf->sendResponse(codeOut.data(), bClose, nullptr, headerOut, body);
}); });
}; };
///////////////////广播HTTP事件/////////////////////////// ///////////////////广播HTTP事件///////////////////////////
@ -854,7 +949,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this);
if(!consumed && doInvoke){ if(!consumed && doInvoke){
//该事件无人消费所以返回404 //该事件无人消费所以返回404
invoker("404 Not Found",KeyValue(),""); invoker("404 Not Found",KeyValue(), HttpBody::Ptr());
if(bClose){ if(bClose){
//close类型回复完毕关闭连接 //close类型回复完毕关闭连接
shutdown(SockException(Err_shutdown,"404 Not Found")); shutdown(SockException(Err_shutdown,"404 Not Found"));
@ -935,25 +1030,10 @@ void HttpSession::Handle_Req_POST(int64_t &content_len) {
} }
//有后续content数据要处理,暂时不关闭连接 //有后续content数据要处理,暂时不关闭连接
} }
void HttpSession::responseDelay(bool bClose,
const string &codeOut,
const KeyValue &headerOut,
const string &contentOut){
if(codeOut.empty()){
sendNotFound(bClose);
return;
}
auto headerOther = makeHttpHeader(bClose,contentOut.size(),"text/plain");
for (auto &pr : headerOther){
//添加默认http头默认http头不能覆盖用户自定义的头
const_cast<KeyValue &>(headerOut).emplace(pr.first,pr.second);
}
sendResponse(codeOut.data(), headerOut, contentOut);
}
void HttpSession::sendNotFound(bool bClose) { void HttpSession::sendNotFound(bool bClose) {
GET_CONFIG(string,notFound,Http::kNotFound); GET_CONFIG(string,notFound,Http::kNotFound);
sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); sendResponse("404 Not Found", bClose,"text/html",KeyValue(),std::make_shared<HttpStringBody>(notFound));
} }
void HttpSession::setSocketFlags(){ void HttpSession::setSocketFlags(){

View File

@ -36,21 +36,44 @@
#include "HttpRequestSplitter.h" #include "HttpRequestSplitter.h"
#include "WebSocketSplitter.h" #include "WebSocketSplitter.h"
#include "HttpCookieManager.h" #include "HttpCookieManager.h"
#include "HttpBody.h"
#include "Util/function_traits.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
/**
*
*/
class HttpResponseInvokerImp{
public:
typedef std::function<void(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body)> HttpResponseInvokerLambda0;
typedef std::function<void(const string &codeOut, const StrCaseMap &headerOut, const string &body)> HttpResponseInvokerLambda1;
HttpResponseInvokerImp(){}
~HttpResponseInvokerImp(){}
template<typename C>
HttpResponseInvokerImp(const C &c):HttpResponseInvokerImp(typename function_traits<C>::stl_function_type(c)) {}
HttpResponseInvokerImp(const HttpResponseInvokerLambda0 &lambda);
HttpResponseInvokerImp(const HttpResponseInvokerLambda1 &lambda);
void operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const;
void operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const;
void responseFile(const StrCaseMap &requestHeader,const StrCaseMap &responseHeader,const string &filePath) const;
operator bool();
private:
HttpResponseInvokerLambda0 _lambad;
};
class HttpSession: public TcpSession, class HttpSession: public TcpSession,
public FlvMuxer, public FlvMuxer,
public HttpRequestSplitter, public HttpRequestSplitter,
public WebSocketSplitter { public WebSocketSplitter {
public: public:
typedef StrCaseMap KeyValue; typedef StrCaseMap KeyValue;
typedef std::function<void(const string &codeOut, typedef HttpResponseInvokerImp HttpResponseInvoker;
const KeyValue &headerOut,
const string &contentOut)> HttpResponseInvoker;
/** /**
* @param errMsg * @param errMsg
@ -67,6 +90,7 @@ public:
virtual void onManager() override; virtual void onManager() override;
static string urlDecode(const string &str); static string urlDecode(const string &str);
static const char* get_mime_type(const char* name);
protected: protected:
//FlvMuxer override //FlvMuxer override
void onWrite(const Buffer::Ptr &data) override ; void onWrite(const Buffer::Ptr &data) override ;
@ -118,13 +142,9 @@ private:
bool emitHttpEvent(bool doInvoke); bool emitHttpEvent(bool doInvoke);
void urlDecode(Parser &parser); void urlDecode(Parser &parser);
void sendNotFound(bool bClose); void sendNotFound(bool bClose);
void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent); void sendResponse(const char *pcStatus, bool bClose, const char *pcContentType = nullptr,
KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html"); const HttpSession::KeyValue &header = HttpSession::KeyValue(),
void responseDelay(bool bClose, const HttpBody::Ptr &body = nullptr,bool set_content_len = true);
const string &codeOut,
const KeyValue &headerOut,
const string &contentOut);
/** /**
* http客户端是否有权限访问文件的逻辑步骤 * http客户端是否有权限访问文件的逻辑步骤
* *
@ -160,6 +180,7 @@ private:
//处理content数据的callback //处理content数据的callback
function<bool (const char *data,uint64_t len) > _contentCallBack; function<bool (const char *data,uint64_t len) > _contentCallBack;
bool _flv_over_websocket = false; bool _flv_over_websocket = false;
bool _is_flv_stream = false;
}; };

View File

@ -42,6 +42,7 @@ HlsMaker::~HlsMaker() {
void HlsMaker::makeIndexFile(bool eof) { void HlsMaker::makeIndexFile(bool eof) {
char file_content[1024]; char file_content[1024];
int maxSegmentDuration = 0; int maxSegmentDuration = 0;
for (auto &tp : _seg_dur_list) { for (auto &tp : _seg_dur_list) {
int dur = std::get<0>(tp); int dur = std::get<0>(tp);
if (dur > maxSegmentDuration) { if (dur > maxSegmentDuration) {
@ -57,7 +58,7 @@ void HlsMaker::makeIndexFile(bool eof) {
"#EXT-X-TARGETDURATION:%u\n" "#EXT-X-TARGETDURATION:%u\n"
"#EXT-X-MEDIA-SEQUENCE:%llu\n", "#EXT-X-MEDIA-SEQUENCE:%llu\n",
(maxSegmentDuration + 999) / 1000, (maxSegmentDuration + 999) / 1000,
_file_index); _seg_number ? _file_index : 0);
m3u8.assign(file_content); m3u8.assign(file_content);
@ -75,11 +76,18 @@ void HlsMaker::makeIndexFile(bool eof) {
void HlsMaker::inputData(void *data, uint32_t len, uint32_t timestamp) { void HlsMaker::inputData(void *data, uint32_t len, uint32_t timestamp) {
addNewFile(timestamp); //分片数据中断结束
onWriteFile((char *) data, len); if (data && len) {
addNewSegment(timestamp);
onWriteSegment((char *) data, len);
//记录上次写入数据时间
_ticker_last_data.resetTime();
} else {
flushLastSegment(true);
}
} }
void HlsMaker::delOldFile() { void HlsMaker::delOldSegment() {
if(_seg_number == 0){ if(_seg_number == 0){
//如果设置为保留0个切片则认为是保存为点播 //如果设置为保留0个切片则认为是保存为点播
return; return;
@ -91,22 +99,38 @@ void HlsMaker::delOldFile() {
//但是实际保存的切片个数比m3u8所述多两个,这样做的目的是防止播放器在切片删除前能下载完毕 //但是实际保存的切片个数比m3u8所述多两个,这样做的目的是防止播放器在切片删除前能下载完毕
if (_file_index >= _seg_number + 4) { if (_file_index >= _seg_number + 4) {
onDelFile(_file_index - _seg_number - 4); onDelSegment(_file_index - _seg_number - 4);
} }
} }
void HlsMaker::addNewFile(uint32_t) { void HlsMaker::addNewSegment(uint32_t) {
int stampInc = _ticker.elapsedTime(); if(!_last_file_name.empty() && _ticker.elapsedTime() < _seg_duration * 1000){
if (stampInc >= _seg_duration * 1000) { //存在上个切片,并且未到分片时间
_ticker.resetTime(); return;
auto file_name = onOpenFile(_file_index);
if (_file_index++ > 0) {
_seg_dur_list.push_back(std::make_tuple(stampInc, _last_file_name));
delOldFile();
makeIndexFile();
}
_last_file_name = file_name;
} }
//关闭并保存上一个切片
flushLastSegment();
//新增切片
_last_file_name = onOpenSegment(_file_index++);
//重置切片计时器
_ticker.resetTime();
}
void HlsMaker::flushLastSegment(bool eof){
if(_last_file_name.empty()){
//不存在上个切片
return;
}
//文件创建到最后一次数据写入的时间即为切片长度
auto seg_dur = _ticker.elapsedTime() - _ticker_last_data.elapsedTime();
if(seg_dur <= 0){
seg_dur = 100;
}
_seg_dur_list.push_back(std::make_tuple(seg_dur, _last_file_name));
delOldSegment();
makeIndexFile(eof);
_last_file_name.clear();
} }
}//namespace mediakit }//namespace mediakit

View File

@ -60,20 +60,20 @@ protected:
* @param index * @param index
* @return * @return
*/ */
virtual string onOpenFile(int index) = 0; virtual string onOpenSegment(int index) = 0;
/** /**
* ts切片文件回调 * ts切片文件回调
* @param index * @param index
*/ */
virtual void onDelFile(int index) = 0; virtual void onDelSegment(int index) = 0;
/** /**
* ts切片文件回调 * ts切片文件回调
* @param data * @param data
* @param len * @param len
*/ */
virtual void onWriteFile(const char *data, int len) = 0; virtual void onWriteSegment(const char *data, int len) = 0;
/** /**
* m3u8文件回调 * m3u8文件回调
@ -82,19 +82,34 @@ protected:
*/ */
virtual void onWriteHls(const char *data, int len) = 0; virtual void onWriteHls(const char *data, int len) = 0;
/**
* ts切片并且写入m3u8索引
* @param eof
*/
void flushLastSegment(bool eof = false);
private:
/** /**
* m3u8文件 * m3u8文件
* @param eof true代表点播 * @param eof true代表点播
*/ */
void makeIndexFile(bool eof = false); void makeIndexFile(bool eof = false);
void delOldFile();
void addNewFile(uint32_t timestamp); /**
protected: * ts切片
uint32_t _seg_number = 0; */
void delOldSegment();
/**
* ts切片
* @param timestamp
*/
void addNewSegment(uint32_t timestamp);
private: private:
uint32_t _seg_number = 0;
float _seg_duration = 0; float _seg_duration = 0;
uint64_t _file_index = 0; uint64_t _file_index = 0;
Ticker _ticker; Ticker _ticker;
Ticker _ticker_last_data;
string _last_file_name; string _last_file_name;
std::deque<tuple<int,string> > _seg_dur_list; std::deque<tuple<int,string> > _seg_dur_list;
}; };

View File

@ -40,6 +40,7 @@ HlsMakerImp::HlsMakerImp(const string &m3u8_file,
_path_hls = m3u8_file; _path_hls = m3u8_file;
_params = params; _params = params;
_buf_size = bufSize; _buf_size = bufSize;
_is_vod = seg_number == 0;
_file_buf.reset(new char[bufSize],[](char *ptr){ _file_buf.reset(new char[bufSize],[](char *ptr){
delete[] ptr; delete[] ptr;
}); });
@ -47,14 +48,14 @@ HlsMakerImp::HlsMakerImp(const string &m3u8_file,
HlsMakerImp::~HlsMakerImp() { HlsMakerImp::~HlsMakerImp() {
//录制完了 //录制完了
makeIndexFile(true); flushLastSegment(true);
if(_seg_number){ if(!_is_vod){
//hls直播才删除文件 //hls直播才删除文件
File::delete_file(_path_prefix.data()); File::delete_file(_path_prefix.data());
} }
} }
string HlsMakerImp::onOpenFile(int index) { string HlsMakerImp::onOpenSegment(int index) {
auto full_path = fullPath(index); auto full_path = fullPath(index);
_file = makeFile(full_path, true); _file = makeFile(full_path, true);
if(!_file){ if(!_file){
@ -67,12 +68,12 @@ string HlsMakerImp::onOpenFile(int index) {
return StrPrinter << index << ".ts" << "?" << _params; return StrPrinter << index << ".ts" << "?" << _params;
} }
void HlsMakerImp::onDelFile(int index) { void HlsMakerImp::onDelSegment(int index) {
//WarnL << index; //WarnL << index;
File::delete_file(fullPath(index).data()); File::delete_file(fullPath(index).data());
} }
void HlsMakerImp::onWriteFile(const char *data, int len) { void HlsMakerImp::onWriteSegment(const char *data, int len) {
if (_file) { if (_file) {
fwrite(data, len, 1, _file.get()); fwrite(data, len, 1, _file.get());
} }

View File

@ -44,9 +44,9 @@ public:
uint32_t seg_number = 3); uint32_t seg_number = 3);
virtual ~HlsMakerImp(); virtual ~HlsMakerImp();
protected: protected:
string onOpenFile(int index) override ; string onOpenSegment(int index) override ;
void onDelFile(int index) override; void onDelSegment(int index) override;
void onWriteFile(const char *data, int len) override; void onWriteSegment(const char *data, int len) override;
void onWriteHls(const char *data, int len) override; void onWriteHls(const char *data, int len) override;
private: private:
string fullPath(int index); string fullPath(int index);
@ -58,6 +58,8 @@ private:
string _path_hls; string _path_hls;
string _params; string _params;
int _buf_size; int _buf_size;
//是否为点播
bool _is_vod;
}; };
}//namespace mediakit }//namespace mediakit

View File

@ -32,7 +32,7 @@
namespace mediakit { namespace mediakit {
class HlsRecorder : public HlsMakerImp , public TsMuxer { class HlsRecorder : public HlsMakerImp, public TsMuxer {
public: public:
template<typename ...ArgsType> template<typename ...ArgsType>
HlsRecorder(ArgsType &&...args):HlsMakerImp(std::forward<ArgsType>(args)...){} HlsRecorder(ArgsType &&...args):HlsMakerImp(std::forward<ArgsType>(args)...){}

View File

@ -33,11 +33,11 @@
namespace mediakit{ namespace mediakit{
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
#define fseek64 _fseeki64 #define fseek64 _fseeki64
#define ftell64 _ftelli64 #define ftell64 _ftelli64
#else #else
#define fseek64 fseek #define fseek64 fseek
#define ftell64 ftell #define ftell64 ftell
#endif #endif
void MP4MuxerBase::init(int flags) { void MP4MuxerBase::init(int flags) {
@ -236,7 +236,9 @@ MP4MuxerFile::MP4MuxerFile(const char *file) {
fclose(fp); fclose(fp);
}); });
init(MOV_FLAG_FASTSTART); GET_CONFIG(bool, mp4FastStart, Record::kFastStart);
init(mp4FastStart ? MOV_FLAG_FASTSTART : 0);
} }
MP4MuxerFile::~MP4MuxerFile() { MP4MuxerFile::~MP4MuxerFile() {
@ -254,15 +256,6 @@ int MP4MuxerFile::onWrite(const void *data, uint64_t bytes) {
return bytes == fwrite(data, 1, bytes, _file.get()) ? 0 : ferror(_file.get()); return bytes == fwrite(data, 1, bytes, _file.get()) ? 0 : ferror(_file.get());
} }
#if defined(_WIN32) || defined(_WIN64)
#define fseek64 _fseeki64
#define ftell64 _ftelli64
#else
#define fseek64 fseek
#define ftell64 ftell
#endif
int MP4MuxerFile::onSeek(uint64_t offset) { int MP4MuxerFile::onSeek(uint64_t offset) {
return fseek64(_file.get(), offset, SEEK_SET); return fseek64(_file.get(), offset, SEEK_SET);
} }

View File

@ -153,6 +153,14 @@ void MP4Recorder::onTrackReady(const Track::Ptr & track){
} }
} }
void MP4Recorder::resetTracks() {
closeFile();
_tracks.clear();
_haveVideo = false;
_createFileTicker.resetTime();
MediaSink::resetTracks();
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -63,6 +63,11 @@ public:
const string &strApp, const string &strApp,
const string &strStreamId); const string &strStreamId);
virtual ~MP4Recorder(); virtual ~MP4Recorder();
/**
* Track
*/
void resetTracks() override;
private: private:
/** /**
* Track输出frameonAllTrackReady触发后才会调用此方法 * Track输出frameonAllTrackReady触发后才会调用此方法

View File

@ -44,10 +44,11 @@ MediaReader::MediaReader(const string &strVhost,const string &strApp, const stri
GET_CONFIG(string,recordPath,Record::kFilePath); GET_CONFIG(string,recordPath,Record::kFilePath);
GET_CONFIG(bool,enableVhost,General::kEnableVhost); GET_CONFIG(bool,enableVhost,General::kEnableVhost);
if(enableVhost){ if(enableVhost){
strFileName = recordPath + "/" + strVhost + "/" + strApp + "/" + strId; strFileName = strVhost + "/" + strApp + "/" + strId;
}else{ }else{
strFileName = recordPath + "/" + strApp + "/" + strId; strFileName = strApp + "/" + strId;
} }
strFileName = File::absolutePath(strFileName,recordPath);
} }
_hMP4File = MP4Read(strFileName.data()); _hMP4File = MP4Read(strFileName.data());

View File

@ -56,13 +56,15 @@ MediaRecorder::MediaRecorder(const string &strVhost_tmp,
#if defined(ENABLE_HLS) #if defined(ENABLE_HLS)
if(enableHls) { if(enableHls) {
string m3u8FilePath; string m3u8FilePath;
string params;
if(enableVhost){ if(enableVhost){
m3u8FilePath = hlsPath + "/" + strVhost + "/" + strApp + "/" + strId + "/hls.m3u8"; m3u8FilePath = strVhost + "/" + strApp + "/" + strId + "/hls.m3u8";
_hlsRecorder.reset(new HlsRecorder(m3u8FilePath,string(VHOST_KEY) + "=" + strVhost ,hlsBufSize, hlsDuration, hlsNum)); params = string(VHOST_KEY) + "=" + strVhost;
}else{ }else{
m3u8FilePath = hlsPath + "/" + strApp + "/" + strId + "/hls.m3u8"; m3u8FilePath = strApp + "/" + strId + "/hls.m3u8";
_hlsRecorder.reset(new HlsRecorder(m3u8FilePath,"",hlsBufSize, hlsDuration, hlsNum));
} }
m3u8FilePath = File::absolutePath(m3u8FilePath,hlsPath);
_hlsRecorder.reset(new HlsRecorder(m3u8FilePath,params,hlsBufSize, hlsDuration, hlsNum));
} }
#endif //defined(ENABLE_HLS) #endif //defined(ENABLE_HLS)
@ -73,10 +75,11 @@ MediaRecorder::MediaRecorder(const string &strVhost_tmp,
if(enableMp4){ if(enableMp4){
string mp4FilePath; string mp4FilePath;
if(enableVhost){ if(enableVhost){
mp4FilePath = recordPath + "/" + strVhost + "/" + recordAppName + "/" + strApp + "/" + strId + "/"; mp4FilePath = strVhost + "/" + recordAppName + "/" + strApp + "/" + strId + "/";
} else { } else {
mp4FilePath = recordPath + "/" + recordAppName + "/" + strApp + "/" + strId + "/"; mp4FilePath = recordAppName + "/" + strApp + "/" + strId + "/";
} }
mp4FilePath = File::absolutePath(mp4FilePath,recordPath);
_mp4Recorder.reset(new MP4Recorder(mp4FilePath,strVhost,strApp,strId)); _mp4Recorder.reset(new MP4Recorder(mp4FilePath,strVhost,strApp,strId));
} }
#endif //defined(ENABLE_MP4RECORD) #endif //defined(ENABLE_MP4RECORD)
@ -113,4 +116,18 @@ void MediaRecorder::addTrack(const Track::Ptr &track) {
#endif //defined(ENABLE_MP4RECORD) #endif //defined(ENABLE_MP4RECORD)
} }
void MediaRecorder::resetTracks() {
#if defined(ENABLE_HLS)
if (_hlsRecorder) {
_hlsRecorder->resetTracks();
}
#endif //defined(ENABLE_HLS)
#if defined(ENABLE_MP4RECORD)
if (_mp4Recorder) {
_mp4Recorder->resetTracks();
}
#endif //defined(ENABLE_MP4RECORD)
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -51,14 +51,19 @@ public:
* frame * frame
* @param frame * @param frame
*/ */
void inputFrame(const Frame::Ptr &frame) override ; void inputFrame(const Frame::Ptr &frame) override;
/** /**
* trackTrack的clone方法 * trackTrack的clone方法
* sps pps这些信息 Delegate相关关系 * sps pps这些信息 Delegate相关关系
* @param track * @param track
*/ */
void addTrack(const Track::Ptr & track) override; void addTrack(const Track::Ptr &track) override;
/**
* track
*/
void resetTracks() override;
private: private:
#if defined(ENABLE_HLS) #if defined(ENABLE_HLS)
std::shared_ptr<HlsRecorder> _hlsRecorder; std::shared_ptr<HlsRecorder> _hlsRecorder;

View File

@ -28,48 +28,48 @@
namespace mediakit { namespace mediakit {
void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out) { int64_t DeltaStamp::deltaStamp(int64_t stamp) {
if(!_last_stamp){
//第一次计算时间戳增量,时间戳增量为0
_last_stamp = stamp;
return 0;
}
int64_t ret = stamp - _last_stamp;
if(ret >= 0){
//时间戳增量为正,返回之
_last_stamp = stamp;
return ret;
}
//时间戳增量为负,说明时间戳回环了或回退了
_last_stamp = stamp;
return _playback ? ret : 0;
}
void DeltaStamp::setPlayBack(bool playback) {
_playback = playback;
}
void Stamp::revise(int64_t dts, int64_t pts, int64_t &dts_out, int64_t &pts_out,bool modifyStamp) {
if(!pts){ if(!pts){
//没有播放时间戳,使其赋值为解码时间戳 //没有播放时间戳,使其赋值为解码时间戳
pts = dts; pts = dts;
} }
//pts和dts的差值 //pts和dts的差值
int pts_dts_diff = pts - dts; int pts_dts_diff = pts - dts;
if(_first){
//记录第一次时间戳,后面好计算时间戳增量
_start_dts = dts;
_first = false;
_ticker.resetTime();
}
if (!dts) {
//没有解码时间戳,我们生成解码时间戳
dts = _ticker.elapsedTime();
}
//相对时间戳 //相对时间戳
dts_out = dts - _start_dts; _relativeStamp += deltaStamp(modifyStamp ? _ticker.elapsedTime() : dts);
if(dts_out < _dts_inc && !_playback){ dts_out = _relativeStamp;
//本次相对时间戳竟然小于上次?
if(dts_out < 0 || _dts_inc - dts_out > 0xFFFF){
//时间戳回环,保证下次相对时间戳与本次相对合理增长
_start_dts = dts - _dts_inc;
//本次时间戳强制等于上次时间戳
dts_out = _dts_inc;
}else{
//时间戳变小了?,那么取上次时间戳
dts_out = _dts_inc;
}
}
//保留这次相对时间戳,以便下次对比是否回环或乱序
_dts_inc = dts_out;
//////////////以下是播放时间戳的计算////////////////// //////////////以下是播放时间戳的计算//////////////////
if(pts_dts_diff > 200 || pts_dts_diff < -200){ if(pts_dts_diff > 200 || pts_dts_diff < -200){
//如果差值大于200毫秒则认为由于回环导致时间戳错乱了 //如果差值大于200毫秒则认为由于回环导致时间戳错乱了
pts_dts_diff = 0; pts_dts_diff = 0;
} }
pts_out = dts_out + pts_dts_diff; pts_out = dts_out + pts_dts_diff;
if(pts_out < 0){ if(pts_out < 0){
//时间戳不能小于0 //时间戳不能小于0
@ -77,8 +77,13 @@ void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_ou
} }
} }
void Stamp::setPlayBack(bool playback) { void Stamp::setRelativeStamp(int64_t relativeStamp) {
_playback = playback; _relativeStamp = relativeStamp;
} }
int64_t Stamp::getRelativeStamp() const {
return _relativeStamp;
}
}//namespace mediakit }//namespace mediakit

View File

@ -33,18 +33,33 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
//该类解决时间戳回环、回退问题 class DeltaStamp{
//计算相对时间戳或者产生平滑时间戳
class Stamp {
public: public:
Stamp() = default; DeltaStamp() = default;
~Stamp() = default; ~DeltaStamp() = default;
/** /**
* 退 *
* @param stamp
* @return
*/
int64_t deltaStamp(int64_t stamp);
/**
* 退
* @param playback * @param playback
*/ */
void setPlayBack(bool playback = true); void setPlayBack(bool playback = true);
private:
int64_t _last_stamp = 0;
bool _playback = false;
};
//该类解决时间戳回环、回退问题
//计算相对时间戳或者产生平滑时间戳
class Stamp : public DeltaStamp{
public:
Stamp() = default;
~Stamp() = default;
/** /**
* *
@ -52,13 +67,23 @@ public:
* @param pts pts0dts * @param pts pts0dts
* @param dts_out dts * @param dts_out dts
* @param pts_out pts * @param pts_out pts
* @param modifyStamp
*/ */
void revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out); void revise(int64_t dts, int64_t pts, int64_t &dts_out, int64_t &pts_out,bool modifyStamp = false);
/**
* seek用
* @param relativeStamp
*/
void setRelativeStamp(int64_t relativeStamp);
/**
*
* @return
*/
int64_t getRelativeStamp() const ;
private: private:
bool _playback = false; int64_t _relativeStamp = 0;
int64_t _start_dts = 0;
int64_t _dts_inc = 0;
bool _first = true;
SmoothTicker _ticker; SmoothTicker _ticker;
}; };

View File

@ -101,6 +101,8 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) {
} }
void TsMuxer::resetTracks() { void TsMuxer::resetTracks() {
//通知片段中断
onTs(nullptr, 0, 0, 0);
uninit(); uninit();
init(); init();
} }

View File

@ -43,10 +43,10 @@ public:
TsMuxer(); TsMuxer();
virtual ~TsMuxer(); virtual ~TsMuxer();
void addTrack(const Track::Ptr &track) override; void addTrack(const Track::Ptr &track) override;
void inputFrame(const Frame::Ptr &frame) override; void resetTracks() override;
void inputFrame(const Frame::Ptr &frame) override;
protected: protected:
virtual void onTs(const void *packet, int bytes,uint32_t timestamp,int flags) = 0; virtual void onTs(const void *packet, int bytes,uint32_t timestamp,int flags) = 0;
void resetTracks();
private: private:
void init(); void init();
void uninit(); void uninit();

View File

@ -122,7 +122,13 @@ void PlayerProxy::play(const string &strUrlTmp) {
for (auto & track : tracks){ for (auto & track : tracks){
track->delDelegate(strongSelf->_mediaMuxer.get()); track->delDelegate(strongSelf->_mediaMuxer.get());
} }
strongSelf->_mediaMuxer.reset();
GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay);
if (resetWhenRePlay) {
strongSelf->_mediaMuxer.reset();
} else {
strongSelf->_mediaMuxer->resetTracks();
}
} }
//播放异常中断,延时重试播放 //播放异常中断,延时重试播放
if(*piFailedCnt < strongSelf->_iRetryCount || strongSelf->_iRetryCount < 0) { if(*piFailedCnt < strongSelf->_iRetryCount || strongSelf->_iRetryCount < 0) {
@ -138,7 +144,7 @@ void PlayerProxy::play(const string &strUrlTmp) {
if(directProxy && _bEnableRtsp){ if(directProxy && _bEnableRtsp){
mediaSource = std::make_shared<RtspMediaSource>(_strVhost,_strApp,_strSrc); mediaSource = std::make_shared<RtspMediaSource>(_strVhost,_strApp,_strSrc);
} }
}else if(dynamic_pointer_cast<RtmpPlayer>(_parser)){ } else if(dynamic_pointer_cast<RtmpPlayer>(_parser)){
//rtmp拉流 //rtmp拉流
if(_bEnableRtmp){ if(_bEnableRtmp){
mediaSource = std::make_shared<RtmpMediaSource>(_strVhost,_strApp,_strSrc); mediaSource = std::make_shared<RtmpMediaSource>(_strVhost,_strApp,_strSrc);
@ -154,7 +160,7 @@ PlayerProxy::~PlayerProxy() {
_timer.reset(); _timer.reset();
} }
void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000,60*1000)); auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60*1000));
weak_ptr<PlayerProxy> weakSelf = shared_from_this(); weak_ptr<PlayerProxy> weakSelf = shared_from_this();
_timer = std::make_shared<Timer>(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() { _timer = std::make_shared<Timer>(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() {
//播放失败次数越多,则延时越长 //播放失败次数越多,则延时越长
@ -213,7 +219,7 @@ public:
if(_iAudioIndex != iAudioIndex){ if(_iAudioIndex != iAudioIndex){
_iAudioIndex = iAudioIndex; _iAudioIndex = iAudioIndex;
auto aacFrame = std::make_shared<AACFrameNoCacheAble>((char *)MUTE_ADTS_DATA, auto aacFrame = std::make_shared<AACFrameNoCacheAble>((char *)MUTE_ADTS_DATA,
MUTE_ADTS_DATA_LEN, MUTE_ADTS_DATA_LEN,
_iAudioIndex * MUTE_ADTS_DATA_MS); _iAudioIndex * MUTE_ADTS_DATA_MS);
FrameRingInterfaceDelegate::inputFrame(aacFrame); FrameRingInterfaceDelegate::inputFrame(aacFrame);
} }
@ -224,15 +230,22 @@ private:
}; };
void PlayerProxy::onPlaySuccess() { void PlayerProxy::onPlaySuccess() {
GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay);
if (dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc)) { if (dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc)) {
//rtsp拉流代理 //rtsp拉流代理
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), false, _bEnableRtmp, _bEnableHls, _bEnableMp4)); if (resetWhenRePlay || !_mediaMuxer) {
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), false, _bEnableRtmp, _bEnableHls, _bEnableMp4));
}
} else if (dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc)) { } else if (dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc)) {
//rtmp拉流代理 //rtmp拉流代理
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, false, _bEnableHls, _bEnableMp4)); if (resetWhenRePlay || !_mediaMuxer) {
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, false, _bEnableHls, _bEnableMp4));
}
} else { } else {
//其他拉流代理 //其他拉流代理
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, _bEnableRtmp, _bEnableHls, _bEnableMp4)); if (resetWhenRePlay || !_mediaMuxer) {
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, _bEnableRtmp, _bEnableHls, _bEnableMp4));
}
} }
_mediaMuxer->setListener(shared_from_this()); _mediaMuxer->setListener(shared_from_this());
@ -244,13 +257,16 @@ void PlayerProxy::onPlaySuccess() {
videoTrack->addDelegate(_mediaMuxer); videoTrack->addDelegate(_mediaMuxer);
} }
//是否添加静音音频
GET_CONFIG(bool,addMuteAudio,General::kAddMuteAudio);
auto audioTrack = getTrack(TrackAudio, false); auto audioTrack = getTrack(TrackAudio, false);
if(audioTrack){ if(audioTrack){
//添加音频 //添加音频
_mediaMuxer->addTrack(audioTrack); _mediaMuxer->addTrack(audioTrack);
//音频数据写入_mediaMuxer //音频数据写入_mediaMuxer
audioTrack->addDelegate(_mediaMuxer); audioTrack->addDelegate(_mediaMuxer);
}else if(videoTrack){ }else if(addMuteAudio && videoTrack){
//没有音频信息,产生一个静音音频 //没有音频信息,产生一个静音音频
MuteAudioMaker::Ptr audioMaker = std::make_shared<MuteAudioMaker>(); MuteAudioMaker::Ptr audioMaker = std::make_shared<MuteAudioMaker>();
//videoTrack把数据写入MuteAudioMaker //videoTrack把数据写入MuteAudioMaker

View File

@ -44,13 +44,17 @@ RtmpSession::~RtmpSession() {
} }
void RtmpSession::onError(const SockException& err) { void RtmpSession::onError(const SockException& err) {
WarnP(this) << err.what(); bool isPlayer = !_pPublisherSrc;
WarnP(this) << (isPlayer ? "播放器(" : "推流器(")
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
<< ")断开:" << err.what();
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
bool isPlayer = !_pPublisherSrc;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
_mediaInfo, _mediaInfo,
_ui64TotalBytes, _ui64TotalBytes,
@ -486,7 +490,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
GET_CONFIG(bool,rtmp_modify_stamp,Rtmp::kModifyStamp); GET_CONFIG(bool,rtmp_modify_stamp,Rtmp::kModifyStamp);
if(rtmp_modify_stamp){ if(rtmp_modify_stamp){
int64_t dts_out; int64_t dts_out;
_stamp[chunkData.typeId % 2].revise(0, 0, dts_out, dts_out); _stamp[chunkData.typeId % 2].revise(0, 0, dts_out, dts_out, true);
chunkData.timeStamp = dts_out; chunkData.timeStamp = dts_out;
} }
if(!_metadata_got && !chunkData.isCfgFrame()){ if(!_metadata_got && !chunkData.isCfgFrame()){

View File

@ -68,35 +68,43 @@ void RtspPlayer::teardown(){
CLEAR_ARR(_aui64RtpRecv) CLEAR_ARR(_aui64RtpRecv)
CLEAR_ARR(_aui64RtpRecv) CLEAR_ARR(_aui64RtpRecv)
CLEAR_ARR(_aui16NowSeq) CLEAR_ARR(_aui16NowSeq)
CLEAR_ARR(_aiFistStamp);
CLEAR_ARR(_aiNowStamp);
_pPlayTimer.reset(); _pPlayTimer.reset();
_pRtpTimer.reset(); _pRtpTimer.reset();
_iSeekTo = 0;
_uiCseq = 1; _uiCseq = 1;
_onHandshake = nullptr; _onHandshake = nullptr;
} }
void RtspPlayer::play(const string &strUrl){ void RtspPlayer::play(const string &strUrl){
auto userAndPwd = FindField(strUrl.data(),"://","@");
Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[kRtpType]; Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[kRtpType];
if(userAndPwd.empty()){ auto schema = FindField(strUrl.data(), nullptr,"://");
play(strUrl,"","",eType); bool isSSL = strcasecmp(schema.data(),"rtsps") == 0;
return; //查找"://"与"/"之间的字符串,用于提取用户名密码
auto middle_url = FindField(strUrl.data(),"://","/");
if(middle_url.empty()){
middle_url = FindField(strUrl.data(),"://", nullptr);
} }
auto suffix = FindField(strUrl.data(),"@",nullptr); auto pos = middle_url.rfind('@');
if(pos == string::npos){
//并没有用户名密码
play(isSSL,strUrl,"","",eType);
return;
}
//包含用户名密码
auto user_pwd = middle_url.substr(0,pos);
auto suffix = strUrl.substr(schema.size() + 3 + pos + 1);
auto url = StrPrinter << "rtsp://" << suffix << endl; auto url = StrPrinter << "rtsp://" << suffix << endl;
if(userAndPwd.find(":") == string::npos){ if(user_pwd.find(":") == string::npos){
play(url,userAndPwd,"",eType); play(isSSL,url,user_pwd,"",eType);
return; return;
} }
auto user = FindField(userAndPwd.data(),nullptr,":"); auto user = FindField(user_pwd.data(),nullptr,":");
auto pwd = FindField(userAndPwd.data(),":",nullptr); auto pwd = FindField(user_pwd.data(),":",nullptr);
play(url,user,pwd,eType); play(isSSL,url,user,pwd,eType);
} }
//播放指定是否走rtp over tcp
void RtspPlayer::play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) { void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) {
DebugL << strUrl << " " DebugL << strUrl << " "
<< (strUser.size() ? strUser : "null") << " " << (strUser.size() ? strUser : "null") << " "
<< (strPwd.size() ? strPwd:"null") << " " << (strPwd.size() ? strPwd:"null") << " "
@ -115,12 +123,12 @@ void RtspPlayer::play(const string &strUrl, const string &strUser, const string
auto ip = FindField(strUrl.data(), "://", "/"); auto ip = FindField(strUrl.data(), "://", "/");
if (!ip.size()) { if (!ip.size()) {
ip = FindField(strUrl.data(), "://", NULL); ip = split(FindField(strUrl.data(), "://", NULL),"?")[0];
} }
auto port = atoi(FindField(ip.data(), ":", NULL).data()); auto port = atoi(FindField(ip.data(), ":", NULL).data());
if (port <= 0) { if (port <= 0) {
//rtsp 默认端口554 //rtsp 默认端口554
port = 554; port = isSSL ? 322 : 554;
} else { } else {
//服务器域名 //服务器域名
ip = FindField(ip.data(), NULL, ":"); ip = FindField(ip.data(), NULL, ":");
@ -222,6 +230,16 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
SdpParser sdpParser(parser.Content()); SdpParser sdpParser(parser.Content());
//解析sdp //解析sdp
_aTrackInfo = sdpParser.getAvailableTrack(); _aTrackInfo = sdpParser.getAvailableTrack();
auto title = sdpParser.getTrack(TrackTitle);
bool isPlayback = false;
if(title && title->_duration ){
isPlayback = true;
}
for(auto &stamp : _stamp){
stamp.setPlayBack(isPlayback);
stamp.setRelativeStamp(0);
}
if (_aTrackInfo.empty()) { if (_aTrackInfo.empty()) {
throw std::runtime_error("无有效的Sdp Track"); throw std::runtime_error("无有效的Sdp Track");
@ -386,7 +404,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
} }
//所有setup命令发送完毕 //所有setup命令发送完毕
//发送play命令 //发送play命令
pause(false); sendPause(false, 0,false);
} }
void RtspPlayer::sendOptions() { void RtspPlayer::sendOptions() {
@ -403,25 +421,19 @@ void RtspPlayer::sendDescribe() {
} }
void RtspPlayer::sendPause(bool bPause,uint32_t seekMS){ void RtspPlayer::sendPause(bool bPause,uint32_t seekMS,bool range){
if(!bPause){
//修改时间轴
int iTimeInc = seekMS - getProgressMilliSecond();
for(unsigned int i = 0 ;i < _aTrackInfo.size() ;i++){
_aiFistStamp[i] = _aiNowStamp[i] + iTimeInc;
_aiNowStamp[i] = _aiFistStamp[i];
}
_iSeekTo = seekMS;
}
//开启或暂停rtsp //开启或暂停rtsp
_onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause); _onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause);
sendRtspRequest(bPause ? "PAUSE" : "PLAY", if(!bPause && range){
_strContentBase, sendRtspRequest(bPause ? "PAUSE" : "PLAY", _strContentBase,
{"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"}); {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"});
} else{
sendRtspRequest(bPause ? "PAUSE" : "PLAY", _strContentBase);
}
} }
void RtspPlayer::pause(bool bPause) { void RtspPlayer::pause(bool bPause) {
sendPause(bPause, getProgressMilliSecond()); sendPause(bPause, getProgressMilliSecond(),false);
} }
void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) { void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) {
@ -430,6 +442,7 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) {
return; return;
} }
if (!bPause) { if (!bPause) {
uint32_t iSeekTo = 0;
//修正时间轴 //修正时间轴
auto strRange = parser["Range"]; auto strRange = parser["Range"];
if (strRange.size()) { if (strRange.size()) {
@ -437,25 +450,12 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) {
if (strStart == "now") { if (strStart == "now") {
strStart = "0"; strStart = "0";
} }
_iSeekTo = 1000 * atof(strStart.data()); iSeekTo = 1000 * atof(strStart.data());
DebugL << "seekTo(ms):" << _iSeekTo ; DebugL << "seekTo(ms):" << iSeekTo ;
}
auto strRtpInfo = parser["RTP-Info"];
if (strRtpInfo.size()) {
strRtpInfo.append(",");
vector<string> vec = split(strRtpInfo, ",");
for(auto &strTrack : vec){
strTrack.append(";");
auto strControlSuffix = strTrack.substr(1 + strTrack.rfind('/'),strTrack.find(';') - strTrack.rfind('/') - 1);
auto strRtpTime = FindField(strTrack.data(), "rtptime=", ";");
auto idx = getTrackIndexByControlSuffix(strControlSuffix);
if(idx != -1){
_aiFistStamp[idx] = _aTrackInfo[idx]->_samplerate>0?atoll(strRtpTime.data()) * 1000 / _aTrackInfo[idx]->_samplerate :1;
_aiNowStamp[idx] = _aiFistStamp[idx];
DebugL << "rtptime(ms):" << strControlSuffix <<" " << strRtpTime;
}
}
} }
//设置相对时间戳
_stamp[0].setRelativeStamp(iSeekTo);
_stamp[1].setRelativeStamp(iSeekTo);
onPlayResult_l(SockException(Err_success, "rtsp play success")); onPlayResult_l(SockException(Err_success, "rtsp play success"));
} else { } else {
_pRtpTimer.reset(); _pRtpTimer.reset();
@ -630,12 +630,11 @@ void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){
} }
_aui64RtpRecv[trackidx] ++; _aui64RtpRecv[trackidx] ++;
_aui16NowSeq[trackidx] = rtppt->sequence; _aui16NowSeq[trackidx] = rtppt->sequence;
_aiNowStamp[trackidx] = rtppt->timeStamp;
if( _aiFistStamp[trackidx] == 0){
_aiFistStamp[trackidx] = _aiNowStamp[trackidx];
}
rtppt->timeStamp -= _aiFistStamp[trackidx]; //计算相对时间戳
int64_t dts_out;
_stamp[trackidx].revise(rtppt->timeStamp,rtppt->timeStamp,dts_out,dts_out);
rtppt->timeStamp = dts_out;
onRecvRTP_l(rtppt,_aTrackInfo[trackidx]); onRecvRTP_l(rtppt,_aTrackInfo[trackidx]);
} }
float RtspPlayer::getPacketLossRate(TrackType type) const{ float RtspPlayer::getPacketLossRate(TrackType type) const{
@ -653,7 +652,6 @@ float RtspPlayer::getPacketLossRate(TrackType type) const{
return 1.0 - (double)totalRecv / totalSend; return 1.0 - (double)totalRecv / totalSend;
} }
if(_aui16NowSeq[iTrackIdx] - _aui16FirstSeq[iTrackIdx] + 1 == 0){ if(_aui16NowSeq[iTrackIdx] - _aui16FirstSeq[iTrackIdx] + 1 == 0){
return 0; return 0;
} }
@ -661,14 +659,10 @@ float RtspPlayer::getPacketLossRate(TrackType type) const{
} }
uint32_t RtspPlayer::getProgressMilliSecond() const{ uint32_t RtspPlayer::getProgressMilliSecond() const{
uint32_t iTime[2] = {0,0}; return MAX(_stamp[0].getRelativeStamp(),_stamp[1].getRelativeStamp());
for(unsigned int i = 0 ;i < _aTrackInfo.size() ;i++){
iTime[i] = _aiNowStamp[i] - _aiFistStamp[i];
}
return _iSeekTo + MAX(iTime[0],iTime[1]);
} }
void RtspPlayer::seekToMilliSecond(uint32_t ms) { void RtspPlayer::seekToMilliSecond(uint32_t ms) {
sendPause(false,ms); sendPause(false,ms, true);
} }
void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header) { void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header) {

View File

@ -40,6 +40,7 @@
#include "Network/TcpClient.h" #include "Network/TcpClient.h"
#include "RtspSplitter.h" #include "RtspSplitter.h"
#include "RtpReceiver.h" #include "RtpReceiver.h"
#include "MediaFile/Stamp.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
@ -106,7 +107,7 @@ private:
int getTrackIndexByInterleaved(int interleaved) const; int getTrackIndexByInterleaved(int interleaved) const;
int getTrackIndexByTrackType(TrackType trackType) const; int getTrackIndexByTrackType(TrackType trackType) const;
void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType); void play(bool isSSL,const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType);
void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex); void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex);
void handleResDESCRIBE(const Parser &parser); void handleResDESCRIBE(const Parser &parser);
bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr);
@ -114,7 +115,7 @@ private:
//发送SETUP命令 //发送SETUP命令
void sendSetup(unsigned int uiTrackIndex); void sendSetup(unsigned int uiTrackIndex);
void sendPause(bool bPause,uint32_t ms); void sendPause(bool bPause,uint32_t ms, bool range);
void sendOptions(); void sendOptions();
void sendDescribe(); void sendDescribe();
@ -148,12 +149,8 @@ private:
std::shared_ptr<Timer> _pPlayTimer; std::shared_ptr<Timer> _pPlayTimer;
std::shared_ptr<Timer> _pRtpTimer; std::shared_ptr<Timer> _pRtpTimer;
//播放进度控制,单位毫秒 //时间戳
uint32_t _iSeekTo = 0; Stamp _stamp[2];
//单位毫秒
uint32_t _aiFistStamp[2] = {0,0};
uint32_t _aiNowStamp[2] = {0,0};
//rtcp相关 //rtcp相关
RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标 RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标

View File

@ -85,7 +85,13 @@ RtspSession::~RtspSession() {
} }
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {
WarnP(this) << err.what(); bool isPlayer = !_pushSrc;
WarnP(this) << (isPlayer ? "播放器(" : "推流器(")
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
<< ")断开:" << err.what();
if (_rtpType == Rtsp::RTP_MULTICAST) { if (_rtpType == Rtsp::RTP_MULTICAST) {
//取消UDP端口监听 //取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
@ -100,7 +106,6 @@ void RtspSession::onError(const SockException& err) {
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
bool isPlayer = !_pushSrc;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
_mediaInfo, _mediaInfo,
_ui64TotalBytes, _ui64TotalBytes,
@ -932,7 +937,7 @@ void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
GET_CONFIG(bool,modify_stamp,Rtsp::kModifyStamp); GET_CONFIG(bool,modify_stamp,Rtsp::kModifyStamp);
if(modify_stamp){ if(modify_stamp){
int64_t dts_out; int64_t dts_out;
_stamp[trackidx].revise(0, 0, dts_out, dts_out); _stamp[trackidx].revise(0, 0, dts_out, dts_out, true);
rtppt->timeStamp = dts_out; rtppt->timeStamp = dts_out;
} }
_pushSrc->onWrite(rtppt, false); _pushSrc->onWrite(rtppt, false);

View File

@ -101,6 +101,7 @@ int main(int argc, char *argv[]) {
#endif #endif
static char *url = argv[1];
//设置退出信号处理函数 //设置退出信号处理函数
signal(SIGINT, [](int) { SDLDisplayerHelper::Instance().shutdown(); }); signal(SIGINT, [](int) { SDLDisplayerHelper::Instance().shutdown(); });
//设置日志 //设置日志
@ -140,7 +141,7 @@ int main(int argc, char *argv[]) {
decoder.set<H264Decoder>(); decoder.set<H264Decoder>();
} }
if(!displayer){ if(!displayer){
displayer.set<YuvDisplayer>(); displayer.set<YuvDisplayer>(nullptr,url);
} }
if(!merger){ if(!merger){
merger.set<FrameMerger>(); merger.set<FrameMerger>();