新增自定义rtmp command

This commit is contained in:
monktan 2020-11-30 10:00:26 +08:00
parent 8e575c0c78
commit 575a231ba0
9 changed files with 56 additions and 3 deletions

View File

@ -120,10 +120,14 @@ on_stream_none_reader=https://127.0.0.1/index/hook/on_stream_none_reader
on_stream_not_found=https://127.0.0.1/index/hook/on_stream_not_found on_stream_not_found=https://127.0.0.1/index/hook/on_stream_not_found
#服务器启动报告,可以用于服务器的崩溃重启事件监听 #服务器启动报告,可以用于服务器的崩溃重启事件监听
on_server_started=https://127.0.0.1/index/hook/on_server_started on_server_started=https://127.0.0.1/index/hook/on_server_started
#服务器录制hls_ts结束后通知不会删除文件
on_record_hls=https://127.0.0.1/index/hook/on_record_hls
#服务器转推流失败通知
on_proxy_pusher_failed=https://127.0.0.1/index/hook/on_proxy_pusher_failed
#服务器转推流,转推方无人观看通知
on_proxy_pusher_none_reader=https://127.0.0.1/index/on_proxy_pusher_none_reader;
#hook api最大等待回复时间单位秒 #hook api最大等待回复时间单位秒
timeoutSec=10 timeoutSec=10
on_record_hls=https://127.0.0.1/index/hook/on_record_hls
on_proxy_pusher_failed=https://127.0.0.1/index/hook/on_proxy_pusher_failed
[http] [http]
#http服务器字符编码windows上默认gb2312 #http服务器字符编码windows上默认gb2312

View File

@ -61,6 +61,7 @@ const string kOnServerStarted = HOOK_FIELD"on_server_started";
const string kAdminParams = HOOK_FIELD"admin_params"; const string kAdminParams = HOOK_FIELD"admin_params";
const string kOnRecordHls = HOOK_FIELD"on_record_hls"; const string kOnRecordHls = HOOK_FIELD"on_record_hls";
const string kOnProxyPusherFailed = HOOK_FIELD"on_proxy_pusher_failed"; const string kOnProxyPusherFailed = HOOK_FIELD"on_proxy_pusher_failed";
const string kOnProxyPusherNoneReader = HOOK_FIELD"on_proxy_pusher_none_reader";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kEnable] = false; mINI::Instance()[kEnable] = false;
@ -81,6 +82,7 @@ onceToken token([](){
mINI::Instance()[kOnServerStarted] = ""; mINI::Instance()[kOnServerStarted] = "";
mINI::Instance()[kOnRecordHls] = ""; mINI::Instance()[kOnRecordHls] = "";
mINI::Instance()[kOnProxyPusherFailed] = ""; mINI::Instance()[kOnProxyPusherFailed] = "";
mINI::Instance()[kOnProxyPusherNoneReader] = "";
mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc"; mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc";
},nullptr); },nullptr);
}//namespace Hook }//namespace Hook
@ -223,6 +225,21 @@ void installWebHook(){
do_http_hook(hook_proxy_pusher_failed, body, nullptr); do_http_hook(hook_proxy_pusher_failed, body, nullptr);
}); });
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcaseProxyPusherNoneReader, [](BroadcaseProxyPusherNoneReaderArgs){
GET_CONFIG(string,hook_proxy_pusher_none_reader,Hook::kOnProxyPusherNoneReader);
if(!hook_enable || hook_proxy_pusher_none_reader.empty()){
return;
}
ArgsType body;
body["key"] = key;
InfoL << "Received kBroadcaseProxyPusherNoneReader, Will perform hook, key: " << key;
//执行hook
do_http_hook(hook_proxy_pusher_none_reader, body, nullptr);
});
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){
GET_CONFIG(string,hook_publish,Hook::kOnPublish); GET_CONFIG(string,hook_publish,Hook::kOnPublish);
GET_CONFIG(bool,toHls,General::kPublishToHls); GET_CONFIG(bool,toHls,General::kPublishToHls);

View File

@ -55,6 +55,7 @@ const string kBroadcastStreamNoneReader = "kBroadcastStreamNoneReader";
const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess"; const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess";
const string kBroadcastRecordHls = "kBroadcastRecordHls"; const string kBroadcastRecordHls = "kBroadcastRecordHls";
const string kBroadcaseProxyPusherFailed = "kBroadcaseProxyPusherFailed"; const string kBroadcaseProxyPusherFailed = "kBroadcaseProxyPusherFailed";
const string kBroadcaseProxyPusherNoneReader = "kBroadcaseProxyPusherNoneReader";
} //namespace Broadcast } //namespace Broadcast
//通用配置项目 //通用配置项目

