From b23cbaa0f8e731ace12c382c2b6b387c3a630cfc Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 4 Jun 2022 11:06:35 +0800 Subject: [PATCH] =?UTF-8?q?on=5Fpublish=20hook=E6=96=B0=E5=A2=9Econtinue?= =?UTF-8?q?=5Fpush=5Fms=E5=8F=82=E6=95=B0=EF=BC=8C=E7=94=A8=E4=BA=8E?= =?UTF-8?q?=E6=96=AD=E8=BF=9E=E7=BB=AD=E6=8E=A8=E5=BB=B6=E6=97=B6=E6=8E=A7?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebHook.cpp | 3 +++ src/Common/MultiMediaSourceMuxer.cpp | 3 +++ src/Common/MultiMediaSourceMuxer.h | 3 +++ src/Rtmp/RtmpSession.cpp | 7 +++---- src/Rtmp/RtmpSession.h | 2 ++ src/Rtsp/RtspSession.cpp | 6 +++--- src/Rtsp/RtspSession.h | 4 +++- webrtc/WebRtcPusher.cpp | 14 ++++++++------ webrtc/WebRtcPusher.h | 6 ++++-- webrtc/WebRtcTransport.cpp | 2 +- 10 files changed, 33 insertions(+), 17 deletions(-) diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 5dc6f23a..d0470af0 100755 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -323,6 +323,9 @@ void installWebHook(){ if (obj.isMember("enable_fmp4")) { option.enable_fmp4 = obj["enable_fmp4"].asBool(); } + if (obj.isMember("continue_push_ms")) { + option.continue_push_ms = obj["continue_push_ms"].asUInt(); + } invoker(err, option); } else { //推流鉴权失败 diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 7606fb6a..88886330 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -26,11 +26,14 @@ ProtocolOption::ProtocolOption() { GET_CONFIG(bool, s_to_mp4, General::kPublishToMP4); GET_CONFIG(bool, s_enabel_audio, General::kEnableAudio); GET_CONFIG(bool, s_add_mute_audio, General::kAddMuteAudio); + GET_CONFIG(uint32_t, s_continue_push_ms, General::kContinuePushMS); + enable_hls = s_to_hls; enable_mp4 = s_to_mp4; enable_audio = s_enabel_audio; add_mute_audio = s_add_mute_audio; + continue_push_ms = s_continue_push_ms; } static std::shared_ptr makeRecorder(MediaSource &sender, const vector &tracks, Recorder::type type, const string &custom_path, size_t max_second){ diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 46dc563f..92c98f1b 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -52,6 +52,9 @@ public: //hls录制保存路径 std::string hls_save_path; + + //断连续推延时,单位毫秒,默认采用配置文件 + uint32_t continue_push_ms; }; class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSink, public std::enable_shared_from_this{ diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index b339a4ff..9475fc98 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -44,14 +44,13 @@ void RtmpSession::onError(const SockException& err) { NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, is_player, static_cast(*this)); } - GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS); //如果是主动关闭的,那么不延迟注销 - if (_push_src && continue_push_ms && err.getErrCode() != Err_shutdown) { + if (_push_src && _continue_push_ms && err.getErrCode() != Err_shutdown) { //取消所有权 _push_src_ownership = nullptr; //延时10秒注销流 auto push_src = std::move(_push_src); - getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; }); + getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; }); } } @@ -184,7 +183,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { } _push_src->setListener(dynamic_pointer_cast(shared_from_this())); - + _continue_push_ms = option.continue_push_ms; sendStatus({"level", "status", "code", "NetStream.Publish.Start", "description", "Started publishing stream.", diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 336c8292..7bce876c 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -89,6 +89,8 @@ private: private: bool _set_meta_data = false; double _recv_req_id = 0; + //断连续推延时 + uint32_t _continue_push_ms = 0; //消耗的总流量 uint64_t _total_bytes = 0; std::string _tc_url; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 4e6bdd1d..f4f8e9f4 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -86,14 +86,13 @@ void RtspSession::onError(const SockException &err) { NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, is_player, static_cast(*this)); } - GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS); //如果是主动关闭的,那么不延迟注销 - if (_push_src && continue_push_ms && err.getErrCode() != Err_shutdown) { + if (_push_src && _continue_push_ms && err.getErrCode() != Err_shutdown) { //取消所有权 _push_src_ownership = nullptr; //延时10秒注销流 auto push_src = std::move(_push_src); - getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; }); + getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; }); } } @@ -280,6 +279,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { } _push_src->setListener(dynamic_pointer_cast(shared_from_this())); + _continue_push_ms = option.continue_push_ms; sendRtspResponse("200 OK"); }; diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index d6dbe9d8..b07428aa 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -163,6 +163,9 @@ private: private: //是否已经触发on_play事件 bool _emit_on_play = false; + bool _send_sr_rtcp[2] = {true, true}; + //断连续推延时 + uint32_t _continue_push_ms = 0; //推流或拉流客户端采用的rtp传输方式 Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid; //收到的seq,回复时一致 @@ -213,7 +216,6 @@ private: toolkit::Ticker _rtcp_send_tickers[2]; //统计rtp并发送rtcp std::vector _rtcp_context; - bool _send_sr_rtcp[2] = {true, true}; }; /** diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 05929dc6..111cda38 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -16,8 +16,9 @@ using namespace mediakit; WebRtcPusher::Ptr WebRtcPusher::create(const EventPoller::Ptr &poller, const RtspMediaSourceImp::Ptr &src, const std::shared_ptr &ownership, - const MediaInfo &info) { - WebRtcPusher::Ptr ret(new WebRtcPusher(poller, src, ownership, info), [](WebRtcPusher *ptr) { + const MediaInfo &info, + const mediakit::ProtocolOption &option) { + WebRtcPusher::Ptr ret(new WebRtcPusher(poller, src, ownership, info, option), [](WebRtcPusher *ptr) { ptr->onDestory(); delete ptr; }); @@ -28,10 +29,12 @@ WebRtcPusher::Ptr WebRtcPusher::create(const EventPoller::Ptr &poller, WebRtcPusher::WebRtcPusher(const EventPoller::Ptr &poller, const RtspMediaSourceImp::Ptr &src, const std::shared_ptr &ownership, - const MediaInfo &info) : WebRtcTransportImp(poller) { + const MediaInfo &info, + const mediakit::ProtocolOption &option) : WebRtcTransportImp(poller) { _media_info = info; _push_src = src; _push_src_ownership = ownership; + _continue_push_ms = option.continue_push_ms; CHECK(_push_src); } @@ -130,13 +133,12 @@ void WebRtcPusher::onDestory() { } } - GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS); - if (_push_src && continue_push_ms) { + if (_push_src && _continue_push_ms) { //取消所有权 _push_src_ownership = nullptr; //延时10秒注销流 auto push_src = std::move(_push_src); - getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; }); + getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; }); } } diff --git a/webrtc/WebRtcPusher.h b/webrtc/WebRtcPusher.h index 9b52c890..865c5672 100644 --- a/webrtc/WebRtcPusher.h +++ b/webrtc/WebRtcPusher.h @@ -18,7 +18,7 @@ public: using Ptr = std::shared_ptr; ~WebRtcPusher() override = default; static Ptr create(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src, - const std::shared_ptr &ownership, const mediakit::MediaInfo &info); + const std::shared_ptr &ownership, const mediakit::MediaInfo &info, const mediakit::ProtocolOption &option); protected: ///////WebRtcTransportImp override/////// @@ -42,10 +42,12 @@ protected: private: WebRtcPusher(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src, - const std::shared_ptr &ownership, const mediakit::MediaInfo &info); + const std::shared_ptr &ownership, const mediakit::MediaInfo &info, const mediakit::ProtocolOption &option); private: bool _simulcast = false; + //断连续推延时 + uint32_t _continue_push_ms = 0; //媒体相关元数据 mediakit::MediaInfo _media_info; //推流的rtsp源 diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 98086caa..42e97e92 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -1086,7 +1086,7 @@ void push_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &arg push_src_ownership = push_src->getOwnership(); push_src->setProtocolOption(option); } - auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info); + auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info, option); push_src->setListener(rtc); cb(*rtc); };