整理RtpReceiver相关代码

This commit is contained in:
ziyue 2021-06-25 16:24:44 +08:00
parent 18b7e45906
commit a7f75774e9
5 changed files with 141 additions and 88 deletions

View File

@ -24,36 +24,23 @@ static inline bool checkTS(const uint8_t *packet, size_t bytes){
return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE; return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE;
} }
class RtpReceiverImp : public RtpReceiver { class RtpReceiverImp : public RtpTrackImp {
public: public:
using Ptr = std::shared_ptr<RtpReceiverImp>; using Ptr = std::shared_ptr<RtpReceiverImp>;
RtpReceiverImp(int sample_rate, function<void(RtpPacket::Ptr rtp)> cb, function<void(const RtpPacket::Ptr &rtp)> cb_before = nullptr){ RtpReceiverImp(int sample_rate, RtpTrackImp::OnSorted cb, RtpTrackImp::BeforeSorted cb_before = nullptr){
_sample_rate = sample_rate; _sample_rate = sample_rate;
_on_sort = std::move(cb); setOnSorted(std::move(cb));
_on_before_sort = std::move(cb_before); setBeforeSorted(std::move(cb_before));
} }
~RtpReceiverImp() override = default; ~RtpReceiverImp() override = default;
bool inputRtp(TrackType type, uint8_t *ptr, size_t len){ bool inputRtp(TrackType type, uint8_t *ptr, size_t len){
return handleOneRtp((int) type, type, _sample_rate, ptr, len); return RtpTrack::inputRtp(type, _sample_rate, ptr, len);
}
protected:
void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override {
_on_sort(std::move(rtp));
}
void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override {
if (_on_before_sort) {
_on_before_sort(rtp);
}
} }
private: private:
int _sample_rate; int _sample_rate;
function<void(RtpPacket::Ptr rtp)> _on_sort;
function<void(const RtpPacket::Ptr &rtp)> _on_before_sort;
}; };
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -103,7 +103,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
} }
try { try {
_process->inputRtp(false, getSock(), data, len, &_addr); _process->inputRtp(false, getSock(), data, len, &_addr);
} catch (RtpReceiver::BadRtpException &ex) { } catch (RtpTrack::BadRtpException &ex) {
if (!_is_udp) { if (!_is_udp) {
WarnL << ex.what() << "开始搜索ssrc以便恢复上下文"; WarnL << ex.what() << "开始搜索ssrc以便恢复上下文";
_search_rtp = true; _search_rtp = true;

View File

@ -15,19 +15,23 @@
namespace mediakit { namespace mediakit {
RtpReceiver::RtpReceiver() { RtpTrack::RtpTrack() {
int index = 0; setOnSort([this](uint16_t seq, RtpPacket::Ptr &packet) {
for (auto &sortor : _rtp_sortor) { onRtpSorted(std::move(packet));
sortor.setOnSort([this, index](uint16_t seq, RtpPacket::Ptr &packet) {
onRtpSorted(std::move(packet), index);
}); });
++index;
}
} }
RtpReceiver::~RtpReceiver() {} uint32_t RtpTrack::getSSRC() const {
return _ssrc;
}
bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len) { void RtpTrack::clear() {
_ssrc = 0;
_ssrc_alive.resetTime();
PacketSortor<RtpPacket::Ptr>::clear();
}
bool RtpTrack::inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len) {
if (len < RtpPacket::kRtpHeaderSize) { if (len < RtpPacket::kRtpHeaderSize) {
WarnL << "rtp包太小:" << len; WarnL << "rtp包太小:" << len;
return false; return false;
@ -52,23 +56,23 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8
//比对缓存ssrc //比对缓存ssrc
auto ssrc = ntohl(header->ssrc); auto ssrc = ntohl(header->ssrc);
if (!_ssrc[index]) { if (!_ssrc) {
//记录并锁定ssrc //记录并锁定ssrc
_ssrc[index] = ssrc; _ssrc = ssrc;
_ssrc_alive[index].resetTime(); _ssrc_alive.resetTime();
} else if (_ssrc[index] == ssrc) { } else if (_ssrc == ssrc) {
//ssrc匹配正确,刷新计时器 //ssrc匹配正确,刷新计时器
_ssrc_alive[index].resetTime(); _ssrc_alive.resetTime();
} else { } else {
//ssrc错误 //ssrc错误
if (_ssrc_alive[index].elapsedTime() < 3 * 1000) { if (_ssrc_alive.elapsedTime() < 3 * 1000) {
//接受正确ssrc的rtp在10秒内那么我们认为存在多路rtp,忽略掉ssrc不匹配的rtp //接受正确ssrc的rtp在10秒内那么我们认为存在多路rtp,忽略掉ssrc不匹配的rtp
WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc[index]; WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc;
return false; return false;
} }
InfoL << "rtp流ssrc切换:" << _ssrc[index] << " -> " << ssrc; InfoL << "rtp流ssrc切换:" << _ssrc << " -> " << ssrc;
_ssrc[index] = ssrc; _ssrc = ssrc;
_ssrc_alive[index].resetTime(); _ssrc_alive.resetTime();
} }
auto rtp = RtpPacket::create(); auto rtp = RtpPacket::create();
@ -87,29 +91,32 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8
//拷贝rtp //拷贝rtp
memcpy(&data[4], ptr, len); memcpy(&data[4], ptr, len);
onBeforeRtpSorted(rtp, index); onBeforeRtpSorted(rtp);
auto seq = rtp->getSeq(); auto seq = rtp->getSeq();
_rtp_sortor[index].sortPacket(seq, std::move(rtp)); sortPacket(seq, std::move(rtp));
return true; return true;
} }
void RtpReceiver::clear() { ////////////////////////////////////////////////////////////////////////////////////
CLEAR_ARR(_ssrc);
for (auto &sortor : _rtp_sortor) { void RtpTrackImp::setOnSorted(OnSorted cb) {
sortor.clear(); _on_sorted = std::move(cb);
}
void RtpTrackImp::setBeforeSorted(BeforeSorted cb) {
_on_before_sorted = std::move(cb);
}
void RtpTrackImp::onRtpSorted(RtpPacket::Ptr rtp) {
if (_on_sorted) {
_on_sorted(std::move(rtp));
} }
} }
size_t RtpReceiver::getJitterSize(int index) const{ void RtpTrackImp::onBeforeRtpSorted(const RtpPacket::Ptr &rtp) {
return _rtp_sortor[index].getJitterSize(); if (_on_before_sorted) {
} _on_before_sorted(rtp);
}
size_t RtpReceiver::getCycleCount(int index) const{
return _rtp_sortor[index].getCycleCount();
}
uint32_t RtpReceiver::getSSRC(int index) const{
return _ssrc[index];
} }
}//namespace mediakit }//namespace mediakit

View File

@ -160,11 +160,8 @@ private:
function<void(SEQ seq, T &packet)> _cb; function<void(SEQ seq, T &packet)> _cb;
}; };
class RtpReceiver { class RtpTrack : private PacketSortor<RtpPacket::Ptr>{
public: public:
RtpReceiver();
virtual ~RtpReceiver();
class BadRtpException : public invalid_argument { class BadRtpException : public invalid_argument {
public: public:
template<typename Type> template<typename Type>
@ -172,7 +169,60 @@ public:
~BadRtpException() = default; ~BadRtpException() = default;
}; };
RtpTrack();
virtual ~RtpTrack() = default;
void clear();
uint32_t getSSRC() const;
bool inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len);
protected: protected:
virtual void onRtpSorted(RtpPacket::Ptr rtp) {}
virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) {}
private:
uint32_t _ssrc = 0;
Ticker _ssrc_alive;
};
class RtpTrackImp : public RtpTrack{
public:
using OnSorted = function<void(RtpPacket::Ptr)>;
using BeforeSorted = function<void(const RtpPacket::Ptr &)>;
RtpTrackImp() = default;
~RtpTrackImp() override = default;
void setOnSorted(OnSorted cb);
void setBeforeSorted(BeforeSorted cb);
protected:
void onRtpSorted(RtpPacket::Ptr rtp) override;
void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) override;
private:
OnSorted _on_sorted;
BeforeSorted _on_before_sorted;
};
template<int kCount = 2>
class RtpMultiReceiver {
public:
RtpMultiReceiver() {
int index = 0;
for (auto &track : _track) {
track.setOnSorted([this, index](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp), index);
});
track.setBeforeSorted([this, index](const RtpPacket::Ptr &rtp) {
onBeforeRtpSorted(rtp, index);
});
++index;
}
}
virtual ~RtpMultiReceiver() = default;
/** /**
* rtp包 * rtp包
* @param index track下标索引 * @param index track下标索引
@ -182,34 +232,49 @@ protected:
* @param len rtp数据指针长度 * @param len rtp数据指针长度
* @return true * @return true
*/ */
bool handleOneRtp(int index, TrackType type, int samplerate, uint8_t *ptr, size_t len); bool handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len){
return _track[index].inputRtp(type, sample_rate, ptr, len);
}
void clear() {
for (auto &track : _track) {
track.clear();
}
}
size_t getJitterSize(int index) const {
return _track[index].getJitterSize();
}
size_t getCycleCount(int index) const {
return _track[index].getCycleCount();
}
uint32_t getSSRC(int index) const {
return _track[index].getSSRC();
}
protected:
/** /**
* rtp数据包排序后输出 * rtp数据包排序后输出
* @param rtp rtp数据包 * @param rtp rtp数据包
* @param track_index track索引 * @param track_index track索引
*/ */
virtual void onRtpSorted(RtpPacket::Ptr rtp, int track_index) {} virtual void onRtpSorted(RtpPacket::Ptr rtp, int index) {}
/** /**
* rtp但还未排序 * rtp但还未排序
* @param rtp rtp数据包 * @param rtp rtp数据包
* @param track_index track索引 * @param track_index track索引
*/ */
virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) {} virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int index) {}
void clear();
size_t getJitterSize(int track_index) const;
size_t getCycleCount(int track_index) const;
uint32_t getSSRC(int track_index) const;
private: private:
uint32_t _ssrc[2] = {0, 0}; RtpTrackImp _track[kCount];
Ticker _ssrc_alive[2];
//rtp排序缓存根据seq排序
PacketSortor<RtpPacket::Ptr> _rtp_sortor[2];
}; };
using RtpReceiver = RtpMultiReceiver<2>;
}//namespace mediakit }//namespace mediakit

