推流鉴权事件支持是否允许转rtsp/rtmp、hls 、mp4

This commit is contained in:
xiongziliang 2019-09-10 11:06:31 +08:00
parent 15539bc27e
commit e67894a085
10 changed files with 92 additions and 38 deletions

View File

@ -168,7 +168,7 @@ static void initEvent() {
<< args._vhost << " " << args._app << " " << args._vhost << " " << args._app << " "
<< args._streamid << " " << args._streamid << " "
<< args._param_strs; << args._param_strs;
invoker("");//鉴权成功 invoker("",true,true,false);//鉴权成功
//invoker("this is auth failed message");//鉴权失败 //invoker("this is auth failed message");//鉴权失败
}); });

View File

@ -581,7 +581,12 @@ void installWebApi() {
////////////以下是注册的Hook API//////////// ////////////以下是注册的Hook API////////////
API_REGIST(hook,on_publish,{ API_REGIST(hook,on_publish,{
//开始推流事件 //开始推流事件
throw SuccessException(); //转换成rtsp或rtmp
val["enableRtxp"] = true;
//转换hls
val["enableHls"] = true;
//不录制mp4
val["enableMP4"] = false;
}); });
API_REGIST(hook,on_play,{ API_REGIST(hook,on_play,{

View File

@ -195,7 +195,7 @@ void installWebHook(){
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){
if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){ if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){
invoker(""); invoker("",true, true,false);
return; return;
} }
//异步执行该hook api防止阻塞NoticeCenter //异步执行该hook api防止阻塞NoticeCenter
@ -205,7 +205,23 @@ void installWebHook(){
body["id"] = sender.getIdentifier(); body["id"] = sender.getIdentifier();
//执行hook //执行hook
do_http_hook(hook_publish,body,[invoker](const Value &obj,const string &err){ do_http_hook(hook_publish,body,[invoker](const Value &obj,const string &err){
invoker(err); if(err.empty()){
//推流鉴权成功
bool enableRtxp = true;
bool enableHls = true;
bool enableMP4 = false;
//加try catch目的是兼容之前的hook接口用户可以不传递enableRtxp、enableHls、enableMP4参数
try { enableRtxp = obj["enableRtxp"].asBool(); } catch (...) {}
try { enableHls = obj["enableHls"].asBool(); } catch (...) {}
try { enableMP4 = obj["enableMP4"].asBool(); } catch (...) {}
invoker(err,enableRtxp,enableHls,enableMP4);
}else{
//推流鉴权失败
invoker(err,false, false, false);
}
}); });
}); });

View File

@ -92,13 +92,20 @@ extern const string kBroadcastOnGetRtspRealm;
extern const string kBroadcastOnRtspAuth; extern const string kBroadcastOnRtspAuth;
#define BroadcastOnRtspAuthArgs const MediaInfo &args,const string &realm,const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker,TcpSession &sender #define BroadcastOnRtspAuthArgs const MediaInfo &args,const string &realm,const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker,TcpSession &sender
//鉴权结果回调对象 //推流鉴权结果回调对象
//如果errMessage为空则代表鉴权成功 //如果errMessage为空则代表鉴权成功
typedef std::function<void(const string &errMessage)> AuthInvoker; //enableHls: 是否允许转换hls
//enableMP4: 是否运行MP4录制
//enableRtxp: rtmp推流时是否运行转rtsprtsp推流时是否允许转rtmp
typedef std::function<void(const string &errMessage,bool enableRtxp,bool enableHls,bool enableMP4)> PublishAuthInvoker;
//收到rtsp/rtmp推流事件广播通过该事件控制推流鉴权 //收到rtsp/rtmp推流事件广播通过该事件控制推流鉴权
extern const string kBroadcastMediaPublish; extern const string kBroadcastMediaPublish;
#define BroadcastMediaPublishArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,TcpSession &sender #define BroadcastMediaPublishArgs const MediaInfo &args,const Broadcast::PublishAuthInvoker &invoker,TcpSession &sender
//播放鉴权结果回调对象
//如果errMessage为空则代表鉴权成功
typedef std::function<void(const string &errMessage)> AuthInvoker;
//播放rtsp/rtmp/http-flv事件广播通过该事件控制播放鉴权 //播放rtsp/rtmp/http-flv事件广播通过该事件控制播放鉴权
extern const string kBroadcastMediaPlayed; extern const string kBroadcastMediaPlayed;

View File

@ -143,7 +143,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
_mediaInfo.parse(_strTcUrl + "/" + getStreamId(dec.load<std::string>())); _mediaInfo.parse(_strTcUrl + "/" + getStreamId(dec.load<std::string>()));
_mediaInfo._schema = RTMP_SCHEMA; _mediaInfo._schema = RTMP_SCHEMA;
auto onRes = [this,pToken](const string &err){ auto onRes = [this,pToken](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA, auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
_mediaInfo._vhost, _mediaInfo._vhost,
_mediaInfo._app, _mediaInfo._app,
@ -167,22 +167,25 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
} }
_pPublisherSrc.reset(new RtmpToRtspMediaSource(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid)); _pPublisherSrc.reset(new RtmpToRtspMediaSource(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid));
_pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
//设置转协议
_pPublisherSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);
//如果是rtmp推流客户端那么加大TCP接收缓存这样能提升接收性能 //如果是rtmp推流客户端那么加大TCP接收缓存这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024)); _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags(); setSocketFlags();
}; };
Broadcast::AuthInvoker invoker = [weakSelf,onRes,pToken](const string &err){ Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes,pToken](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
strongSelf->async([weakSelf,onRes,err,pToken](){ strongSelf->async([weakSelf,onRes,err,pToken,enableRtxp,enableHls,enableMP4](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
onRes(err); onRes(err,enableRtxp,enableHls,enableMP4);
}); });
}; };
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,
@ -191,7 +194,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
*this); *this);
if(!flag){ if(!flag){
//该事件无人监听,默认鉴权成功 //该事件无人监听,默认鉴权成功
onRes(""); onRes("",true,true,false);
} }
} }

