diff --git a/.gitignore b/.gitignore index 61eb6410..ba302fa1 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,151 @@ /build/ /3rdpart/media-server/.idea/ /ios/ + +### VisualStudioCode template +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +*.code-workspace + +# Windows Installer files +*.cab +*.msi +*.msix +*.msm +*.msp + +# Windows shortcuts +*.lnk + +# Shared objects (inc. Windows DLLs) +*.dll +*.so +*.so.* +*.dylib + +# Executables +*.exe +*.out +*.app +*.i*86 +*.x86_64 +*.hex + +# Debug files +*.dSYM/ +*.su +*.idb +*.pdb + +# Kernel Module Compile Results +*.mod* +*.cmd +.tmp_versions/ +modules.order +Module.symvers +Mkfile.old +dkms.conf + +### Xcode template +# Xcode +# +# gitignore contributors: remember to update Global/Xcode.gitignore, Objective-C.gitignore & Swift.gitignore + +## User settings +xcuserdata/ + +## compatibility with Xcode 8 and earlier (ignoring not required starting Xcode 9) +*.xcscmblueprint +*.xccheckout + +## compatibility with Xcode 3 and earlier (ignoring not required starting Xcode 4) +build/ +DerivedData/ +*.moved-aside +*.pbxuser +!default.pbxuser +*.mode1v3 +!default.mode1v3 +*.mode2v3 +!default.mode2v3 +*.perspectivev3 +!default.perspectivev3 + +## Gcc Patch +/*.gcno + +### CMake template +CMakeLists.txt.user +CMakeCache.txt +CMakeFiles +CMakeScripts +Testing +Makefile +cmake_install.cmake +install_manifest.txt +compile_commands.json +CTestTestfile.cmake +_deps + + +# User-specific stuff +.idea/ +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + diff --git a/conf/config.ini b/conf/config.ini index 72ca1b43..b4c54b73 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -101,6 +101,8 @@ on_stream_not_found=https://127.0.0.1/index/hook/on_stream_not_found on_server_started=https://127.0.0.1/index/hook/on_server_started #hook api最大等待回复时间,单位秒 timeoutSec=10 +on_record_hls=https://127.0.0.1/index/hook/on_record_hls +on_proxy_pusher_failed=https://127.0.0.1/index/hook/on_proxy_pusher_failed [http] #http服务器字符编码,windows上默认gb2312 diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 0064a462..db39fa06 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -19,6 +19,8 @@ #include "Util/logger.h" #include "Util/onceToken.h" #include "Util/NoticeCenter.h" +#include "Pusher/MediaPusher.h" + #ifdef ENABLE_MYSQL #include "Util/SqlPool.h" #endif //ENABLE_MYSQL @@ -37,6 +39,8 @@ #if defined(ENABLE_RTPPROXY) #include "Rtp/RtpServer.h" #endif +#include "Util/base64.h" + using namespace Json; using namespace toolkit; using namespace mediakit; @@ -251,6 +255,10 @@ bool checkArgs(Args &&args,First &&first,KeyTypes && ...keys){ static unordered_map s_proxyMap; static recursive_mutex s_proxyMapMtx; +//推流代理器列表 +static unordered_map s_proxyPusherMap; +static recursive_mutex s_proxyPusherMapMtx; + //FFmpeg拉流代理器列表 static unordered_map s_ffmpegMap; static recursive_mutex s_ffmpegMapMtx; @@ -592,6 +600,114 @@ void installWebApi() { val["count_hit"] = (Json::UInt64)count_hit; }); + static auto addStreamPusherProxy = [](const string &schema, + const string &vhost, + const string &app, + const string &stream, + const string &url, + const function &cb){ + auto key = getProxyKey(vhost,app,stream); + lock_guard lck(s_proxyPusherMapMtx); + if(s_proxyPusherMap.find(key) != s_proxyPusherMap.end()){ + //已经在推流了 + WarnL << "the " << key << "is already pusher."; + cb(SockException(Err_success),key); + return; + } + + auto poller = EventPollerPool::Instance().getPoller(); + + //添加推流代理 + MediaPusher::Ptr pusher(new MediaPusher(schema,vhost, app, stream,poller)); + s_proxyPusherMap[key] = pusher; + + //设置推流中断处理逻辑 + pusher->setOnShutdown([poller,schema,vhost, app, stream, url, cb, key](const SockException &ex) { + WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); + //重试 + //rePushDelay(poller,schema,vhost,app, stream, url); + lock_guard lck(s_proxyPusherMapMtx); + s_proxyPusherMap.erase(key); + cb(ex, key); + + ProxyPusherInfo info; + info.key = key; + info.proxy_pusher_url = url; + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcaseProxyPusherFailed, info); + }); + //设置发布结果处理逻辑 + pusher->setOnPublished([poller,schema,vhost, app, stream, url, cb, key](const SockException &ex) { + if (ex) { + WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); + lock_guard lck(s_proxyPusherMapMtx); + s_proxyPusherMap.erase(key); + //如果发布失败,就重试 + // rePushDelay(poller,schema,vhost,app, stream, url); + + //上报失败事件,由业务决定是否重推 + ProxyPusherInfo info; + info.key = key; + info.proxy_pusher_url = url; + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcaseProxyPusherFailed, info); + } + cb(ex, key); + }); + pusher->publish(url); + }; + + //动态添加rtsp/rtmp推流代理 + //测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs + api_regist2("/index/api/addStreamPusherProxy", [](API_ARGS2) { + CHECK_SECRET(); + CHECK_ARGS("schema","vhost","app","stream"); + + InfoL << allArgs["schema"] << ", " << allArgs["vhost"] << ", " << allArgs["app"] << ", " << allArgs["stream"]; + + //查找源 + auto src = MediaSource::find(allArgs["schema"], + allArgs["vhost"], + allArgs["app"], + allArgs["stream"]); + if(!src){ + InfoL << "addStreamPusherProxy, canont find source stream!"; + const_cast(val)["code"] = API::OtherFailed; + const_cast(val)["msg"] = "can not find the source stream"; + invoker("200 OK", headerOut, val.toStyledString()); + return; + } + + std::string srcUrl = allArgs["schema"] + "://" + "127.0.0.1" + "/" + allArgs["app"] + "/" + allArgs["stream"]; + std::string pushUrl = decodeBase64(allArgs["dst_url"]); + InfoL << "addStreamPusherProxy, find stream: " << srcUrl << ", push dst url: " << pushUrl; + + addStreamPusherProxy(allArgs["schema"], + allArgs["vhost"], + allArgs["app"], + allArgs["stream"], + pushUrl, + [invoker,val,headerOut, pushUrl](const SockException &ex,const string &key){ + if(ex){ + const_cast(val)["code"] = API::OtherFailed; + const_cast(val)["msg"] = ex.what(); + InfoL << "Publish error url: " << pushUrl; + }else{ + const_cast(val)["data"]["key"] = key; + InfoL << "Publish success,Please play with player:" << pushUrl; + } + invoker("200 OK", headerOut, val.toStyledString()); + }); + + }); + + //关闭推流代理 + //测试url http://127.0.0.1/index/api/delStreamPusherProxy?key=__defaultVhost__/proxy/0 + api_regist1("/index/api/delStreamPusherProxy",[](API_ARGS1){ + CHECK_SECRET(); + CHECK_ARGS("key"); + lock_guard lck(s_proxyPusherMapMtx); + val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1; + }); + static auto addStreamProxy = [](const string &vhost, const string &app, const string &stream, @@ -632,6 +748,26 @@ void installWebApi() { player->play(url); }; + api_regist1("/index/api/getSourceStreamInfo",[](API_ARGS1){ + CHECK_SECRET(); + CHECK_ARGS("schema","vhost","app","stream"); + + InfoL << "getSourceStreamInfo: " << allArgs["schema"] << ", " << allArgs["vhost"] << ", " << allArgs["app"] << ", " << allArgs["stream"]; + + //查找源 + auto src = MediaSource::find(allArgs["schema"], + allArgs["vhost"], + allArgs["app"], + allArgs["stream"]); + + if(!src){ + val["exist"] = false; + return; + } + + val["exist"] = true; + }); + //动态添加rtsp/rtmp拉流代理 //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs api_regist2("/index/api/addStreamProxy",[](API_ARGS2){ @@ -1091,6 +1227,10 @@ void installWebApi() { //录制mp4分片完毕事件 }); + api_regist1("/index/hook/on_record_hls",[](API_ARGS1){ + //录制hls分片完毕事件 + }); + api_regist1("/index/hook/on_shell_login",[](API_ARGS1){ //shell登录调试事件 }); diff --git a/server/WebApi.h b/server/WebApi.h index 028a5445..cebe2b2a 100644 --- a/server/WebApi.h +++ b/server/WebApi.h @@ -34,4 +34,10 @@ void unInstallWebApi(); //配置文件路径 extern string g_ini_file; +class ProxyPusherInfo { +public: + string key; //流id的key + string proxy_pusher_url;//转推地址 +}; + #endif //ZLMEDIAKIT_WEBAPI_H diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 05943403..ec82e974 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -24,6 +24,8 @@ #include "Http/HttpSession.h" #include "WebHook.h" #include "Record/MP4Recorder.h" +#include "WebApi.h" +#include "Util/base64.h" using namespace Json; using namespace toolkit; @@ -58,6 +60,8 @@ const string kOnStreamNoneReader = HOOK_FIELD"on_stream_none_reader"; const string kOnHttpAccess = HOOK_FIELD"on_http_access"; const string kOnServerStarted = HOOK_FIELD"on_server_started"; const string kAdminParams = HOOK_FIELD"admin_params"; +const string kOnRecordHls = HOOK_FIELD"on_record_hls"; +const string kOnProxyPusherFailed = HOOK_FIELD"on_proxy_pusher_failed"; onceToken token([](){ mINI::Instance()[kEnable] = false; @@ -75,10 +79,11 @@ onceToken token([](){ mINI::Instance()[kOnHttpAccess] = "https://127.0.0.1/index/hook/on_http_access"; mINI::Instance()[kOnServerStarted] = "https://127.0.0.1/index/hook/on_server_started"; mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc"; + mINI::Instance()[kOnRecordHls] = "https://127.0.0.1/index/hook/on_record_hls"; + mINI::Instance()[kOnProxyPusherFailed] = "https://127.0.0.1/index/hook/on_proxy_pusher_failed"; },nullptr); }//namespace Hook - static void parse_http_response(const SockException &ex, const string &status, const HttpClient::HttpHeader &header, @@ -193,6 +198,37 @@ void installWebHook(){ GET_CONFIG(string,hook_shell_login,Hook::kOnShellLogin); GET_CONFIG(string,hook_stream_none_reader,Hook::kOnStreamNoneReader); GET_CONFIG(string,hook_http_access,Hook::kOnHttpAccess); + GET_CONFIG(string,hook_record_hls,Hook::kOnRecordHls); + GET_CONFIG(string,hook_proxy_pusher_failed, Hook::kOnProxyPusherFailed); + + //录制hls文件成功后广播 + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastRecordHls,[](BroadcastRecordHlsArgs){ + if(!hook_enable || hook_record_hls.empty()){ + return; + } + ArgsType body; + body["file_path"] = info.strFilePath; + body["app"] = info.strAppName; + body["stream"] = info.strStreamId; + body["start_time"] = (Json::UInt64)info.ui64StartedTime; + body["time_len"] = (Json::UInt64)info.ui64TimeLen; + + //执行hook + do_http_hook(hook_record_hls,body, nullptr); + }); + + //转推流失败后广播 + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcaseProxyPusherFailed, [](BroadcaseProxyPusherFailedArgs){ + if(!hook_enable || hook_proxy_pusher_failed.empty()){ + return; + } + ArgsType body; + body["key"] = info.key; + body["proxy_pusher_url"] = encodeBase64(info.proxy_pusher_url); + + //执行hook + do_http_hook(hook_proxy_pusher_failed, body, nullptr); + }); NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ GET_CONFIG(bool,toHls,General::kPublishToHls); diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 84cc2fac..b19b1735 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -219,8 +219,9 @@ static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, con }); } - if(!ret && create_new){ + if(!ret && create_new && schema != HLS_SCHEMA){ //未查找媒体源,则创建一个 + //播放hls不触发mp4点播(因为HLS也可以用于录像,不是纯粹的直播) ret = MediaSource::createFromMP4(schema, vhost, app, id); } return ret; diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 0bbe8959..2d9ab082 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -109,6 +109,25 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo } return true; } + case Recorder::type_hls_record : { + if (start && !_hls_record) { + //开始录制 + auto hls_record = dynamic_pointer_cast(makeRecorder(getTracks(true), type, custom_path, sender)); + if (hls_record) { + //设置HlsMediaSource的事件监听器 + InfoL << "find record hls ms "; + hls_record->setListener(_listener); + //hls_src->setTrackSource(shared_from_this()); + } + _hls_record = hls_record; + } else if (!start && _hls_record) { + //停止录制 + InfoL << "stop record hls"; + _hls_record = nullptr; + } + + return true; + } default : return false; } } @@ -120,6 +139,8 @@ bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){ return _hls ? true : false; case Recorder::type_mp4 : return _mp4 ? true : false; + case Recorder::type_hls_record : + return _hls_record ? true : false; default: return false; } @@ -155,6 +176,11 @@ void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) { if (mp4) { mp4->addTrack(track); } + + auto rhls = _hls_record; + if (rhls) { + rhls->addTrack(track); + } } bool MultiMuxerPrivate::isEnabled(){ @@ -181,6 +207,11 @@ void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) { if (mp4) { mp4->inputFrame(frame); } + + auto rhls = _hls_record; + if (rhls) { + rhls->inputFrame(frame); + } } static string getTrackInfoStr(const TrackSource *track_src){ @@ -213,6 +244,7 @@ static string getTrackInfoStr(const TrackSource *track_src){ return codec_info; } + void MultiMuxerPrivate::onAllTrackReady() { if (_rtmp) { _rtmp->onAllTrackReady(); @@ -404,4 +436,4 @@ bool MultiMediaSourceMuxer::isEnabled(){ } -}//namespace mediakit +}//namespace mediakit \ No newline at end of file diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 7e38a4f8..da328111 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -56,6 +56,7 @@ private: RtspMediaSourceMuxer::Ptr _rtsp; HlsRecorder::Ptr _hls; MediaSinkInterface::Ptr _mp4; + MediaSinkInterface::Ptr _hls_record; std::weak_ptr _listener; }; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 69e7ed42..69e80d8e 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -52,6 +52,8 @@ const string kBroadcastShellLogin = "kBroadcastShellLogin"; const string kBroadcastNotFoundStream = "kBroadcastNotFoundStream"; const string kBroadcastStreamNoneReader = "kBroadcastStreamNoneReader"; const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess"; +const string kBroadcastRecordHls = "kBroadcastRecordHls"; +const string kBroadcaseProxyPusherFailed = "kBroadcaseProxyPusherFailed"; } //namespace Broadcast //通用配置项目 diff --git a/src/Common/config.h b/src/Common/config.h index 0ebdf980..d374c53d 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -60,6 +60,14 @@ extern const string kBroadcastMediaChanged; extern const string kBroadcastRecordMP4; #define BroadcastRecordMP4Args const MP4Info &info +//录制hls文件成功后广播 +extern const string kBroadcastRecordHls; +#define BroadcastRecordHlsArgs const HlsInfo &info + +//转推流失败后广播 +extern const string kBroadcaseProxyPusherFailed; +#define BroadcaseProxyPusherFailedArgs const ProxyPusherInfo &info + //收到http api请求广播 extern const string kBroadcastHttpRequest; #define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed,SockInfo &sender diff --git a/src/Record/HlsMaker.cpp b/src/Record/HlsMaker.cpp index ff8ead52..62a741c7 100644 --- a/src/Record/HlsMaker.cpp +++ b/src/Record/HlsMaker.cpp @@ -9,18 +9,19 @@ */ #include "HlsMaker.h" +using namespace toolkit; namespace mediakit { -HlsMaker::HlsMaker(float seg_duration, uint32_t seg_number) { +HlsMaker::HlsMaker(float seg_duration, uint32_t seg_number, int record_type) { //最小允许设置为0,0个切片代表点播 _seg_number = seg_number; _seg_duration = seg_duration; + _hls_record_type = record_type; } HlsMaker::~HlsMaker() { } - void HlsMaker::makeIndexFile(bool eof) { char file_content[1024]; int maxSegmentDuration = 0; @@ -32,32 +33,46 @@ void HlsMaker::makeIndexFile(bool eof) { } } - auto sequence = _seg_number ? (_file_index > _seg_number ? _file_index - _seg_number : 0LL) : 0LL; + auto sequence = _hls_record_type == 0 ? ( _seg_number ? (_file_index > _seg_number ? _file_index - _seg_number : 0LL) : 0LL): 0LL; string m3u8; - snprintf(file_content, sizeof(file_content), - "#EXTM3U\n" - "#EXT-X-VERSION:3\n" - "#EXT-X-ALLOW-CACHE:NO\n" - "#EXT-X-TARGETDURATION:%u\n" - "#EXT-X-MEDIA-SEQUENCE:%llu\n", - (maxSegmentDuration + 999) / 1000, - sequence); + snprintf(file_content,sizeof(file_content), + "#EXTM3U\n" + "#EXT-X-VERSION:3\n" + "#EXT-X-ALLOW-CACHE:NO\n" + "#EXT-X-TARGETDURATION:%u\n" + "#EXT-X-MEDIA-SEQUENCE:%llu\n", + (maxSegmentDuration + 999) / 1000, + sequence); m3u8.assign(file_content); + string rm3u8 = m3u8; + string rcontent; - for (auto &tp : _seg_dur_list) { - snprintf(file_content, sizeof(file_content), "#EXTINF:%.3f,\n%s\n", std::get<0>(tp) / 1000.0, std::get<1>(tp).data()); - m3u8.append(file_content); + if (_hls_record_type == 2) { + auto &tp = _seg_dur_list.back(); + snprintf(file_content,sizeof(file_content), "#EXTINF:%.3f,\n%s\n", std::get<0>(tp) / 1000.0, std::get<1>(tp).data()); + rcontent.assign(file_content); + }else{ + for (auto &tp : _seg_dur_list) { + snprintf(file_content,sizeof(file_content), "#EXTINF:%.3f,\n%s\n", std::get<0>(tp) / 1000.0, std::get<1>(tp).data()); + m3u8.append(file_content); + } } if (eof) { - snprintf(file_content, sizeof(file_content), "#EXT-X-ENDLIST\n"); + snprintf(file_content, sizeof(file_content),"#EXT-X-ENDLIST\n"); m3u8.append(file_content); + rcontent.append(file_content); } - onWriteHls(m3u8.data(), m3u8.size()); -} + if (_hls_record_type == 2) { + onWriteRecordM3u8(rm3u8.data(), rm3u8.size(), rcontent.data(), rcontent.size()); + }else{ + onWriteHls(m3u8.data(), m3u8.size()); + } + +} void HlsMaker::inputData(void *data, uint32_t len, uint32_t timestamp, bool is_idr_fast_packet) { if (data && len) { @@ -68,16 +83,17 @@ void HlsMaker::inputData(void *data, uint32_t len, uint32_t timestamp, bool is_i if (!_last_file_name.empty()) { //存在切片才写入ts数据 onWriteSegment((char *) data, len); + _last_timestamp = timestamp; } } else { //resetTracks时触发此逻辑 - flushLastSegment(timestamp, true); + flushLastSegment(true); } } void HlsMaker::delOldSegment() { - if (_seg_number == 0) { - //如果设置为保留0个切片,则认为是保存为点播 + if(_seg_number == 0 || _hls_record_type == 2){ + //如果设置为保留0个切片,则认为是保存为点播 ,record_type为2则是hls录制 return; } //在hls m3u8索引文件中,我们保存的切片个数跟_seg_number相关设置一致 @@ -99,31 +115,33 @@ void HlsMaker::addNewSegment(uint32_t stamp) { } //关闭并保存上一个切片,如果_seg_number==0,那么是点播。 - flushLastSegment(stamp, _seg_number == 0); + flushLastSegment((_seg_number == 0 || _hls_record_type == 2)); //新增切片 _last_file_name = onOpenSegment(_file_index++); //记录本次切片的起始时间戳 _last_seg_timestamp = stamp; } -void HlsMaker::flushLastSegment(uint32_t timestamp, bool eof){ +void HlsMaker::flushLastSegment(bool eof){ if (_last_file_name.empty()) { //不存在上个切片 return; } + //文件创建到最后一次数据写入的时间即为切片长度 - auto seg_dur = timestamp - _last_seg_timestamp; + auto seg_dur = _last_timestamp - _last_seg_timestamp; if (seg_dur <= 0) { seg_dur = 100; } - _seg_dur_list.push_back(std::make_tuple(seg_dur, _last_file_name)); + + _seg_dur_list.push_back(std::make_tuple(seg_dur, std::move(_last_file_name))); delOldSegment(); makeIndexFile(eof); _last_file_name.clear(); } bool HlsMaker::isLive() { - return _seg_number != 0; + return _seg_number != 0 && _hls_record_type == 0; } void HlsMaker::clear() { diff --git a/src/Record/HlsMaker.h b/src/Record/HlsMaker.h index 37666b93..04e2ea21 100644 --- a/src/Record/HlsMaker.h +++ b/src/Record/HlsMaker.h @@ -28,7 +28,7 @@ public: * @param seg_duration 切片文件长度 * @param seg_number 切片个数 */ - HlsMaker(float seg_duration = 5, uint32_t seg_number = 3); + HlsMaker(float seg_duration = 5, uint32_t seg_number = 3, int record_type = 0); virtual ~HlsMaker(); /** @@ -81,9 +81,11 @@ protected: /** * 关闭上个ts切片并且写入m3u8索引 * @param timestamp 毫秒时间戳 - * @param eof + * @param eof HLS直播是否已结束 */ - void flushLastSegment(uint32_t timestamp, bool eof = false); + void flushLastSegment(bool eof = false); + + virtual void onWriteRecordM3u8(const char *header, int hlen,const char *body,int blen) = 0; private: /** @@ -106,10 +108,12 @@ private: private: float _seg_duration = 0; uint32_t _seg_number = 0; + uint32_t _last_timestamp = 0; uint32_t _last_seg_timestamp = 0; uint64_t _file_index = 0; string _last_file_name; std::deque > _seg_dur_list; + int32_t _hls_record_type; }; }//namespace mediakit diff --git a/src/Record/HlsMakerImp.cpp b/src/Record/HlsMakerImp.cpp index 10521e36..b0d1aa2f 100644 --- a/src/Record/HlsMakerImp.cpp +++ b/src/Record/HlsMakerImp.cpp @@ -11,7 +11,6 @@ #include "HlsMakerImp.h" #include "Util/util.h" #include "Util/uv_errno.h" - using namespace toolkit; namespace mediakit { @@ -20,14 +19,15 @@ HlsMakerImp::HlsMakerImp(const string &m3u8_file, const string ¶ms, uint32_t bufSize, float seg_duration, - uint32_t seg_number) : HlsMaker(seg_duration, seg_number) { + uint32_t seg_number,int record_type) : HlsMaker(seg_duration, seg_number,record_type) { _path_prefix = m3u8_file.substr(0, m3u8_file.rfind('/')); _path_hls = m3u8_file; _params = params; _buf_size = bufSize; - _file_buf.reset(new char[bufSize], [](char *ptr) { + _file_buf.reset(new char[bufSize],[](char *ptr){ delete[] ptr; }); + _ui64StartedTime = ::time(nullptr); } HlsMakerImp::~HlsMakerImp() { @@ -37,32 +37,44 @@ HlsMakerImp::~HlsMakerImp() { void HlsMakerImp::clearCache() { //录制完了 flushLastSegment(true); - if (isLive()) { + if(isLive()){ //hls直播才删除文件 clear(); _file = nullptr; _segment_file_paths.clear(); File::delete_file(_path_prefix.data()); + }else{ + //hook接口 + HlsInfo info; + if (_media_src) { + info.strAppName = _media_src.get()->getApp(); + info.strStreamId = _media_src.get()->getId(); + info.strFilePath = _path_hls; + info.ui64StartedTime = _ui64StartedTime; + info.ui64TimeLen = ::time(NULL) - info.ui64StartedTime; + } + + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastRecordHls, info); } } string HlsMakerImp::onOpenSegment(int index) { - string segment_name, segment_path; + string segment_name , segment_path; { auto strDate = getTimeStr("%Y-%m-%d"); auto strHour = getTimeStr("%H"); auto strTime = getTimeStr("%M-%S"); segment_name = StrPrinter << strDate + "/" + strHour + "/" + strTime << "_" << index << ".ts"; - segment_path = _path_prefix + "/" + segment_name; - if (isLive()) { - _segment_file_paths.emplace(index, segment_path); + segment_path = _path_prefix + "/" + segment_name; + if(isLive()){ + _segment_file_paths.emplace(index,segment_path); } } _file = makeFile(segment_path, true); - if (!_file) { + if(!_file){ WarnL << "create file failed," << segment_path << " " << get_uv_errmsg(); } - if (_params.empty()) { + if(_params.empty()){ return std::move(segment_name); } return std::move(segment_name + "?" + _params); @@ -70,7 +82,7 @@ string HlsMakerImp::onOpenSegment(int index) { void HlsMakerImp::onDelSegment(int index) { auto it = _segment_file_paths.find(index); - if (it == _segment_file_paths.end()) { + if(it == _segment_file_paths.end()){ return; } File::delete_file(it->second.data()); @@ -85,32 +97,80 @@ void HlsMakerImp::onWriteSegment(const char *data, int len) { void HlsMakerImp::onWriteHls(const char *data, int len) { auto hls = makeFile(_path_hls); - if (hls) { - fwrite(data, len, 1, hls.get()); + if(hls){ + fwrite(data,len,1,hls.get()); hls.reset(); - if (_media_src) { + if(_media_src){ _media_src->registHls(true); } - } else { + } else{ WarnL << "create hls file failed," << _path_hls << " " << get_uv_errmsg(); } //DebugL << "\r\n" << string(data,len); } - -std::shared_ptr HlsMakerImp::makeFile(const string &file, bool setbuf) { +std::shared_ptr HlsMakerImp::makeFile(const string &file,bool setbuf) { auto file_buf = _file_buf; - auto ret = shared_ptr(File::create_file(file.data(), "wb"), [file_buf](FILE *fp) { + auto ret= shared_ptr(File::create_file(file.data(), "wb"), [file_buf](FILE *fp) { if (fp) { fclose(fp); } }); - if (ret && setbuf) { + if(ret && setbuf){ setvbuf(ret.get(), _file_buf.get(), _IOFBF, _buf_size); } return ret; } + +void HlsMakerImp::onWriteRecordM3u8(const char *header, int hlen,const char *body,int blen){ + bool exist = true; + string mode = "r+"; + if (access(_path_hls.c_str(), 0) == -1) { + exist = false; + WarnL << "hls m3u8 not exist" << _path_hls; + mode = "w+"; + }else{ + WarnL << "hls m3u8 exist" << _path_hls; + } + + auto hls = makeRecordM3u8(_path_hls, mode); + InfoL << "makeFile hls " << hls; + + if(hls){ + fwrite(header, hlen,1,hls.get()); + if (exist) { + fseek(hls.get(),-15L,SEEK_END); + } + fwrite(body, blen,1,hls.get()); + hls.reset(); + if(_media_src){ + _media_src->registHls(true); + } + } else{ + WarnL << "create hls file falied," << _path_hls << " " << get_uv_errmsg(); + } + DebugL << "\r\n" << string(body,blen); + DebugL << "_path_hls " << _path_hls; +} + + +std::shared_ptr HlsMakerImp::makeRecordM3u8(const string &file,const string &mode,bool setbuf) { + auto file_buf = _file_buf; + + auto ret= shared_ptr(File::create_file(file.data(), mode.data()), [file_buf](FILE *fp) { + if (fp) { + fclose(fp); + } + }); + if(ret && setbuf){ + setvbuf(ret.get(), _file_buf.get(), _IOFBF, _buf_size); + } + return ret; +} + + + void HlsMakerImp::setMediaSource(const string &vhost, const string &app, const string &stream_id) { _media_src = std::make_shared(vhost, app, stream_id); } diff --git a/src/Record/HlsMakerImp.h b/src/Record/HlsMakerImp.h index 7764644f..ca12b003 100644 --- a/src/Record/HlsMakerImp.h +++ b/src/Record/HlsMakerImp.h @@ -20,14 +20,23 @@ using namespace std; namespace mediakit { +class HlsInfo { +public: + string strFilePath;//m3u8文件路径 + string strAppName;//应用名称 + string strStreamId;//流ID + time_t ui64StartedTime; //GMT标准时间,单位秒 + time_t ui64TimeLen;//录像长度,单位秒 +}; + class HlsMakerImp : public HlsMaker{ public: HlsMakerImp(const string &m3u8_file, const string ¶ms, uint32_t bufSize = 64 * 1024, float seg_duration = 5, - uint32_t seg_number = 3); - + uint32_t seg_number = 3, + int record_type = 0); ~HlsMakerImp() override; /** @@ -54,10 +63,11 @@ protected: void onDelSegment(int index) override; void onWriteSegment(const char *data, int len) override; void onWriteHls(const char *data, int len) override; + void onWriteRecordM3u8(const char *header, int hlen, const char *body, int blen) override; private: std::shared_ptr makeFile(const string &file,bool setbuf = false); - + std::shared_ptr makeRecordM3u8(const string &file,const string &mode,bool setbuf = false); private: int _buf_size; string _params; @@ -67,6 +77,7 @@ private: std::shared_ptr _file_buf; HlsMediaSource::Ptr _media_src; map _segment_file_paths; + time_t _ui64StartedTime; }; }//namespace mediakit diff --git a/src/Record/HlsRecorder.h b/src/Record/HlsRecorder.h index c753c8b4..3d1bc4f7 100644 --- a/src/Record/HlsRecorder.h +++ b/src/Record/HlsRecorder.h @@ -18,12 +18,12 @@ namespace mediakit { class HlsRecorder : public MediaSourceEventInterceptor, public TsMuxer, public std::enable_shared_from_this { public: typedef std::shared_ptr Ptr; - HlsRecorder(const string &m3u8_file, const string ¶ms){ - GET_CONFIG(uint32_t, hlsNum, Hls::kSegmentNum); - GET_CONFIG(uint32_t, hlsBufSize, Hls::kFileBufSize); - GET_CONFIG(uint32_t, hlsDuration, Hls::kSegmentDuration); - _hls = std::make_shared(m3u8_file, params, hlsBufSize, hlsDuration, hlsNum); - //清空上次的残余文件 + HlsRecorder(const string &m3u8_file, const string ¶ms,int record_type){ + GET_CONFIG(uint32_t,hlsNum,Hls::kSegmentNum); + GET_CONFIG(uint32_t,hlsBufSize,Hls::kFileBufSize); + GET_CONFIG(uint32_t,hlsDuration,Hls::kSegmentDuration); + _hls = std::make_shared(m3u8_file,params,hlsBufSize,hlsDuration,hlsNum,record_type); + //清空上次的残余文件 _hls->clearCache(); } diff --git a/src/Record/Recorder.cpp b/src/Record/Recorder.cpp index 100a2951..21c59b5a 100644 --- a/src/Record/Recorder.cpp +++ b/src/Record/Recorder.cpp @@ -13,6 +13,7 @@ #include "Common/MediaSource.h" #include "MP4Recorder.h" #include "HlsRecorder.h" +#include "Util/logger.h" using namespace toolkit; @@ -50,6 +51,22 @@ string Recorder::getRecordPath(Recorder::type type, const string &vhost, const s } return File::absolutePath(mp4FilePath, recordPath); } + + case Recorder::type_hls_record: { + GET_CONFIG(string, hlsPath, Record::kFilePath); + string m3u8FilePath; + if (enableVhost) { + m3u8FilePath = vhost + "/" + app + "/" + stream_id + "/hls.m3u8"; + } else { + m3u8FilePath = app + "/" + stream_id + "/hls.m3u8"; + } + //Here we use the customized file path. + if (!customized_path.empty()) { + m3u8FilePath = customized_path + "/hls.m3u8"; + } + return File::absolutePath(m3u8FilePath, hlsPath); + } + default: return ""; } @@ -57,10 +74,12 @@ string Recorder::getRecordPath(Recorder::type type, const string &vhost, const s std::shared_ptr Recorder::createRecorder(type type, const string &vhost, const string &app, const string &stream_id, const string &customized_path){ auto path = Recorder::getRecordPath(type, vhost, app, stream_id, customized_path); + InfoL << "createRecorder type " << type << " path " << path; switch (type) { case Recorder::type_hls: { #if defined(ENABLE_HLS) - auto ret = std::make_shared(path, string(VHOST_KEY) + "=" + vhost); + auto ret = std::make_shared(path, string(VHOST_KEY) + "=" + vhost, 0); + InfoL << "create Hls Record ret "<setMediaSource(vhost, app, stream_id); return ret; #endif @@ -74,6 +93,16 @@ std::shared_ptr Recorder::createRecorder(type type, const st return nullptr; } + case Recorder::type_hls_record: { +#if defined(ENABLE_HLS) + auto ret = std::make_shared(path, string(VHOST_KEY) + "=" + vhost, 2); + InfoL << "create Hls Record ret "<setMediaSource(vhost, app, stream_id); + return ret; +#endif + return nullptr; + } + default: return nullptr; } diff --git a/src/Record/Recorder.h b/src/Record/Recorder.h index 670de9f8..c9ff78f8 100644 --- a/src/Record/Recorder.h +++ b/src/Record/Recorder.h @@ -22,7 +22,9 @@ public: // 录制hls type_hls = 0, // 录制MP4 - type_mp4 = 1 + type_mp4 = 1, + + type_hls_record = 2 } type; /**