完善网络相关信息

This commit is contained in:
xiongziliang 2020-04-23 23:30:24 +08:00
parent 0df25942aa
commit 26cfb5ae73
7 changed files with 24 additions and 16 deletions

View File

@ -76,7 +76,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
{ {
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".rtp",dump_dir).data(),"wb") : nullptr; FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(_media_info._streamid + ".rtp",dump_dir).data(),"wb") : nullptr;
if(fp){ if(fp){
_save_file_rtp.reset(fp,[](FILE *fp){ _save_file_rtp.reset(fp,[](FILE *fp){
fclose(fp); fclose(fp);
@ -85,7 +85,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
} }
{ {
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".mp2",dump_dir).data(),"wb") : nullptr; FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(_media_info._streamid + ".mp2",dump_dir).data(),"wb") : nullptr;
if(fp){ if(fp){
_save_file_ps.reset(fp,[](FILE *fp){ _save_file_ps.reset(fp,[](FILE *fp){
fclose(fp); fclose(fp);
@ -94,7 +94,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
} }
{ {
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".video",dump_dir).data(),"wb") : nullptr; FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(_media_info._streamid + ".video",dump_dir).data(),"wb") : nullptr;
if(fp){ if(fp){
_save_file_video.reset(fp,[](FILE *fp){ _save_file_video.reset(fp,[](FILE *fp){
fclose(fp); fclose(fp);
@ -124,7 +124,7 @@ RtpProcess::~RtpProcess() {
} }
} }
bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
GET_CONFIG(bool,check_source,RtpProxy::kCheckSource); GET_CONFIG(bool,check_source,RtpProxy::kCheckSource);
//检查源是否合法 //检查源是否合法
if(!_addr){ if(!_addr){
@ -133,6 +133,7 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *
DebugP(this) << "bind to address:" << printAddress(_addr); DebugP(this) << "bind to address:" << printAddress(_addr);
//推流鉴权 //推流鉴权
emitOnPublish(); emitOnPublish();
_sock = sock;
} }
if(!_muxer){ if(!_muxer){
@ -182,11 +183,11 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam
//创建解码器 //创建解码器
if(checkTS(packet, bytes)){ if(checkTS(packet, bytes)){
//猜测是ts负载 //猜测是ts负载
InfoP(this) << "judged to be TS: " << printSSRC(_ssrc); InfoP(this) << "judged to be TS";
_decoder = Decoder::createDecoder(Decoder::decoder_ts); _decoder = Decoder::createDecoder(Decoder::decoder_ts);
}else{ }else{
//猜测是ps负载 //猜测是ps负载
InfoP(this) << "judged to be PS: " << printSSRC(_ssrc); InfoP(this) << "judged to be PS";
_decoder = Decoder::createDecoder(Decoder::decoder_ps); _decoder = Decoder::createDecoder(Decoder::decoder_ps);
} }
_decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){ _decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){
@ -342,12 +343,16 @@ uint16_t RtpProcess::get_peer_port() {
} }
const string& RtpProcess::get_local_ip() { const string& RtpProcess::get_local_ip() {
//todo if(_sock){
_local_ip = _sock->get_local_ip();
}
return _local_ip; return _local_ip;
} }
uint16_t RtpProcess::get_local_port() { uint16_t RtpProcess::get_local_port() {
//todo if(_sock){
return _sock->get_local_port();
}
return 0; return 0;
} }

View File

@ -29,7 +29,7 @@ public:
typedef std::shared_ptr<RtpProcess> Ptr; typedef std::shared_ptr<RtpProcess> Ptr;
RtpProcess(uint32_t ssrc); RtpProcess(uint32_t ssrc);
~RtpProcess(); ~RtpProcess();
bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
bool alive(); bool alive();
const string &get_local_ip() override; const string &get_local_ip() override;
@ -70,6 +70,7 @@ private:
std::weak_ptr<MediaSourceEvent> _listener; std::weak_ptr<MediaSourceEvent> _listener;
MediaInfo _media_info; MediaInfo _media_info;
uint64_t _ui64TotalBytes = 0; uint64_t _ui64TotalBytes = 0;
Socket::Ptr _sock;
}; };
}//namespace mediakit }//namespace mediakit

View File

@ -15,7 +15,7 @@ namespace mediakit{
INSTANCE_IMP(RtpSelector); INSTANCE_IMP(RtpSelector);
bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
uint32_t ssrc = 0; uint32_t ssrc = 0;
if(!getSSRC(data,data_len,ssrc)){ if(!getSSRC(data,data_len,ssrc)){
WarnL << "get ssrc from rtp failed:" << data_len; WarnL << "get ssrc from rtp failed:" << data_len;
@ -23,7 +23,7 @@ bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr
} }
auto process = getProcess(ssrc, true); auto process = getProcess(ssrc, true);
if(process){ if(process){
return process->inputRtp(data,data_len, addr,dts_out); return process->inputRtp(sock, data,data_len, addr,dts_out);
} }
return false; return false;
} }

View File

@ -45,7 +45,7 @@ public:
~RtpSelector(); ~RtpSelector();
static RtpSelector &Instance(); static RtpSelector &Instance();
bool inputRtp(const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr ); bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr );
static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); static bool getSSRC(const char *data,int data_len, uint32_t &ssrc);
RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew); RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew);
void delProcess(uint32_t ssrc,const RtpProcess *ptr); void delProcess(uint32_t ssrc,const RtpProcess *ptr);

View File

@ -57,7 +57,7 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) {
_process = RtpSelector::Instance().getProcess(_ssrc, true); _process = RtpSelector::Instance().getProcess(_ssrc, true);
_process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this())); _process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
} }
_process->inputRtp(data + 2, len - 2, &addr); _process->inputRtp(_sock,data + 2, len - 2, &addr);
_ticker.resetTime(); _ticker.resetTime();
} }

View File

@ -17,6 +17,7 @@ UdpRecver::UdpRecver() {
} }
UdpRecver::~UdpRecver() { UdpRecver::~UdpRecver() {
_sock->setOnRead(nullptr);
} }
bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) { bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) {
@ -26,8 +27,9 @@ bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) {
}); });
auto &ref = RtpSelector::Instance(); auto &ref = RtpSelector::Instance();
_sock->setOnRead([&ref](const Buffer::Ptr &buf, struct sockaddr *addr, int ){ auto sock = _sock;
ref.inputRtp(buf->data(),buf->size(),addr); _sock->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int ){
ref.inputRtp(sock, buf->data(),buf->size(),addr);
}); });
return _sock->bindUdpSock(local_port,local_ip); return _sock->bindUdpSock(local_port,local_ip);
} }

View File

@ -55,7 +55,7 @@ static bool loadFile(const char *path){
} }
uint32_t timeStamp; uint32_t timeStamp;
RtpSelector::Instance().inputRtp(rtp,len, &addr,&timeStamp); RtpSelector::Instance().inputRtp(nullptr,rtp,len, &addr,&timeStamp);
if(timeStamp_last){ if(timeStamp_last){
auto diff = timeStamp - timeStamp_last; auto diff = timeStamp - timeStamp_last;
if(diff > 0){ if(diff > 0){