From c353c626c181eb79c0427f1842300b6f8a07f097 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Mon, 16 Dec 2019 11:00:40 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=A0=B9=E6=8D=AEPS=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=97=B6=E9=97=B4=E6=88=B3=202=E3=80=81=E4=BF=AE?= =?UTF-8?q?=E5=A4=8Drtp=E4=BB=A3=E7=90=86=E5=8F=AF=E8=83=BD=E8=8A=B1?= =?UTF-8?q?=E5=B1=8F=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtp/RtpProcess.cpp | 51 ++++++++++++++++++++++++++++++++++++----- src/Rtp/RtpProcess.h | 6 +++-- src/Rtp/RtpSelector.cpp | 4 ++-- src/Rtp/RtpSelector.h | 2 +- tests/test_rtp.cpp | 7 +----- 5 files changed, 53 insertions(+), 17 deletions(-) diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index e328063d..d26e2e56 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -33,6 +33,35 @@ namespace mediakit{ + +/** +* 合并一些时间戳相同的frame +*/ +class FrameMerger { +public: + FrameMerger() = default; + virtual ~FrameMerger() = default; + + void inputFrame(const Frame::Ptr &frame,const function &cb){ + if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { + Frame::Ptr back = _frameCached.back(); + Buffer::Ptr merged_frame = back; + if(_frameCached.size() != 1){ + string merged; + _frameCached.for_each([&](const Frame::Ptr &frame){ + merged.append(frame->data(),frame->size()); + }); + merged_frame = std::make_shared(std::move(merged)); + } + cb(back->dts(),back->pts(),merged_frame); + _frameCached.clear(); + } + _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); + } +private: + List _frameCached; +}; + string printSSRC(uint32_t ui32Ssrc) { char tmp[9] = { 0 }; ui32Ssrc = htonl(ui32Ssrc); @@ -90,6 +119,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) { }); } } + _merger = std::make_shared(); } RtpProcess::~RtpProcess() { @@ -101,7 +131,7 @@ RtpProcess::~RtpProcess() { } } -bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr) { +bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { GET_CONFIG(bool,check_source,RtpProxy::kCheckSource); //检查源是否合法 if(!_addr){ @@ -116,7 +146,11 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr * } _last_rtp_time.resetTime(); - return handleOneRtp(0,_track,(unsigned char *)data,data_len); + bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len); + if(dts_out){ + *dts_out = _dts; + } + return ret; } void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { @@ -156,6 +190,7 @@ void RtpProcess::onPSDecode(int stream, int bytes) { pts /= 90; dts /= 90; + _dts = dts; _stamps[codecid].revise(dts,pts,dts,pts,false); switch (codecid) { @@ -176,8 +211,10 @@ void RtpProcess::onPSDecode(int stream, if(_save_file_video){ fwrite((uint8_t *)data,bytes, 1, _save_file_video.get()); } - auto frame = std::make_shared((char *) data, bytes, dts, pts,4); - _muxer->inputFrame(frame); + auto frame = std::make_shared((char *) data, bytes, dts, pts,0); + _merger->inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) { + _muxer->inputFrame(std::make_shared(buffer->data(), buffer->size(), dts, pts,4)); + }); break; } @@ -196,8 +233,10 @@ void RtpProcess::onPSDecode(int stream, if(_save_file_video){ fwrite((uint8_t *)data,bytes, 1, _save_file_video.get()); } - auto frame = std::make_shared((char *) data, bytes, dts, pts, 4); - _muxer->inputFrame(frame); + auto frame = std::make_shared((char *) data, bytes, dts, pts, 0); + _merger->inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) { + _muxer->inputFrame(std::make_shared(buffer->data(), buffer->size(), dts, pts, 4)); + }); break; } diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 08d56b6b..7053956e 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -39,13 +39,13 @@ using namespace mediakit; namespace mediakit{ string printSSRC(uint32_t ui32Ssrc); - +class FrameMerger; class RtpProcess : public RtpReceiver , public RtpDecoder , public PSDecoder { public: typedef std::shared_ptr Ptr; RtpProcess(uint32_t ssrc); ~RtpProcess(); - bool inputRtp(const char *data,int data_len, const struct sockaddr *addr); + bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool alive(); string get_peer_ip(); uint16_t get_peer_port(); @@ -70,8 +70,10 @@ private: int _codecid_video = 0; int _codecid_audio = 0; MultiMediaSourceMuxer::Ptr _muxer; + std::shared_ptr _merger; Ticker _last_rtp_time; map _stamps; + uint32_t _dts = 0; }; }//namespace mediakit diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 2ff997a7..f57081d2 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -31,7 +31,7 @@ namespace mediakit{ INSTANCE_IMP(RtpSelector); -bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr *addr) { +bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { if(_last_rtp_time.elapsedTime() > 3000){ _last_rtp_time.resetTime(); onManager(); @@ -43,7 +43,7 @@ bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr } auto process = getProcess(ssrc, true); if(process){ - return process->inputRtp(data,data_len, addr); + return process->inputRtp(data,data_len, addr,dts_out); } return false; } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 872d3ada..bbbd9665 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -41,7 +41,7 @@ public: ~RtpSelector(); static RtpSelector &Instance(); - bool inputRtp(const char *data,int data_len,const struct sockaddr *addr); + bool inputRtp(const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr ); static uint32_t getSSRC(const char *data,int data_len); RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew); void delProcess(uint32_t ssrc,const RtpProcess *ptr); diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 14b3d61a..52c8fe03 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -71,19 +71,14 @@ static bool loadFile(const char *path){ } uint32_t timeStamp; - memcpy(&timeStamp, rtp + 4, 4); - timeStamp = ntohl(timeStamp); - timeStamp /= 90; - + RtpSelector::Instance().inputRtp(rtp,len, &addr,&timeStamp); if(timeStamp_last){ auto diff = timeStamp - timeStamp_last; if(diff > 0){ usleep(diff * 1000); } } - timeStamp_last = timeStamp; - RtpSelector::Instance().inputRtp(rtp,len, &addr); } fclose(fp); return true;