View File

@ -552,14 +552,14 @@ SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////
class RtpChannel : public RtpReceiver { class RtpChannel : public RtpTrackImp {
public: public:
uint32_t rtp_ssrc; uint32_t rtp_ssrc;
public: public:
RtpChannel(function<void(RtpPacket::Ptr rtp)> on_rtp, function<void(const FCI_NACK &nack)> on_nack) { RtpChannel(RtpTrackImp::OnSorted cb, function<void(const FCI_NACK &nack)> on_nack) {
_on_sort = std::move(on_rtp); setOnSorted(std::move(cb));
nack_ctx.setOnNack(std::move(on_nack)); _nack_ctx.setOnNack(std::move(on_nack));
} }
~RtpChannel() override = default; ~RtpChannel() override = default;
@ -569,27 +569,21 @@ public:
RtpHeader *rtp = (RtpHeader *) ptr; RtpHeader *rtp = (RtpHeader *) ptr;
auto seq = ntohs(rtp->seq); auto seq = ntohs(rtp->seq);
//统计rtp接受情况便于生成nack rtcp包 //统计rtp接受情况便于生成nack rtcp包
nack_ctx.received(seq); _nack_ctx.received(seq);
//统计rtp收到的情况好做rr汇报 //统计rtp收到的情况好做rr汇报
rtcp_context.onRtp(seq, ntohl(rtp->stamp), len); _rtcp_context.onRtp(seq, ntohl(rtp->stamp), len);
} }
return handleOneRtp((int) type, type, sample_rate, ptr, len); return RtpTrack::inputRtp(type, sample_rate, ptr, len);
} }
Buffer::Ptr createRtcpRR(RtcpHeader *sr, uint32_t ssrc) { Buffer::Ptr createRtcpRR(RtcpHeader *sr, uint32_t ssrc) {
rtcp_context.onRtcp(sr); _rtcp_context.onRtcp(sr);
return rtcp_context.createRtcpRR(ssrc, rtp_ssrc); return _rtcp_context.createRtcpRR(ssrc, rtp_ssrc);
}
protected:
void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override {
_on_sort(std::move(rtp));
} }
private: private:
NackContext nack_ctx; NackContext _nack_ctx;
RtcpContext rtcp_context{true}; RtcpContext _rtcp_context{true};
function<void(RtpPacket::Ptr rtp)> _on_sort;
}; };
std::shared_ptr<RtpChannel> MediaTrack::getRtpChannel(uint32_t ssrc) const{ std::shared_ptr<RtpChannel> MediaTrack::getRtpChannel(uint32_t ssrc) const{