View File

@ -93,7 +93,7 @@ private:
double _dNowReqID = 0; double _dNowReqID = 0;
Ticker _ticker;//数据接收时间 Ticker _ticker;//数据接收时间
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader; RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader;
std::shared_ptr<RtmpMediaSource> _pPublisherSrc; std::shared_ptr<RtmpToRtspMediaSource> _pPublisherSrc;
std::weak_ptr<RtmpMediaSource> _pPlayerSrc; std::weak_ptr<RtmpMediaSource> _pPlayerSrc;
//时间戳修整器 //时间戳修整器
Stamp _stamp[2]; Stamp _stamp[2];

View File

@ -52,11 +52,7 @@ public:
RtmpToRtspMediaSource(const string &vhost, RtmpToRtspMediaSource(const string &vhost,
const string &app, const string &app,
const string &id, const string &id,
bool bEnableHls = true,
bool bEnableMp4 = false,
int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){ int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){
_bEnableHls = bEnableHls;
_bEnableMp4 = bEnableMp4;
_demuxer = std::make_shared<RtmpDemuxer>(); _demuxer = std::make_shared<RtmpDemuxer>();
} }
virtual ~RtmpToRtspMediaSource(){} virtual ~RtmpToRtspMediaSource(){}
@ -66,17 +62,17 @@ public:
RtmpMediaSource::onGetMetaData(metadata); RtmpMediaSource::onGetMetaData(metadata);
} }
void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos) override { void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos = true) override {
_demuxer->inputRtmp(pkt); _demuxer->inputRtmp(pkt);
if(!_muxer && _demuxer->isInited(2000)){ if(!_muxer && _demuxer->isInited(2000)){
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), _muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(),
getApp(), getApp(),
getId(), getId(),
_demuxer->getDuration(), _demuxer->getDuration(),
true,//转rtsp _enableRtsp,
false,//不重复生成rtmp false,//不重复生成rtmp
_bEnableHls, _enableHls,
_bEnableMp4); _enableMP4);
for (auto &track : _demuxer->getTracks(false)){ for (auto &track : _demuxer->getTracks(false)){
_muxer->addTrack(track); _muxer->addTrack(track);
track->addDelegate(_muxer); track->addDelegate(_muxer);
@ -107,11 +103,25 @@ public:
} }
return _demuxer->getTracks(trackReady); return _demuxer->getTracks(trackReady);
} }
/**
*
* @param enableRtsp rtsp
* @param enableHls hls
* @param enableMP4 mp4录制
*/
void setProtocolTranslation(bool enableRtsp,bool enableHls,bool enableMP4){
// DebugL << enableRtsp << " " << enableHls << " " << enableMP4;
_enableRtsp = enableRtsp;
_enableHls = enableHls;
_enableMP4 = enableMP4;
}
private: private:
RtmpDemuxer::Ptr _demuxer; RtmpDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
bool _bEnableHls; bool _enableHls = true;
bool _bEnableMp4; bool _enableMP4 = false;
bool _enableRtsp = true;
}; };
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -256,7 +256,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
send_SessionNotFound(); send_SessionNotFound();
throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record"); throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
} }
auto onRes = [this](const string &err){ auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
bool authSuccess = err.empty(); bool authSuccess = err.empty();
if(!authSuccess){ if(!authSuccess){
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
@ -264,6 +264,9 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
return; return;
} }
//设置转协议
_pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);
_StrPrinter rtp_info; _StrPrinter rtp_info;
for(auto &track : _aTrackInfo){ for(auto &track : _aTrackInfo){
if (track->_inited == false) { if (track->_inited == false) {
@ -284,17 +287,17 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
}; };
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){ Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
strongSelf->async([weakSelf,onRes,err](){ strongSelf->async([weakSelf,onRes,err,enableRtxp,enableHls,enableMP4](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
onRes(err); onRes(err,enableRtxp,enableHls,enableMP4);
}); });
}; };
@ -302,7 +305,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this); auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
if(!flag){ if(!flag){
//该事件无人监听,默认不鉴权 //该事件无人监听,默认不鉴权
onRes(""); onRes("",true,true,false);
} }
} }

