diff --git a/.gitattributes b/.gitattributes index 292d29d3..b7f1a8b2 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,2 +1,2 @@ -release/ filter=lfs diff=lfs merge=lfs -text -*.a filter=lfs diff=lfs merge=lfs -text +*.h linguist-language=cpp +*.c linguist-language=cpp diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 00000000..4bf33d7a --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1,5 @@ +# These are supported funding model platforms +custom: ['https://www.paypal.me/xiachu'] +ko_fi: xiachu +issuehunt: xiongziliang +liberapay: xiachu diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index e256dabd..8d1681b5 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit e256dabd370220a5b19e3c5b54cf29dd5cccd48e +Subproject commit 8d1681b5bb247e7f47ae0f8c414f6eeb376b742b diff --git a/Android/app/build.gradle b/Android/app/build.gradle index da3ae89b..56c5646d 100644 --- a/Android/app/build.gradle +++ b/Android/app/build.gradle @@ -4,7 +4,7 @@ android { compileSdkVersion 28 defaultConfig { applicationId "com.zlmediakit.demo" - minSdkVersion 19 + minSdkVersion 15 targetSdkVersion 28 versionCode 1 versionName "1.0" diff --git a/Android/app/libs/arm64-v8a/libcrypto.a b/Android/app/libs/arm64-v8a/libcrypto.a index cb6cb229..6f890f91 100644 Binary files a/Android/app/libs/arm64-v8a/libcrypto.a and b/Android/app/libs/arm64-v8a/libcrypto.a differ diff --git a/Android/app/libs/arm64-v8a/libssl.a b/Android/app/libs/arm64-v8a/libssl.a index a4c5d563..a9fc3f6c 100644 Binary files a/Android/app/libs/arm64-v8a/libssl.a and b/Android/app/libs/arm64-v8a/libssl.a differ diff --git a/Android/app/libs/armeabi-v7a/libcrypto.a b/Android/app/libs/armeabi-v7a/libcrypto.a index ca9dad21..29eeb767 100644 Binary files a/Android/app/libs/armeabi-v7a/libcrypto.a and b/Android/app/libs/armeabi-v7a/libcrypto.a differ diff --git a/Android/app/libs/armeabi-v7a/libssl.a b/Android/app/libs/armeabi-v7a/libssl.a index 766cbd1a..a3678fae 100644 Binary files a/Android/app/libs/armeabi-v7a/libssl.a and b/Android/app/libs/armeabi-v7a/libssl.a differ diff --git a/Android/app/libs/armeabi/libcrypto.a b/Android/app/libs/armeabi/libcrypto.a index cf6e027e..d481a036 100644 Binary files a/Android/app/libs/armeabi/libcrypto.a and b/Android/app/libs/armeabi/libcrypto.a differ diff --git a/Android/app/libs/armeabi/libssl.a b/Android/app/libs/armeabi/libssl.a index 8740b193..143a176b 100644 Binary files a/Android/app/libs/armeabi/libssl.a and b/Android/app/libs/armeabi/libssl.a differ diff --git a/Android/app/libs/x86/libcrypto.a b/Android/app/libs/x86/libcrypto.a index 28069371..4935e4f2 100644 Binary files a/Android/app/libs/x86/libcrypto.a and b/Android/app/libs/x86/libcrypto.a differ diff --git a/Android/app/libs/x86/libssl.a b/Android/app/libs/x86/libssl.a index 92de513f..5f68368d 100644 Binary files a/Android/app/libs/x86/libssl.a and b/Android/app/libs/x86/libssl.a differ diff --git a/Android/app/libs/x86_64/libcrypto.a b/Android/app/libs/x86_64/libcrypto.a index 8339bd3c..d3e4e826 100644 Binary files a/Android/app/libs/x86_64/libcrypto.a and b/Android/app/libs/x86_64/libcrypto.a differ diff --git a/Android/app/libs/x86_64/libssl.a b/Android/app/libs/x86_64/libssl.a index b6106ad9..1b96f397 100644 Binary files a/Android/app/libs/x86_64/libssl.a and b/Android/app/libs/x86_64/libssl.a differ diff --git a/Android/app/src/main/cpp/native-lib.cpp b/Android/app/src/main/cpp/native-lib.cpp index 193ebff7..f6a2946c 100644 --- a/Android/app/src/main/cpp/native-lib.cpp +++ b/Android/app/src/main/cpp/native-lib.cpp @@ -174,6 +174,7 @@ JNI_API(jboolean,startDemo,jstring ini_dir){ mINI::Instance()["http.sslport"] = 8443; mINI::Instance()["rtsp.port"] = 8554; mINI::Instance()["rtsp.sslport"] = 8332; + mINI::Instance()["general.enableVhost"] = 0; for(auto &pr : mINI::Instance()){ //替换hook默认地址 replace(pr.second,"https://127.0.0.1/","http://127.0.0.1:8080/"); diff --git a/Android/app/src/main/java/com/zlmediakit/demo/MainActivity.java b/Android/app/src/main/java/com/zlmediakit/demo/MainActivity.java index 97ee8887..05eba135 100644 --- a/Android/app/src/main/java/com/zlmediakit/demo/MainActivity.java +++ b/Android/app/src/main/java/com/zlmediakit/demo/MainActivity.java @@ -38,10 +38,10 @@ public class MainActivity extends AppCompatActivity { if(permissionSuccess){ Toast.makeText(this,"你可以修改配置文件再启动:" + sd_dir + "/zlmediakit.ini" ,Toast.LENGTH_LONG).show(); Toast.makeText(this,"SSL证书请放置在:" + sd_dir + "/zlmediakit.pem" ,Toast.LENGTH_LONG).show(); - ZLMediaKit.startDemo(sd_dir); }else{ Toast.makeText(this,"请给予我权限,否则无法启动测试!" ,Toast.LENGTH_LONG).show(); } + ZLMediaKit.startDemo(sd_dir); } private ZLMediaKit.MediaPlayer _player; diff --git a/CMakeLists.txt b/CMakeLists.txt index 61baf74a..f90e5cd3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -153,6 +153,15 @@ if(ENABLE_MP4RECORD) endif(WIN32) endif() +if(${CMAKE_BUILD_TYPE} MATCHES "Release") + #查找jemalloc是否安装 + find_package(JEMALLOC QUIET) + if(JEMALLOC_FOUND) + message(STATUS "found library:\"${JEMALLOC_LIBRARIES}\"") + include_directories(${JEMALLOC_INCLUDE_DIR}) + list(APPEND LINK_LIB_LIST ${JEMALLOC_LIBRARIES}) + endif() +endif() if (WIN32) list(APPEND LINK_LIB_LIST WS2_32 Iphlpapi shlwapi) diff --git a/README.md b/README.md index 0eef5ac4..5dcbcb68 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,11 @@ ## Why ZLMediaKit? - Developed based on C++ 11, the code is stable and reliable, avoiding the use of raw pointers, cross-platform porting is simple and convenient, and the code is clear and concise. -- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV`),and support Inter-protocol conversion. +- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV/Websocket-flv`),and support Inter-protocol conversion. - Multiplexing asynchronous network IO based on epoll and multi thread,extreme performance. - Well performance and stable test,can be used commercially. - Support linux, macos, ios, android, Windows Platforms. - Very low latency(lower then one second), video opened immediately. -- **Now Support websocket-flv!** ## Features diff --git a/README_CN.md b/README_CN.md index 918d068a..54199461 100644 --- a/README_CN.md +++ b/README_CN.md @@ -4,14 +4,13 @@ ## 项目特点 - 基于C++11开发,避免使用裸指针,代码稳定可靠;同时跨平台移植简单方便,代码清晰简洁。 -- 打包多种流媒体协议(RTSP/RTMP/HLS),支持协议间的互相转换,提供一站式的服务。 +- 打包多种流媒体协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV),支持协议间的互相转换,提供一站式的服务。 - 使用epoll+线程池+异步网络IO模式开发,并发性能优越。 - 已实现主流的的H264/H265+AAC流媒体方案,代码精简,脉络清晰,适合学习。 - 编码格式与框架代码解耦,方便自由简洁的添加支持其他编码格式 - 代码经过大量的稳定性、性能测试,可满足商用服务器项目。 - 支持linux、macos、ios、android、windows平台 - 支持画面秒开(GOP缓存)、极低延时([500毫秒内,最低可达100毫秒](https://github.com/zlmediakit/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95)) -- **支持websocket-flv直播** - [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86) ## 项目定位 diff --git a/conf/config.ini b/conf/config.ini index 7ce085e8..cbbcb96d 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -6,12 +6,13 @@ apiDebug=1 secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc [ffmpeg] -#FFmpeg可执行程序路径 +#FFmpeg可执行程序绝对路径 bin=/usr/local/bin/ffmpeg #FFmpeg拉流再推流的命令模板,通过该模板可以设置再编码的一些参数 cmd=%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s #FFmpeg日志的路径,如果置空则不生成FFmpeg日志 -log=/Users/xzl/git/ZLMediaKit/release/mac/Release/ffmpeg/ffmpeg.log +#可以为相对(相对于本可执行程序目录)或绝对路径 +log=./ffmpeg/ffmpeg.log [general] #是否启用虚拟主机 @@ -30,12 +31,18 @@ maxStreamWaitMS=5000 streamNoneReaderDelayMS=5000 #是否开启低延时模式,该模式下禁用MSG_MORE,启用TCP_NODEALY,延时将降低,但数据发送性能将降低 ultraLowDelay=1 +#拉流代理是否添加静音音频(直接拉流模式本协议无效) +addMuteAudio=1 +#拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始, +#如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写) +resetWhenRePlay=1 [hls] #hls写文件的buf大小,调整参数可以提高文件io性能 fileBufSize=65536 #hls保存文件路径 -filePath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot +#可以为相对(相对于本可执行程序目录)或绝对路径 +filePath=./httpRoot #hls最大切片时间 segDur=3 #m3u8索引中,hls保留切片个数(实际保留切片个数大2~3个) @@ -88,7 +95,8 @@ notFound=404 Not Found> 8; + if(exit_code_ptr){ + *exit_code_ptr = (status & 0xFF00) >> 8; + } + if (p < 0) { + WarnL << "waitpid failed, pid=" << pid << ", err=" << get_uv_errmsg(); + return false; + } + if (p > 0) { + InfoL << "process terminated, pid=" << pid << ", exit code=" << exit_code; + return false; + } + //WarnL << "process is running, pid=" << _pid; + return true; +} + +static void s_kill(pid_t pid,int max_delay,bool force){ + if (pid <= 0) { + //pid无效 + return; + } + + if (::kill(pid, force ? SIGKILL : SIGTERM) == -1) { + //进程可能已经退出了 + WarnL << "kill process " << pid << " failed:" << get_uv_errmsg(); + return; + } + + if(force){ + //发送SIGKILL信号后,阻塞等待退出 + s_wait(pid, NULL, true); + DebugL << "force kill " << pid << " success!"; + return; + } + + //发送SIGTERM信号后,2秒后检查子进程是否已经退出 + WorkThreadPool::Instance().getPoller()->doDelayTask(max_delay,[pid](){ + if (!s_wait(pid, nullptr, false)) { + //进程已经退出了 + return 0; + } + //进程还在运行 + WarnL << "process still working,force kill it:" << pid; + s_kill(pid,0, true); + return 0; + }); +} + +void Process::kill(int max_delay,bool force) { if (_pid <= 0) { return; } - if (::kill(_pid, SIGTERM) == -1) { - WarnL << "kill process " << _pid << " falied,err:" << get_uv_errmsg(); - } else { - //等待子进程退出 - auto pid = _pid; - EventPollerPool::Instance().getPoller()->doDelayTask(max_delay,[pid](){ - //最多等待2秒,2秒后强制杀掉程序 - if (waitpid(pid, NULL, WNOHANG) == 0) { - ::kill(pid, SIGKILL); - WarnL << "force kill process " << pid; - } - return 0; - }); - } + s_kill(_pid,max_delay,force); _pid = -1; } @@ -134,28 +181,10 @@ Process::~Process() { kill(2000); } -Process::Process() { -} +Process::Process() {} bool Process::wait(bool block) { - if (_pid <= 0) { - return false; - } - int status = 0; - pid_t p = waitpid(_pid, &status, block ? 0 : WNOHANG); - - _exit_code = (status & 0xFF00) >> 8; - if (p < 0) { - WarnL << "waitpid failed, pid=" << _pid << ", err=" << get_uv_errmsg(); - return false; - } - if (p > 0) { - InfoL << "process terminated, pid=" << _pid << ", exit code=" << _exit_code; - return false; - } - - //WarnL << "process is running, pid=" << _pid; - return true; + return s_wait(_pid,&_exit_code,block); } int Process::exit_code() { diff --git a/server/Process.h b/server/Process.h index b0b994d9..cce8470a 100644 --- a/server/Process.h +++ b/server/Process.h @@ -36,7 +36,7 @@ public: Process(); ~Process(); void run(const string &cmd,const string &log_file); - void kill(int max_delay); + void kill(int max_delay,bool force = false); bool wait(bool block = true); int exit_code(); private: diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 3209b439..9a4fcdb7 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -45,6 +45,7 @@ #include "Util/MD5.h" #include "WebApi.h" #include "WebHook.h" +#include "Thread/WorkThreadPool.h" #if !defined(_WIN32) #include "FFmpegSource.h" @@ -180,19 +181,35 @@ static inline void addHttpListener(){ if(api_debug){ auto newInvoker = [invoker,parser,allArgs](const string &codeOut, const HttpSession::KeyValue &headerOut, - const string &contentOut){ + const HttpBody::Ptr &body){ stringstream ss; for(auto &pr : allArgs ){ ss << pr.first << " : " << pr.second << "\r\n"; } - DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" - << "# content:\r\n" << parser.Content() << "\r\n" - << "# args:\r\n" << ss.str() - << "# response:\r\n" - << contentOut << "\r\n"; + //body默认为空 + int64_t size = 0; + if (body && body->remainSize()) { + //有body,获取body大小 + size = body->remainSize(); + } - invoker(codeOut,headerOut,contentOut); + if(size < 4 * 1024){ + string contentOut = body->readData(size)->toString(); + DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" + << "# content:\r\n" << parser.Content() << "\r\n" + << "# args:\r\n" << ss.str() + << "# response:\r\n" + << contentOut << "\r\n"; + invoker(codeOut,headerOut,contentOut); + } else{ + DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" + << "# content:\r\n" << parser.Content() << "\r\n" + << "# args:\r\n" << ss.str() + << "# response size:" + << size <<"\r\n"; + invoker(codeOut,headerOut,body); + } }; ((HttpSession::HttpResponseInvoker &)invoker) = newInvoker; } @@ -281,6 +298,23 @@ void installWebApi() { }); }); + //获取后台工作线程负载 + //测试url http://127.0.0.1/index/api/getWorkThreadsLoad + API_REGIST_INVOKER(api, getWorkThreadsLoad, { + WorkThreadPool::Instance().getExecutorDelay([invoker, headerOut](const vector &vecDelay) { + Value val; + auto vec = WorkThreadPool::Instance().getExecutorLoad(); + int i = 0; + for (auto load : vec) { + Value obj(objectValue); + obj["load"] = load; + obj["delay"] = vecDelay[i++]; + val["data"].append(obj); + } + invoker("200 OK", headerOut, val.toStyledString()); + }); + }); + //获取服务器配置 //测试url http://127.0.0.1/index/api/getServerConfig API_REGIST(api, getServerConfig, { @@ -578,6 +612,13 @@ void installWebApi() { }); #endif + //新增http api下载可执行程序文件接口 + //测试url http://127.0.0.1/index/api/downloadBin + API_REGIST_INVOKER(api,downloadBin,{ + CHECK_SECRET(); + invoker.responseFile(headerIn,StrCaseMap(),exePath()); + }); + ////////////以下是注册的Hook API//////////// API_REGIST(hook,on_publish,{ //开始推流事件 diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp index b2263e54..4d74e2c9 100644 --- a/src/Common/MediaSink.cpp +++ b/src/Common/MediaSink.cpp @@ -32,19 +32,9 @@ namespace mediakit{ void MediaSink::addTrack(const Track::Ptr &track_in) { lock_guard lck(_mtx); -//克隆Track,只拷贝其数据,不拷贝其数据转发关系 + //克隆Track,只拷贝其数据,不拷贝其数据转发关系 auto track = track_in->clone(); - weak_ptr weakSelf = shared_from_this(); - track->addDelegate(std::make_shared([weakSelf](const Frame::Ptr &frame){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - return; - } - if(!strongSelf->_anyTrackUnReady){ - strongSelf->onTrackFrame(frame); - } - })); auto codec_id = track->getCodecId(); _track_map[codec_id] = track; auto lam = [this,track](){ @@ -58,6 +48,26 @@ void MediaSink::addTrack(const Track::Ptr &track_in) { _trackReadyCallback[codec_id] = lam; _ticker.resetTime(); } + + weak_ptr weakSelf = shared_from_this(); + track->addDelegate(std::make_shared([weakSelf](const Frame::Ptr &frame){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } + if(!strongSelf->_anyTrackUnReady){ + strongSelf->onTrackFrame(frame); + } + })); +} + +void MediaSink::resetTracks() { + lock_guard lck(_mtx); + _anyTrackUnReady = false; + _allTrackReady = false; + _track_map.clear(); + _trackReadyCallback.clear(); + _ticker.resetTime(); } void MediaSink::inputFrame(const Frame::Ptr &frame) { diff --git a/src/Common/MediaSink.h b/src/Common/MediaSink.h index a45b9638..8798590d 100644 --- a/src/Common/MediaSink.h +++ b/src/Common/MediaSink.h @@ -52,7 +52,7 @@ public: * 输入frame * @param frame */ - void inputFrame(const Frame::Ptr &frame) override ; + void inputFrame(const Frame::Ptr &frame) override; /** * 添加track,内部会调用Track的clone方法 @@ -61,21 +61,24 @@ public: */ virtual void addTrack(const Track::Ptr & track); + /** + * 重置track + */ + virtual void resetTracks(); /** * 全部Track是否都准备好了 * @return */ - bool isAllTrackReady() const ; - + bool isAllTrackReady() const; /** * 获取特定类型的Track * @param type track类型 - * @param trackReady 是否获取已经准备好的Track + * @param trackReady 是否获取已经准备好的Track * @return */ - Track::Ptr getTrack(TrackType type,bool trackReady = true) const ; + Track::Ptr getTrack(TrackType type,bool trackReady = true) const; protected: /** * 某track已经准备好,其ready()状态返回true, diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 895ba506..b466ffec 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -69,6 +69,19 @@ public: _record->addTrack(track); } + /** + * 重置音视频媒体 + */ + void resetTracks() { + if(_rtmp){ + _rtmp->resetTracks(); + } + if(_rtsp){ + _rtsp->resetTracks(); + } + _record->resetTracks(); + } + /** * 写入帧数据然后打包rtmp * @param frame 帧数据 diff --git a/src/Common/Parser.h b/src/Common/Parser.h index bf9854a7..eaa1874d 100644 --- a/src/Common/Parser.h +++ b/src/Common/Parser.h @@ -163,7 +163,7 @@ class Parser { for (string &key_val : arg_vec) { auto key = FindField(key_val.data(), NULL, key_delim); auto val = FindField(key_val.data(), key_delim, NULL); - ret.emplace_force(key,val); + ret.emplace_force(trim(key),trim(val)); } return ret; } diff --git a/src/Common/config.cpp b/src/Common/config.cpp index fa4a1c44..15540bcd 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -77,12 +77,17 @@ const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS"; const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS"; const string kEnableVhost = GENERAL_FIELD"enableVhost"; const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay"; +const string kAddMuteAudio = GENERAL_FIELD"addMuteAudio"; +const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay"; + onceToken token([](){ mINI::Instance()[kFlowThreshold] = 1024; mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000; mINI::Instance()[kMaxStreamWaitTimeMS] = 5 * 1000; mINI::Instance()[kEnableVhost] = 1; mINI::Instance()[kUltraLowDelay] = 1; + mINI::Instance()[kAddMuteAudio] = 1; + mINI::Instance()[kResetWhenRePlay] = 1; },nullptr); }//namespace General @@ -117,7 +122,7 @@ const string kMaxReqCount = HTTP_FIELD"maxReqCount"; const string kCharSet = HTTP_FIELD"charSet"; //http 服务器根目录 -#define HTTP_ROOT_PATH (exeDir() + "httpRoot") +#define HTTP_ROOT_PATH "./httpRoot" const string kRootPath = HTTP_FIELD"rootPath"; //http 404错误提示内容 @@ -172,7 +177,7 @@ onceToken token([](){ mINI::Instance()[kHandshakeSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15; mINI::Instance()[kDirectProxy] = 1; - mINI::Instance()[kModifyStamp] = true; + mINI::Instance()[kModifyStamp] = false; },nullptr); } //namespace Rtsp @@ -254,7 +259,7 @@ namespace Record { const string kAppName = RECORD_FIELD"appName"; //每次流化MP4文件的时长,单位毫秒 -#define RECORD_SAMPLE_MS 100 +#define RECORD_SAMPLE_MS 500 const string kSampleMS = RECORD_FIELD"sampleMS"; //MP4文件录制大小,默认一个小时 @@ -268,6 +273,9 @@ const string kFilePath = RECORD_FIELD"filePath"; //mp4文件写缓存大小 const string kFileBufSize = RECORD_FIELD"fileBufSize"; +//mp4录制完成后是否进行二次关键帧索引写入头部 +const string kFastStart = RECORD_FIELD"fastStart"; + //mp4文件是否重头循环读取 const string kFileRepeat = RECORD_FIELD"fileRepeat"; @@ -277,6 +285,7 @@ onceToken token([](){ mINI::Instance()[kFileSecond] = RECORD_FILE_SECOND; mINI::Instance()[kFilePath] = RECORD_FILE_PATH; mINI::Instance()[kFileBufSize] = 64 * 1024; + mINI::Instance()[kFastStart] = false; mINI::Instance()[kFileRepeat] = false; },nullptr); diff --git a/src/Common/config.h b/src/Common/config.h index d6ab955c..a4557ea1 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -177,6 +177,11 @@ extern const string kMaxStreamWaitTimeMS; extern const string kEnableVhost; //超低延时模式,默认打开,打开后会降低延时但是转发性能会稍差 extern const string kUltraLowDelay; +//拉流代理时是否添加静音音频 +extern const string kAddMuteAudio; +//拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始, +//如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写) +extern const string kResetWhenRePlay; }//namespace General @@ -268,6 +273,8 @@ extern const string kFileSecond; extern const string kFilePath; //mp4文件写缓存大小 extern const string kFileBufSize; +//mp4录制完成后是否进行二次关键帧索引写入头部 +extern const string kFastStart; //mp4文件是否重头循环读取 extern const string kFileRepeat; } //namespace Record diff --git a/src/Extension/Factory.cpp b/src/Extension/Factory.cpp index aee7de21..d65dbb56 100644 --- a/src/Extension/Factory.cpp +++ b/src/Extension/Factory.cpp @@ -41,8 +41,8 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { aac_cfg_str = FindField(track->_fmtp.data(), "config=", ";"); } if (aac_cfg_str.empty()) { - //延后获取adts头 - return std::make_shared(); + //如果sdp中获取不到aac config信息,那么在rtp也无法获取,那么忽略该Track + return nullptr; } string aac_cfg; @@ -60,10 +60,8 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { } if (strcasecmp(track->_codec.data(), "h264") == 0) { - auto map = Parser::parseArgs(track->_fmtp," ","="); - for(auto &pr : map){ - trim(pr.second," ;"); - } + //a=fmtp:96 packetization-mode=1;profile-level-id=42C01F;sprop-parameter-sets=Z0LAH9oBQBboQAAAAwBAAAAPI8YMqA==,aM48gA== + auto map = Parser::parseArgs(FindField(track->_fmtp.data()," ", nullptr),";","="); auto sps_pps = map["sprop-parameter-sets"]; if(sps_pps.empty()){ return std::make_shared(); @@ -77,10 +75,7 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { if (strcasecmp(track->_codec.data(), "h265") == 0) { //a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBdoAKAgC0WNrkky/AIAAADAAgAAAMBlQg=; sprop-pps=RAHA8vA8kAA= - auto map = Parser::parseArgs(track->_fmtp," ","="); - for(auto &pr : map){ - trim(pr.second," ;"); - } + auto map = Parser::parseArgs(FindField(track->_fmtp.data()," ", nullptr),";","="); auto vps = decodeBase64(map["sprop-vps"]); auto sps = decodeBase64(map["sprop-sps"]); auto pps = decodeBase64(map["sprop-pps"]); diff --git a/src/Extension/H264.h b/src/Extension/H264.h index df14031d..394752ec 100644 --- a/src/Extension/H264.h +++ b/src/Extension/H264.h @@ -390,7 +390,7 @@ public: _printer << "m=video 0 RTP/AVP " << playload_type << "\r\n"; _printer << "b=AS:" << bitrate << "\r\n"; _printer << "a=rtpmap:" << playload_type << " H264/" << 90000 << "\r\n"; - _printer << "a=fmtp:" << playload_type << " packetization-mode=1;profile-level-id="; + _printer << "a=fmtp:" << playload_type << " packetization-mode=1; profile-level-id="; char strTemp[100]; uint32_t profile_level_id = 0; @@ -402,7 +402,7 @@ public: memset(strTemp, 0, 100); sprintf(strTemp, "%06X", profile_level_id); _printer << strTemp; - _printer << ";sprop-parameter-sets="; + _printer << "; sprop-parameter-sets="; memset(strTemp, 0, 100); av_base64_encode(strTemp, 100, (uint8_t *) strSPS.data(), strSPS.size()); _printer << strTemp << ","; diff --git a/src/Http/HttpBody.cpp b/src/Http/HttpBody.cpp new file mode 100644 index 00000000..9e6a0592 --- /dev/null +++ b/src/Http/HttpBody.cpp @@ -0,0 +1,257 @@ +/* + * MIT License + * + * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "HttpBody.h" +#include "Util/util.h" +#include "Util/uv_errno.h" +#include "Util/logger.h" +#include "HttpClient.h" +#ifndef _WIN32 +#include +#endif + +#ifndef _WIN32 +#define ENABLE_MMAP +#endif + +namespace mediakit { + +HttpStringBody::HttpStringBody(const string &str){ + _str = str; +} +uint64_t HttpStringBody::remainSize() { + return _str.size() - _offset; +} + +Buffer::Ptr HttpStringBody::readData(uint32_t size) { + size = MIN(remainSize(),size); + if(!size){ + //没有剩余字节了 + return nullptr; + } + auto ret = std::make_shared(_str,_offset,size); + _offset += size; + return ret; +} + +////////////////////////////////////////////////////////////////// +HttpFileBody::HttpFileBody(const string &filePath){ + std::shared_ptr fp(fopen(filePath.data(), "rb"), [](FILE *fp) { + if(fp){ + fclose(fp); + } + }); + if(!fp){ + init(fp,0,0); + }else{ + init(fp,0,HttpMultiFormBody::fileSize(fp.get())); + } +} + +HttpFileBody::HttpFileBody(const std::shared_ptr &fp, uint64_t offset, uint64_t max_size) { + init(fp,offset,max_size); +} + +void HttpFileBody::init(const std::shared_ptr &fp,uint64_t offset,uint64_t max_size){ + _fp = fp; + _max_size = max_size; +#ifdef ENABLE_MMAP + do { + if(!_fp){ + //文件不存在 + break; + } + int fd = fileno(fp.get()); + if (fd < 0) { + WarnL << "fileno failed:" << get_uv_errmsg(false); + break; + } + auto ptr = (char *) mmap(NULL, max_size, PROT_READ, MAP_SHARED, fd, offset); + if (ptr == MAP_FAILED) { + WarnL << "mmap failed:" << get_uv_errmsg(false); + break; + } + _map_addr.reset(ptr,[max_size,fp](char *ptr){ + munmap(ptr,max_size); + }); + } while (false); +#endif + if(!_map_addr && offset && fp.get()){ + //未映射,那么fseek设置偏移量 + fseek(fp.get(), offset, SEEK_SET); + } +} + + +class BufferMmap : public Buffer{ +public: + typedef std::shared_ptr Ptr; + BufferMmap(const std::shared_ptr &map_addr,uint64_t offset,int size){ + _map_addr = map_addr; + _data = map_addr.get() + offset; + _size = size; + }; + virtual ~BufferMmap(){}; + //返回数据长度 + char *data() const override { + return _data; + } + uint32_t size() const override{ + return _size; + } +private: + std::shared_ptr _map_addr; + char *_data; + uint32_t _size; +}; + +uint64_t HttpFileBody::remainSize() { + return _max_size - _offset; +} + +Buffer::Ptr HttpFileBody::readData(uint32_t size) { + size = MIN(remainSize(),size); + if(!size){ + //没有剩余字节了 + return nullptr; + } + if(!_map_addr){ + //fread模式 + int iRead; + auto ret = _pool.obtain(); + ret->setCapacity(size + 1); + do{ + iRead = fread(ret->data(), 1, size, _fp.get()); + }while(-1 == iRead && UV_EINTR == get_uv_error(false)); + + if(iRead > 0){ + //读到数据了 + ret->setSize(iRead); + _offset += iRead; + return std::move(ret); + } + //读取文件异常,文件真实长度小于声明长度 + _offset = _max_size; + WarnL << "read file err:" << get_uv_errmsg(); + return nullptr; + } + + //mmap模式 + auto ret = std::make_shared(_map_addr,_offset,size); + _offset += size; + return std::move(ret); +} + +////////////////////////////////////////////////////////////////// +HttpMultiFormBody::HttpMultiFormBody(const HttpArgs &args,const string &filePath,const string &boundary){ + std::shared_ptr fp(fopen(filePath.data(), "rb"), [](FILE *fp) { + if(fp){ + fclose(fp); + } + }); + if(!fp){ + throw std::invalid_argument(StrPrinter << "open file failed:" << filePath << " " << get_uv_errmsg()); + } + _fileBody = std::make_shared(fp, 0, fileSize(fp.get())); + + auto fileName = filePath; + auto pos = filePath.rfind('/'); + if(pos != string::npos){ + fileName = filePath.substr(pos + 1); + } + _bodyPrefix = multiFormBodyPrefix(args,boundary,fileName); + _bodySuffix = multiFormBodySuffix(boundary); + _totalSize = _bodyPrefix.size() + _bodySuffix.size() + _fileBody->remainSize(); +} + +uint64_t HttpMultiFormBody::remainSize() { + return _totalSize - _offset; +} + +Buffer::Ptr HttpMultiFormBody::readData(uint32_t size){ + if(_bodyPrefix.size()){ + auto ret = std::make_shared(_bodyPrefix); + _offset += _bodyPrefix.size(); + _bodyPrefix.clear(); + return ret; + } + + if(_fileBody->remainSize()){ + auto ret = _fileBody->readData(size); + if(!ret){ + //读取文件出现异常,提前中断 + _offset = _totalSize; + }else{ + _offset += ret->size(); + } + return ret; + } + + if(_bodySuffix.size()){ + auto ret = std::make_shared(_bodySuffix); + _offset = _totalSize; + _bodySuffix.clear(); + return ret; + } + + return nullptr; +} + +string HttpMultiFormBody::multiFormBodySuffix(const string &boundary){ + string MPboundary = string("--") + boundary; + string endMPboundary = MPboundary + "--"; + _StrPrinter body; + body << "\r\n" << endMPboundary; + return body; +} + +uint64_t HttpMultiFormBody::fileSize(FILE *fp) { + auto current = ftell(fp); + fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */ + auto end = ftell(fp); /* 得到文件大小 */ + fseek(fp,current,SEEK_SET); + return end - current; +} + +string HttpMultiFormBody::multiFormContentType(const string &boundary){ + return StrPrinter << "multipart/form-data; boundary=" << boundary; +} + +string HttpMultiFormBody::multiFormBodyPrefix(const HttpArgs &args,const string &boundary,const string &fileName){ + string MPboundary = string("--") + boundary; + _StrPrinter body; + for(auto &pr : args){ + body << MPboundary << "\r\n"; + body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n"; + body << pr.second << "\r\n"; + } + body << MPboundary << "\r\n"; + body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n"; + body << "Content-Type: application/octet-stream\r\n\r\n" ; + return body; +} + +}//namespace mediakit diff --git a/src/Http/HttpBody.h b/src/Http/HttpBody.h new file mode 100644 index 00000000..d119cbc5 --- /dev/null +++ b/src/Http/HttpBody.h @@ -0,0 +1,145 @@ +/* + * MIT License + * + * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#ifndef ZLMEDIAKIT_FILEREADER_H +#define ZLMEDIAKIT_FILEREADER_H + +#include +#include +#include "Network/Buffer.h" +#include "Util/ResourcePool.h" +#include "Util/logger.h" + +using namespace std; +using namespace toolkit; + +#ifndef MIN +#define MIN(a,b) ((a) < (b) ? (a) : (b) ) +#endif //MIN + +namespace mediakit { + +/** + * http content部分基类定义 + */ +class HttpBody{ +public: + typedef std::shared_ptr Ptr; + HttpBody(){} + virtual ~HttpBody(){} + + /** + * 剩余数据大小 + */ + virtual uint64_t remainSize() { return 0;}; + + /** + * 读取一定字节数,返回大小可能小于size + * @param size 请求大小 + * @return 字节对象 + */ + virtual Buffer::Ptr readData(uint32_t size) { return nullptr;}; +}; + +/** + * string类型的content + */ +class HttpStringBody : public HttpBody{ +public: + typedef std::shared_ptr Ptr; + HttpStringBody(const string &str); + virtual ~HttpStringBody(){} + uint64_t remainSize() override ; + Buffer::Ptr readData(uint32_t size) override ; +private: + mutable string _str; + uint64_t _offset = 0; +}; + +/** + * 文件类型的content + */ +class HttpFileBody : public HttpBody{ +public: + typedef std::shared_ptr Ptr; + + /** + * 构造函数 + * @param fp 文件句柄,文件的偏移量必须为0 + * @param offset 相对文件头的偏移量 + * @param max_size 最大读取字节数,未判断是否大于文件真实大小 + */ + HttpFileBody(const std::shared_ptr &fp,uint64_t offset,uint64_t max_size); + HttpFileBody(const string &file_path); + ~HttpFileBody(){}; + + uint64_t remainSize() override ; + Buffer::Ptr readData(uint32_t size) override; +private: + void init(const std::shared_ptr &fp,uint64_t offset,uint64_t max_size); +private: + std::shared_ptr _fp; + uint64_t _max_size; + uint64_t _offset = 0; + std::shared_ptr _map_addr; + ResourcePool _pool; +}; + +class HttpArgs; + +/** + * http MultiForm 方式提交的http content + */ +class HttpMultiFormBody : public HttpBody { +public: + typedef std::shared_ptr Ptr; + + /** + * 构造函数 + * @param args http提交参数列表 + * @param filePath 文件路径 + * @param boundary boundary字符串 + */ + HttpMultiFormBody(const HttpArgs &args,const string &filePath,const string &boundary = "0xKhTmLbOuNdArY"); + virtual ~HttpMultiFormBody(){} + uint64_t remainSize() override ; + Buffer::Ptr readData(uint32_t size) override; +public: + static string multiFormBodyPrefix(const HttpArgs &args,const string &boundary,const string &fileName); + static string multiFormBodySuffix(const string &boundary); + static uint64_t fileSize(FILE *fp); + static string multiFormContentType(const string &boundary); +private: + string _bodyPrefix; + string _bodySuffix; + uint64_t _offset = 0; + uint64_t _totalSize; + HttpFileBody::Ptr _fileBody; +}; + +}//namespace mediakit + +#endif //ZLMEDIAKIT_FILEREADER_H diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index d9eafee0..c7483b33 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -242,8 +242,9 @@ void HttpClient::onRecvContent(const char *data, uint64_t len) { void HttpClient::onFlush() { _aliveTicker.resetTime(); + GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); while (_body && _body->remainSize() && !isSocketBusy()) { - auto buffer = _body->readData(); + auto buffer = _body->readData(sendBufSize); if (!buffer) { //数据发送结束或读取数据异常 break; diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 1f8287e5..59ab4629 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -39,7 +39,7 @@ #include "HttpCookie.h" #include "HttpChunkedSplitter.h" #include "strCoding.h" - +#include "HttpBody.h" using namespace std; using namespace toolkit; @@ -64,145 +64,6 @@ public: } }; -class HttpBody{ -public: - typedef std::shared_ptr Ptr; - HttpBody(){} - virtual ~HttpBody(){} - //剩余数据大小 - virtual uint64_t remainSize() = 0; - virtual Buffer::Ptr readData() = 0; -}; - -class HttpStringBody : public HttpBody{ -public: - typedef std::shared_ptr Ptr; - HttpStringBody(const string &str){ - _str = str; - } - virtual ~HttpStringBody(){} - - uint64_t remainSize() override { - return _str.size(); - } - Buffer::Ptr readData() override { - auto ret = std::make_shared(_str); - _str.clear(); - return ret; - } -private: - mutable string _str; -}; - - -class HttpMultiFormBody : public HttpBody { -public: - typedef std::shared_ptr Ptr; - template - HttpMultiFormBody(const MapType &args,const string &filePath,const string &boundary,uint32_t sliceSize = 4 * 1024){ - _fp = fopen(filePath.data(),"rb"); - if(!_fp){ - throw std::invalid_argument(StrPrinter << "打开文件失败:" << filePath << " " << get_uv_errmsg()); - } - auto fileName = filePath; - auto pos = filePath.rfind('/'); - if(pos != string::npos){ - fileName = filePath.substr(pos + 1); - } - _bodyPrefix = multiFormBodyPrefix(args,boundary,fileName); - _bodySuffix = multiFormBodySuffix(boundary); - _totalSize = _bodyPrefix.size() + _bodySuffix.size() + fileSize(_fp); - _sliceSize = sliceSize; - } - virtual ~HttpMultiFormBody(){ - fclose(_fp); - } - - uint64_t remainSize() override { - return _totalSize - _offset; - } - - Buffer::Ptr readData() override{ - if(_bodyPrefix.size()){ - auto ret = std::make_shared(_bodyPrefix); - _offset += _bodyPrefix.size(); - _bodyPrefix.clear(); - return ret; - } - - if(0 == feof(_fp)){ - auto ret = std::make_shared(_sliceSize); - //读文件 - int size; - do{ - size = fread(ret->data(),1,_sliceSize,_fp); - }while(-1 == size && UV_EINTR == get_uv_error(false)); - - if(size == -1){ - _offset = _totalSize; - WarnL << "fread failed:" << get_uv_errmsg(); - return nullptr; - } - _offset += size; - ret->setSize(size); - return ret; - } - - if(_bodySuffix.size()){ - auto ret = std::make_shared(_bodySuffix); - _offset = _totalSize; - _bodySuffix.clear(); - return ret; - } - - return nullptr; - } - -public: - template - static string multiFormBodyPrefix(const MapType &args,const string &boundary,const string &fileName){ - string MPboundary = string("--") + boundary; - _StrPrinter body; - for(auto &pr : args){ - body << MPboundary << "\r\n"; - body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n"; - body << pr.second << "\r\n"; - } - body << MPboundary << "\r\n"; - body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n"; - body << "Content-Type: application/octet-stream\r\n\r\n" ; - return body; - } - static string multiFormBodySuffix(const string &boundary){ - string MPboundary = string("--") + boundary; - string endMPboundary = MPboundary + "--"; - _StrPrinter body; - body << "\r\n" << endMPboundary; - return body; - } - - static uint64_t fileSize(FILE *fp) { - auto current = ftell(fp); - fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */ - auto end = ftell(fp); /* 得到文件大小 */ - fseek(fp,current,SEEK_SET); - return end - current; - } - - static string multiFormContentType(const string &boundary){ - return StrPrinter << "multipart/form-data; boundary=" << boundary; - } -private: - FILE *_fp; - string _bodyPrefix; - string _bodySuffix; - uint64_t _offset = 0; - uint64_t _totalSize; - uint32_t _sliceSize; -}; - - - class HttpClient : public TcpClient , public HttpRequestSplitter { public: diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index bd8686db..10d6b08d 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -59,49 +59,138 @@ string dateStr() { strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt)); return buf; } -static const char* -get_mime_type(const char* name) { - const char* dot; - dot = strrchr(name, '.'); - static HttpSession::KeyValue mapType; - static onceToken token([&]() { - mapType.emplace(".html","text/html"); - mapType.emplace(".htm","text/html"); - mapType.emplace(".mp4","video/mp4"); - mapType.emplace(".m3u8","application/vnd.apple.mpegurl"); - mapType.emplace(".jpg","image/jpeg"); - mapType.emplace(".jpeg","image/jpeg"); - mapType.emplace(".gif","image/gif"); - mapType.emplace(".png","image/png"); - mapType.emplace(".ico","image/x-icon"); - mapType.emplace(".css","text/css"); - mapType.emplace(".js","application/javascript"); - mapType.emplace(".au","audio/basic"); - mapType.emplace(".wav","audio/wav"); - mapType.emplace(".avi","video/x-msvideo"); - mapType.emplace(".mov","video/quicktime"); - mapType.emplace(".qt","video/quicktime"); - mapType.emplace(".mpeg","video/mpeg"); - mapType.emplace(".mpe","video/mpeg"); - mapType.emplace(".vrml","model/vrml"); - mapType.emplace(".wrl","model/vrml"); - mapType.emplace(".midi","audio/midi"); - mapType.emplace(".mid","audio/midi"); - mapType.emplace(".mp3","audio/mpeg"); - mapType.emplace(".ogg","application/ogg"); - mapType.emplace(".pac","application/x-ns-proxy-autoconfig"); - mapType.emplace(".flv","video/x-flv"); - }, nullptr); - if(!dot){ - return "text/plain"; - } - auto it = mapType.find(dot); - if (it == mapType.end()) { - return "text/plain"; - } - return it->second.data(); + +const char *HttpSession::get_mime_type(const char *name) { + const char *dot; + dot = strrchr(name, '.'); + static HttpSession::KeyValue mapType; + static onceToken token([&]() { + mapType.emplace(".html", "text/html"); + mapType.emplace(".htm", "text/html"); + mapType.emplace(".mp4", "video/mp4"); + mapType.emplace(".mkv", "video/x-matroska"); + mapType.emplace(".rmvb", "application/vnd.rn-realmedia"); + mapType.emplace(".rm", "application/vnd.rn-realmedia"); + mapType.emplace(".m3u8", "application/vnd.apple.mpegurl"); + mapType.emplace(".jpg", "image/jpeg"); + mapType.emplace(".jpeg", "image/jpeg"); + mapType.emplace(".gif", "image/gif"); + mapType.emplace(".png", "image/png"); + mapType.emplace(".ico", "image/x-icon"); + mapType.emplace(".css", "text/css"); + mapType.emplace(".js", "application/javascript"); + mapType.emplace(".au", "audio/basic"); + mapType.emplace(".wav", "audio/wav"); + mapType.emplace(".avi", "video/x-msvideo"); + mapType.emplace(".mov", "video/quicktime"); + mapType.emplace(".qt", "video/quicktime"); + mapType.emplace(".mpeg", "video/mpeg"); + mapType.emplace(".mpe", "video/mpeg"); + mapType.emplace(".vrml", "model/vrml"); + mapType.emplace(".wrl", "model/vrml"); + mapType.emplace(".midi", "audio/midi"); + mapType.emplace(".mid", "audio/midi"); + mapType.emplace(".mp3", "audio/mpeg"); + mapType.emplace(".ogg", "application/ogg"); + mapType.emplace(".pac", "application/x-ns-proxy-autoconfig"); + mapType.emplace(".flv", "video/x-flv"); + }, nullptr); + if (!dot) { + return "text/plain"; + } + auto it = mapType.find(dot); + if (it == mapType.end()) { + return "text/plain"; + } + return it->second.data(); } +//////////////////////////////////////////////////////////////////////////////////////////////////// + +void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const{ + if(_lambad){ + _lambad(codeOut,headerOut,body); + } +} + +void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const{ + this->operator()(codeOut,headerOut,std::make_shared(body)); +} + +HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda0 &lambda){ + _lambad = lambda; +} + +HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda1 &lambda){ + if(!lambda){ + _lambad = nullptr; + return; + } + _lambad = [lambda](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body){ + string str; + if(body && body->remainSize()){ + str = body->readData(body->remainSize())->toString(); + } + lambda(codeOut,headerOut,str); + }; +} + +void HttpResponseInvokerImp::responseFile(const StrCaseMap &requestHeader, + const StrCaseMap &responseHeader, + const string &filePath) const { + StrCaseMap &httpHeader = const_cast(responseHeader); + do { + std::shared_ptr fp(fopen(filePath.data(), "rb"), [](FILE *fp) { + if (fp) { + fclose(fp); + } + }); + if (!fp) { + //打开文件失败 + break; + } + + auto &strRange = const_cast(requestHeader)["Range"]; + int64_t iRangeStart = 0; + int64_t iRangeEnd = 0 ; + int64_t fileSize = HttpMultiFormBody::fileSize(fp.get()); + + const char *pcHttpResult = NULL; + if (strRange.size() == 0) { + //全部下载 + pcHttpResult = "200 OK"; + iRangeEnd = fileSize - 1; + } else { + //分节下载 + pcHttpResult = "206 Partial Content"; + iRangeStart = atoll(FindField(strRange.data(), "bytes=", "-").data()); + iRangeEnd = atoll(FindField(strRange.data(), "-", "\r\n").data()); + if (iRangeEnd == 0) { + iRangeEnd = fileSize - 1; + } + //分节下载返回Content-Range头 + httpHeader.emplace("Content-Range", StrPrinter << "bytes " << iRangeStart << "-" << iRangeEnd << "/" << fileSize << endl); + } + + //回复文件 + HttpBody::Ptr fileBody = std::make_shared(fp, iRangeStart, iRangeEnd - iRangeStart + 1); + (*this)(pcHttpResult, httpHeader, fileBody); + return; + }while(false); + + GET_CONFIG(string,notFound,Http::kNotFound); + GET_CONFIG(string,charSet,Http::kCharSet); + + auto strContentType = StrPrinter << "text/html; charset=" << charSet << endl; + httpHeader["Content-Type"] = strContentType; + (*this)("404 Not Found", httpHeader, notFound); +} + +HttpResponseInvokerImp::operator bool(){ + return _lambad.operator bool(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { TraceP(this); @@ -128,7 +217,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { string cmd = _parser.Method(); auto it = g_mapCmdIndex.find(cmd); if (it == g_mapCmdIndex.end()) { - sendResponse("403 Forbidden", makeHttpHeader(true), ""); + sendResponse("403 Forbidden", true); shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << cmd)); return 0; } @@ -169,21 +258,32 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { } void HttpSession::onError(const SockException& err) { + if(_is_flv_stream){ + //flv播放器 + WarnP(this) << "播放器(" + << _mediaInfo._vhost << "/" + << _mediaInfo._app << "/" + << _mediaInfo._streamid + << ")断开:" << err.what(); + + GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); + if(_ui64TotalBytes > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, + _mediaInfo, + _ui64TotalBytes, + _ticker.createdTime()/1000, + true, + *this); + } + return; + } + + //http客户端 if(_ticker.createdTime() < 10 * 1000){ TraceP(this) << err.what(); }else{ WarnP(this) << err.what(); } - - GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); - if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, - _mediaInfo, - _ui64TotalBytes, - _ticker.createdTime()/1000, - true, - *this); - } } void HttpSession::onManager() { @@ -202,7 +302,7 @@ bool HttpSession::checkWebSocket(){ } auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); - KeyValue headerOut = makeHttpHeader(); + KeyValue headerOut; headerOut["Upgrade"] = "websocket"; headerOut["Connection"] = "Upgrade"; headerOut["Sec-WebSocket-Accept"] = Sec_WebSocket_Accept; @@ -212,7 +312,7 @@ bool HttpSession::checkWebSocket(){ auto res_cb = [this,headerOut](){ _flv_over_websocket = true; - sendResponse("101 Switching Protocols",headerOut,""); + sendResponse("101 Switching Protocols",false,nullptr,headerOut,nullptr,false); }; //判断是否为websocket-flv @@ -223,11 +323,11 @@ bool HttpSession::checkWebSocket(){ //如果checkLiveFlvStream返回false,则代表不是websocket-flv,而是普通的websocket连接 if(!onWebSocketConnect(_parser)){ - sendResponse("501 Not Implemented",headerOut,""); + sendResponse("501 Not Implemented",true, nullptr, headerOut); shutdown(SockException(Err_shutdown,"WebSocket server not implemented")); return true; } - sendResponse("101 Switching Protocols",headerOut,""); + sendResponse("101 Switching Protocols",false, nullptr,headerOut); return true; } //http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 @@ -274,14 +374,14 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ auto onRes = [this,rtmp_src,cb](const string &err){ bool authSuccess = err.empty(); if(!authSuccess){ - sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err); + sendResponse("401 Unauthorized", true, nullptr, KeyValue(), std::make_shared(err)); shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err)); return ; } if(!cb) { //找到rtmp源,发送http头,负载后续发送 - sendResponse("200 OK", makeHttpHeader(false, 0, get_mime_type(".flv")), ""); + sendResponse("200 OK", false, "video/x-flv",KeyValue(),nullptr,false); }else{ cb(); } @@ -291,6 +391,7 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ try{ start(getPoller(),rtmp_src); + _is_flv_stream = true; }catch (std::exception &ex){ //该rtmp源不存在 shutdown(SockException(Err_shutdown,"rtmp mediasource released")); @@ -375,10 +476,7 @@ static bool checkHls(BroadcastHttpAccessArgs){ return NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,args_copy,mediaAuthInvoker,sender); } -void HttpSession::canAccessPath(const string &path_in,bool is_dir,const function &callback_in){ - auto path = path_in; - replace(const_cast(path),"//","/"); - +void HttpSession::canAccessPath(const string &path,bool is_dir,const function &callback_in){ auto callback = [callback_in,this](const string &errMsg,const HttpServerCookie::Ptr &cookie){ try { callback_in(errMsg,cookie); @@ -488,16 +586,15 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) { return; } - //先看看该http事件是否被拦截 if(emitHttpEvent(false)){ + //拦截http api事件 return; } - //再看看是否为http-flv直播请求 - if(checkLiveFlvStream()){ - //若是,return! - return; - } + if(checkLiveFlvStream()){ + //拦截http-flv播放器 + return; + } //事件未被拦截,则认为是http下载请求 auto fullUrl = string(HTTP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl(); @@ -507,7 +604,7 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) { GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount); GET_CONFIG(bool,enableVhost,General::kEnableVhost); GET_CONFIG(string,rootPath,Http::kRootPath); - string strFile = enableVhost ? rootPath + "/" + _mediaInfo._vhost + _parser.Url() :rootPath + _parser.Url(); + auto strFile = File::absolutePath(enableVhost ? _mediaInfo._vhost + _parser.Url() : _parser.Url(),rootPath); bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt); do{ @@ -533,139 +630,38 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) { if(!errMsg.empty()){ const_cast(strMeun) = errMsg; } - auto headerOut = makeHttpHeader(bClose,strMeun.size()); + KeyValue headerOut; if(cookie){ headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); } - sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" , headerOut, strMeun); + sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" ,bClose, "text/html", headerOut, std::make_shared(strMeun)); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access folder"); }); return; } }while(0); - //访问的是文件 - struct stat tFileStat; - if (0 != stat(strFile.data(), &tFileStat)) { - //文件不存在 - sendNotFound(bClose); - throw SockException(bClose ? Err_shutdown : Err_success,"close connection after send 404 not found on file"); - } - //文件智能指针,防止退出时未关闭 - std::shared_ptr pFilePtr(fopen(strFile.data(), "rb"), [](FILE *pFile) { - if(pFile){ - fclose(pFile); - } - }); - - if (!pFilePtr) { - //打开文件失败 - sendNotFound(bClose); - throw SockException(bClose ? Err_shutdown : Err_success,"close connection after send 404 not found on open file failed"); - } - auto parser = _parser; //判断是否有权限访问该文件 - canAccessPath(_parser.Url(),false,[this,parser,tFileStat,pFilePtr,bClose,strFile](const string &errMsg,const HttpServerCookie::Ptr &cookie){ + canAccessPath(_parser.Url(),false,[this,parser,bClose,strFile](const string &errMsg,const HttpServerCookie::Ptr &cookie){ if(!errMsg.empty()){ - auto headerOut = makeHttpHeader(bClose,errMsg.size()); + KeyValue headerOut; if(cookie){ headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); } - sendResponse("401 Unauthorized" , headerOut, errMsg); + sendResponse("401 Unauthorized" ,bClose, nullptr, headerOut, std::make_shared(errMsg)); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file failed"); } - //判断是不是分节下载 - auto &strRange = parser["Range"]; - int64_t iRangeStart = 0, iRangeEnd = 0; - iRangeStart = atoll(FindField(strRange.data(), "bytes=", "-").data()); - iRangeEnd = atoll(FindField(strRange.data(), "-", "\r\n").data()); - if (iRangeEnd == 0) { - iRangeEnd = tFileStat.st_size - 1; - } - const char *pcHttpResult = NULL; - if (strRange.size() == 0) { - //全部下载 - pcHttpResult = "200 OK"; - } else { - //分节下载 - pcHttpResult = "206 Partial Content"; - fseek(pFilePtr.get(), iRangeStart, SEEK_SET); - } - auto httpHeader = makeHttpHeader(bClose, iRangeEnd - iRangeStart + 1, get_mime_type(strFile.data())); - if (strRange.size() != 0) { - //分节下载返回Content-Range头 - httpHeader.emplace("Content-Range",StrPrinter<<"bytes " << iRangeStart << "-" << iRangeEnd << "/" << tFileStat.st_size<< endl); - } - + KeyValue httpHeader; if(cookie){ httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); } - //先回复HTTP头部分 - sendResponse(pcHttpResult,httpHeader,""); - - if (iRangeEnd - iRangeStart < 0) { - //文件是空的! - throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file"); - } - //回复Content部分 - std::shared_ptr piLeft(new int64_t(iRangeEnd - iRangeStart + 1)); - GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); - - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto onFlush = [pFilePtr,bClose,weakSelf,piLeft]() { - TimeTicker(); - auto strongSelf = weakSelf.lock(); - while(*piLeft && strongSelf){ - //更新超时定时器 - strongSelf->_ticker.resetTime(); - //从循环池获取一个内存片 - auto sendBuf = strongSelf->obtainBuffer(); - sendBuf->setCapacity(sendBufSize); - //本次需要读取文件字节数 - int64_t iReq = MIN(sendBufSize,*piLeft); - //读文件 - int iRead; - do{ - iRead = fread(sendBuf->data(), 1, iReq, pFilePtr.get()); - }while(-1 == iRead && UV_EINTR == get_uv_error(false)); - //文件剩余字节数 - *piLeft -= iRead; - - if (iRead < iReq || !*piLeft) { - //文件读完 - if(iRead>0) { - sendBuf->setSize(iRead); - strongSelf->send(sendBuf); - } - if(bClose) { - strongSelf->shutdown(SockException(Err_shutdown,"read file eof")); - } - return false; - } - //文件还未读完 - sendBuf->setSize(iRead); - int iSent = strongSelf->send(sendBuf); - if(iSent == -1) { - //套机制销毁 - return false; - } - if(strongSelf->isSocketBusy()){ - //套接字忙,那么停止继续写 - return true; - } - //继续写套接字 - } - return false; + HttpResponseInvoker invoker = [this,bClose,&strFile](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){ + sendResponse(codeOut.data(), bClose, get_mime_type(strFile.data()), headerOut, body); }; - - //文件下载提升发送性能 - setSocketFlags(); - - onFlush(); - _sock->setOnFlush(onFlush); + invoker.responseFile(parser.getValues(),httpHeader,strFile); }); } @@ -768,43 +764,137 @@ bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet) return true; } -void HttpSession::sendResponse(const char* pcStatus, const KeyValue& header, const string& strContent) { - _StrPrinter printer; - printer << "HTTP/1.1 " << pcStatus << "\r\n"; - for (auto &pr : header) { - printer << pr.first << ": " << pr.second << "\r\n"; - } - printer << "\r\n" << strContent; - auto strSend = printer << endl; - send(strSend); - _ticker.resetTime(); -} -HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentSize,const char* pcContentType) { - KeyValue headerOut; +void HttpSession::sendResponse(const char *pcStatus, + bool bClose, + const char *pcContentType, + const HttpSession::KeyValue &header, + const HttpBody::Ptr &body, + bool set_content_len ){ + GET_CONFIG(string,charSet,Http::kCharSet); GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond); GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount); - headerOut.emplace("Date", dateStr()); - headerOut.emplace("Server", SERVER_NAME); - headerOut.emplace("Connection", bClose ? "close" : "keep-alive"); - if(!bClose){ - headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl); - } - if(pcContentType){ - auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl; - headerOut.emplace("Content-Type",strContentType); - } - if(iContentSize > 0){ - headerOut.emplace("Content-Length", StrPrinter<remainSize()) { + //有body,获取body大小 + size = body->remainSize(); + if (size >= INT64_MAX) { + //不固定长度的body,那么不设置content-length字段 + size = -1; + } + } + + if(!set_content_len || size == -1){ + //如果是不定长度body,或者不设置conten-length, + //那么一定是Keep-Alive类型 + bClose = false; + } + + HttpSession::KeyValue &headerOut = const_cast(header); + headerOut.emplace("Date", dateStr()); + headerOut.emplace("Server", SERVER_NAME); + headerOut.emplace("Connection", bClose ? "close" : "keep-alive"); + if(!bClose){ + headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl); + } if(!_origin.empty()){ + //设置跨域 headerOut.emplace("Access-Control-Allow-Origin",_origin); headerOut.emplace("Access-Control-Allow-Credentials", "true"); } - return headerOut; + + if(set_content_len && size >= 0){ + //文件长度为定值或者,且不是http-flv强制设置Content-Length + headerOut["Content-Length"] = StrPrinter << size << endl; + } + + if(size && !pcContentType){ + //有body时,设置缺省类型 + pcContentType = "text/plain"; + } + + if(size && pcContentType){ + //有body时,设置文件类型 + auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl; + headerOut.emplace("Content-Type",strContentType); + } + + //发送http头 + _StrPrinter printer; + printer << "HTTP/1.1 " << pcStatus << "\r\n"; + for (auto &pr : header) { + printer << pr.first << ": " << pr.second << "\r\n"; + } + + printer << "\r\n"; + send(printer << endl); + _ticker.resetTime(); + + if(!size){ + //没有body + if(bClose){ + shutdown(SockException(Err_shutdown,"close connection after send http header completed")); + } + return; + } + + //发送http body + GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + + auto onFlush = [body,bClose,weakSelf]() { + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + //本对象已经销毁 + return false; + } + while(true){ + //更新超时计时器 + strongSelf->_ticker.resetTime(); + //读取文件 + auto sendBuf = body->readData(sendBufSize); + if (!sendBuf) { + //文件读完 + if(strongSelf->isSocketBusy() && bClose){ + //套接字忙,我们等待触发下一次onFlush事件 + //待所有数据flush到socket fd再移除onFlush事件监听 + //标记文件读写完毕 + return true; + } + //文件全部flush到socket fd,可以直接关闭socket了 + break; + } + + //文件还未读完 + if(strongSelf->send(sendBuf) == -1) { + //socket已经销毁,不再监听onFlush事件 + return false; + } + if(strongSelf->isSocketBusy()){ + //socket忙,那么停止继续写,等待下一次onFlush事件,然后再读文件写socket + return true; + } + //socket还可写,继续写socket + } + + if(bClose) { + //最后一次flush事件,文件也发送完毕了,可以关闭socket了 + strongSelf->shutdown(SockException(Err_shutdown,"close connection after send http body completed")); + } + //不再监听onFlush事件 + return false; + }; + + if(body->remainSize() > sendBufSize){ + //文件下载提升发送性能 + setSocketFlags(); + } + onFlush(); + _sock->setOnFlush(onFlush); } string HttpSession::urlDecode(const string &str){ @@ -833,20 +923,25 @@ bool HttpSession::emitHttpEvent(bool doInvoke){ bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt); /////////////////////异步回复Invoker/////////////////////////////// weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const string &contentOut){ + HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){ auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; } - strongSelf->async([weakSelf,bClose,codeOut,headerOut,contentOut]() { + strongSelf->async([weakSelf,bClose,codeOut,headerOut,body]() { auto strongSelf = weakSelf.lock(); if(!strongSelf) { + //本对象已经销毁 return; } - strongSelf->responseDelay(bClose,codeOut,headerOut,contentOut); - if(bClose){ - strongSelf->shutdown(SockException(Err_shutdown,"Connection: close")); - } + + if(codeOut.empty()){ + //回调提供的参数异常 + strongSelf->sendNotFound(bClose); + return; + } + + strongSelf->sendResponse(codeOut.data(), bClose, nullptr, headerOut, body); }); }; ///////////////////广播HTTP事件/////////////////////////// @@ -854,7 +949,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke){ NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this); if(!consumed && doInvoke){ //该事件无人消费,所以返回404 - invoker("404 Not Found",KeyValue(),""); + invoker("404 Not Found",KeyValue(), HttpBody::Ptr()); if(bClose){ //close类型,回复完毕,关闭连接 shutdown(SockException(Err_shutdown,"404 Not Found")); @@ -935,25 +1030,10 @@ void HttpSession::Handle_Req_POST(int64_t &content_len) { } //有后续content数据要处理,暂时不关闭连接 } -void HttpSession::responseDelay(bool bClose, - const string &codeOut, - const KeyValue &headerOut, - const string &contentOut){ - if(codeOut.empty()){ - sendNotFound(bClose); - return; - } - auto headerOther = makeHttpHeader(bClose,contentOut.size(),"text/plain"); - for (auto &pr : headerOther){ - //添加默认http头,默认http头不能覆盖用户自定义的头 - const_cast(headerOut).emplace(pr.first,pr.second); - } - sendResponse(codeOut.data(), headerOut, contentOut); -} void HttpSession::sendNotFound(bool bClose) { GET_CONFIG(string,notFound,Http::kNotFound); - sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); + sendResponse("404 Not Found", bClose,"text/html",KeyValue(),std::make_shared(notFound)); } void HttpSession::setSocketFlags(){ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index e6199668..98f234c3 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -36,21 +36,44 @@ #include "HttpRequestSplitter.h" #include "WebSocketSplitter.h" #include "HttpCookieManager.h" +#include "HttpBody.h" +#include "Util/function_traits.h" using namespace std; using namespace toolkit; namespace mediakit { +/** + * 该类实现与老代码的兼容适配 + */ +class HttpResponseInvokerImp{ +public: + typedef std::function HttpResponseInvokerLambda0; + typedef std::function HttpResponseInvokerLambda1; + + HttpResponseInvokerImp(){} + ~HttpResponseInvokerImp(){} + template + HttpResponseInvokerImp(const C &c):HttpResponseInvokerImp(typename function_traits::stl_function_type(c)) {} + HttpResponseInvokerImp(const HttpResponseInvokerLambda0 &lambda); + HttpResponseInvokerImp(const HttpResponseInvokerLambda1 &lambda); + + void operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const; + void operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const; + void responseFile(const StrCaseMap &requestHeader,const StrCaseMap &responseHeader,const string &filePath) const; + operator bool(); +private: + HttpResponseInvokerLambda0 _lambad; +}; + class HttpSession: public TcpSession, public FlvMuxer, public HttpRequestSplitter, public WebSocketSplitter { public: typedef StrCaseMap KeyValue; - typedef std::function HttpResponseInvoker; + typedef HttpResponseInvokerImp HttpResponseInvoker; /** * @param errMsg 如果为空,则代表鉴权通过,否则为错误提示 @@ -67,6 +90,7 @@ public: virtual void onManager() override; static string urlDecode(const string &str); + static const char* get_mime_type(const char* name); protected: //FlvMuxer override void onWrite(const Buffer::Ptr &data) override ; @@ -118,13 +142,9 @@ private: bool emitHttpEvent(bool doInvoke); void urlDecode(Parser &parser); void sendNotFound(bool bClose); - void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent); - KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html"); - void responseDelay(bool bClose, - const string &codeOut, - const KeyValue &headerOut, - const string &contentOut); - + void sendResponse(const char *pcStatus, bool bClose, const char *pcContentType = nullptr, + const HttpSession::KeyValue &header = HttpSession::KeyValue(), + const HttpBody::Ptr &body = nullptr,bool set_content_len = true); /** * 判断http客户端是否有权限访问文件的逻辑步骤 * @@ -160,6 +180,7 @@ private: //处理content数据的callback function _contentCallBack; bool _flv_over_websocket = false; + bool _is_flv_stream = false; }; diff --git a/src/MediaFile/HlsMaker.cpp b/src/MediaFile/HlsMaker.cpp index 36f88c39..959fcc36 100644 --- a/src/MediaFile/HlsMaker.cpp +++ b/src/MediaFile/HlsMaker.cpp @@ -42,6 +42,7 @@ HlsMaker::~HlsMaker() { void HlsMaker::makeIndexFile(bool eof) { char file_content[1024]; int maxSegmentDuration = 0; + for (auto &tp : _seg_dur_list) { int dur = std::get<0>(tp); if (dur > maxSegmentDuration) { @@ -57,7 +58,7 @@ void HlsMaker::makeIndexFile(bool eof) { "#EXT-X-TARGETDURATION:%u\n" "#EXT-X-MEDIA-SEQUENCE:%llu\n", (maxSegmentDuration + 999) / 1000, - _file_index); + _seg_number ? _file_index : 0); m3u8.assign(file_content); @@ -75,11 +76,18 @@ void HlsMaker::makeIndexFile(bool eof) { void HlsMaker::inputData(void *data, uint32_t len, uint32_t timestamp) { - addNewFile(timestamp); - onWriteFile((char *) data, len); + //分片数据中断结束 + if (data && len) { + addNewSegment(timestamp); + onWriteSegment((char *) data, len); + //记录上次写入数据时间 + _ticker_last_data.resetTime(); + } else { + flushLastSegment(true); + } } -void HlsMaker::delOldFile() { +void HlsMaker::delOldSegment() { if(_seg_number == 0){ //如果设置为保留0个切片,则认为是保存为点播 return; @@ -91,22 +99,38 @@ void HlsMaker::delOldFile() { //但是实际保存的切片个数比m3u8所述多两个,这样做的目的是防止播放器在切片删除前能下载完毕 if (_file_index >= _seg_number + 4) { - onDelFile(_file_index - _seg_number - 4); + onDelSegment(_file_index - _seg_number - 4); } } -void HlsMaker::addNewFile(uint32_t) { - int stampInc = _ticker.elapsedTime(); - if (stampInc >= _seg_duration * 1000) { - _ticker.resetTime(); - auto file_name = onOpenFile(_file_index); - if (_file_index++ > 0) { - _seg_dur_list.push_back(std::make_tuple(stampInc, _last_file_name)); - delOldFile(); - makeIndexFile(); - } - _last_file_name = file_name; +void HlsMaker::addNewSegment(uint32_t) { + if(!_last_file_name.empty() && _ticker.elapsedTime() < _seg_duration * 1000){ + //存在上个切片,并且未到分片时间 + return; } + + //关闭并保存上一个切片 + flushLastSegment(); + //新增切片 + _last_file_name = onOpenSegment(_file_index++); + //重置切片计时器 + _ticker.resetTime(); +} + +void HlsMaker::flushLastSegment(bool eof){ + if(_last_file_name.empty()){ + //不存在上个切片 + return; + } + //文件创建到最后一次数据写入的时间即为切片长度 + auto seg_dur = _ticker.elapsedTime() - _ticker_last_data.elapsedTime(); + if(seg_dur <= 0){ + seg_dur = 100; + } + _seg_dur_list.push_back(std::make_tuple(seg_dur, _last_file_name)); + delOldSegment(); + makeIndexFile(eof); + _last_file_name.clear(); } }//namespace mediakit \ No newline at end of file diff --git a/src/MediaFile/HlsMaker.h b/src/MediaFile/HlsMaker.h index 0a1e9617..49635a60 100644 --- a/src/MediaFile/HlsMaker.h +++ b/src/MediaFile/HlsMaker.h @@ -60,20 +60,20 @@ protected: * @param index * @return */ - virtual string onOpenFile(int index) = 0; + virtual string onOpenSegment(int index) = 0; /** * 删除ts切片文件回调 * @param index */ - virtual void onDelFile(int index) = 0; + virtual void onDelSegment(int index) = 0; /** * 写ts切片文件回调 * @param data * @param len */ - virtual void onWriteFile(const char *data, int len) = 0; + virtual void onWriteSegment(const char *data, int len) = 0; /** * 写m3u8文件回调 @@ -82,19 +82,34 @@ protected: */ virtual void onWriteHls(const char *data, int len) = 0; + /** + * 关闭上个ts切片并且写入m3u8索引 + * @param eof + */ + void flushLastSegment(bool eof = false); +private: /** * 生成m3u8文件 * @param eof true代表点播 */ void makeIndexFile(bool eof = false); - void delOldFile(); - void addNewFile(uint32_t timestamp); -protected: - uint32_t _seg_number = 0; + + /** + * 删除旧的ts切片 + */ + void delOldSegment(); + + /** + * 添加新的ts切片 + * @param timestamp + */ + void addNewSegment(uint32_t timestamp); private: + uint32_t _seg_number = 0; float _seg_duration = 0; uint64_t _file_index = 0; Ticker _ticker; + Ticker _ticker_last_data; string _last_file_name; std::deque > _seg_dur_list; }; diff --git a/src/MediaFile/HlsMakerImp.cpp b/src/MediaFile/HlsMakerImp.cpp index 77763e30..8fa9d4fd 100644 --- a/src/MediaFile/HlsMakerImp.cpp +++ b/src/MediaFile/HlsMakerImp.cpp @@ -40,6 +40,7 @@ HlsMakerImp::HlsMakerImp(const string &m3u8_file, _path_hls = m3u8_file; _params = params; _buf_size = bufSize; + _is_vod = seg_number == 0; _file_buf.reset(new char[bufSize],[](char *ptr){ delete[] ptr; }); @@ -47,14 +48,14 @@ HlsMakerImp::HlsMakerImp(const string &m3u8_file, HlsMakerImp::~HlsMakerImp() { //录制完了 - makeIndexFile(true); - if(_seg_number){ + flushLastSegment(true); + if(!_is_vod){ //hls直播才删除文件 File::delete_file(_path_prefix.data()); } } -string HlsMakerImp::onOpenFile(int index) { +string HlsMakerImp::onOpenSegment(int index) { auto full_path = fullPath(index); _file = makeFile(full_path, true); if(!_file){ @@ -67,12 +68,12 @@ string HlsMakerImp::onOpenFile(int index) { return StrPrinter << index << ".ts" << "?" << _params; } -void HlsMakerImp::onDelFile(int index) { +void HlsMakerImp::onDelSegment(int index) { //WarnL << index; File::delete_file(fullPath(index).data()); } -void HlsMakerImp::onWriteFile(const char *data, int len) { +void HlsMakerImp::onWriteSegment(const char *data, int len) { if (_file) { fwrite(data, len, 1, _file.get()); } diff --git a/src/MediaFile/HlsMakerImp.h b/src/MediaFile/HlsMakerImp.h index d04de216..55637fd8 100644 --- a/src/MediaFile/HlsMakerImp.h +++ b/src/MediaFile/HlsMakerImp.h @@ -44,9 +44,9 @@ public: uint32_t seg_number = 3); virtual ~HlsMakerImp(); protected: - string onOpenFile(int index) override ; - void onDelFile(int index) override; - void onWriteFile(const char *data, int len) override; + string onOpenSegment(int index) override ; + void onDelSegment(int index) override; + void onWriteSegment(const char *data, int len) override; void onWriteHls(const char *data, int len) override; private: string fullPath(int index); @@ -58,6 +58,8 @@ private: string _path_hls; string _params; int _buf_size; + //是否为点播 + bool _is_vod; }; }//namespace mediakit diff --git a/src/MediaFile/HlsRecorder.h b/src/MediaFile/HlsRecorder.h index 7b876ae0..ca20c5f0 100644 --- a/src/MediaFile/HlsRecorder.h +++ b/src/MediaFile/HlsRecorder.h @@ -32,7 +32,7 @@ namespace mediakit { -class HlsRecorder : public HlsMakerImp , public TsMuxer { +class HlsRecorder : public HlsMakerImp, public TsMuxer { public: template HlsRecorder(ArgsType &&...args):HlsMakerImp(std::forward(args)...){} diff --git a/src/MediaFile/MP4Muxer.cpp b/src/MediaFile/MP4Muxer.cpp index a8a38ab4..c1b5bad3 100644 --- a/src/MediaFile/MP4Muxer.cpp +++ b/src/MediaFile/MP4Muxer.cpp @@ -33,11 +33,11 @@ namespace mediakit{ #if defined(_WIN32) || defined(_WIN64) -#define fseek64 _fseeki64 -#define ftell64 _ftelli64 + #define fseek64 _fseeki64 + #define ftell64 _ftelli64 #else -#define fseek64 fseek -#define ftell64 ftell + #define fseek64 fseek + #define ftell64 ftell #endif void MP4MuxerBase::init(int flags) { @@ -236,7 +236,9 @@ MP4MuxerFile::MP4MuxerFile(const char *file) { fclose(fp); }); - init(MOV_FLAG_FASTSTART); + GET_CONFIG(bool, mp4FastStart, Record::kFastStart); + + init(mp4FastStart ? MOV_FLAG_FASTSTART : 0); } MP4MuxerFile::~MP4MuxerFile() { @@ -254,15 +256,6 @@ int MP4MuxerFile::onWrite(const void *data, uint64_t bytes) { return bytes == fwrite(data, 1, bytes, _file.get()) ? 0 : ferror(_file.get()); } - -#if defined(_WIN32) || defined(_WIN64) - #define fseek64 _fseeki64 - #define ftell64 _ftelli64 -#else - #define fseek64 fseek - #define ftell64 ftell -#endif - int MP4MuxerFile::onSeek(uint64_t offset) { return fseek64(_file.get(), offset, SEEK_SET); } diff --git a/src/MediaFile/MP4Recorder.cpp b/src/MediaFile/MP4Recorder.cpp index 658243c9..0f36561c 100644 --- a/src/MediaFile/MP4Recorder.cpp +++ b/src/MediaFile/MP4Recorder.cpp @@ -153,6 +153,14 @@ void MP4Recorder::onTrackReady(const Track::Ptr & track){ } } +void MP4Recorder::resetTracks() { + closeFile(); + _tracks.clear(); + _haveVideo = false; + _createFileTicker.resetTime(); + MediaSink::resetTracks(); +} + } /* namespace mediakit */ diff --git a/src/MediaFile/MP4Recorder.h b/src/MediaFile/MP4Recorder.h index ca97cb3f..0c619461 100644 --- a/src/MediaFile/MP4Recorder.h +++ b/src/MediaFile/MP4Recorder.h @@ -63,6 +63,11 @@ public: const string &strApp, const string &strStreamId); virtual ~MP4Recorder(); + + /** + * 重置所有Track + */ + void resetTracks() override; private: /** * 某Track输出frame,在onAllTrackReady触发后才会调用此方法 diff --git a/src/MediaFile/MediaReader.cpp b/src/MediaFile/MediaReader.cpp index 22683b4e..eae430bd 100644 --- a/src/MediaFile/MediaReader.cpp +++ b/src/MediaFile/MediaReader.cpp @@ -44,10 +44,11 @@ MediaReader::MediaReader(const string &strVhost,const string &strApp, const stri GET_CONFIG(string,recordPath,Record::kFilePath); GET_CONFIG(bool,enableVhost,General::kEnableVhost); if(enableVhost){ - strFileName = recordPath + "/" + strVhost + "/" + strApp + "/" + strId; + strFileName = strVhost + "/" + strApp + "/" + strId; }else{ - strFileName = recordPath + "/" + strApp + "/" + strId; + strFileName = strApp + "/" + strId; } + strFileName = File::absolutePath(strFileName,recordPath); } _hMP4File = MP4Read(strFileName.data()); diff --git a/src/MediaFile/MediaRecorder.cpp b/src/MediaFile/MediaRecorder.cpp index 24309dfa..e422b6e5 100644 --- a/src/MediaFile/MediaRecorder.cpp +++ b/src/MediaFile/MediaRecorder.cpp @@ -56,13 +56,15 @@ MediaRecorder::MediaRecorder(const string &strVhost_tmp, #if defined(ENABLE_HLS) if(enableHls) { string m3u8FilePath; + string params; if(enableVhost){ - m3u8FilePath = hlsPath + "/" + strVhost + "/" + strApp + "/" + strId + "/hls.m3u8"; - _hlsRecorder.reset(new HlsRecorder(m3u8FilePath,string(VHOST_KEY) + "=" + strVhost ,hlsBufSize, hlsDuration, hlsNum)); + m3u8FilePath = strVhost + "/" + strApp + "/" + strId + "/hls.m3u8"; + params = string(VHOST_KEY) + "=" + strVhost; }else{ - m3u8FilePath = hlsPath + "/" + strApp + "/" + strId + "/hls.m3u8"; - _hlsRecorder.reset(new HlsRecorder(m3u8FilePath,"",hlsBufSize, hlsDuration, hlsNum)); + m3u8FilePath = strApp + "/" + strId + "/hls.m3u8"; } + m3u8FilePath = File::absolutePath(m3u8FilePath,hlsPath); + _hlsRecorder.reset(new HlsRecorder(m3u8FilePath,params,hlsBufSize, hlsDuration, hlsNum)); } #endif //defined(ENABLE_HLS) @@ -73,10 +75,11 @@ MediaRecorder::MediaRecorder(const string &strVhost_tmp, if(enableMp4){ string mp4FilePath; if(enableVhost){ - mp4FilePath = recordPath + "/" + strVhost + "/" + recordAppName + "/" + strApp + "/" + strId + "/"; + mp4FilePath = strVhost + "/" + recordAppName + "/" + strApp + "/" + strId + "/"; } else { - mp4FilePath = recordPath + "/" + recordAppName + "/" + strApp + "/" + strId + "/"; + mp4FilePath = recordAppName + "/" + strApp + "/" + strId + "/"; } + mp4FilePath = File::absolutePath(mp4FilePath,recordPath); _mp4Recorder.reset(new MP4Recorder(mp4FilePath,strVhost,strApp,strId)); } #endif //defined(ENABLE_MP4RECORD) @@ -113,4 +116,18 @@ void MediaRecorder::addTrack(const Track::Ptr &track) { #endif //defined(ENABLE_MP4RECORD) } +void MediaRecorder::resetTracks() { +#if defined(ENABLE_HLS) + if (_hlsRecorder) { + _hlsRecorder->resetTracks(); + } +#endif //defined(ENABLE_HLS) + +#if defined(ENABLE_MP4RECORD) + if (_mp4Recorder) { + _mp4Recorder->resetTracks(); + } +#endif //defined(ENABLE_MP4RECORD) +} + } /* namespace mediakit */ diff --git a/src/MediaFile/MediaRecorder.h b/src/MediaFile/MediaRecorder.h index 70ff0093..c36f8613 100644 --- a/src/MediaFile/MediaRecorder.h +++ b/src/MediaFile/MediaRecorder.h @@ -51,14 +51,19 @@ public: * 输入frame * @param frame */ - void inputFrame(const Frame::Ptr &frame) override ; + void inputFrame(const Frame::Ptr &frame) override; /** * 添加track,内部会调用Track的clone方法 * 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系 * @param track */ - void addTrack(const Track::Ptr & track) override; + void addTrack(const Track::Ptr &track) override; + + /** + * 重置track + */ + void resetTracks() override; private: #if defined(ENABLE_HLS) std::shared_ptr _hlsRecorder; diff --git a/src/MediaFile/Stamp.cpp b/src/MediaFile/Stamp.cpp index cf89e6e4..ab77a1bf 100644 --- a/src/MediaFile/Stamp.cpp +++ b/src/MediaFile/Stamp.cpp @@ -28,48 +28,48 @@ namespace mediakit { -void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out) { +int64_t DeltaStamp::deltaStamp(int64_t stamp) { + if(!_last_stamp){ + //第一次计算时间戳增量,时间戳增量为0 + _last_stamp = stamp; + return 0; + } + + int64_t ret = stamp - _last_stamp; + if(ret >= 0){ + //时间戳增量为正,返回之 + _last_stamp = stamp; + return ret; + } + + //时间戳增量为负,说明时间戳回环了或回退了 + _last_stamp = stamp; + return _playback ? ret : 0; +} + +void DeltaStamp::setPlayBack(bool playback) { + _playback = playback; +} + +void Stamp::revise(int64_t dts, int64_t pts, int64_t &dts_out, int64_t &pts_out,bool modifyStamp) { if(!pts){ //没有播放时间戳,使其赋值为解码时间戳 pts = dts; } + //pts和dts的差值 int pts_dts_diff = pts - dts; - if(_first){ - //记录第一次时间戳,后面好计算时间戳增量 - _start_dts = dts; - _first = false; - _ticker.resetTime(); - } - if (!dts) { - //没有解码时间戳,我们生成解码时间戳 - dts = _ticker.elapsedTime(); - } - //相对时间戳 - dts_out = dts - _start_dts; - if(dts_out < _dts_inc && !_playback){ - //本次相对时间戳竟然小于上次? - if(dts_out < 0 || _dts_inc - dts_out > 0xFFFF){ - //时间戳回环,保证下次相对时间戳与本次相对合理增长 - _start_dts = dts - _dts_inc; - //本次时间戳强制等于上次时间戳 - dts_out = _dts_inc; - }else{ - //时间戳变小了?,那么取上次时间戳 - dts_out = _dts_inc; - } - } - - //保留这次相对时间戳,以便下次对比是否回环或乱序 - _dts_inc = dts_out; + _relativeStamp += deltaStamp(modifyStamp ? _ticker.elapsedTime() : dts); + dts_out = _relativeStamp; //////////////以下是播放时间戳的计算////////////////// if(pts_dts_diff > 200 || pts_dts_diff < -200){ //如果差值大于200毫秒,则认为由于回环导致时间戳错乱了 pts_dts_diff = 0; } + pts_out = dts_out + pts_dts_diff; if(pts_out < 0){ //时间戳不能小于0 @@ -77,8 +77,13 @@ void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_ou } } -void Stamp::setPlayBack(bool playback) { - _playback = playback; +void Stamp::setRelativeStamp(int64_t relativeStamp) { + _relativeStamp = relativeStamp; } +int64_t Stamp::getRelativeStamp() const { + return _relativeStamp; +} + + }//namespace mediakit \ No newline at end of file diff --git a/src/MediaFile/Stamp.h b/src/MediaFile/Stamp.h index 8616f8a7..82458fda 100644 --- a/src/MediaFile/Stamp.h +++ b/src/MediaFile/Stamp.h @@ -33,18 +33,33 @@ using namespace toolkit; namespace mediakit { -//该类解决时间戳回环、回退问题 -//计算相对时间戳或者产生平滑时间戳 -class Stamp { +class DeltaStamp{ public: - Stamp() = default; - ~Stamp() = default; + DeltaStamp() = default; + ~DeltaStamp() = default; /** - * 设置回放模式,回放模式时间戳可以回退 + * 计算时间戳增量 + * @param stamp 绝对时间戳 + * @return 时间戳增量 + */ + int64_t deltaStamp(int64_t stamp); + + /** + * 设置是否为回放模式,回放模式运行时间戳回退 * @param playback 是否为回放模式 */ void setPlayBack(bool playback = true); +private: + int64_t _last_stamp = 0; + bool _playback = false; +}; +//该类解决时间戳回环、回退问题 +//计算相对时间戳或者产生平滑时间戳 +class Stamp : public DeltaStamp{ +public: + Stamp() = default; + ~Stamp() = default; /** * 修正时间戳 @@ -52,13 +67,23 @@ public: * @param pts 输入pts,如果为0则等于dts * @param dts_out 输出dts * @param pts_out 输出pts + * @param modifyStamp 是否用系统时间戳覆盖 */ - void revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out); + void revise(int64_t dts, int64_t pts, int64_t &dts_out, int64_t &pts_out,bool modifyStamp = false); + + /** + * 再设置相对时间戳,用于seek用 + * @param relativeStamp 相对时间戳 + */ + void setRelativeStamp(int64_t relativeStamp); + + /** + * 获取当前相对时间戳 + * @return + */ + int64_t getRelativeStamp() const ; private: - bool _playback = false; - int64_t _start_dts = 0; - int64_t _dts_inc = 0; - bool _first = true; + int64_t _relativeStamp = 0; SmoothTicker _ticker; }; diff --git a/src/MediaFile/TsMuxer.cpp b/src/MediaFile/TsMuxer.cpp index f0d1092a..fe63624b 100644 --- a/src/MediaFile/TsMuxer.cpp +++ b/src/MediaFile/TsMuxer.cpp @@ -101,6 +101,8 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { } void TsMuxer::resetTracks() { + //通知片段中断 + onTs(nullptr, 0, 0, 0); uninit(); init(); } diff --git a/src/MediaFile/TsMuxer.h b/src/MediaFile/TsMuxer.h index b4ea0fab..a876c8c3 100644 --- a/src/MediaFile/TsMuxer.h +++ b/src/MediaFile/TsMuxer.h @@ -43,10 +43,10 @@ public: TsMuxer(); virtual ~TsMuxer(); void addTrack(const Track::Ptr &track) override; - void inputFrame(const Frame::Ptr &frame) override; + void resetTracks() override; + void inputFrame(const Frame::Ptr &frame) override; protected: virtual void onTs(const void *packet, int bytes,uint32_t timestamp,int flags) = 0; - void resetTracks(); private: void init(); void uninit(); diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 5b821d85..075394c9 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -122,7 +122,13 @@ void PlayerProxy::play(const string &strUrlTmp) { for (auto & track : tracks){ track->delDelegate(strongSelf->_mediaMuxer.get()); } - strongSelf->_mediaMuxer.reset(); + + GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay); + if (resetWhenRePlay) { + strongSelf->_mediaMuxer.reset(); + } else { + strongSelf->_mediaMuxer->resetTracks(); + } } //播放异常中断,延时重试播放 if(*piFailedCnt < strongSelf->_iRetryCount || strongSelf->_iRetryCount < 0) { @@ -138,7 +144,7 @@ void PlayerProxy::play(const string &strUrlTmp) { if(directProxy && _bEnableRtsp){ mediaSource = std::make_shared(_strVhost,_strApp,_strSrc); } - }else if(dynamic_pointer_cast(_parser)){ + } else if(dynamic_pointer_cast(_parser)){ //rtmp拉流 if(_bEnableRtmp){ mediaSource = std::make_shared(_strVhost,_strApp,_strSrc); @@ -154,7 +160,7 @@ PlayerProxy::~PlayerProxy() { _timer.reset(); } void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ - auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000,60*1000)); + auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60*1000)); weak_ptr weakSelf = shared_from_this(); _timer = std::make_shared(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() { //播放失败次数越多,则延时越长 @@ -213,7 +219,7 @@ public: if(_iAudioIndex != iAudioIndex){ _iAudioIndex = iAudioIndex; auto aacFrame = std::make_shared((char *)MUTE_ADTS_DATA, - MUTE_ADTS_DATA_LEN, + MUTE_ADTS_DATA_LEN, _iAudioIndex * MUTE_ADTS_DATA_MS); FrameRingInterfaceDelegate::inputFrame(aacFrame); } @@ -224,15 +230,22 @@ private: }; void PlayerProxy::onPlaySuccess() { + GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay); if (dynamic_pointer_cast(_pMediaSrc)) { //rtsp拉流代理 - _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), false, _bEnableRtmp, _bEnableHls, _bEnableMp4)); + if (resetWhenRePlay || !_mediaMuxer) { + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), false, _bEnableRtmp, _bEnableHls, _bEnableMp4)); + } } else if (dynamic_pointer_cast(_pMediaSrc)) { //rtmp拉流代理 - _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, false, _bEnableHls, _bEnableMp4)); + if (resetWhenRePlay || !_mediaMuxer) { + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, false, _bEnableHls, _bEnableMp4)); + } } else { //其他拉流代理 - _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, _bEnableRtmp, _bEnableHls, _bEnableMp4)); + if (resetWhenRePlay || !_mediaMuxer) { + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, _bEnableRtmp, _bEnableHls, _bEnableMp4)); + } } _mediaMuxer->setListener(shared_from_this()); @@ -244,13 +257,16 @@ void PlayerProxy::onPlaySuccess() { videoTrack->addDelegate(_mediaMuxer); } + //是否添加静音音频 + GET_CONFIG(bool,addMuteAudio,General::kAddMuteAudio); + auto audioTrack = getTrack(TrackAudio, false); if(audioTrack){ //添加音频 _mediaMuxer->addTrack(audioTrack); //音频数据写入_mediaMuxer audioTrack->addDelegate(_mediaMuxer); - }else if(videoTrack){ + }else if(addMuteAudio && videoTrack){ //没有音频信息,产生一个静音音频 MuteAudioMaker::Ptr audioMaker = std::make_shared(); //videoTrack把数据写入MuteAudioMaker diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 3b4a1663..62787b1d 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -44,13 +44,17 @@ RtmpSession::~RtmpSession() { } void RtmpSession::onError(const SockException& err) { - WarnP(this) << err.what(); + bool isPlayer = !_pPublisherSrc; + WarnP(this) << (isPlayer ? "播放器(" : "推流器(") + << _mediaInfo._vhost << "/" + << _mediaInfo._app << "/" + << _mediaInfo._streamid + << ")断开:" << err.what(); //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - bool isPlayer = !_pPublisherSrc; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, @@ -486,7 +490,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { GET_CONFIG(bool,rtmp_modify_stamp,Rtmp::kModifyStamp); if(rtmp_modify_stamp){ int64_t dts_out; - _stamp[chunkData.typeId % 2].revise(0, 0, dts_out, dts_out); + _stamp[chunkData.typeId % 2].revise(0, 0, dts_out, dts_out, true); chunkData.timeStamp = dts_out; } if(!_metadata_got && !chunkData.isCfgFrame()){ diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 126135fe..bd3b83b9 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -68,35 +68,43 @@ void RtspPlayer::teardown(){ CLEAR_ARR(_aui64RtpRecv) CLEAR_ARR(_aui64RtpRecv) CLEAR_ARR(_aui16NowSeq) - CLEAR_ARR(_aiFistStamp); - CLEAR_ARR(_aiNowStamp); - _pPlayTimer.reset(); + _pPlayTimer.reset(); _pRtpTimer.reset(); - _iSeekTo = 0; _uiCseq = 1; _onHandshake = nullptr; } void RtspPlayer::play(const string &strUrl){ - auto userAndPwd = FindField(strUrl.data(),"://","@"); Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[kRtpType]; - if(userAndPwd.empty()){ - play(strUrl,"","",eType); - return; + auto schema = FindField(strUrl.data(), nullptr,"://"); + bool isSSL = strcasecmp(schema.data(),"rtsps") == 0; + //查找"://"与"/"之间的字符串,用于提取用户名密码 + auto middle_url = FindField(strUrl.data(),"://","/"); + if(middle_url.empty()){ + middle_url = FindField(strUrl.data(),"://", nullptr); } - auto suffix = FindField(strUrl.data(),"@",nullptr); + auto pos = middle_url.rfind('@'); + if(pos == string::npos){ + //并没有用户名密码 + play(isSSL,strUrl,"","",eType); + return; + } + + //包含用户名密码 + auto user_pwd = middle_url.substr(0,pos); + auto suffix = strUrl.substr(schema.size() + 3 + pos + 1); auto url = StrPrinter << "rtsp://" << suffix << endl; - if(userAndPwd.find(":") == string::npos){ - play(url,userAndPwd,"",eType); + if(user_pwd.find(":") == string::npos){ + play(isSSL,url,user_pwd,"",eType); return; } - auto user = FindField(userAndPwd.data(),nullptr,":"); - auto pwd = FindField(userAndPwd.data(),":",nullptr); - play(url,user,pwd,eType); + auto user = FindField(user_pwd.data(),nullptr,":"); + auto pwd = FindField(user_pwd.data(),":",nullptr); + play(isSSL,url,user,pwd,eType); } -//播放,指定是否走rtp over tcp -void RtspPlayer::play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) { + +void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) { DebugL << strUrl << " " << (strUser.size() ? strUser : "null") << " " << (strPwd.size() ? strPwd:"null") << " " @@ -115,12 +123,12 @@ void RtspPlayer::play(const string &strUrl, const string &strUser, const string auto ip = FindField(strUrl.data(), "://", "/"); if (!ip.size()) { - ip = FindField(strUrl.data(), "://", NULL); + ip = split(FindField(strUrl.data(), "://", NULL),"?")[0]; } auto port = atoi(FindField(ip.data(), ":", NULL).data()); if (port <= 0) { //rtsp 默认端口554 - port = 554; + port = isSSL ? 322 : 554; } else { //服务器域名 ip = FindField(ip.data(), NULL, ":"); @@ -222,6 +230,16 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { SdpParser sdpParser(parser.Content()); //解析sdp _aTrackInfo = sdpParser.getAvailableTrack(); + auto title = sdpParser.getTrack(TrackTitle); + bool isPlayback = false; + if(title && title->_duration ){ + isPlayback = true; + } + + for(auto &stamp : _stamp){ + stamp.setPlayBack(isPlayback); + stamp.setRelativeStamp(0); + } if (_aTrackInfo.empty()) { throw std::runtime_error("无有效的Sdp Track"); @@ -386,7 +404,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) } //所有setup命令发送完毕 //发送play命令 - pause(false); + sendPause(false, 0,false); } void RtspPlayer::sendOptions() { @@ -403,25 +421,19 @@ void RtspPlayer::sendDescribe() { } -void RtspPlayer::sendPause(bool bPause,uint32_t seekMS){ - if(!bPause){ - //修改时间轴 - int iTimeInc = seekMS - getProgressMilliSecond(); - for(unsigned int i = 0 ;i < _aTrackInfo.size() ;i++){ - _aiFistStamp[i] = _aiNowStamp[i] + iTimeInc; - _aiNowStamp[i] = _aiFistStamp[i]; - } - _iSeekTo = seekMS; - } - +void RtspPlayer::sendPause(bool bPause,uint32_t seekMS,bool range){ //开启或暂停rtsp _onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause); - sendRtspRequest(bPause ? "PAUSE" : "PLAY", - _strContentBase, - {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"}); + if(!bPause && range){ + sendRtspRequest(bPause ? "PAUSE" : "PLAY", _strContentBase, + {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"}); + } else{ + sendRtspRequest(bPause ? "PAUSE" : "PLAY", _strContentBase); + } + } void RtspPlayer::pause(bool bPause) { - sendPause(bPause, getProgressMilliSecond()); + sendPause(bPause, getProgressMilliSecond(),false); } void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) { @@ -430,6 +442,7 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) { return; } if (!bPause) { + uint32_t iSeekTo = 0; //修正时间轴 auto strRange = parser["Range"]; if (strRange.size()) { @@ -437,25 +450,12 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) { if (strStart == "now") { strStart = "0"; } - _iSeekTo = 1000 * atof(strStart.data()); - DebugL << "seekTo(ms):" << _iSeekTo ; - } - auto strRtpInfo = parser["RTP-Info"]; - if (strRtpInfo.size()) { - strRtpInfo.append(","); - vector vec = split(strRtpInfo, ","); - for(auto &strTrack : vec){ - strTrack.append(";"); - auto strControlSuffix = strTrack.substr(1 + strTrack.rfind('/'),strTrack.find(';') - strTrack.rfind('/') - 1); - auto strRtpTime = FindField(strTrack.data(), "rtptime=", ";"); - auto idx = getTrackIndexByControlSuffix(strControlSuffix); - if(idx != -1){ - _aiFistStamp[idx] = _aTrackInfo[idx]->_samplerate>0?atoll(strRtpTime.data()) * 1000 / _aTrackInfo[idx]->_samplerate :1; - _aiNowStamp[idx] = _aiFistStamp[idx]; - DebugL << "rtptime(ms):" << strControlSuffix <<" " << strRtpTime; - } - } + iSeekTo = 1000 * atof(strStart.data()); + DebugL << "seekTo(ms):" << iSeekTo ; } + //设置相对时间戳 + _stamp[0].setRelativeStamp(iSeekTo); + _stamp[1].setRelativeStamp(iSeekTo); onPlayResult_l(SockException(Err_success, "rtsp play success")); } else { _pRtpTimer.reset(); @@ -630,12 +630,11 @@ void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){ } _aui64RtpRecv[trackidx] ++; _aui16NowSeq[trackidx] = rtppt->sequence; - _aiNowStamp[trackidx] = rtppt->timeStamp; - if( _aiFistStamp[trackidx] == 0){ - _aiFistStamp[trackidx] = _aiNowStamp[trackidx]; - } - rtppt->timeStamp -= _aiFistStamp[trackidx]; + //计算相对时间戳 + int64_t dts_out; + _stamp[trackidx].revise(rtppt->timeStamp,rtppt->timeStamp,dts_out,dts_out); + rtppt->timeStamp = dts_out; onRecvRTP_l(rtppt,_aTrackInfo[trackidx]); } float RtspPlayer::getPacketLossRate(TrackType type) const{ @@ -653,7 +652,6 @@ float RtspPlayer::getPacketLossRate(TrackType type) const{ return 1.0 - (double)totalRecv / totalSend; } - if(_aui16NowSeq[iTrackIdx] - _aui16FirstSeq[iTrackIdx] + 1 == 0){ return 0; } @@ -661,14 +659,10 @@ float RtspPlayer::getPacketLossRate(TrackType type) const{ } uint32_t RtspPlayer::getProgressMilliSecond() const{ - uint32_t iTime[2] = {0,0}; - for(unsigned int i = 0 ;i < _aTrackInfo.size() ;i++){ - iTime[i] = _aiNowStamp[i] - _aiFistStamp[i]; - } - return _iSeekTo + MAX(iTime[0],iTime[1]); + return MAX(_stamp[0].getRelativeStamp(),_stamp[1].getRelativeStamp()); } void RtspPlayer::seekToMilliSecond(uint32_t ms) { - sendPause(false,ms); + sendPause(false,ms, true); } void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list &header) { diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index 5977539b..be170953 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -40,6 +40,7 @@ #include "Network/TcpClient.h" #include "RtspSplitter.h" #include "RtpReceiver.h" +#include "MediaFile/Stamp.h" using namespace std; using namespace toolkit; @@ -106,7 +107,7 @@ private: int getTrackIndexByInterleaved(int interleaved) const; int getTrackIndexByTrackType(TrackType trackType) const; - void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType); + void play(bool isSSL,const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType); void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex); void handleResDESCRIBE(const Parser &parser); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); @@ -114,7 +115,7 @@ private: //发送SETUP命令 void sendSetup(unsigned int uiTrackIndex); - void sendPause(bool bPause,uint32_t ms); + void sendPause(bool bPause,uint32_t ms, bool range); void sendOptions(); void sendDescribe(); @@ -148,12 +149,8 @@ private: std::shared_ptr _pPlayTimer; std::shared_ptr _pRtpTimer; - //播放进度控制,单位毫秒 - uint32_t _iSeekTo = 0; - - //单位毫秒 - uint32_t _aiFistStamp[2] = {0,0}; - uint32_t _aiNowStamp[2] = {0,0}; + //时间戳 + Stamp _stamp[2]; //rtcp相关 RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标 diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index aa10aa84..3985ebec 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -85,7 +85,13 @@ RtspSession::~RtspSession() { } void RtspSession::onError(const SockException& err) { - WarnP(this) << err.what(); + bool isPlayer = !_pushSrc; + WarnP(this) << (isPlayer ? "播放器(" : "推流器(") + << _mediaInfo._vhost << "/" + << _mediaInfo._app << "/" + << _mediaInfo._streamid + << ")断开:" << err.what(); + if (_rtpType == Rtsp::RTP_MULTICAST) { //取消UDP端口监听 UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); @@ -100,7 +106,6 @@ void RtspSession::onError(const SockException& err) { //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - bool isPlayer = !_pushSrc; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, @@ -932,7 +937,7 @@ void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) { GET_CONFIG(bool,modify_stamp,Rtsp::kModifyStamp); if(modify_stamp){ int64_t dts_out; - _stamp[trackidx].revise(0, 0, dts_out, dts_out); + _stamp[trackidx].revise(0, 0, dts_out, dts_out, true); rtppt->timeStamp = dts_out; } _pushSrc->onWrite(rtppt, false); diff --git a/tests/test_player.cpp b/tests/test_player.cpp index 626c3858..8fd33159 100644 --- a/tests/test_player.cpp +++ b/tests/test_player.cpp @@ -101,6 +101,7 @@ int main(int argc, char *argv[]) { #endif + static char *url = argv[1]; //设置退出信号处理函数 signal(SIGINT, [](int) { SDLDisplayerHelper::Instance().shutdown(); }); //设置日志 @@ -140,7 +141,7 @@ int main(int argc, char *argv[]) { decoder.set(); } if(!displayer){ - displayer.set(); + displayer.set(nullptr,url); } if(!merger){ merger.set();