新增发送rtp被动关闭hook

This commit is contained in:
xiongziliang 2022-08-27 10:53:47 +08:00
parent c2ab45f78d
commit 6a4297845f
9 changed files with 81 additions and 21 deletions

View File

@ -153,6 +153,8 @@ on_stream_not_found=https://127.0.0.1/index/hook/on_stream_not_found
on_server_started=https://127.0.0.1/index/hook/on_server_started on_server_started=https://127.0.0.1/index/hook/on_server_started
#server保活上报 #server保活上报
on_server_keepalive=https://127.0.0.1/index/hook/on_server_keepalive on_server_keepalive=https://127.0.0.1/index/hook/on_server_keepalive
#发送rtp(startSendRtp)被动关闭时回调
on_send_rtp_stopped=https://127.0.0.1/index/hook/on_send_rtp_stopped
#hook api最大等待回复时间单位秒 #hook api最大等待回复时间单位秒
timeoutSec=10 timeoutSec=10
#keepalive hook触发间隔,单位秒float类型 #keepalive hook触发间隔,单位秒float类型

View File

@ -1673,6 +1673,10 @@ void installWebApi() {
val["close"] = true; val["close"] = true;
}); });
api_regist("/index/hook/on_send_rtp_stopped",[](API_ARGS_JSON){
//发送rtp(startSendRtp)被动关闭时回调
});
static auto checkAccess = [](const string &params){ static auto checkAccess = [](const string &params){
//我们假定大家都要权限访问 //我们假定大家都要权限访问
return true; return true;

View File

@ -45,6 +45,7 @@ const string kOnStreamNoneReader = HOOK_FIELD"on_stream_none_reader";
const string kOnHttpAccess = HOOK_FIELD"on_http_access"; const string kOnHttpAccess = HOOK_FIELD"on_http_access";
const string kOnServerStarted = HOOK_FIELD"on_server_started"; const string kOnServerStarted = HOOK_FIELD"on_server_started";
const string kOnServerKeepalive = HOOK_FIELD"on_server_keepalive"; const string kOnServerKeepalive = HOOK_FIELD"on_server_keepalive";
const string kOnSendRtpStopped = HOOK_FIELD"on_send_rtp_stopped";
const string kAdminParams = HOOK_FIELD"admin_params"; const string kAdminParams = HOOK_FIELD"admin_params";
const string kAliveInterval = HOOK_FIELD"alive_interval"; const string kAliveInterval = HOOK_FIELD"alive_interval";
const string kRetry = HOOK_FIELD"retry"; const string kRetry = HOOK_FIELD"retry";
@ -68,6 +69,7 @@ onceToken token([](){
mINI::Instance()[kOnHttpAccess] = ""; mINI::Instance()[kOnHttpAccess] = "";
mINI::Instance()[kOnServerStarted] = ""; mINI::Instance()[kOnServerStarted] = "";
mINI::Instance()[kOnServerKeepalive] = ""; mINI::Instance()[kOnServerKeepalive] = "";
mINI::Instance()[kOnSendRtpStopped] = "";
mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc"; mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc";
mINI::Instance()[kAliveInterval] = 30.0; mINI::Instance()[kAliveInterval] = 30.0;
mINI::Instance()[kRetry] = 1; mINI::Instance()[kRetry] = 1;
@ -589,6 +591,26 @@ void installWebHook(){
}); });
}); });
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastSendRtpStopped, [](BroadcastSendRtpStopped) {
GET_CONFIG(string, hook_send_rtp_stopped, Hook::kOnSendRtpStopped);
if (!hook_enable || hook_send_rtp_stopped.empty()) {
return;
}
ArgsType body;
body[VHOST_KEY] = sender.getVhost();
body["app"] = sender.getApp();
body["stream"] = sender.getStreamId();
body["ssrc"] = ssrc;
body["originType"] = (int)sender.getOriginType(MediaSource::NullMediaSource());
body["originTypeStr"] = getOriginTypeString(sender.getOriginType(MediaSource::NullMediaSource()));
body["originUrl"] = sender.getOriginUrl(MediaSource::NullMediaSource());
body["msg"] = ex.what();
body["err"] = ex.getErrCode();
//执行hook
do_http_hook(hook_send_rtp_stopped, body, nullptr);
});
/** /**
* kBroadcastHttpAccess事件触发机制 * kBroadcastHttpAccess事件触发机制
* 1http请求头查找cookie3 * 1http请求头查找cookie3

View File

@ -75,7 +75,22 @@ static string getTrackInfoStr(const TrackSource *track_src){
return std::move(codec_info); return std::move(codec_info);
} }
const std::string &MultiMediaSourceMuxer::getVhost() const {
return _vhost;
}
const std::string &MultiMediaSourceMuxer::getApp() const {
return _app;
}
const std::string &MultiMediaSourceMuxer::getStreamId() const {
return _stream_id;
}
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, const ProtocolOption &option) { MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, const ProtocolOption &option) {
_vhost = vhost;
_app = app;
_stream_id = stream;
_option = option; _option = option;
_get_origin_url = [this, vhost, app, stream]() { _get_origin_url = [this, vhost, app, stream]() {
auto ret = getOriginUrl(MediaSource::NullMediaSource()); auto ret = getOriginUrl(MediaSource::NullMediaSource());
@ -235,12 +250,13 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE
rtp_sender->addTrackCompleted(); rtp_sender->addTrackCompleted();
auto ssrc = args.ssrc; auto ssrc = args.ssrc;
rtp_sender->setOnClose([weak_self, ssrc]() { rtp_sender->setOnClose([weak_self, ssrc](const toolkit::SockException &ex) {
if (auto strong_self = weak_self.lock()) { if (auto strong_self = weak_self.lock()) {
WarnL << "stream:" << strong_self->_get_origin_url() << " stop send rtp:" << ssrc; WarnL << "stream:" << strong_self->_get_origin_url() << " stop send rtp:" << ssrc << ", reason:" << ex.what();
strong_self->_rtp_sender.erase(ssrc); strong_self->_rtp_sender.erase(ssrc);
//触发观看人数统计 //触发观看人数统计
strong_self->onReaderChanged(MediaSource::NullMediaSource(), strong_self->totalReaderCount()); strong_self->onReaderChanged(MediaSource::NullMediaSource(), strong_self->totalReaderCount());
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex);
} }
}); });
strong_self->_rtp_sender[args.ssrc] = std::move(rtp_sender); strong_self->_rtp_sender[args.ssrc] = std::move(rtp_sender);

View File

@ -154,6 +154,10 @@ public:
*/ */
std::vector<Track::Ptr> getMediaTracks(MediaSource &sender, bool trackReady = true) const override; std::vector<Track::Ptr> getMediaTracks(MediaSource &sender, bool trackReady = true) const override;
const std::string& getVhost() const;
const std::string& getApp() const;
const std::string& getStreamId() const;
protected: protected:
/////////////////////////////////MediaSink override///////////////////////////////// /////////////////////////////////MediaSink override/////////////////////////////////
@ -177,6 +181,9 @@ protected:
private: private:
bool _is_enable = false; bool _is_enable = false;
std::string _vhost;
std::string _app;
std::string _stream_id;
ProtocolOption _option; ProtocolOption _option;
toolkit::Ticker _last_check; toolkit::Ticker _last_check;
Stamp _stamp[2]; Stamp _stamp[2];