View File

@ -74,6 +74,10 @@ extern const string kBroadcastRecordHls;
extern const string kBroadcaseProxyPusherFailed; extern const string kBroadcaseProxyPusherFailed;
#define BroadcaseProxyPusherFailedArgs const ProxyPusherInfo &info #define BroadcaseProxyPusherFailedArgs const ProxyPusherInfo &info
//转推流无人观看广播
extern const string kBroadcaseProxyPusherNoneReader;
#define BroadcaseProxyPusherNoneReaderArgs const std::string& key
//收到http api请求广播 //收到http api请求广播
extern const string kBroadcastHttpRequest; extern const string kBroadcastHttpRequest;
#define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed,SockInfo &sender #define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed,SockInfo &sender

View File

@ -57,6 +57,7 @@ using namespace toolkit;
#define CONTROL_STREAM_ISRECORDED 4 #define CONTROL_STREAM_ISRECORDED 4
#define CONTROL_PING_REQUEST 6 #define CONTROL_PING_REQUEST 6
#define CONTROL_PING_RESPONSE 7 #define CONTROL_PING_RESPONSE 7
#define CONTROL_CUSTOM_FREEZE 16
#define STREAM_CONTROL 0 #define STREAM_CONTROL 0
#define STREAM_MEDIA 1 #define STREAM_MEDIA 1

View File

@ -699,7 +699,20 @@ void RtmpProtocol::handle_chunk(RtmpPacket& chunk_data) {
break; break;
} }
default: /*WarnL << "unhandled user control:" << event_type; */ break; case CONTROL_CUSTOM_FREEZE: {
//无人观看
if (chunk_data.buffer.size() < 4) {
throw std::runtime_error("CONTROL_CUSTOM_FREEZE: Not enough data.");
}
uint32_t is_freeze = load_be32(&chunk_data.buffer[0]);
TraceL << "CONTROL_CUSTOM_FREEZE:" << is_freeze;
onStreamFreeze(is_freeze);
}
default: {
//WarnL << "unhandled user control:" << event_type;
break;
}
} }
break; break;
} }

View File

@ -46,6 +46,10 @@ protected:
virtual void onStreamEof(uint32_t stream_index){}; virtual void onStreamEof(uint32_t stream_index){};
virtual void onStreamDry(uint32_t stream_index){}; virtual void onStreamDry(uint32_t stream_index){};
//custom rtmo command
//MSG_USER_CONTROL(4)下面定义CONTROL_CUSTOM_FREEZE(16)
virtual void onStreamFreeze(bool is_freeze){};
protected: protected:
//// HttpRequestSplitter override //// //// HttpRequestSplitter override ////
int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; } int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; }

View File

@ -312,6 +312,14 @@ void RtmpPusher::onRtmpChunk(RtmpPacket &chunk_data) {
} }
} }
void RtmpPusher::onStreamFreeze(bool is_freeze) {
auto src = _publish_src.lock();
if(is_freeze && src) {
std::string key = src->getVhost()+ "/" + src->getApp() + "/" + src->getId();
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcaseProxyPusherNoneReader, key);
}
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -47,6 +47,7 @@ protected:
void onSendRawData(Buffer::Ptr buffer) override{ void onSendRawData(Buffer::Ptr buffer) override{
send(std::move(buffer)); send(std::move(buffer));
} }
void onStreamFreeze(bool is_freeze) override;
private: private:
void onPublishResult(const SockException &ex, bool handshake_done); void onPublishResult(const SockException &ex, bool handshake_done);