From 8fdfc14f6feb2d9b3837a1281974155f450149ea Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Wed, 12 May 2021 20:51:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=20nack/rtx/ssrc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- webrtc/WebRtcTransport.cpp | 131 ++++++++++++++++++++++--------------- webrtc/WebRtcTransport.h | 12 ++-- 2 files changed, 85 insertions(+), 58 deletions(-) diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 50cf0c64..b599fd5a 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -271,7 +271,8 @@ void WebRtcTransport::inputSockData(char *buf, size_t len, RTC::TransportTuple * void WebRtcTransport::sendRtpPacket(char *buf, size_t len, bool flush, void *ctx) { if (_srtp_session_send) { - CHECK(len + SRTP_MAX_TRAILER_LEN <= sizeof(_srtp_buf)); + //预留rtx加入的两个字节 + CHECK(len + SRTP_MAX_TRAILER_LEN + 2 <= sizeof(_srtp_buf)); memcpy(_srtp_buf, buf, len); onBeforeEncryptRtp((char *) _srtp_buf, len, ctx); if (_srtp_session_send->EncryptRtp(_srtp_buf, &len)) { @@ -395,40 +396,40 @@ bool WebRtcTransportImp::canRecvRtp() const{ } const RtcSession& WebRtcTransportImp::getSdpWithSSRC() const{ - auto &offer = getSdp(SdpType::offer); + auto &offer = getSdp(SdpType::answer); if (offer.haveSSRC()) { return offer; } - auto &answer = getSdp(SdpType::answer); + auto &answer = getSdp(SdpType::offer); CHECK(answer.haveSSRC()); return answer; } void WebRtcTransportImp::onStartWebRTC() { //获取ssrc和pt相关信息,届时收到rtp和rtcp时分别可以根据pt和ssrc找到相关的信息 - for (auto &m : getSdp(SdpType::offer).media) { - for (auto &plan : m.plan) { - auto hit_pan = getSdp(SdpType::answer).getMedia(m.type)->getPlan(plan.pt); - if (!hit_pan) { - continue; - } - auto m_with_ssrc = getSdpWithSSRC().getMedia(m.type); + for (auto &m_answer : getSdp(SdpType::answer).media) { + auto m_with_ssrc = getSdpWithSSRC().getMedia(m_answer.type); + for (auto &plan_answer : m_answer.plan) { //获取offer端rtp的ssrc和pt相关信息 auto info = std::make_shared(); - _rtp_info_pt.emplace(plan.pt, info); - info->plan = &plan; + _rtp_info_pt.emplace(plan_answer.pt, info); info->media = m_with_ssrc; - info->is_common_rtp = getCodecId(plan.codec) != CodecInvalid; + info->is_common_rtp = getCodecId(plan_answer.codec) != CodecInvalid; if (info->is_common_rtp) { //rtp - _rtp_info_ssrc[m_with_ssrc->rtp_rtx_ssrc[0].ssrc] = info; + _rtp_info_ssrc[info->media->rtp_rtx_ssrc[0].ssrc] = info; + info->plan_rtp = &plan_answer; + info->plan_rtx = m_answer.getRelatedRtxPlan(plan_answer.pt); } else { //rtx - auto apt = atoi(plan.getFmtp("apt").data()); - info->plan_apt = m.getPlan(apt); + if (info->media->rtp_rtx_ssrc.size() > 1) { + _rtp_info_ssrc[info->media->rtp_rtx_ssrc[1].ssrc] = info; + } + info->plan_rtp = m_answer.getPlan(atoi(plan_answer.getFmtp("apt").data())); + info->plan_rtx = &plan_answer; } - info->rtcp_context_recv = std::make_shared(info->plan->sample_rate, true); - info->rtcp_context_send = std::make_shared(info->plan->sample_rate, false); + info->rtcp_context_recv = std::make_shared(info->plan_rtp->sample_rate, true); + info->rtcp_context_send = std::make_shared(info->plan_rtp->sample_rate, false); info->receiver = std::make_shared([info, this](RtpPacket::Ptr rtp) mutable { onSortedRtp(*info, std::move(rtp)); }); @@ -436,9 +437,9 @@ void WebRtcTransportImp::onStartWebRTC() { onNack(*info, nack); }); } - if (m.type != TrackApplication) { + if (m_answer.type != TrackApplication) { //记录rtp ext类型与id的关系,方便接收或发送rtp时修改rtp ext id - for (auto &ext : m.extmap) { + for (auto &ext : m_answer.extmap) { auto ext_type = RtpExt::getExtType(ext.ext); _rtp_ext_id_to_type.emplace(ext.id, ext_type); _rtp_ext_type_to_id.emplace(ext_type, ext.id); @@ -499,8 +500,8 @@ void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp){ sdp.origin.address = m.addr.address; } - if (!canSendRtp() || getSdp(SdpType::offer).haveSSRC()) { - //offer sdp未包含ssrc相关信息,那么我们才在answer sdp中回复ssrc相关信息 + if (!canSendRtp()) { + //设置我们发送的rtp的ssrc return; } @@ -596,7 +597,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { auto rr = it->second->rtcp_context_recv->createRtcpRR(sr->items.ssrc, sr->ssrc); sendRtcpPacket(rr->data(), rr->size(), true); } else { - WarnL << "未识别的sr rtcp包:" << (uint32_t)sr->ssrc; + WarnL << "未识别的sr rtcp包:" << rtcp->dumpString(); } break; } @@ -604,12 +605,14 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { _alive_ticker.resetTime(); //对方汇报rtp接收情况 RtcpRR *rr = (RtcpRR *) rtcp; - auto it = _rtp_info_ssrc.find(rr->ssrc); - if (it != _rtp_info_ssrc.end()) { - auto sr = it->second->rtcp_context_send->createRtcpSR(rr->items.ssrc); - sendRtcpPacket(sr->data(), sr->size(), true); - } else { - WarnL << "未识别的rr rtcp包:" << (uint32_t)rr->ssrc; + for (auto item : rr->getItemList()) { + auto it = _rtp_info_ssrc.find(item->ssrc); + if (it != _rtp_info_ssrc.end()) { + auto sr = it->second->rtcp_context_send->createRtcpSR(item->ssrc); + sendRtcpPacket(sr->data(), sr->size(), true); + } else { + WarnL << "未识别的rr rtcp包:" << rtcp->dumpString(); + } } break; } @@ -619,10 +622,13 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { for (auto ssrc : bye->getSSRC()) { auto it = _rtp_info_ssrc.find(*ssrc); if (it == _rtp_info_ssrc.end()) { - WarnL << "未识别的bye rtcp包:" << *ssrc; + WarnL << "未识别的bye rtcp包:" << rtcp->dumpString(); continue; } - _rtp_info_pt.erase(it->second->plan->pt); + _rtp_info_pt.erase(it->second->plan_rtp->pt); + if (it->second->plan_rtx) { + _rtp_info_pt.erase(it->second->plan_rtx->pt); + } _rtp_info_ssrc.erase(it); } onShutdown(SockException(Err_eof, "rtcp bye message received")); @@ -630,18 +636,18 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { } case RtcpType::RTCP_PSFB: case RtcpType::RTCP_RTPFB: { - RtcpFB *fb = (RtcpFB *) rtcp; - auto it = _rtp_info_ssrc.find(fb->ssrc); - if (it == _rtp_info_ssrc.end()) { - WarnL << "未识别的 rtcp包:" << rtcp->dumpString(); - return; - } if ((RtcpType) rtcp->pt == RtcpType::RTCP_PSFB) { break; } //RTPFB switch ((RTPFBType) rtcp->report_count) { case RTPFBType::RTCP_RTPFB_NACK : { + RtcpFB *fb = (RtcpFB *) rtcp; + auto it = _rtp_info_ssrc.find(fb->ssrc_media); + if (it == _rtp_info_ssrc.end()) { + WarnL << "未识别的 rtcp包:" << rtcp->dumpString(); + return; + } auto &fci = fb->getFci(); it->second->nack_list.for_each_nack(fci, [&](const RtpPacket::Ptr &rtp) { //rtp重传 @@ -702,7 +708,7 @@ void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) { if (info->is_common_rtp) { //这是普通的rtp数据 auto seq = ntohs(rtp->seq); -#if 0 +#if 1 if (!rtx && info->media->type == TrackVideo && seq % 100 == 0) { //此处模拟接受丢包 DebugL << "recv dropped:" << seq; @@ -715,12 +721,12 @@ void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) { //修改ext id至统一 changeRtpExtId(rtp, _rtp_ext_id_to_type); //时间戳转换成毫秒 - auto stamp_ms = ntohl(rtp->stamp) * uint64_t(1000) / info->plan->sample_rate; + auto stamp_ms = ntohl(rtp->stamp) * uint64_t(1000) / info->plan_rtp->sample_rate; //统计rtp收到的情况,好做rr汇报 info->rtcp_context_recv->onRtp(seq, stamp_ms, len); } //解析并排序rtp - info->receiver->inputRtp(info->media->type, info->plan->sample_rate, (uint8_t *) buf, len); + info->receiver->inputRtp(info->media->type, info->plan_rtp->sample_rate, (uint8_t *) buf, len); return; } @@ -736,7 +742,7 @@ void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) { InfoL << "received rtx rtp: " << origin_seq; rtp->seq = htons(origin_seq); rtp->ssrc = htonl(info->media->rtp_rtx_ssrc[0].ssrc); - rtp->pt = info->plan_apt->pt; + rtp->pt = info->plan_rtp->pt; memmove((uint8_t *) buf + 2, buf, payload - (uint8_t *) buf); buf += 2; len -= 2; @@ -782,31 +788,52 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool r //统计rtp发送情况,好做sr汇报 info->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); info->nack_list.push_back(rtp); -#if 0 +#if 1 //此处模拟发送丢包 if(rtp->getSeq() % 100 == 0){ - DebugL << "send droped:" << rtp->getSeq(); + DebugL << "send dropped:" << rtp->getSeq(); return; } #endif } else { WarnL << "send rtx rtp:" << rtp->getSeq(); } - sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, info.get()); + pair ctx{rtx, info.get()}; + sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx); _bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize; } -void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, size_t len, void *ctx) { - RtpPayloadInfo *info = reinterpret_cast(ctx); - auto header = (RtpHeader *)buf; - //修改目标pt和ssrc - header->pt = info->plan->pt; - header->ssrc = htons(info->media->rtp_rtx_ssrc[0].ssrc); +void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) { + auto pr = (pair *) ctx; + auto header = (RtpHeader *) buf; changeRtpExtId(header, _rtp_ext_type_to_id); -} -void WebRtcTransportImp::onBeforeEncryptRtcp(const char *buf, size_t len, void *ctx) { + if (!pr->first || !pr->second->plan_rtx) { + //普通的rtp,或者不支持rtx, 修改目标pt和ssrc + header->pt = pr->second->plan_rtp->pt; + header->ssrc = htonl(pr->second->media->rtp_rtx_ssrc[0].ssrc); + } else { + //重传的rtp, rtx + header->pt = pr->second->plan_rtx->pt; + if (pr->second->media->rtp_rtx_ssrc.size() > 1) { + //有rtx单独的ssrc + header->ssrc = htonl(pr->second->media->rtp_rtx_ssrc[1].ssrc); + } + auto origin_seq = ntohs(header->seq); + //seq跟原来的不一样 + header->seq = htons(origin_seq + 100); + auto payload = header->getPayloadData(); + auto payload_size = header->getPayloadSize(len); + if (payload_size) { + //rtp负载后移两个字节,这两个字节用于存放osn + //https://datatracker.ietf.org/doc/html/rfc4588#section-4 + memmove(payload + 2, payload, payload_size); + } + payload[0] = origin_seq >> 8; + payload[1] = origin_seq & 0xFF; + len += 2; + } } void WebRtcTransportImp::onShutdown(const SockException &ex){ diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 8fc1ddca..f9ca0d31 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -101,8 +101,8 @@ protected: virtual void onRtp(const char *buf, size_t len) = 0; virtual void onRtcp(const char *buf, size_t len) = 0; virtual void onShutdown(const SockException &ex) = 0; - virtual void onBeforeEncryptRtp(const char *buf, size_t len, void *ctx) = 0; - virtual void onBeforeEncryptRtcp(const char *buf, size_t len, void *ctx) = 0; + virtual void onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) = 0; + virtual void onBeforeEncryptRtcp(const char *buf, size_t &len, void *ctx) = 0; protected: const RtcSession& getSdp(SdpType type) const; @@ -301,8 +301,8 @@ protected: void onRtp_l(const char *buf, size_t len, bool rtx); void onRtcp(const char *buf, size_t len) override; - void onBeforeEncryptRtp(const char *buf, size_t len, void *ctx) override; - void onBeforeEncryptRtcp(const char *buf, size_t len, void *ctx) override; + void onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) override; + void onBeforeEncryptRtcp(const char *buf, size_t &len, void *ctx) override {}; void onShutdown(const SockException &ex) override; @@ -345,8 +345,8 @@ private: using Ptr = std::shared_ptr; bool is_common_rtp; - const RtcCodecPlan *plan; - const RtcCodecPlan *plan_apt; + const RtcCodecPlan *plan_rtp; + const RtcCodecPlan *plan_rtx; const RtcMedia *media; std::shared_ptr receiver; RtcpContext::Ptr rtcp_context_recv;