From 6a4297845f516a79354166cd2c486001f9098c4d Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 27 Aug 2022 10:53:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=8F=91=E9=80=81rtp?= =?UTF-8?q?=E8=A2=AB=E5=8A=A8=E5=85=B3=E9=97=ADhook?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/config.ini | 2 ++ server/WebApi.cpp | 4 ++++ server/WebHook.cpp | 22 +++++++++++++++++++ src/Common/MultiMediaSourceMuxer.cpp | 20 +++++++++++++++-- src/Common/MultiMediaSourceMuxer.h | 7 ++++++ src/Common/config.cpp | 2 ++ src/Common/config.h | 6 ++++- src/Rtp/RtpSender.cpp | 33 +++++++++++++++------------- src/Rtp/RtpSender.h | 6 ++--- 9 files changed, 81 insertions(+), 21 deletions(-) diff --git a/conf/config.ini b/conf/config.ini index 6c48df31..a60fa4cc 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -153,6 +153,8 @@ on_stream_not_found=https://127.0.0.1/index/hook/on_stream_not_found on_server_started=https://127.0.0.1/index/hook/on_server_started #server保活上报 on_server_keepalive=https://127.0.0.1/index/hook/on_server_keepalive +#发送rtp(startSendRtp)被动关闭时回调 +on_send_rtp_stopped=https://127.0.0.1/index/hook/on_send_rtp_stopped #hook api最大等待回复时间,单位秒 timeoutSec=10 #keepalive hook触发间隔,单位秒,float类型 diff --git a/server/WebApi.cpp b/server/WebApi.cpp index fa998925..4de4449b 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1673,6 +1673,10 @@ void installWebApi() { val["close"] = true; }); + api_regist("/index/hook/on_send_rtp_stopped",[](API_ARGS_JSON){ + //发送rtp(startSendRtp)被动关闭时回调 + }); + static auto checkAccess = [](const string ¶ms){ //我们假定大家都要权限访问 return true; diff --git a/server/WebHook.cpp b/server/WebHook.cpp index a67a6b28..58d270fb 100755 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -45,6 +45,7 @@ const string kOnStreamNoneReader = HOOK_FIELD"on_stream_none_reader"; const string kOnHttpAccess = HOOK_FIELD"on_http_access"; const string kOnServerStarted = HOOK_FIELD"on_server_started"; const string kOnServerKeepalive = HOOK_FIELD"on_server_keepalive"; +const string kOnSendRtpStopped = HOOK_FIELD"on_send_rtp_stopped"; const string kAdminParams = HOOK_FIELD"admin_params"; const string kAliveInterval = HOOK_FIELD"alive_interval"; const string kRetry = HOOK_FIELD"retry"; @@ -68,6 +69,7 @@ onceToken token([](){ mINI::Instance()[kOnHttpAccess] = ""; mINI::Instance()[kOnServerStarted] = ""; mINI::Instance()[kOnServerKeepalive] = ""; + mINI::Instance()[kOnSendRtpStopped] = ""; mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc"; mINI::Instance()[kAliveInterval] = 30.0; mINI::Instance()[kRetry] = 1; @@ -589,6 +591,26 @@ void installWebHook(){ }); }); + NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastSendRtpStopped, [](BroadcastSendRtpStopped) { + GET_CONFIG(string, hook_send_rtp_stopped, Hook::kOnSendRtpStopped); + if (!hook_enable || hook_send_rtp_stopped.empty()) { + return; + } + + ArgsType body; + body[VHOST_KEY] = sender.getVhost(); + body["app"] = sender.getApp(); + body["stream"] = sender.getStreamId(); + body["ssrc"] = ssrc; + body["originType"] = (int)sender.getOriginType(MediaSource::NullMediaSource()); + body["originTypeStr"] = getOriginTypeString(sender.getOriginType(MediaSource::NullMediaSource())); + body["originUrl"] = sender.getOriginUrl(MediaSource::NullMediaSource()); + body["msg"] = ex.what(); + body["err"] = ex.getErrCode(); + //执行hook + do_http_hook(hook_send_rtp_stopped, body, nullptr); + }); + /** * kBroadcastHttpAccess事件触发机制 * 1、根据http请求头查找cookie,找到进入步骤3 diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index fd0964f7..a7d52106 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -75,7 +75,22 @@ static string getTrackInfoStr(const TrackSource *track_src){ return std::move(codec_info); } +const std::string &MultiMediaSourceMuxer::getVhost() const { + return _vhost; +} + +const std::string &MultiMediaSourceMuxer::getApp() const { + return _app; +} + +const std::string &MultiMediaSourceMuxer::getStreamId() const { + return _stream_id; +} + MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, const ProtocolOption &option) { + _vhost = vhost; + _app = app; + _stream_id = stream; _option = option; _get_origin_url = [this, vhost, app, stream]() { auto ret = getOriginUrl(MediaSource::NullMediaSource()); @@ -235,12 +250,13 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE rtp_sender->addTrackCompleted(); auto ssrc = args.ssrc; - rtp_sender->setOnClose([weak_self, ssrc]() { + rtp_sender->setOnClose([weak_self, ssrc](const toolkit::SockException &ex) { if (auto strong_self = weak_self.lock()) { - WarnL << "stream:" << strong_self->_get_origin_url() << " stop send rtp:" << ssrc; + WarnL << "stream:" << strong_self->_get_origin_url() << " stop send rtp:" << ssrc << ", reason:" << ex.what(); strong_self->_rtp_sender.erase(ssrc); //触发观看人数统计 strong_self->onReaderChanged(MediaSource::NullMediaSource(), strong_self->totalReaderCount()); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex); } }); strong_self->_rtp_sender[args.ssrc] = std::move(rtp_sender); diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 75dbd437..b513a989 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -154,6 +154,10 @@ public: */ std::vector getMediaTracks(MediaSource &sender, bool trackReady = true) const override; + const std::string& getVhost() const; + const std::string& getApp() const; + const std::string& getStreamId() const; + protected: /////////////////////////////////MediaSink override///////////////////////////////// @@ -177,6 +181,9 @@ protected: private: bool _is_enable = false; + std::string _vhost; + std::string _app; + std::string _stream_id; ProtocolOption _option; toolkit::Ticker _last_check; Stamp _stamp[2]; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index d02365a9..5dd4a02c 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -55,6 +55,8 @@ const string kBroadcastShellLogin = "kBroadcastShellLogin"; const string kBroadcastNotFoundStream = "kBroadcastNotFoundStream"; const string kBroadcastStreamNoneReader = "kBroadcastStreamNoneReader"; const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess"; +const string kBroadcastSendRtpStopped = "kBroadcastSendRtpStopped"; + } // namespace Broadcast // 通用配置项目 diff --git a/src/Common/config.h b/src/Common/config.h index 3442d2e4..7e6b68a9 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -106,6 +106,10 @@ extern const std::string kBroadcastNotFoundStream; extern const std::string kBroadcastStreamNoneReader; #define BroadcastStreamNoneReaderArgs MediaSource &sender +// rtp推流被动停止时触发 +extern const std::string kBroadcastSendRtpStopped; +#define BroadcastSendRtpStopped MultiMediaSourceMuxer &sender, const std::string &ssrc, const SockException &ex + // 更新配置文件事件广播,执行loadIniConfig函数加载配置文件成功后会触发该广播 extern const std::string kBroadcastReloadConfig; #define BroadcastReloadConfigArgs void @@ -295,7 +299,7 @@ extern const std::string kFileBufSize; extern const std::string kFastStart; // mp4文件是否重头循环读取 extern const std::string kFileRepeat; -//MP4录制是否当做播放器参与播放人数统计 +// MP4录制是否当做播放器参与播放人数统计 extern const std::string kMP4AsPlayer; } // namespace Record diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 10bb78bf..47f4a900 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -248,13 +248,15 @@ void RtpSender::onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check) { //接收rr rtcp超时 WarnL << "recv rr rtcp timeout"; _rtcp_recv_ticker.resetTime(); - onClose(); + onClose(SockException(Err_timeout, "recv rr rtcp timeout")); } } -void RtpSender::onClose() { - if (_on_close) { - _on_close(); +void RtpSender::onClose(const SockException &ex) { + auto cb = _on_close; + if (cb) { + //在下次循环时触发onClose,原因是防止遍历map时删除元素 + _poller->async([cb, ex]() { cb(ex); }, false); } } @@ -282,17 +284,18 @@ void RtpSender::onFlushRtpList(shared_ptr > rtp_list) { void RtpSender::onErr(const SockException &ex, bool is_connect) { _is_connect = false; - if (_args.passive) { - WarnL << "tcp passive connection lost: " << ex.what(); - //tcp被动模式,如果对方断开连接,应该停止发送rtp - onClose(); + if (_args.passive || !_args.is_udp) { + WarnL << "send rtp tcp connection lost: " << ex.what(); + //tcp模式,如果对方断开连接,应该停止发送rtp + onClose(ex); + return; + } + + //监听socket断开事件,方便重连 + if (is_connect) { + WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what(); } else { - //监听socket断开事件,方便重连 - if (is_connect) { - WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what(); - } else { - WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what(); - } + WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what(); } weak_ptr weak_self = shared_from_this(); @@ -312,7 +315,7 @@ void RtpSender::onErr(const SockException &ex, bool is_connect) { }, _poller); } -void RtpSender::setOnClose(std::function on_close){ +void RtpSender::setOnClose(std::function on_close){ _on_close = std::move(on_close); } diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 5f966343..bebd2ac3 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -54,7 +54,7 @@ public: */ virtual void resetTracks() override; - void setOnClose(std::function on_close); + void setOnClose(std::function on_close); private: //合并写输出 @@ -66,7 +66,7 @@ private: void createRtcpSocket(); void onRecvRtcp(RtcpHeader *rtcp); void onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check); - void onClose(); + void onClose(const toolkit::SockException &ex); private: bool _is_connect = false; @@ -79,7 +79,7 @@ private: std::shared_ptr _rtcp_context; toolkit::Ticker _rtcp_send_ticker; toolkit::Ticker _rtcp_recv_ticker; - std::function _on_close; + std::function _on_close; }; }//namespace mediakit