View File

@ -55,6 +55,8 @@ const string kBroadcastShellLogin = "kBroadcastShellLogin";
const string kBroadcastNotFoundStream = "kBroadcastNotFoundStream"; const string kBroadcastNotFoundStream = "kBroadcastNotFoundStream";
const string kBroadcastStreamNoneReader = "kBroadcastStreamNoneReader"; const string kBroadcastStreamNoneReader = "kBroadcastStreamNoneReader";
const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess"; const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess";
const string kBroadcastSendRtpStopped = "kBroadcastSendRtpStopped";
} // namespace Broadcast } // namespace Broadcast
// 通用配置项目 // 通用配置项目

View File

@ -106,6 +106,10 @@ extern const std::string kBroadcastNotFoundStream;
extern const std::string kBroadcastStreamNoneReader; extern const std::string kBroadcastStreamNoneReader;
#define BroadcastStreamNoneReaderArgs MediaSource &sender #define BroadcastStreamNoneReaderArgs MediaSource &sender
// rtp推流被动停止时触发
extern const std::string kBroadcastSendRtpStopped;
#define BroadcastSendRtpStopped MultiMediaSourceMuxer &sender, const std::string &ssrc, const SockException &ex
// 更新配置文件事件广播,执行loadIniConfig函数加载配置文件成功后会触发该广播 // 更新配置文件事件广播,执行loadIniConfig函数加载配置文件成功后会触发该广播
extern const std::string kBroadcastReloadConfig; extern const std::string kBroadcastReloadConfig;
#define BroadcastReloadConfigArgs void #define BroadcastReloadConfigArgs void
@ -295,7 +299,7 @@ extern const std::string kFileBufSize;
extern const std::string kFastStart; extern const std::string kFastStart;
// mp4文件是否重头循环读取 // mp4文件是否重头循环读取
extern const std::string kFileRepeat; extern const std::string kFileRepeat;
//MP4录制是否当做播放器参与播放人数统计 // MP4录制是否当做播放器参与播放人数统计
extern const std::string kMP4AsPlayer; extern const std::string kMP4AsPlayer;
} // namespace Record } // namespace Record