View File

@ -43,11 +43,7 @@ public:
RtspToRtmpMediaSource(const string &vhost, RtspToRtmpMediaSource(const string &vhost,
const string &app, const string &app,
const string &id, const string &id,
bool bEnableHls = true,
bool bEnableMp4 = false,
int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) { int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) {
_bEnableHls = bEnableHls;
_bEnableMp4 = bEnableMp4;
} }
virtual ~RtspToRtmpMediaSource() {} virtual ~RtspToRtmpMediaSource() {}
@ -66,9 +62,9 @@ public:
getId(), getId(),
_demuxer->getDuration(), _demuxer->getDuration(),
false,//不重复生成rtsp false,//不重复生成rtsp
true,//转rtmp _enableRtmp,
_bEnableHls, _enableHls,
_bEnableMp4); _enableMP4);
for (auto &track : _demuxer->getTracks(false)) { for (auto &track : _demuxer->getTracks(false)) {
_muxer->addTrack(track); _muxer->addTrack(track);
track->addDelegate(_muxer); track->addDelegate(_muxer);
@ -99,11 +95,25 @@ public:
} }
return _demuxer->getTracks(trackReady); return _demuxer->getTracks(trackReady);
} }
/**
*
* @param enableRtmp rtmp
* @param enableHls hls
* @param enableMP4 mp4录制
*/
void setProtocolTranslation(bool enableRtmp,bool enableHls,bool enableMP4){
// DebugL << enableRtmp << " " << enableHls << " " << enableMP4;
_enableRtmp = enableRtmp;
_enableHls = enableHls;
_enableMP4 = enableMP4;
}
private: private:
RtspDemuxer::Ptr _demuxer; RtspDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
bool _bEnableHls; bool _enableHls = true;
bool _bEnableMp4; bool _enableMP4 = false;
bool _enableRtmp = true;
}; };
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -157,7 +157,7 @@ void initEventListener() {
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) { NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) {
DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " "
<< args._param_strs; << args._param_strs;
invoker("");//鉴权成功 invoker("", true, true, false);//鉴权成功
//invoker("this is auth failed message");//鉴权失败 //invoker("this is auth failed message");//鉴权失败
}); });