diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index b0f23721..12bcf4e3 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -76,7 +76,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) { GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); { - FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".rtp",dump_dir).data(),"wb") : nullptr; + FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(_media_info._streamid + ".rtp",dump_dir).data(),"wb") : nullptr; if(fp){ _save_file_rtp.reset(fp,[](FILE *fp){ fclose(fp); @@ -85,7 +85,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) { } { - FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".mp2",dump_dir).data(),"wb") : nullptr; + FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(_media_info._streamid + ".mp2",dump_dir).data(),"wb") : nullptr; if(fp){ _save_file_ps.reset(fp,[](FILE *fp){ fclose(fp); @@ -94,7 +94,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) { } { - FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".video",dump_dir).data(),"wb") : nullptr; + FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(_media_info._streamid + ".video",dump_dir).data(),"wb") : nullptr; if(fp){ _save_file_video.reset(fp,[](FILE *fp){ fclose(fp); @@ -124,7 +124,7 @@ RtpProcess::~RtpProcess() { } } -bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { +bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { GET_CONFIG(bool,check_source,RtpProxy::kCheckSource); //检查源是否合法 if(!_addr){ @@ -133,6 +133,7 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr * DebugP(this) << "bind to address:" << printAddress(_addr); //推流鉴权 emitOnPublish(); + _sock = sock; } if(!_muxer){ @@ -182,11 +183,11 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam //创建解码器 if(checkTS(packet, bytes)){ //猜测是ts负载 - InfoP(this) << "judged to be TS: " << printSSRC(_ssrc); + InfoP(this) << "judged to be TS"; _decoder = Decoder::createDecoder(Decoder::decoder_ts); }else{ //猜测是ps负载 - InfoP(this) << "judged to be PS: " << printSSRC(_ssrc); + InfoP(this) << "judged to be PS"; _decoder = Decoder::createDecoder(Decoder::decoder_ps); } _decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){ @@ -342,12 +343,16 @@ uint16_t RtpProcess::get_peer_port() { } const string& RtpProcess::get_local_ip() { - //todo + if(_sock){ + _local_ip = _sock->get_local_ip(); + } return _local_ip; } uint16_t RtpProcess::get_local_port() { - //todo + if(_sock){ + return _sock->get_local_port(); + } return 0; } diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index b8841402..a5e9b937 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -29,7 +29,7 @@ public: typedef std::shared_ptr Ptr; RtpProcess(uint32_t ssrc); ~RtpProcess(); - bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); + bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool alive(); const string &get_local_ip() override; @@ -70,6 +70,7 @@ private: std::weak_ptr _listener; MediaInfo _media_info; uint64_t _ui64TotalBytes = 0; + Socket::Ptr _sock; }; }//namespace mediakit diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index fbddccdc..d40e5ceb 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -15,7 +15,7 @@ namespace mediakit{ INSTANCE_IMP(RtpSelector); -bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { +bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { uint32_t ssrc = 0; if(!getSSRC(data,data_len,ssrc)){ WarnL << "get ssrc from rtp failed:" << data_len; @@ -23,7 +23,7 @@ bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr } auto process = getProcess(ssrc, true); if(process){ - return process->inputRtp(data,data_len, addr,dts_out); + return process->inputRtp(sock, data,data_len, addr,dts_out); } return false; } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 8e4ca205..d6354f48 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -45,7 +45,7 @@ public: ~RtpSelector(); static RtpSelector &Instance(); - bool inputRtp(const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr ); + bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr ); static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew); void delProcess(uint32_t ssrc,const RtpProcess *ptr); diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 57d71d41..b51bbda4 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -57,7 +57,7 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) { _process = RtpSelector::Instance().getProcess(_ssrc, true); _process->setListener(dynamic_pointer_cast(shared_from_this())); } - _process->inputRtp(data + 2, len - 2, &addr); + _process->inputRtp(_sock,data + 2, len - 2, &addr); _ticker.resetTime(); } diff --git a/src/Rtp/UdpRecver.cpp b/src/Rtp/UdpRecver.cpp index 484855cf..365b78a3 100644 --- a/src/Rtp/UdpRecver.cpp +++ b/src/Rtp/UdpRecver.cpp @@ -17,6 +17,7 @@ UdpRecver::UdpRecver() { } UdpRecver::~UdpRecver() { + _sock->setOnRead(nullptr); } bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) { @@ -26,8 +27,9 @@ bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) { }); auto &ref = RtpSelector::Instance(); - _sock->setOnRead([&ref](const Buffer::Ptr &buf, struct sockaddr *addr, int ){ - ref.inputRtp(buf->data(),buf->size(),addr); + auto sock = _sock; + _sock->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int ){ + ref.inputRtp(sock, buf->data(),buf->size(),addr); }); return _sock->bindUdpSock(local_port,local_ip); } diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 4f16899c..58686f2c 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -55,7 +55,7 @@ static bool loadFile(const char *path){ } uint32_t timeStamp; - RtpSelector::Instance().inputRtp(rtp,len, &addr,&timeStamp); + RtpSelector::Instance().inputRtp(nullptr,rtp,len, &addr,&timeStamp); if(timeStamp_last){ auto diff = timeStamp - timeStamp_last; if(diff > 0){