rtp推流初步添加事件

This commit is contained in:
xiongziliang 2020-04-23 23:18:24 +08:00
parent 00c21cd99e
commit 0df25942aa
3 changed files with 120 additions and 34 deletions

@ -1 +1 @@
Subproject commit 07d21ac61be6c7a4eba90a5d2d26b15daa882cf7 Subproject commit ebd96d983d8dd3268e3e77ed08fb57d67666061c

View File

@ -68,13 +68,11 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
_track->_samplerate = 90000; _track->_samplerate = 90000;
_track->_type = TrackVideo; _track->_type = TrackVideo;
_track->_ssrc = _ssrc; _track->_ssrc = _ssrc;
DebugL << printSSRC(_ssrc);
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); _media_info._schema = RTP_APP_NAME;
GET_CONFIG(bool,toHls,General::kPublishToHls); _media_info._vhost = DEFAULT_VHOST;
GET_CONFIG(bool,toMP4,General::kPublishToMP4); _media_info._app = RTP_APP_NAME;
_media_info._streamid = printSSRC(_ssrc);
_muxer = std::make_shared<MultiMediaSourceMuxer>(DEFAULT_VHOST,RTP_APP_NAME,printSSRC(_ssrc),0,toRtxp,toRtxp,toHls,toMP4);
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
{ {
@ -107,11 +105,22 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
} }
RtpProcess::~RtpProcess() { RtpProcess::~RtpProcess() {
if(_addr){ DebugP(this);
DebugL << printSSRC(_ssrc) << " " << printAddress(_addr); if (_addr) {
delete _addr; delete _addr;
}else{ }
DebugL << printSSRC(_ssrc);
uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000;
WarnP(this) << "RTP推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")断开,耗时(s):" << duration;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_ui64TotalBytes > iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _ui64TotalBytes, duration, false, static_cast<SockInfo &>(*this));
} }
} }
@ -121,14 +130,22 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *
if(!_addr){ if(!_addr){
_addr = new struct sockaddr; _addr = new struct sockaddr;
memcpy(_addr,addr, sizeof(struct sockaddr)); memcpy(_addr,addr, sizeof(struct sockaddr));
DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") bind to address:" << printAddress(_addr); DebugP(this) << "bind to address:" << printAddress(_addr);
//推流鉴权
emitOnPublish();
} }
if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){ if(!_muxer){
DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") address dismatch:" << printAddress(addr) << " != " << printAddress(_addr); //无权限推流
return false; return false;
} }
if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){
DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
return false;
}
_ui64TotalBytes += data_len;
_last_rtp_time.resetTime(); _last_rtp_time.resetTime();
bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len); bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len);
if(dts_out){ if(dts_out){
@ -144,7 +161,7 @@ static inline bool checkTS(const uint8_t *packet, int bytes){
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
if(rtp->sequence != _sequence + 1){ if(rtp->sequence != _sequence + 1){
WarnL << rtp->sequence << " != " << _sequence << "+1"; WarnP(this) << rtp->sequence << " != " << _sequence << "+1";
} }
_sequence = rtp->sequence; _sequence = rtp->sequence;
if(_save_file_rtp){ if(_save_file_rtp){
@ -165,11 +182,11 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam
//创建解码器 //创建解码器
if(checkTS(packet, bytes)){ if(checkTS(packet, bytes)){
//猜测是ts负载 //猜测是ts负载
InfoL << "judged to be TS: " << printSSRC(_ssrc); InfoP(this) << "judged to be TS: " << printSSRC(_ssrc);
_decoder = Decoder::createDecoder(Decoder::decoder_ts); _decoder = Decoder::createDecoder(Decoder::decoder_ts);
}else{ }else{
//猜测是ps负载 //猜测是ps负载
InfoL << "judged to be PS: " << printSSRC(_ssrc); InfoP(this) << "judged to be PS: " << printSSRC(_ssrc);
_decoder = Decoder::createDecoder(Decoder::decoder_ps); _decoder = Decoder::createDecoder(Decoder::decoder_ps);
} }
_decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){ _decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){
@ -179,7 +196,7 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam
auto ret = _decoder->input((uint8_t *)packet,bytes); auto ret = _decoder->input((uint8_t *)packet,bytes);
if(ret != bytes){ if(ret != bytes){
WarnL << ret << " != " << bytes << " " << flags; WarnP(this) << ret << " != " << bytes << " " << flags;
} }
} }
@ -213,13 +230,13 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
if (!_codecid_video) { if (!_codecid_video) {
//获取到视频 //获取到视频
_codecid_video = codecid; _codecid_video = codecid;
InfoL << "got video track: H264"; InfoP(this) << "got video track: H264";
auto track = std::make_shared<H264Track>(); auto track = std::make_shared<H264Track>();
_muxer->addTrack(track); _muxer->addTrack(track);
} }
if (codecid != _codecid_video) { if (codecid != _codecid_video) {
WarnL << "video track change to H264 from codecid:" << getCodecName(_codecid_video); WarnP(this) << "video track change to H264 from codecid:" << getCodecName(_codecid_video);
return; return;
} }
@ -238,12 +255,12 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
if (!_codecid_video) { if (!_codecid_video) {
//获取到视频 //获取到视频
_codecid_video = codecid; _codecid_video = codecid;
InfoL << "got video track: H265"; InfoP(this) << "got video track: H265";
auto track = std::make_shared<H265Track>(); auto track = std::make_shared<H265Track>();
_muxer->addTrack(track); _muxer->addTrack(track);
} }
if (codecid != _codecid_video) { if (codecid != _codecid_video) {
WarnL << "video track change to H265 from codecid:" << getCodecName(_codecid_video); WarnP(this) << "video track change to H265 from codecid:" << getCodecName(_codecid_video);
return; return;
} }
if(_save_file_video){ if(_save_file_video){
@ -261,13 +278,13 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
if (!_codecid_audio) { if (!_codecid_audio) {
//获取到音频 //获取到音频
_codecid_audio = codecid; _codecid_audio = codecid;
InfoL << "got audio track: AAC"; InfoP(this) << "got audio track: AAC";
auto track = std::make_shared<AACTrack>(); auto track = std::make_shared<AACTrack>();
_muxer->addTrack(track); _muxer->addTrack(track);
} }
if (codecid != _codecid_audio) { if (codecid != _codecid_audio) {
WarnL << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio); WarnP(this) << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio);
return; return;
} }
_muxer->inputFrame(std::make_shared<AACFrameNoCacheAble>((char *) data, bytes, dts, 0, 7)); _muxer->inputFrame(std::make_shared<AACFrameNoCacheAble>((char *) data, bytes, dts, 0, 7));
@ -281,14 +298,14 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
if (!_codecid_audio) { if (!_codecid_audio) {
//获取到音频 //获取到音频
_codecid_audio = codecid; _codecid_audio = codecid;
InfoL << "got audio track: G711"; InfoP(this) << "got audio track: G711";
//G711传统只支持 8000/1/16的规格FFmpeg貌似做了扩展但是这里不管它了 //G711传统只支持 8000/1/16的规格FFmpeg貌似做了扩展但是这里不管它了
auto track = std::make_shared<G711Track>(codec, 8000, 1, 16); auto track = std::make_shared<G711Track>(codec, 8000, 1, 16);
_muxer->addTrack(track); _muxer->addTrack(track);
} }
if (codecid != _codecid_audio) { if (codecid != _codecid_audio) {
WarnL << "audio track change to G711 from codecid:" << getCodecName(_codecid_audio); WarnP(this) << "audio track change to G711 from codecid:" << getCodecName(_codecid_audio);
return; return;
} }
_muxer->inputFrame(std::make_shared<G711FrameNoCacheAble>(codec, (char *) data, bytes, dts)); _muxer->inputFrame(std::make_shared<G711FrameNoCacheAble>(codec, (char *) data, bytes, dts));
@ -296,7 +313,7 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
} }
default: default:
if(codecid != 0){ if(codecid != 0){
WarnL << "unsupported codec type:" << getCodecName(codecid) << " " << (int)codecid; WarnP(this) << "unsupported codec type:" << getCodecName(codecid) << " " << (int)codecid;
} }
return; return;
} }
@ -310,20 +327,74 @@ bool RtpProcess::alive() {
return false; return false;
} }
string RtpProcess::get_peer_ip() { const string& RtpProcess::get_peer_ip() {
return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); if(_peer_ip.empty() && _addr){
_peer_ip = SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
}
return _peer_ip;
} }
uint16_t RtpProcess::get_peer_port() { uint16_t RtpProcess::get_peer_port() {
if(!_addr){
return 0;
}
return ntohs(((struct sockaddr_in *) _addr)->sin_port); return ntohs(((struct sockaddr_in *) _addr)->sin_port);
} }
const string& RtpProcess::get_local_ip() {
//todo
return _local_ip;
}
uint16_t RtpProcess::get_local_port() {
//todo
return 0;
}
string RtpProcess::getIdentifier() const{
return _media_info._streamid;
}
int RtpProcess::totalReaderCount(){ int RtpProcess::totalReaderCount(){
return _muxer->totalReaderCount(); return _muxer ? _muxer->totalReaderCount() : 0;
} }
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){ void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
if(_muxer){
_muxer->setMediaListener(listener); _muxer->setMediaListener(listener);
}else{
_listener = listener;
}
}
void RtpProcess::emitOnPublish() {
weak_ptr<RtpProcess> weak_self = shared_from_this();
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableRtxp, bool enableHls, bool enableMP4) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
if (err.empty()) {
strongSelf->_muxer = std::make_shared<MultiMediaSourceMuxer>(strongSelf->_media_info._vhost,
strongSelf->_media_info._app,
strongSelf->_media_info._streamid, 0,
enableRtxp, enableRtxp, enableHls, enableMP4);
strongSelf->_muxer->setMediaListener(strongSelf->_listener);
InfoP(strongSelf) << "允许RTP推流";
} else {
WarnP(strongSelf) << "禁止RTP推流:" << err;
}
};
//触发推流鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
GET_CONFIG(bool, toRtxp, General::kPublishToRtxp);
GET_CONFIG(bool, toHls, General::kPublishToHls);
GET_CONFIG(bool, toMP4, General::kPublishToMP4);
invoker("", toRtxp, toHls, toMP4);
}
} }

