初步支持simulcast流的接收和分流

This commit is contained in:
ziyue 2021-06-23 21:44:04 +08:00
parent faa43adbbc
commit dcb91e3b50
2 changed files with 88 additions and 27 deletions

View File

@ -408,14 +408,7 @@ void WebRtcTransportImp::onStartWebRTC() {
info->offer_ssrc_rtx = m_offer->getRtxSSRC(); info->offer_ssrc_rtx = m_offer->getRtxSSRC();
info->plan_rtp = &m_answer.plan[0];; info->plan_rtp = &m_answer.plan[0];;
info->plan_rtx = m_answer.getRelatedRtxPlan(info->plan_rtp->pt); info->plan_rtx = m_answer.getRelatedRtxPlan(info->plan_rtp->pt);
info->rtcp_context_recv = std::make_shared<RtcpContext>(info->plan_rtp->sample_rate, true);
info->rtcp_context_send = std::make_shared<RtcpContext>(info->plan_rtp->sample_rate, false); info->rtcp_context_send = std::make_shared<RtcpContext>(info->plan_rtp->sample_rate, false);
info->receiver = std::make_shared<RtpReceiverImp>([info, this](RtpPacket::Ptr rtp) mutable {
onSortedRtp(*info, std::move(rtp));
});
info->nack_ctx.setOnNack([info, this](const FCI_NACK &nack) mutable {
onSendNack(*info, nack);
});
//send ssrc --> RtpPayloadInfo //send ssrc --> RtpPayloadInfo
_rtp_info_ssrc[info->answer_ssrc_rtp] = std::make_pair(false, info); _rtp_info_ssrc[info->answer_ssrc_rtp] = std::make_pair(false, info);
@ -599,9 +592,14 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
auto rtx = it->second.first; auto rtx = it->second.first;
if (!rtx) { if (!rtx) {
auto &info = it->second.second; auto &info = it->second.second;
info->rtcp_context_recv->onRtcp(sr); auto it = info->rtcp_context_recv.find(sr->ssrc);
auto rr = info->rtcp_context_recv->createRtcpRR(info->answer_ssrc_rtp, info->offer_ssrc_rtp); if (it != info->rtcp_context_recv.end()) {
it->second->onRtcp(sr);
auto rr = it->second->createRtcpRR(info->answer_ssrc_rtp, sr->ssrc);
sendRtcpPacket(rr->data(), rr->size(), true); sendRtcpPacket(rr->data(), rr->size(), true);
} else {
WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
}
} }
} else { } else {
WarnL << "未识别的sr rtcp包:" << rtcp->dumpString(); WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
@ -677,7 +675,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////
void WebRtcTransportImp::changeRtpExtId(const RtpPayloadInfo *info, const RtpHeader *header, bool is_recv, bool is_rtx) const{ void WebRtcTransportImp::changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *header, bool is_recv, bool is_rtx, string *rid_ptr) const{
auto ext_map = RtpExt::getExtValue(header); auto ext_map = RtpExt::getExtValue(header);
for (auto &pr : ext_map) { for (auto &pr : ext_map) {
if (is_recv) { if (is_recv) {
@ -690,6 +688,35 @@ void WebRtcTransportImp::changeRtpExtId(const RtpPayloadInfo *info, const RtpHea
pr.second.setType(it->second); pr.second.setType(it->second);
//重新赋值ext id为 ext type作为后面处理ext的统一中间类型 //重新赋值ext id为 ext type作为后面处理ext的统一中间类型
pr.second.setExtId((uint8_t) it->second); pr.second.setExtId((uint8_t) it->second);
switch(it->second){
case RtpExtType::sdes_repaired_rtp_stream_id :
case RtpExtType::sdes_rtp_stream_id : {
auto ssrc = ntohl(header->ssrc);
auto rid = it->second == RtpExtType::sdes_rtp_stream_id ? pr.second.getRtpStreamId() : pr.second.getRepairedRtpStreamId();
//根据rid获取rtp或rtx的ssrc
auto &ssrc_ref = is_rtx ? info.rid_ssrc[rid].second : info.rid_ssrc[rid].first;
if (!ssrc_ref) {
//ssrc未赋值赋值
ssrc_ref = ssrc;
DebugL << (is_rtx ? "got rid of rtx:" : "got rid:") << rid << ", ssrc:" << ssrc;
}
if (is_rtx) {
//rtx ssrc --> rtp ssrc
auto &rtp_ssrc_ref = info.rtx_ssrc_to_rtp_ssrc[ssrc];
if (!rtp_ssrc_ref && info.rid_ssrc[rid].first) {
//未找到rtx到rtp ssrc的映射关系且已经获取rtp的ssrc那么设置映射关系
rtp_ssrc_ref = info.rid_ssrc[rid].first;
DebugL << "got ssrc of rid:" << rid << ", [rtx-rtp]:" << ssrc << "-" << rtp_ssrc_ref;
}
}
if (rid_ptr) {
*rid_ptr = rid;
}
break;
}
default : break;
}
} else { } else {
pr.second.setType((RtpExtType) pr.first); pr.second.setType((RtpExtType) pr.first);
auto it = _rtp_ext_type_to_id.find((RtpExtType) pr.first); auto it = _rtp_ext_type_to_id.find((RtpExtType) pr.first);
@ -715,6 +742,8 @@ void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) {
} }
RtpHeader *rtp = (RtpHeader *) buf; RtpHeader *rtp = (RtpHeader *) buf;
auto ssrc = ntohl(rtp->ssrc);
//根据接收到的rtp的pt信息找到该流的信息 //根据接收到的rtp的pt信息找到该流的信息
auto it = _rtp_info_pt.find(rtp->pt); auto it = _rtp_info_pt.find(rtp->pt);
if (it == _rtp_info_pt.end()) { if (it == _rtp_info_pt.end()) {
@ -734,16 +763,34 @@ void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) {
#endif #endif
if (!rtx) { if (!rtx) {
//统计rtp接受情况便于生成nack rtcp包 //统计rtp接受情况便于生成nack rtcp包
info->nack_ctx.received(seq); info->nack_ctx[ssrc].received(seq);
//时间戳转换成毫秒 //时间戳转换成毫秒
auto stamp_ms = ntohl(rtp->stamp) * uint64_t(1000) / info->plan_rtp->sample_rate; auto stamp_ms = ntohl(rtp->stamp) * uint64_t(1000) / info->plan_rtp->sample_rate;
//统计rtp收到的情况好做rr汇报 //统计rtp收到的情况好做rr汇报
info->rtcp_context_recv->onRtp(seq, stamp_ms, len); auto &ref = info->rtcp_context_recv[ssrc];
if (!ref) {
ref = std::make_shared<RtcpContext>(info->plan_rtp->sample_rate, true);
} }
ref->onRtp(seq, stamp_ms, len);
//修改ext id至统一 //修改ext id至统一
changeRtpExtId(info.get(), rtp, true, rtx); changeRtpExtId(*info, rtp, true, false);
}
//解析并排序rtp //解析并排序rtp
info->receiver->inputRtp(info->media->type, info->plan_rtp->sample_rate, (uint8_t *) buf, len); auto &ref = info->receiver[ssrc];
if (!ref) {
ref = std::make_shared<RtpReceiverImp>([info, this](RtpPacket::Ptr rtp) mutable {
onSortedRtp(*info, std::move(rtp));
});
info->nack_ctx[ssrc].setOnNack([info, this, ssrc](const FCI_NACK &nack) mutable {
onSendNack(*info, nack, ssrc);
});
//recv simulcast ssrc --> RtpPayloadInfo
_rtp_info_ssrc[ssrc] = std::make_pair(false, info);
InfoL << "receive rtp of ssrc:" << ssrc;
}
ref->inputRtp(info->media->type, info->plan_rtp->sample_rate, (uint8_t *) buf, len);
return; return;
} }
@ -754,11 +801,22 @@ void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) {
if (size < 2) { if (size < 2) {
return; return;
} }
//修改ext id至统一
changeRtpExtId(*info, rtp, true, true);
//前两个字节是原始的rtp的seq //前两个字节是原始的rtp的seq
auto origin_seq = payload[0] << 8 | payload[1]; auto origin_seq = payload[0] << 8 | payload[1];
InfoL << "received rtx rtp: " << origin_seq;
rtp->seq = htons(origin_seq); rtp->seq = htons(origin_seq);
if (info->offer_ssrc_rtp) {
//非simulcast
rtp->ssrc = htonl(info->offer_ssrc_rtp); rtp->ssrc = htonl(info->offer_ssrc_rtp);
TraceL << "received rtx rtp,ssrc: " << ssrc << ", seq:" << origin_seq;
} else {
//todo simulcast下辅码流通过rtx传输
//simulcast情况下根据rtx的ssrc查找rtp的ssrc
rtp->ssrc = htonl(info->rtx_ssrc_to_rtp_ssrc[ntohl(rtp->ssrc)]);
}
rtp->pt = info->plan_rtp->pt; rtp->pt = info->plan_rtp->pt;
memmove((uint8_t *) buf + 2, buf, payload - (uint8_t *) buf); memmove((uint8_t *) buf + 2, buf, payload - (uint8_t *) buf);
buf += 2; buf += 2;
@ -766,10 +824,11 @@ void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) {
onRtp_l(buf, len, true); onRtp_l(buf, len, true);
} }
void WebRtcTransportImp::onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack) { void WebRtcTransportImp::onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, uint32_t ssrc) {
auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize); auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize);
rtcp->ssrc = htons(info.answer_ssrc_rtp); rtcp->ssrc = htons(info.answer_ssrc_rtp);
rtcp->ssrc_media = htonl(info.offer_ssrc_rtp); rtcp->ssrc_media = htonl(ssrc);
DebugL << htonl(ssrc) << " " << nack.getPid();
sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true); sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true);
} }
@ -826,12 +885,12 @@ void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, size_t &len, void *
if (!pr->first || !pr->second->plan_rtx) { if (!pr->first || !pr->second->plan_rtx) {
//普通的rtp,或者不支持rtx, 修改目标pt和ssrc //普通的rtp,或者不支持rtx, 修改目标pt和ssrc
changeRtpExtId(pr->second, header, false, false); changeRtpExtId(*pr->second, header, false, false);
header->pt = pr->second->plan_rtp->pt; header->pt = pr->second->plan_rtp->pt;
header->ssrc = htonl(pr->second->answer_ssrc_rtp); header->ssrc = htonl(pr->second->answer_ssrc_rtp);
} else { } else {
//重传的rtp, rtx //重传的rtp, rtx
changeRtpExtId(pr->second, header, false, true); changeRtpExtId(*pr->second, header, false, true);
header->pt = pr->second->plan_rtx->pt; header->pt = pr->second->plan_rtx->pt;
if (pr->second->answer_ssrc_rtx) { if (pr->second->answer_ssrc_rtx) {
//有rtx单独的ssrc,有些情况下浏览器支持rtx但是未指定rtx单独的ssrc //有rtx单独的ssrc,有些情况下浏览器支持rtx但是未指定rtx单独的ssrc

View File

@ -350,15 +350,17 @@ private:
uint32_t answer_ssrc_rtx = 0; uint32_t answer_ssrc_rtx = 0;
const RtcMedia *media; const RtcMedia *media;
NackList nack_list; NackList nack_list;
NackContext nack_ctx;
RtcpContext::Ptr rtcp_context_recv;
RtcpContext::Ptr rtcp_context_send; RtcpContext::Ptr rtcp_context_send;
std::shared_ptr<RtpReceiverImp> receiver; unordered_map<string/*rid*/, std::pair<uint32_t/*rtp ssrc*/, uint32_t/*rtx ssrc*/> > rid_ssrc;
unordered_map<uint32_t/*rtx ssrc*/, uint32_t/*rtp ssrc*/> rtx_ssrc_to_rtp_ssrc;
unordered_map<uint32_t/*simulcast ssrc*/, NackContext> nack_ctx;
unordered_map<uint32_t/*simulcast ssrc*/, RtcpContext::Ptr> rtcp_context_recv;
unordered_map<uint32_t/*simulcast ssrc*/, std::shared_ptr<RtpReceiverImp> > receiver;
}; };
void onSortedRtp(RtpPayloadInfo &info, RtpPacket::Ptr rtp); void onSortedRtp(RtpPayloadInfo &info, RtpPacket::Ptr rtp);
void onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack); void onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, uint32_t ssrc);
void changeRtpExtId(const RtpPayloadInfo *info, const RtpHeader *header, bool is_recv, bool is_rtx = false) const; void changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *header, bool is_recv, bool is_rtx = false, string *rid_ptr = nullptr) const;
private: private:
uint16_t _rtx_seq[2] = {0, 0}; uint16_t _rtx_seq[2] = {0, 0};