From 575a231ba0cf4574a4a62e8fb4ee8cd5aff2af5c Mon Sep 17 00:00:00 2001 From: monktan Date: Mon, 30 Nov 2020 10:00:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=87=AA=E5=AE=9A=E4=B9=89rt?= =?UTF-8?q?mp=20command?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/config.ini | 8 ++++++-- server/WebHook.cpp | 17 +++++++++++++++++ src/Common/config.cpp | 1 + src/Common/config.h | 4 ++++ src/Rtmp/Rtmp.h | 1 + src/Rtmp/RtmpProtocol.cpp | 15 ++++++++++++++- src/Rtmp/RtmpProtocol.h | 4 ++++ src/Rtmp/RtmpPusher.cpp | 8 ++++++++ src/Rtmp/RtmpPusher.h | 1 + 9 files changed, 56 insertions(+), 3 deletions(-) diff --git a/conf/config.ini b/conf/config.ini index 8665d76f..404f6f8b 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -120,10 +120,14 @@ on_stream_none_reader=https://127.0.0.1/index/hook/on_stream_none_reader on_stream_not_found=https://127.0.0.1/index/hook/on_stream_not_found #服务器启动报告,可以用于服务器的崩溃重启事件监听 on_server_started=https://127.0.0.1/index/hook/on_server_started +#服务器录制hls_ts结束后通知(不会删除文件) +on_record_hls=https://127.0.0.1/index/hook/on_record_hls +#服务器转推流失败通知 +on_proxy_pusher_failed=https://127.0.0.1/index/hook/on_proxy_pusher_failed +#服务器转推流,转推方无人观看通知 +on_proxy_pusher_none_reader=https://127.0.0.1/index/on_proxy_pusher_none_reader; #hook api最大等待回复时间,单位秒 timeoutSec=10 -on_record_hls=https://127.0.0.1/index/hook/on_record_hls -on_proxy_pusher_failed=https://127.0.0.1/index/hook/on_proxy_pusher_failed [http] #http服务器字符编码,windows上默认gb2312 diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 06971476..9bedf4c8 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -61,6 +61,7 @@ const string kOnServerStarted = HOOK_FIELD"on_server_started"; const string kAdminParams = HOOK_FIELD"admin_params"; const string kOnRecordHls = HOOK_FIELD"on_record_hls"; const string kOnProxyPusherFailed = HOOK_FIELD"on_proxy_pusher_failed"; +const string kOnProxyPusherNoneReader = HOOK_FIELD"on_proxy_pusher_none_reader"; onceToken token([](){ mINI::Instance()[kEnable] = false; @@ -81,6 +82,7 @@ onceToken token([](){ mINI::Instance()[kOnServerStarted] = ""; mINI::Instance()[kOnRecordHls] = ""; mINI::Instance()[kOnProxyPusherFailed] = ""; + mINI::Instance()[kOnProxyPusherNoneReader] = ""; mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc"; },nullptr); }//namespace Hook @@ -223,6 +225,21 @@ void installWebHook(){ do_http_hook(hook_proxy_pusher_failed, body, nullptr); }); + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcaseProxyPusherNoneReader, [](BroadcaseProxyPusherNoneReaderArgs){ + GET_CONFIG(string,hook_proxy_pusher_none_reader,Hook::kOnProxyPusherNoneReader); + if(!hook_enable || hook_proxy_pusher_none_reader.empty()){ + return; + } + + ArgsType body; + body["key"] = key; + + InfoL << "Received kBroadcaseProxyPusherNoneReader, Will perform hook, key: " << key; + + //执行hook + do_http_hook(hook_proxy_pusher_none_reader, body, nullptr); + }); + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ GET_CONFIG(string,hook_publish,Hook::kOnPublish); GET_CONFIG(bool,toHls,General::kPublishToHls); diff --git a/src/Common/config.cpp b/src/Common/config.cpp index c2a4edf9..4b4da7d5 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -55,6 +55,7 @@ const string kBroadcastStreamNoneReader = "kBroadcastStreamNoneReader"; const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess"; const string kBroadcastRecordHls = "kBroadcastRecordHls"; const string kBroadcaseProxyPusherFailed = "kBroadcaseProxyPusherFailed"; +const string kBroadcaseProxyPusherNoneReader = "kBroadcaseProxyPusherNoneReader"; } //namespace Broadcast //通用配置项目 diff --git a/src/Common/config.h b/src/Common/config.h index 841780c1..d56f19e5 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -74,6 +74,10 @@ extern const string kBroadcastRecordHls; extern const string kBroadcaseProxyPusherFailed; #define BroadcaseProxyPusherFailedArgs const ProxyPusherInfo &info +//转推流无人观看广播 +extern const string kBroadcaseProxyPusherNoneReader; +#define BroadcaseProxyPusherNoneReaderArgs const std::string& key + //收到http api请求广播 extern const string kBroadcastHttpRequest; #define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed,SockInfo &sender diff --git a/src/Rtmp/Rtmp.h b/src/Rtmp/Rtmp.h index 1f8d1e3e..4c9e3d8e 100644 --- a/src/Rtmp/Rtmp.h +++ b/src/Rtmp/Rtmp.h @@ -57,6 +57,7 @@ using namespace toolkit; #define CONTROL_STREAM_ISRECORDED 4 #define CONTROL_PING_REQUEST 6 #define CONTROL_PING_RESPONSE 7 +#define CONTROL_CUSTOM_FREEZE 16 #define STREAM_CONTROL 0 #define STREAM_MEDIA 1 diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index deb9aebc..e6fc8900 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -699,7 +699,20 @@ void RtmpProtocol::handle_chunk(RtmpPacket& chunk_data) { break; } - default: /*WarnL << "unhandled user control:" << event_type; */ break; + case CONTROL_CUSTOM_FREEZE: { + //无人观看 + if (chunk_data.buffer.size() < 4) { + throw std::runtime_error("CONTROL_CUSTOM_FREEZE: Not enough data."); + } + uint32_t is_freeze = load_be32(&chunk_data.buffer[0]); + TraceL << "CONTROL_CUSTOM_FREEZE:" << is_freeze; + onStreamFreeze(is_freeze); + } + + default: { + //WarnL << "unhandled user control:" << event_type; + break; + } } break; } diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index e20fce30..1c7592b3 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -46,6 +46,10 @@ protected: virtual void onStreamEof(uint32_t stream_index){}; virtual void onStreamDry(uint32_t stream_index){}; + //custom rtmo command + //MSG_USER_CONTROL(4)下面定义CONTROL_CUSTOM_FREEZE(16) + virtual void onStreamFreeze(bool is_freeze){}; + protected: //// HttpRequestSplitter override //// int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; } diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 2b3c8d49..05a7b929 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -312,6 +312,14 @@ void RtmpPusher::onRtmpChunk(RtmpPacket &chunk_data) { } } +void RtmpPusher::onStreamFreeze(bool is_freeze) { + auto src = _publish_src.lock(); + if(is_freeze && src) { + std::string key = src->getVhost()+ "/" + src->getApp() + "/" + src->getId(); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcaseProxyPusherNoneReader, key); + } +} + } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index 2f6fa6ed..d3cd7f96 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -47,6 +47,7 @@ protected: void onSendRawData(Buffer::Ptr buffer) override{ send(std::move(buffer)); } + void onStreamFreeze(bool is_freeze) override; private: void onPublishResult(const SockException &ex, bool handshake_done);