View File

@ -24,21 +24,31 @@ namespace mediakit{
string printSSRC(uint32_t ui32Ssrc); string printSSRC(uint32_t ui32Ssrc);
class FrameMerger; class FrameMerger;
class RtpProcess : public RtpReceiver , public RtpDecoder{ class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public std::enable_shared_from_this<RtpProcess>{
public: public:
typedef std::shared_ptr<RtpProcess> Ptr; typedef std::shared_ptr<RtpProcess> Ptr;
RtpProcess(uint32_t ssrc); RtpProcess(uint32_t ssrc);
~RtpProcess(); ~RtpProcess();
bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
bool alive(); bool alive();
string get_peer_ip();
uint16_t get_peer_port(); const string &get_local_ip() override;
uint16_t get_local_port() override;
const string &get_peer_ip() override;
uint16_t get_peer_port() override;
string getIdentifier() const override;
int totalReaderCount(); int totalReaderCount();
void setListener(const std::weak_ptr<MediaSourceEvent> &listener); void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
protected: protected:
void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ;
void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) override; void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) override;
void onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts, const void *data,int bytes); void onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts, const void *data,int bytes);
private:
void emitOnPublish();
private: private:
std::shared_ptr<FILE> _save_file_rtp; std::shared_ptr<FILE> _save_file_rtp;
std::shared_ptr<FILE> _save_file_ps; std::shared_ptr<FILE> _save_file_ps;
@ -55,6 +65,11 @@ private:
unordered_map<int,Stamp> _stamps; unordered_map<int,Stamp> _stamps;
uint32_t _dts = 0; uint32_t _dts = 0;
Decoder::Ptr _decoder; Decoder::Ptr _decoder;
string _peer_ip;
string _local_ip;
std::weak_ptr<MediaSourceEvent> _listener;
MediaInfo _media_info;
uint64_t _ui64TotalBytes = 0;
}; };
}//namespace mediakit }//namespace mediakit