diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 98aa3ee5..b5891ab2 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -148,7 +148,6 @@ void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) { fb->ssrc = htonl(0); fb->ssrc_media = htonl(ssrc); sendRtcpPacket((char *) fb.get(), fb->getSize(), true); - TraceL << ssrc << " " << bit_rate; } void WebRtcTransport::sendRtcpPli(uint32_t ssrc) { @@ -400,7 +399,7 @@ void WebRtcTransportImp::onStartWebRTC() { //获取ssrc和pt相关信息,届时收到rtp和rtcp时分别可以根据pt和ssrc找到相关的信息 for (auto &m_answer : getSdp(SdpType::answer).media) { auto m_offer = getSdp(SdpType::offer).getMedia(m_answer.type); - auto info = std::make_shared(); + auto info = std::make_shared(); info->media = &m_answer; info->answer_ssrc_rtp = m_answer.getRtpSSRC(); @@ -411,16 +410,16 @@ void WebRtcTransportImp::onStartWebRTC() { info->plan_rtx = m_answer.getRelatedRtxPlan(info->plan_rtp->pt); info->rtcp_context_send = std::make_shared(info->plan_rtp->sample_rate, false); - //send ssrc --> RtpPayloadInfo + //send ssrc --> MediaTrack _rtp_info_ssrc[info->answer_ssrc_rtp] = info; - //recv ssrc --> RtpPayloadInfo + //recv ssrc --> MediaTrack _rtp_info_ssrc[info->offer_ssrc_rtp] = info; - //rtp pt --> RtpPayloadInfo + //rtp pt --> MediaTrack _rtp_info_pt.emplace(info->plan_rtp->pt, std::make_pair(false, info)); if (info->plan_rtx) { - //rtx pt --> RtpPayloadInfo + //rtx pt --> MediaTrack _rtp_info_pt.emplace(info->plan_rtx->pt, std::make_pair(true, info)); } if (m_offer->type != TrackApplication) { @@ -557,16 +556,29 @@ SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{ /////////////////////////////////////////////////////////////////// -class RtpReceiverImp : public RtpReceiver { +class RtpChannel : public RtpReceiver { public: - RtpReceiverImp( function cb){ - _on_sort = std::move(cb); + uint32_t ssrc; + RtcpContext::Ptr rtcp_context; + +public: + RtpChannel(function on_rtp, function on_nack) { + _on_sort = std::move(on_rtp); + nack_ctx.setOnNack(std::move(on_nack)); } - ~RtpReceiverImp() override = default; + ~RtpChannel() override = default; - bool inputRtp(TrackType type, int samplerate, uint8_t *ptr, size_t len){ - return handleOneRtp((int) type, type, samplerate, ptr, len); + bool inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len, bool is_rtx){ + if (!is_rtx) { + RtpHeader *rtp = (RtpHeader *) ptr; + auto seq = ntohs(rtp->seq); + //统计rtp接受情况,便于生成nack rtcp包 + nack_ctx.received(seq); + //统计rtp收到的情况,好做rr汇报 + rtcp_context->onRtp(seq, ntohl(rtp->stamp) * uint64_t(1000) / sample_rate, len); + } + return handleOneRtp((int) type, type, sample_rate, ptr, len); } protected: @@ -575,9 +587,22 @@ protected: } private: + NackContext nack_ctx; function _on_sort; }; +std::shared_ptr WebRtcTransportImp::MediaTrack::getRtpChannel(uint32_t ssrc) const{ + auto it_rid = ssrc_to_rid.find(ssrc); + if (it_rid == ssrc_to_rid.end()) { + return nullptr; + } + auto it_chn = rtp_channel.find(it_rid->second); + if (it_chn == rtp_channel.end()) { + return nullptr; + } + return it_chn->second; +} + void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { _bytes_usage += len; auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len); @@ -589,13 +614,13 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { auto it = _rtp_info_ssrc.find(sr->ssrc); if (it != _rtp_info_ssrc.end()) { auto &info = it->second; - auto it = info->rtcp_context_recv.find(sr->ssrc); - if (it != info->rtcp_context_recv.end()) { - it->second->onRtcp(sr); - auto rr = it->second->createRtcpRR(info->answer_ssrc_rtp, sr->ssrc); - sendRtcpPacket(rr->data(), rr->size(), true); - } else { + auto rtp_chn = info->getRtpChannel(sr->ssrc); + if(!rtp_chn){ WarnL << "未识别的sr rtcp包:" << rtcp->dumpString(); + } else { + rtp_chn->rtcp_context->onRtcp(sr); + auto rr = rtp_chn->rtcp_context->createRtcpRR(info->answer_ssrc_rtp, sr->ssrc); + sendRtcpPacket(rr->data(), rr->size(), true); } } else { WarnL << "未识别的sr rtcp包:" << rtcp->dumpString(); @@ -665,7 +690,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { /////////////////////////////////////////////////////////////////// -void WebRtcTransportImp::changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *header, bool is_recv, string *rid_ptr) const{ +void WebRtcTransportImp::changeRtpExtId(MediaTrack &info, const RtpHeader *header, bool is_recv, string *rid_ptr) const{ string rid, repaired_rid; auto ext_map = RtpExt::getExtValue(header); for (auto &pr : ext_map) { @@ -709,29 +734,30 @@ void WebRtcTransportImp::changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *h rid = info.ssrc_to_rid[ssrc]; } else { //设置rid - info.ssrc_to_rid[ssrc] = rid; + if (info.ssrc_to_rid.emplace(ssrc, rid).second) { + InfoL << "rid of ssrc " << ssrc << " is:" << rid; + } } if (rid_ptr) { *rid_ptr = rid; } } -std::shared_ptr WebRtcTransportImp::createRtpReceiver(const string &rid, uint32_t ssrc, bool is_rtx, const RtpPayloadInfo::Ptr &info){ - auto ref = std::make_shared([info, this, rid](RtpPacket::Ptr rtp) mutable { +void WebRtcTransportImp::createRtpChannel(const string &rid, uint32_t ssrc, const MediaTrack::Ptr &info) { + //rid --> RtpReceiverImp + auto &ref = info->rtp_channel[rid]; + ref = std::make_shared([info, this, rid](RtpPacket::Ptr rtp) mutable { onSortedRtp(*info, rid, std::move(rtp)); + }, [info, this, ssrc](const FCI_NACK &nack) mutable { + onSendNack(*info, nack, ssrc); }); - - if (!is_rtx) { - //rtx没nack - info->nack_ctx[rid].setOnNack([info, this, ssrc](const FCI_NACK &nack) mutable { - onSendNack(*info, nack, ssrc); - }); - //ssrc --> RtpPayloadInfo - _rtp_info_ssrc[ssrc] = info; - } - - InfoL << "receive rtp of ssrc:" << ssrc << ", rid:" << rid << ", is rtx:" << is_rtx << ", codec:" << info->plan_rtp->codec; - return ref; + //rid --> rtp ssrc + ref->ssrc = ssrc; + //rtp ssrc --> RtcpContext + ref->rtcp_context = std::make_shared(info->plan_rtp->sample_rate, true); + //rtp ssrc --> MediaTrack + _rtp_info_ssrc[ssrc] = info; + InfoL << "create rtp receiver of ssrc:" << ssrc << ", rid:" << rid << ", codec:" << info->plan_rtp->codec; } void WebRtcTransportImp::onRtp(const char *buf, size_t len) { @@ -758,36 +784,27 @@ void WebRtcTransportImp::onRtp(const char *buf, size_t len) { WarnL << "ssrc:" << ssrc << ", rtx:" << is_rtx << ",seq:" << ntohs((uint16_t) rtp->seq); } #endif - auto &ref = info->receiver[rid]; + auto &ref = info->rtp_channel[rid]; if (!ref) { - ref = createRtpReceiver(rid, ssrc, is_rtx, info); + if (is_rtx) { + WarnL << "dropped rtx rtp, rid:" << rid << ", ssrc:" << ssrc << ", codec:" << info->plan_rtp->codec << ", seq:" << ntohs(rtp->seq); + return; + } + createRtpChannel(rid, ssrc, info); } if (!is_rtx) { //这是普通的rtp数据 - auto seq = ntohs(rtp->seq); #if 0 + auto seq = ntohs(rtp->seq); if (info->media->type == TrackVideo && seq % 100 == 0) { //此处模拟接受丢包 - DebugL << "recv dropped:" << seq; + DebugL << "recv dropped:" << seq << ", rid:" << rid << ", ssrc:" << ssrc; return; } #endif - //统计rtp接受情况,便于生成nack rtcp包 - info->nack_ctx[rid].received(seq); - - auto &cxt_ref = info->rtcp_context_recv[ssrc]; - if (!cxt_ref) { - cxt_ref = std::make_shared(info->plan_rtp->sample_rate, true); - info->rid_to_ssrc[rid] = ssrc; - } - //时间戳转换成毫秒 - auto stamp_ms = ntohl(rtp->stamp) * uint64_t(1000) / info->plan_rtp->sample_rate; - //统计rtp收到的情况,好做rr汇报 - cxt_ref->onRtp(seq, stamp_ms, len); - //解析并排序rtp - ref->inputRtp(info->media->type, info->plan_rtp->sample_rate, (uint8_t *) buf, len); + ref->inputRtp(info->media->type, info->plan_rtp->sample_rate, (uint8_t *) buf, len, false); return; } @@ -801,19 +818,19 @@ void WebRtcTransportImp::onRtp(const char *buf, size_t len) { //前两个字节是原始的rtp的seq auto origin_seq = payload[0] << 8 | payload[1]; - //rtx的seq转换为rtp的seq - rtp->seq = htons(origin_seq); - //rtx的ssrc转换为rtp的ssrc - rtp->ssrc = htonl(info->rid_to_ssrc[rid]); - //rtx的pt转换为rtp的pt + //InfoL << "rtx rtp, rid:" << rid << ", seq:" << origin_seq << ", ssrc:" << ssrc; + //rtx 转换为 rtp rtp->pt = info->plan_rtp->pt; + rtp->seq = htons(origin_seq); + rtp->ssrc = htonl(ref->ssrc); + memmove((uint8_t *) buf + 2, buf, payload - (uint8_t *) buf); buf += 2; len -= 2; - ref->inputRtp(info->media->type, info->plan_rtp->sample_rate, (uint8_t *) buf, len); + ref->inputRtp(info->media->type, info->plan_rtp->sample_rate, (uint8_t *) buf, len, true); } -void WebRtcTransportImp::onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, uint32_t ssrc) { +void WebRtcTransportImp::onSendNack(MediaTrack &info, const FCI_NACK &nack, uint32_t ssrc) { auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize); rtcp->ssrc = htons(info.answer_ssrc_rtp); rtcp->ssrc_media = htonl(ssrc); @@ -823,7 +840,7 @@ void WebRtcTransportImp::onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, /////////////////////////////////////////////////////////////////// -void WebRtcTransportImp::onSortedRtp(RtpPayloadInfo &info, const string &rid, RtpPacket::Ptr rtp) { +void WebRtcTransportImp::onSortedRtp(MediaTrack &info, const string &rid, RtpPacket::Ptr rtp) { if (info.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) { //定期发送pli请求关键帧,方便非rtc等协议 _pli_ticker.resetTime(); @@ -880,13 +897,13 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool r } else { WarnL << "send rtx rtp:" << rtp->getSeq(); } - pair ctx{rtx, info.get()}; + pair ctx{rtx, info.get()}; sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx); _bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize; } void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) { - auto pr = (pair *) ctx; + auto pr = (pair *) ctx; auto header = (RtpHeader *) buf; if (!pr->first || !pr->second->plan_rtx) { diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index ef863fcd..4b24f488 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -125,7 +125,7 @@ private: RtcSession::Ptr _answer_sdp; }; -class RtpReceiverImp; +class RtpChannel; class NackList { public: @@ -337,9 +337,9 @@ private: bool canSendRtp() const; bool canRecvRtp() const; - class RtpPayloadInfo { + class MediaTrack { public: - using Ptr = std::shared_ptr; + using Ptr = std::shared_ptr; const RtcCodecPlan *plan_rtp; const RtcCodecPlan *plan_rtx; uint32_t offer_ssrc_rtp = 0; @@ -349,19 +349,16 @@ private: const RtcMedia *media; NackList nack_list; RtcpContext::Ptr rtcp_context_send; - //只生成rtp的nack,rtx的不生成 - unordered_map nack_ctx; - unordered_map rid_to_ssrc; - unordered_map > receiver; + unordered_map > rtp_channel; unordered_map ssrc_to_rid; - //只统计rtp的接收情况,rtx的不统计 - unordered_map rtcp_context_recv; + + std::shared_ptr getRtpChannel(uint32_t ssrc) const; }; - void onSortedRtp(RtpPayloadInfo &info, const string &rid, RtpPacket::Ptr rtp); - void onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, uint32_t ssrc); - void changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *header, bool is_recv, string *rid_ptr = nullptr) const; - std::shared_ptr createRtpReceiver(const string &rid, uint32_t ssrc, bool is_rtx, const RtpPayloadInfo::Ptr &info); + void onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp); + void onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc); + void changeRtpExtId(MediaTrack &track, const RtpHeader *header, bool is_recv, string *rid_ptr = nullptr) const; + void createRtpChannel(const string &rid, uint32_t ssrc, const MediaTrack::Ptr &info); private: uint16_t _rtx_seq[2] = {0, 0}; @@ -387,11 +384,11 @@ private: //播放rtsp源的reader对象 RtspMediaSource::RingType::RingReader::Ptr _reader; //根据发送rtp的track类型获取相关信息 - RtpPayloadInfo::Ptr _send_rtp_info[2]; + MediaTrack::Ptr _send_rtp_info[2]; //根据接收rtp的pt获取相关信息 - unordered_map > _rtp_info_pt; + unordered_map > _rtp_info_pt; //根据rtcp的ssrc获取相关信息,只记录rtp的ssrc,rtx的ssrc不记录 - unordered_map _rtp_info_ssrc; + unordered_map _rtp_info_ssrc; //发送rtp时需要修改rtp ext id map _rtp_ext_type_to_id; //接收rtp时需要修改rtp ext id