View File

@ -248,13 +248,15 @@ void RtpSender::onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check) {
//接收rr rtcp超时 //接收rr rtcp超时
WarnL << "recv rr rtcp timeout"; WarnL << "recv rr rtcp timeout";
_rtcp_recv_ticker.resetTime(); _rtcp_recv_ticker.resetTime();
onClose(); onClose(SockException(Err_timeout, "recv rr rtcp timeout"));
} }
} }
void RtpSender::onClose() { void RtpSender::onClose(const SockException &ex) {
if (_on_close) { auto cb = _on_close;
_on_close(); if (cb) {
//在下次循环时触发onClose原因是防止遍历map时删除元素
_poller->async([cb, ex]() { cb(ex); }, false);
} }
} }
@ -282,17 +284,18 @@ void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr> > rtp_list) {
void RtpSender::onErr(const SockException &ex, bool is_connect) { void RtpSender::onErr(const SockException &ex, bool is_connect) {
_is_connect = false; _is_connect = false;
if (_args.passive) { if (_args.passive || !_args.is_udp) {
WarnL << "tcp passive connection lost: " << ex.what(); WarnL << "send rtp tcp connection lost: " << ex.what();
//tcp被动模式如果对方断开连接应该停止发送rtp //tcp模式如果对方断开连接应该停止发送rtp
onClose(); onClose(ex);
return;
}
//监听socket断开事件方便重连
if (is_connect) {
WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what();
} else { } else {
//监听socket断开事件方便重连 WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what();
if (is_connect) {
WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what();
} else {
WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what();
}
} }
weak_ptr<RtpSender> weak_self = shared_from_this(); weak_ptr<RtpSender> weak_self = shared_from_this();
@ -312,7 +315,7 @@ void RtpSender::onErr(const SockException &ex, bool is_connect) {
}, _poller); }, _poller);
} }
void RtpSender::setOnClose(std::function<void()> on_close){ void RtpSender::setOnClose(std::function<void(const toolkit::SockException &ex)> on_close){
_on_close = std::move(on_close); _on_close = std::move(on_close);
} }

View File

@ -54,7 +54,7 @@ public:
*/ */
virtual void resetTracks() override; virtual void resetTracks() override;
void setOnClose(std::function<void()> on_close); void setOnClose(std::function<void(const toolkit::SockException &ex)> on_close);
private: private:
//合并写输出 //合并写输出
@ -66,7 +66,7 @@ private:
void createRtcpSocket(); void createRtcpSocket();
void onRecvRtcp(RtcpHeader *rtcp); void onRecvRtcp(RtcpHeader *rtcp);
void onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check); void onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check);
void onClose(); void onClose(const toolkit::SockException &ex);
private: private:
bool _is_connect = false; bool _is_connect = false;
@ -79,7 +79,7 @@ private:
std::shared_ptr<RtcpContext> _rtcp_context; std::shared_ptr<RtcpContext> _rtcp_context;
toolkit::Ticker _rtcp_send_ticker; toolkit::Ticker _rtcp_send_ticker;
toolkit::Ticker _rtcp_recv_ticker; toolkit::Ticker _rtcp_recv_ticker;
std::function<void()> _on_close; std::function<void(const toolkit::SockException &ex)> _on_close;
}; };
}//namespace mediakit }//namespace mediakit