diff --git a/src/Extension/H265.cpp b/src/Extension/H265.cpp index a4b2dde6..79248989 100644 --- a/src/Extension/H265.cpp +++ b/src/Extension/H265.cpp @@ -68,7 +68,6 @@ bool H265Frame::isKeyFrame(int type, const char *ptr) { return (((*((uint8_t *) ptr + 2)) >> 7) & 0x01) == 1 && (type == NAL_IDR_N_LP || type == NAL_IDR_W_RADL); } return false; - } H265Frame::H265Frame(){ diff --git a/src/Rtcp/RtcpContext.cpp b/src/Rtcp/RtcpContext.cpp index 9b4e31cc..384b15b2 100644 --- a/src/Rtcp/RtcpContext.cpp +++ b/src/Rtcp/RtcpContext.cpp @@ -18,8 +18,7 @@ void RtcpContext::clear() { memset(this, 0, sizeof(RtcpContext)); } -RtcpContext::RtcpContext(uint32_t sample_rate, bool is_receiver) { - _sample_rate = sample_rate; +RtcpContext::RtcpContext(bool is_receiver) { _is_receiver = is_receiver; } @@ -35,7 +34,6 @@ void RtcpContext::onRtp(uint16_t seq, uint32_t stamp, size_t bytes) { diff = -diff; } //抖动单位为采样次数 - diff *= (_sample_rate / 1000.0); _jitter += (diff - _jitter) / 16.0; } else { _jitter = 0; @@ -129,7 +127,7 @@ Buffer::Ptr RtcpContext::createRtcpSR(uint32_t rtcp_ssrc) { rtcp->setNtpStamp(tv); //转换成rtp时间戳 - rtcp->rtpts = htonl(uint32_t(_last_rtp_stamp * (_sample_rate / 1000.0))); + rtcp->rtpts = htonl(_last_rtp_stamp); rtcp->packet_count = htonl((uint32_t) _packets); rtcp->octet_count = htonl((uint32_t) _bytes); return RtcpHeader::toBuffer(std::move(rtcp)); diff --git a/src/Rtcp/RtcpContext.h b/src/Rtcp/RtcpContext.h index 85be0c29..d9f705ef 100644 --- a/src/Rtcp/RtcpContext.h +++ b/src/Rtcp/RtcpContext.h @@ -22,15 +22,14 @@ public: using Ptr = std::shared_ptr; /** * 创建rtcp上下文 - * @param sample_rate 音频采用率,视频一般为90000 * @param is_receiver 是否为rtp接收者,接收者更消耗性能 */ - RtcpContext(uint32_t sample_rate, bool is_receiver); + RtcpContext(bool is_receiver); /** * 输出或输入rtp时调用 * @param seq rtp的seq - * @param stamp rtp的时间戳,单位毫秒 + * @param stamp rtp的时间戳,单位采样数(非毫秒) * @param bytes rtp数据长度 */ void onRtp(uint16_t seq, uint32_t stamp, size_t bytes); @@ -87,8 +86,6 @@ private: bool _is_receiver; //时间戳抖动值 double _jitter = 0; - //视频默认90000,音频为采样率 - uint32_t _sample_rate; //收到或发送的rtp的字节数 size_t _bytes = 0; //收到或发送的rtp的个数 diff --git a/src/Rtp/GB28181Process.cpp b/src/Rtp/GB28181Process.cpp index 6400c98f..c4dc8c24 100644 --- a/src/Rtp/GB28181Process.cpp +++ b/src/Rtp/GB28181Process.cpp @@ -24,36 +24,23 @@ static inline bool checkTS(const uint8_t *packet, size_t bytes){ return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE; } -class RtpReceiverImp : public RtpReceiver { +class RtpReceiverImp : public RtpTrackImp { public: using Ptr = std::shared_ptr; - RtpReceiverImp(int sample_rate, function cb, function cb_before = nullptr){ + RtpReceiverImp(int sample_rate, RtpTrackImp::OnSorted cb, RtpTrackImp::BeforeSorted cb_before = nullptr){ _sample_rate = sample_rate; - _on_sort = std::move(cb); - _on_before_sort = std::move(cb_before); + setOnSorted(std::move(cb)); + setBeforeSorted(std::move(cb_before)); } ~RtpReceiverImp() override = default; bool inputRtp(TrackType type, uint8_t *ptr, size_t len){ - return handleOneRtp((int) type, type, _sample_rate, ptr, len); - } - -protected: - void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override { - _on_sort(std::move(rtp)); - } - - void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override { - if (_on_before_sort) { - _on_before_sort(rtp); - } + return RtpTrack::inputRtp(type, _sample_rate, ptr, len); } private: int _sample_rate; - function _on_sort; - function _on_before_sort; }; /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 6462abac..6827138f 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -27,7 +27,7 @@ class RtcpHelper : public RtcpContext, public std::enable_shared_from_this; - RtcpHelper(Socket::Ptr rtcp_sock, uint32_t sample_rate) : RtcpContext(sample_rate, true){ + RtcpHelper(Socket::Ptr rtcp_sock, uint32_t sample_rate) : RtcpContext(true){ _rtcp_sock = std::move(rtcp_sock); _sample_rate = sample_rate; } @@ -35,7 +35,7 @@ public: void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){ //统计rtp接受情况,用于发送rr包 auto header = (RtpHeader *) buf->data(); - onRtp(ntohs(header->seq), ntohl(header->stamp) * uint64_t(1000) / _sample_rate, buf->size()); + onRtp(ntohs(header->seq), ntohl(header->stamp), buf->size()); sendRtcp(ntohl(header->ssrc), addr, addr_len); } diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 880fd6cc..c685114c 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -103,7 +103,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { } try { _process->inputRtp(false, getSock(), data, len, &_addr); - } catch (RtpReceiver::BadRtpException &ex) { + } catch (RtpTrack::BadRtpException &ex) { if (!_is_udp) { WarnL << ex.what() << ",开始搜索ssrc以便恢复上下文"; _search_rtp = true; diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 3cc50dbb..5819dd48 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -15,19 +15,23 @@ namespace mediakit { -RtpReceiver::RtpReceiver() { - int index = 0; - for (auto &sortor : _rtp_sortor) { - sortor.setOnSort([this, index](uint16_t seq, RtpPacket::Ptr &packet) { - onRtpSorted(std::move(packet), index); - }); - ++index; - } +RtpTrack::RtpTrack() { + setOnSort([this](uint16_t seq, RtpPacket::Ptr &packet) { + onRtpSorted(std::move(packet)); + }); } -RtpReceiver::~RtpReceiver() {} +uint32_t RtpTrack::getSSRC() const { + return _ssrc; +} -bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len) { +void RtpTrack::clear() { + _ssrc = 0; + _ssrc_alive.resetTime(); + PacketSortor::clear(); +} + +bool RtpTrack::inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len) { if (len < RtpPacket::kRtpHeaderSize) { WarnL << "rtp包太小:" << len; return false; @@ -52,23 +56,23 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 //比对缓存ssrc auto ssrc = ntohl(header->ssrc); - if (!_ssrc[index]) { + if (!_ssrc) { //记录并锁定ssrc - _ssrc[index] = ssrc; - _ssrc_alive[index].resetTime(); - } else if (_ssrc[index] == ssrc) { + _ssrc = ssrc; + _ssrc_alive.resetTime(); + } else if (_ssrc == ssrc) { //ssrc匹配正确,刷新计时器 - _ssrc_alive[index].resetTime(); + _ssrc_alive.resetTime(); } else { //ssrc错误 - if (_ssrc_alive[index].elapsedTime() < 3 * 1000) { + if (_ssrc_alive.elapsedTime() < 3 * 1000) { //接受正确ssrc的rtp在10秒内,那么我们认为存在多路rtp,忽略掉ssrc不匹配的rtp - WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc[index]; + WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc; return false; } - InfoL << "rtp流ssrc切换:" << _ssrc[index] << " -> " << ssrc; - _ssrc[index] = ssrc; - _ssrc_alive[index].resetTime(); + InfoL << "rtp流ssrc切换:" << _ssrc << " -> " << ssrc; + _ssrc = ssrc; + _ssrc_alive.resetTime(); } auto rtp = RtpPacket::create(); @@ -87,29 +91,32 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 //拷贝rtp memcpy(&data[4], ptr, len); - onBeforeRtpSorted(rtp, index); + onBeforeRtpSorted(rtp); auto seq = rtp->getSeq(); - _rtp_sortor[index].sortPacket(seq, std::move(rtp)); + sortPacket(seq, std::move(rtp)); return true; } -void RtpReceiver::clear() { - CLEAR_ARR(_ssrc); - for (auto &sortor : _rtp_sortor) { - sortor.clear(); +//////////////////////////////////////////////////////////////////////////////////// + +void RtpTrackImp::setOnSorted(OnSorted cb) { + _on_sorted = std::move(cb); +} + +void RtpTrackImp::setBeforeSorted(BeforeSorted cb) { + _on_before_sorted = std::move(cb); +} + +void RtpTrackImp::onRtpSorted(RtpPacket::Ptr rtp) { + if (_on_sorted) { + _on_sorted(std::move(rtp)); } } -size_t RtpReceiver::getJitterSize(int index) const{ - return _rtp_sortor[index].getJitterSize(); -} - -size_t RtpReceiver::getCycleCount(int index) const{ - return _rtp_sortor[index].getCycleCount(); -} - -uint32_t RtpReceiver::getSSRC(int index) const{ - return _ssrc[index]; +void RtpTrackImp::onBeforeRtpSorted(const RtpPacket::Ptr &rtp) { + if (_on_before_sorted) { + _on_before_sorted(rtp); + } } }//namespace mediakit diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 24cf8fb8..608fb3cd 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -160,11 +160,8 @@ private: function _cb; }; -class RtpReceiver { +class RtpTrack : private PacketSortor{ public: - RtpReceiver(); - virtual ~RtpReceiver(); - class BadRtpException : public invalid_argument { public: template @@ -172,7 +169,60 @@ public: ~BadRtpException() = default; }; + RtpTrack(); + virtual ~RtpTrack() = default; + + void clear(); + uint32_t getSSRC() const; + bool inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len); + protected: + virtual void onRtpSorted(RtpPacket::Ptr rtp) {} + virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) {} + +private: + uint32_t _ssrc = 0; + Ticker _ssrc_alive; +}; + +class RtpTrackImp : public RtpTrack{ +public: + using OnSorted = function; + using BeforeSorted = function; + + RtpTrackImp() = default; + ~RtpTrackImp() override = default; + + void setOnSorted(OnSorted cb); + void setBeforeSorted(BeforeSorted cb); + +protected: + void onRtpSorted(RtpPacket::Ptr rtp) override; + void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) override; + +private: + OnSorted _on_sorted; + BeforeSorted _on_before_sorted; +}; + +template +class RtpMultiReceiver { +public: + RtpMultiReceiver() { + int index = 0; + for (auto &track : _track) { + track.setOnSorted([this, index](RtpPacket::Ptr rtp) { + onRtpSorted(std::move(rtp), index); + }); + track.setBeforeSorted([this, index](const RtpPacket::Ptr &rtp) { + onBeforeRtpSorted(rtp, index); + }); + ++index; + } + } + + virtual ~RtpMultiReceiver() = default; + /** * 输入数据指针生成并排序rtp包 * @param index track下标索引 @@ -182,34 +232,49 @@ protected: * @param len rtp数据指针长度 * @return 解析成功返回true */ - bool handleOneRtp(int index, TrackType type, int samplerate, uint8_t *ptr, size_t len); + bool handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len){ + return _track[index].inputRtp(type, sample_rate, ptr, len); + } + void clear() { + for (auto &track : _track) { + track.clear(); + } + } + + size_t getJitterSize(int index) const { + return _track[index].getJitterSize(); + } + + size_t getCycleCount(int index) const { + return _track[index].getCycleCount(); + } + + uint32_t getSSRC(int index) const { + return _track[index].getSSRC(); + } + +protected: /** * rtp数据包排序后输出 * @param rtp rtp数据包 * @param track_index track索引 */ - virtual void onRtpSorted(RtpPacket::Ptr rtp, int track_index) {} + virtual void onRtpSorted(RtpPacket::Ptr rtp, int index) {} /** * 解析出rtp但还未排序 * @param rtp rtp数据包 * @param track_index track索引 */ - virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) {} - - void clear(); - size_t getJitterSize(int track_index) const; - size_t getCycleCount(int track_index) const; - uint32_t getSSRC(int track_index) const; + virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int index) {} private: - uint32_t _ssrc[2] = {0, 0}; - Ticker _ssrc_alive[2]; - //rtp排序缓存,根据seq排序 - PacketSortor _rtp_sortor[2]; + RtpTrackImp _track[kCount]; }; +using RtpReceiver = RtpMultiReceiver<2>; + }//namespace mediakit diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index f3b7004f..45d0f445 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -205,7 +205,7 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { } _rtcp_context.clear(); for (auto &track : _sdp_track) { - _rtcp_context.emplace_back(std::make_shared(track->_samplerate, true)); + _rtcp_context.emplace_back(std::make_shared(true)); } sendSetup(0); } @@ -591,7 +591,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC void RtspPlayer::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_idx){ auto &rtcp_ctx = _rtcp_context[track_idx]; - rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); + rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->size() - RtpPacket::kRtpTcpHeaderSize); auto &ticker = _rtcp_send_ticker[track_idx]; if (ticker.elapsedTime() < 3 * 1000) { diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index c19b0ce7..2d0a8abe 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -179,7 +179,7 @@ void RtspPusher::sendAnnounce() { } _rtcp_context.clear(); for (auto &track : _track_vec) { - _rtcp_context.emplace_back(std::make_shared(track->_samplerate, false)); + _rtcp_context.emplace_back(std::make_shared(false)); } _on_res_func = std::bind(&RtspPusher::handleResAnnounce, this, placeholders::_1); sendRtspRequest("ANNOUNCE", _url, {}, src->getSdp()); @@ -360,7 +360,7 @@ void RtspPusher::updateRtcpContext(const RtpPacket::Ptr &rtp){ int track_index = getTrackIndexByTrackType(rtp->type); auto &ticker = _rtcp_send_ticker[track_index]; auto &rtcp_ctx = _rtcp_context[track_index]; - rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); + rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->size() - RtpPacket::kRtpTcpHeaderSize); //send rtcp every 5 second if (ticker.elapsedTime() > 5 * 1000) { diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 1e12adee..e86eb399 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -252,7 +252,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { } _rtcp_context.clear(); for (auto &track : _sdp_track) { - _rtcp_context.emplace_back(std::make_shared(track->_samplerate, true)); + _rtcp_context.emplace_back(std::make_shared(true)); } _push_src = std::make_shared(_media_info._vhost, _media_info._app, _media_info._streamid); _push_src->setListener(dynamic_pointer_cast(shared_from_this())); @@ -413,7 +413,7 @@ void RtspSession::onAuthSuccess() { } strongSelf->_rtcp_context.clear(); for (auto &track : strongSelf->_sdp_track) { - strongSelf->_rtcp_context.emplace_back(std::make_shared(track->_samplerate, false)); + strongSelf->_rtcp_context.emplace_back(std::make_shared(false)); } strongSelf->_sessionid = makeRandStr(12); strongSelf->_play_src = rtsp_src; @@ -1126,7 +1126,7 @@ void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){ void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){ int track_index = getTrackIndexByTrackType(rtp->type); auto &rtcp_ctx = _rtcp_context[track_index]; - rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); + rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->size() - RtpPacket::kRtpTcpHeaderSize); auto &ticker = _rtcp_send_tickers[track_index]; //send rtcp every 5 second diff --git a/webrtc/Nack.cpp b/webrtc/Nack.cpp new file mode 100644 index 00000000..8d4da792 --- /dev/null +++ b/webrtc/Nack.cpp @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "Nack.h" + +static constexpr uint32_t kMaxNackMS = 10 * 1000; + +void NackList::push_back(RtpPacket::Ptr rtp) { + auto seq = rtp->getSeq(); + _nack_cache_seq.emplace_back(seq); + _nack_cache_pkt.emplace(seq, std::move(rtp)); + while (get_cache_ms() > kMaxNackMS) { + //需要清除部分nack缓存 + pop_front(); + } +} + +void NackList::for_each_nack(const FCI_NACK &nack, const function &func) { + auto seq = nack.getPid(); + for (auto bit : nack.getBitArray()) { + if (bit) { + //丢包 + RtpPacket::Ptr *ptr = get_rtp(seq); + if (ptr) { + func(*ptr); + } + } + ++seq; + } +} + +void NackList::pop_front() { + if (_nack_cache_seq.empty()) { + return; + } + _nack_cache_pkt.erase(_nack_cache_seq.front()); + _nack_cache_seq.pop_front(); +} + +RtpPacket::Ptr *NackList::get_rtp(uint16_t seq) { + auto it = _nack_cache_pkt.find(seq); + if (it == _nack_cache_pkt.end()) { + return nullptr; + } + return &it->second; +} + +uint32_t NackList::get_cache_ms() { + if (_nack_cache_seq.size() < 2) { + return 0; + } + uint32_t back = _nack_cache_pkt[_nack_cache_seq.back()]->getStampMS(); + uint32_t front = _nack_cache_pkt[_nack_cache_seq.front()]->getStampMS(); + if (back > front) { + return back - front; + } + //很有可能回环了 + return back + (UINT32_MAX - front); +} + +//////////////////////////////////////////////////////////////////////////////////////////////// + +void NackContext::received(uint16_t seq) { + if (!_last_max_seq && _seq.empty()) { + _last_max_seq = seq - 1; + } + _seq.emplace(seq); + auto max_seq = *_seq.rbegin(); + auto min_seq = *_seq.begin(); + auto diff = max_seq - min_seq; + if (!diff) { + return; + } + + if (diff > UINT32_MAX / 2) { + //回环 + _seq.clear(); + _last_max_seq = min_seq; + return; + } + + if (_seq.size() == diff + 1 && _last_max_seq + 1 == min_seq) { + //都是连续的seq,未丢包 + _seq.clear(); + _last_max_seq = max_seq; + } else { + //seq不连续,有丢包 + if (min_seq == _last_max_seq + 1) { + //前面部分seq是连续的,未丢包,移除之 + eraseFrontSeq(); + } + + //有丢包,丢包从_last_max_seq开始 + if (max_seq - _last_max_seq > FCI_NACK::kBitSize) { + vector vec; + vec.resize(FCI_NACK::kBitSize); + for (auto i = 0; i < FCI_NACK::kBitSize; ++i) { + vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end(); + } + doNack(FCI_NACK(_last_max_seq + 1, vec)); + _last_max_seq += FCI_NACK::kBitSize + 1; + if (_last_max_seq >= max_seq) { + _seq.clear(); + } else { + auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq); + _seq.erase(_seq.begin(), it); + } + } + } +} + +void NackContext::setOnNack(onNack cb) { + _cb = std::move(cb); +} + +void NackContext::doNack(const FCI_NACK &nack) { + if (_cb) { + _cb(nack); + } +} + +void NackContext::eraseFrontSeq() { + //前面部分seq是连续的,未丢包,移除之 + for (auto it = _seq.begin(); it != _seq.end();) { + if (*it != _last_max_seq + 1) { + //seq不连续,丢包了 + break; + } + _last_max_seq = *it; + it = _seq.erase(it); + } +} \ No newline at end of file diff --git a/webrtc/Nack.h b/webrtc/Nack.h new file mode 100644 index 00000000..e872f3f9 --- /dev/null +++ b/webrtc/Nack.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_NACK_H +#define ZLMEDIAKIT_NACK_H + +#include "Rtsp/Rtsp.h" +#include "Rtcp/RtcpFCI.h" + +using namespace mediakit; + +class NackList { +public: + NackList() = default; + ~NackList() = default; + + void push_back(RtpPacket::Ptr rtp); + void for_each_nack(const FCI_NACK &nack, const function &cb); + +private: + void pop_front(); + uint32_t get_cache_ms(); + RtpPacket::Ptr *get_rtp(uint16_t seq); + +private: + deque _nack_cache_seq; + unordered_map _nack_cache_pkt; +}; + +class NackContext { +public: + using onNack = function; + + NackContext() = default; + ~NackContext() = default; + + void received(uint16_t seq); + void setOnNack(onNack cb); + +private: + void eraseFrontSeq(); + void doNack(const FCI_NACK &nack); + +private: + onNack _cb; + set _seq; + uint16_t _last_max_seq = 0; +}; + +#endif //ZLMEDIAKIT_NACK_H diff --git a/webrtc/RtpExt.cpp b/webrtc/RtpExt.cpp index dd9a1e2e..7679abc7 100644 --- a/webrtc/RtpExt.cpp +++ b/webrtc/RtpExt.cpp @@ -561,4 +561,90 @@ void RtpExt::setType(RtpExtType type) { RtpExtType RtpExt::getType() const { return _type; -} \ No newline at end of file +} + +RtpExtContext::RtpExtContext(const RtcMedia &m){ + for (auto &ext : m.extmap) { + auto ext_type = RtpExt::getExtType(ext.ext); + _rtp_ext_id_to_type.emplace(ext.id, ext_type); + _rtp_ext_type_to_id.emplace(ext_type, ext.id); + } +} + +string RtpExtContext::getRid(uint32_t ssrc) const{ + auto it = _ssrc_to_rid.find(ssrc); + if (it == _ssrc_to_rid.end()) { + return ""; + } + return it->second; +} + +void RtpExtContext::setRid(uint32_t ssrc, const string &rid) { + _ssrc_to_rid[ssrc] = rid; +} + +void RtpExtContext::changeRtpExtId(const RtpHeader *header, bool is_recv, string *rid_ptr) { + string rid, repaired_rid; + auto ext_map = RtpExt::getExtValue(header); + for (auto &pr : ext_map) { + if (is_recv) { + auto it = _rtp_ext_id_to_type.find(pr.first); + if (it == _rtp_ext_id_to_type.end()) { + WarnL << "接收rtp时,忽略不识别的rtp ext, id=" << (int) pr.first; + pr.second.clearExt(); + continue; + } + pr.second.setType(it->second); + //重新赋值ext id为 ext type,作为后面处理ext的统一中间类型 + pr.second.setExtId((uint8_t) it->second); + switch (it->second) { + case RtpExtType::sdes_rtp_stream_id : rid = pr.second.getRtpStreamId(); break; + case RtpExtType::sdes_repaired_rtp_stream_id : repaired_rid = pr.second.getRepairedRtpStreamId(); break; + default : break; + } + } else { + pr.second.setType((RtpExtType) pr.first); + auto it = _rtp_ext_type_to_id.find((RtpExtType) pr.first); + if (it == _rtp_ext_type_to_id.end()) { + WarnL << "发送rtp时, 忽略不被客户端支持rtp ext:" << pr.second.dumpString(); + pr.second.clearExt(); + continue; + } + //重新赋值ext id为客户端sdp声明的类型 + pr.second.setExtId(it->second); + } + } + + if (!is_recv) { + return; + } + if (rid.empty()) { + rid = repaired_rid; + } + auto ssrc = ntohl(header->ssrc); + if (rid.empty()) { + //获取rid + rid = _ssrc_to_rid[ssrc]; + } else { + //设置rid + auto it = _ssrc_to_rid.find(ssrc); + if (it == _ssrc_to_rid.end() || it->second != rid) { + _ssrc_to_rid[ssrc] = rid; + onGetRtp(header->pt, ssrc, rid); + } + } + if (rid_ptr) { + *rid_ptr = rid; + } +} + +void RtpExtContext::setOnGetRtp(OnGetRtp cb) { + _cb = std::move(cb); +} + +void RtpExtContext::onGetRtp(uint8_t pt, uint32_t ssrc, const string &rid){ + if (_cb) { + _cb(pt, ssrc, rid); + } +} + diff --git a/webrtc/RtpExt.h b/webrtc/RtpExt.h index 9520bac6..3671bc90 100644 --- a/webrtc/RtpExt.h +++ b/webrtc/RtpExt.h @@ -108,5 +108,31 @@ private: RtpExtType _type = RtpExtType::padding; }; +class RtcMedia; +class RtpExtContext { +public: + using Ptr = std::shared_ptr; + using OnGetRtp = function; + + RtpExtContext(const RtcMedia &media); + ~RtpExtContext() = default; + + void setOnGetRtp(OnGetRtp cb); + string getRid(uint32_t ssrc) const; + void setRid(uint32_t ssrc, const string &rid); + void changeRtpExtId(const RtpHeader *header, bool is_recv, string *rid_ptr = nullptr); + +private: + void onGetRtp(uint8_t pt, uint32_t ssrc, const string &rid); + +private: + OnGetRtp _cb; + //发送rtp时需要修改rtp ext id + map _rtp_ext_type_to_id; + //接收rtp时需要修改rtp ext id + unordered_map _rtp_ext_id_to_type; + //ssrc --> rid + unordered_map _ssrc_to_rid; +}; #endif //ZLMEDIAKIT_RTPEXT_H diff --git a/webrtc/Sdp.cpp b/webrtc/Sdp.cpp index bcf6acb6..4842db03 100644 --- a/webrtc/Sdp.cpp +++ b/webrtc/Sdp.cpp @@ -1636,7 +1636,8 @@ RETRY: if (configure.direction != RtpDirection::recvonly && configure.direction != RtpDirection::sendrecv) { //我们不支持接收 - continue; + answer_media.direction = RtpDirection::inactive; + break; } answer_media.direction = RtpDirection::recvonly; break; @@ -1645,7 +1646,8 @@ RETRY: if (configure.direction != RtpDirection::sendonly && configure.direction != RtpDirection::sendrecv) { //我们不支持发送 - continue; + answer_media.direction = RtpDirection::inactive; + break; } answer_media.direction = RtpDirection::sendonly; break; diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index c5bc5bd1..6ad1b896 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -148,7 +148,6 @@ void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) { fb->ssrc = htonl(0); fb->ssrc_media = htonl(ssrc); sendRtcpPacket((char *) fb.get(), fb->getSize(), true); - TraceL << ssrc << " " << bit_rate; } void WebRtcTransport::sendRtcpPli(uint32_t ssrc) { @@ -255,7 +254,8 @@ void WebRtcTransport::inputSockData(char *buf, size_t len, RTC::TransportTuple * if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) { onRtp(buf, len); } else { - WarnL; + RtpHeader *rtp = (RtpHeader *) buf; + WarnL << "srtp_unprotect rtp failed, pt:" << (int)rtp->pt; } return; } @@ -399,37 +399,49 @@ void WebRtcTransportImp::onStartWebRTC() { //获取ssrc和pt相关信息,届时收到rtp和rtcp时分别可以根据pt和ssrc找到相关的信息 for (auto &m_answer : getSdp(SdpType::answer).media) { auto m_offer = getSdp(SdpType::offer).getMedia(m_answer.type); - auto info = std::make_shared(); + auto track = std::make_shared(); - info->media = &m_answer; - info->answer_ssrc_rtp = m_answer.getRtpSSRC(); - info->answer_ssrc_rtx = m_answer.getRtxSSRC(); - info->offer_ssrc_rtp = m_offer->getRtpSSRC(); - info->offer_ssrc_rtx = m_offer->getRtxSSRC(); - info->plan_rtp = &m_answer.plan[0];; - info->plan_rtx = m_answer.getRelatedRtxPlan(info->plan_rtp->pt); - info->rtcp_context_send = std::make_shared(info->plan_rtp->sample_rate, false); + track->media = &m_answer; + track->answer_ssrc_rtp = m_answer.getRtpSSRC(); + track->answer_ssrc_rtx = m_answer.getRtxSSRC(); + track->offer_ssrc_rtp = m_offer->getRtpSSRC(); + track->offer_ssrc_rtx = m_offer->getRtxSSRC(); + track->plan_rtp = &m_answer.plan[0];; + track->plan_rtx = m_answer.getRelatedRtxPlan(track->plan_rtp->pt); + track->rtcp_context_send = std::make_shared(false); - //send ssrc --> RtpPayloadInfo - _rtp_info_ssrc[info->answer_ssrc_rtp] = std::make_pair(false, info); - _rtp_info_ssrc[info->answer_ssrc_rtx] = std::make_pair(true, info); + //send ssrc --> MediaTrack + _ssrc_to_track[track->answer_ssrc_rtp] = track; + _ssrc_to_track[track->answer_ssrc_rtx] = track; - //recv ssrc --> RtpPayloadInfo - _rtp_info_ssrc[info->offer_ssrc_rtp] = std::make_pair(false, info);; - _rtp_info_ssrc[info->offer_ssrc_rtx] = std::make_pair(true, info);; + //recv ssrc --> MediaTrack + _ssrc_to_track[track->offer_ssrc_rtp] = track; + _ssrc_to_track[track->offer_ssrc_rtx] = track; - //rtp pt --> RtpPayloadInfo - _rtp_info_pt.emplace(info->plan_rtp->pt, std::make_pair(false, info)); - if (info->plan_rtx) { - //rtx pt --> RtpPayloadInfo - _rtp_info_pt.emplace(info->plan_rtx->pt, std::make_pair(true, info)); + //rtp pt --> MediaTrack + _pt_to_track.emplace(track->plan_rtp->pt, std::make_pair(false, track)); + if (track->plan_rtx) { + //rtx pt --> MediaTrack + _pt_to_track.emplace(track->plan_rtx->pt, std::make_pair(true, track)); } if (m_offer->type != TrackApplication) { //记录rtp ext类型与id的关系,方便接收或发送rtp时修改rtp ext id - for (auto &ext : m_offer->extmap) { - auto ext_type = RtpExt::getExtType(ext.ext); - _rtp_ext_id_to_type.emplace(ext.id, ext_type); - _rtp_ext_type_to_id.emplace(ext_type, ext.id); + track->rtp_ext_ctx = std::make_shared(*m_offer); + track->rtp_ext_ctx->setOnGetRtp([this, track](uint8_t pt, uint32_t ssrc, const string &rid) { + //ssrc --> MediaTrack + _ssrc_to_track[ssrc] = track; + InfoL << "get rtp, pt:" << (int) pt << ", ssrc:" << ssrc << ", rid:" << rid; + }); + + int index = 0; + for (auto &ssrc : m_offer->rtp_ssrc_sim) { + //记录ssrc对应的MediaTrack + _ssrc_to_track[ssrc.ssrc] = track; + if (m_offer->rtp_rids.size() > index) { + //支持firefox的simulcast, 提前映射好ssrc和rid的关系 + track->rtp_ext_ctx->setRid(ssrc.ssrc, m_offer->rtp_rids[index]); + } + ++index; } } } @@ -466,10 +478,10 @@ void WebRtcTransportImp::onStartWebRTC() { } auto rtsp_media = rtsp_send_sdp.getMedia(m.type); if (rtsp_media && getCodecId(rtsp_media->plan[0].codec) == getCodecId(m.plan[0].codec)) { - auto it = _rtp_info_pt.find(m.plan[0].pt); - CHECK(it != _rtp_info_pt.end()); + auto it = _pt_to_track.find(m.plan[0].pt); + CHECK(it != _pt_to_track.end()); //记录发送rtp时约定的信息,届时发送rtp时需要修改pt和ssrc - _send_rtp_info[m.type] = it->second.second; + _type_to_track[m.type] = it->second.second; } } } @@ -558,27 +570,45 @@ SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{ /////////////////////////////////////////////////////////////////// -class RtpReceiverImp : public RtpReceiver { +class RtpChannel : public RtpTrackImp { public: - RtpReceiverImp( function cb){ - _on_sort = std::move(cb); + RtpChannel(RtpTrackImp::OnSorted cb, function on_nack) { + setOnSorted(std::move(cb)); + _nack_ctx.setOnNack(std::move(on_nack)); } - ~RtpReceiverImp() override = default; + ~RtpChannel() override = default; - bool inputRtp(TrackType type, int samplerate, uint8_t *ptr, size_t len){ - return handleOneRtp((int) type, type, samplerate, ptr, len); + bool inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len, bool is_rtx){ + if (!is_rtx) { + RtpHeader *rtp = (RtpHeader *) ptr; + auto seq = ntohs(rtp->seq); + //统计rtp接受情况,便于生成nack rtcp包 + _nack_ctx.received(seq); + //统计rtp收到的情况,好做rr汇报 + _rtcp_context.onRtp(seq, ntohl(rtp->stamp), len); + } + return RtpTrack::inputRtp(type, sample_rate, ptr, len); } -protected: - void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override { - _on_sort(std::move(rtp)); + Buffer::Ptr createRtcpRR(RtcpHeader *sr, uint32_t ssrc) { + _rtcp_context.onRtcp(sr); + return _rtcp_context.createRtcpRR(ssrc, getSSRC()); } private: - function _on_sort; + NackContext _nack_ctx; + RtcpContext _rtcp_context{true}; }; +std::shared_ptr MediaTrack::getRtpChannel(uint32_t ssrc) const{ + auto it_chn = rtp_channel.find(rtp_ext_ctx->getRid(ssrc)); + if (it_chn == rtp_channel.end()) { + return nullptr; + } + return it_chn->second; +} + void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { _bytes_usage += len; auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len); @@ -587,19 +617,15 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { case RtcpType::RTCP_SR : { //对方汇报rtp发送情况 RtcpSR *sr = (RtcpSR *) rtcp; - auto it = _rtp_info_ssrc.find(sr->ssrc); - if (it != _rtp_info_ssrc.end()) { - auto rtx = it->second.first; - if (!rtx) { - auto &info = it->second.second; - auto it = info->rtcp_context_recv.find(sr->ssrc); - if (it != info->rtcp_context_recv.end()) { - it->second->onRtcp(sr); - auto rr = it->second->createRtcpRR(info->answer_ssrc_rtp, sr->ssrc); - sendRtcpPacket(rr->data(), rr->size(), true); - } else { - WarnL << "未识别的sr rtcp包:" << rtcp->dumpString(); - } + auto it = _ssrc_to_track.find(sr->ssrc); + if (it != _ssrc_to_track.end()) { + auto &track = it->second; + auto rtp_chn = track->getRtpChannel(sr->ssrc); + if(!rtp_chn){ + WarnL << "未识别的sr rtcp包:" << rtcp->dumpString(); + } else { + auto rr = rtp_chn->createRtcpRR(sr, track->answer_ssrc_rtp); + sendRtcpPacket(rr->data(), rr->size(), true); } } else { WarnL << "未识别的sr rtcp包:" << rtcp->dumpString(); @@ -611,14 +637,11 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { //对方汇报rtp接收情况 RtcpRR *rr = (RtcpRR *) rtcp; for (auto item : rr->getItemList()) { - auto it = _rtp_info_ssrc.find(item->ssrc); - if (it != _rtp_info_ssrc.end()) { - auto rtx = it->second.first; - if (!rtx) { - auto &info = it->second.second; - auto sr = info->rtcp_context_send->createRtcpSR(info->answer_ssrc_rtp); - sendRtcpPacket(sr->data(), sr->size(), true); - } + auto it = _ssrc_to_track.find(item->ssrc); + if (it != _ssrc_to_track.end()) { + auto &track = it->second; + auto sr = track->rtcp_context_send->createRtcpSR(track->answer_ssrc_rtp); + sendRtcpPacket(sr->data(), sr->size(), true); } else { WarnL << "未识别的rr rtcp包:" << rtcp->dumpString(); } @@ -629,12 +652,12 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { //对方汇报停止发送rtp RtcpBye *bye = (RtcpBye *) rtcp; for (auto ssrc : bye->getSSRC()) { - auto it = _rtp_info_ssrc.find(*ssrc); - if (it == _rtp_info_ssrc.end()) { + auto it = _ssrc_to_track.find(*ssrc); + if (it == _ssrc_to_track.end()) { WarnL << "未识别的bye rtcp包:" << rtcp->dumpString(); continue; } - _rtp_info_ssrc.erase(it); + _ssrc_to_track.erase(it); } onShutdown(SockException(Err_eof, "rtcp bye message received")); break; @@ -648,20 +671,17 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { switch ((RTPFBType) rtcp->report_count) { case RTPFBType::RTCP_RTPFB_NACK : { RtcpFB *fb = (RtcpFB *) rtcp; - auto it = _rtp_info_ssrc.find(fb->ssrc_media); - if (it == _rtp_info_ssrc.end()) { + auto it = _ssrc_to_track.find(fb->ssrc_media); + if (it == _ssrc_to_track.end()) { WarnL << "未识别的 rtcp包:" << rtcp->dumpString(); return; } - auto rtx = it->second.first; - if (!rtx) { - auto &info = it->second.second; - auto &fci = fb->getFci(); - info->nack_list.for_each_nack(fci, [&](const RtpPacket::Ptr &rtp) { - //rtp重传 - onSendRtp(rtp, true, true); - }); - } + auto &track = it->second; + auto &fci = fb->getFci(); + track->nack_list.for_each_nack(fci, [&](const RtpPacket::Ptr &rtp) { + //rtp重传 + onSendRtp(rtp, true, true); + }); break; } default: break; @@ -675,122 +695,55 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { /////////////////////////////////////////////////////////////////// -void WebRtcTransportImp::changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *header, bool is_recv, bool is_rtx, string *rid_ptr) const{ - auto ext_map = RtpExt::getExtValue(header); - for (auto &pr : ext_map) { - if (is_recv) { - auto it = _rtp_ext_id_to_type.find(pr.first); - if (it == _rtp_ext_id_to_type.end()) { - WarnL << "接收rtp时,忽略不识别的rtp ext, id=" << (int) pr.first; - pr.second.clearExt(); - continue; - } - pr.second.setType(it->second); - //重新赋值ext id为 ext type,作为后面处理ext的统一中间类型 - pr.second.setExtId((uint8_t) it->second); - switch(it->second){ - case RtpExtType::sdes_repaired_rtp_stream_id : - case RtpExtType::sdes_rtp_stream_id : { - auto ssrc = ntohl(header->ssrc); - auto rid = it->second == RtpExtType::sdes_rtp_stream_id ? pr.second.getRtpStreamId() : pr.second.getRepairedRtpStreamId(); - //根据rid获取rtp或rtx的ssrc - auto &ssrc_ref = is_rtx ? info.rid_ssrc[rid].second : info.rid_ssrc[rid].first; - if (!ssrc_ref) { - //ssrc未赋值,赋值 - ssrc_ref = ssrc; - DebugL << (is_rtx ? "got rid of rtx:" : "got rid:") << rid << ", ssrc:" << ssrc; - } - if (is_rtx) { - //rtx ssrc --> rtp ssrc - auto &rtp_ssrc_ref = info.rtx_ssrc_to_rtp_ssrc[ssrc]; - if (!rtp_ssrc_ref && info.rid_ssrc[rid].first) { - //未找到rtx到rtp ssrc的映射关系,且已经获取rtp的ssrc,那么设置映射关系 - rtp_ssrc_ref = info.rid_ssrc[rid].first; - DebugL << "got ssrc of rid:" << rid << ", [rtx-rtp]:" << ssrc << "-" << rtp_ssrc_ref; - } - } - if (rid_ptr) { - *rid_ptr = rid; - } - break; - } - - default : break; - } - } else { - pr.second.setType((RtpExtType) pr.first); - auto it = _rtp_ext_type_to_id.find((RtpExtType) pr.first); - if (it == _rtp_ext_type_to_id.end()) { - WarnL << "发送rtp时, 忽略不被客户端支持rtp ext:" << pr.second.dumpString(); - pr.second.clearExt(); - continue; - } - //重新赋值ext id为客户端sdp声明的类型 - pr.second.setExtId(it->second); - } - } +void WebRtcTransportImp::createRtpChannel(const string &rid, uint32_t ssrc, const MediaTrack::Ptr &track) { + //rid --> RtpReceiverImp + auto &ref = track->rtp_channel[rid]; + ref = std::make_shared([track, this, rid](RtpPacket::Ptr rtp) mutable { + onSortedRtp(*track, rid, std::move(rtp)); + }, [track, this, ssrc](const FCI_NACK &nack) mutable { + onSendNack(*track, nack, ssrc); + }); + InfoL << "create rtp receiver of ssrc:" << ssrc << ", rid:" << rid << ", codec:" << track->plan_rtp->codec; } void WebRtcTransportImp::onRtp(const char *buf, size_t len) { - onRtp_l(buf, len, false); -} - -void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) { - if (!rtx) { - _bytes_usage += len; - _alive_ticker.resetTime(); - } + _bytes_usage += len; + _alive_ticker.resetTime(); RtpHeader *rtp = (RtpHeader *) buf; - auto ssrc = ntohl(rtp->ssrc); - //根据接收到的rtp的pt信息,找到该流的信息 - auto it = _rtp_info_pt.find(rtp->pt); - if (it == _rtp_info_pt.end()) { - WarnL; + auto it = _pt_to_track.find(rtp->pt); + if (it == _pt_to_track.end()) { + WarnL << "unknown rtp pt:" << (int)rtp->pt; return; } - auto &info = it->second.second; - if (!it->second.first) { + bool is_rtx = it->second.first; + auto ssrc = ntohl(rtp->ssrc); + auto &track = it->second.second; + + //修改ext id至统一 + string rid; + track->rtp_ext_ctx->changeRtpExtId(rtp, true, &rid); + + auto &ref = track->rtp_channel[rid]; + if (!ref) { + if (is_rtx) { + //再接收到对应的rtp前,丢弃rtx包 + WarnL << "unknown rtx rtp, rid:" << rid << ", ssrc:" << ssrc << ", codec:" << track->plan_rtp->codec << ", seq:" << ntohs(rtp->seq); + return; + } + createRtpChannel(rid, ssrc, track); + } + + if (!is_rtx) { //这是普通的rtp数据 - auto seq = ntohs(rtp->seq); #if 0 - if (!rtx && info->media->type == TrackVideo && seq % 100 == 0) { + auto seq = ntohs(rtp->seq); + if (track->media->type == TrackVideo && seq % 100 == 0) { //此处模拟接受丢包 - DebugL << "recv dropped:" << seq; return; } #endif - auto &ref = info->receiver[ssrc]; - if (!rtx) { - //统计rtp接受情况,便于生成nack rtcp包 - info->nack_ctx[ssrc].received(seq); - //时间戳转换成毫秒 - auto stamp_ms = ntohl(rtp->stamp) * uint64_t(1000) / info->plan_rtp->sample_rate; - - //统计rtp收到的情况,好做rr汇报 - auto &cxt_ref = info->rtcp_context_recv[ssrc]; - if (!cxt_ref) { - cxt_ref = std::make_shared(info->plan_rtp->sample_rate, true); - } - cxt_ref->onRtp(seq, stamp_ms, len); - - //修改ext id至统一 - string rid; - changeRtpExtId(*info, rtp, true, false, &rid); - - if (!ref) { - ref = std::make_shared([info, this, rid](RtpPacket::Ptr rtp) mutable { - onSortedRtp(*info, rid, std::move(rtp)); - }); - info->nack_ctx[ssrc].setOnNack([info, this, ssrc](const FCI_NACK &nack) mutable { - onSendNack(*info, nack, ssrc); - }); - //recv simulcast ssrc --> RtpPayloadInfo - _rtp_info_ssrc[ssrc] = std::make_pair(false, info); - InfoL << "receive rtp of ssrc:" << ssrc; - } - } //解析并排序rtp if(!ref){ InfoL << "ignore no rtp receiver of ssrc:" << ssrc<<" is rtx:"<pt = track->plan_rtp->pt; rtp->seq = htons(origin_seq); - if (info->offer_ssrc_rtp) { - //非simulcast或音频 - rtp->ssrc = htonl(info->offer_ssrc_rtp); - TraceL << "received rtx rtp,ssrc: " << ssrc << ", seq:" << origin_seq << ", pt:" << (int)rtp->pt; - } else { - //todo simulcast下,辅码流通过rtx传输? - //simulcast情况下,根据rtx的ssrc查找rtp的ssrc - rtp->ssrc = htonl(info->rtx_ssrc_to_rtp_ssrc[ntohl(rtp->ssrc)]); - } - rtp->pt = info->plan_rtp->pt; + rtp->ssrc = htonl(ref->getSSRC()); + memmove((uint8_t *) buf + 2, buf, payload - (uint8_t *) buf); buf += 2; len -= 2; - onRtp_l(buf, len, true); + ref->inputRtp(track->media->type, track->plan_rtp->sample_rate, (uint8_t *) buf, len, true); } -void WebRtcTransportImp::onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, uint32_t ssrc) { +void WebRtcTransportImp::onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc) { auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize); - rtcp->ssrc = htons(info.answer_ssrc_rtp); + rtcp->ssrc = htons(track.answer_ssrc_rtp); rtcp->ssrc_media = htonl(ssrc); DebugL << htonl(ssrc) << " " << nack.getPid(); sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true); @@ -841,8 +785,8 @@ void WebRtcTransportImp::onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, /////////////////////////////////////////////////////////////////// -void WebRtcTransportImp::onSortedRtp(RtpPayloadInfo &info, const string &rid, RtpPacket::Ptr rtp) { - if (info.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) { +void WebRtcTransportImp::onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) { + if (track.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) { //定期发送pli请求关键帧,方便非rtc等协议 _pli_ticker.resetTime(); sendRtcpPli(rtp->getSSRC()); @@ -879,42 +823,41 @@ void WebRtcTransportImp::onSortedRtp(RtpPayloadInfo &info, const string &rid, Rt /////////////////////////////////////////////////////////////////// void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx){ - auto &info = _send_rtp_info[rtp->type]; - if (!info) { + auto &track = _type_to_track[rtp->type]; + if (!track) { //忽略,对方不支持该编码类型 return; } if (!rtx) { //统计rtp发送情况,好做sr汇报 - info->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); - info->nack_list.push_back(rtp); + track->rtcp_context_send->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->size() - RtpPacket::kRtpTcpHeaderSize); + track->nack_list.push_back(rtp); #if 0 //此处模拟发送丢包 if (rtp->type == TrackVideo && rtp->getSeq() % 100 == 0) { - DebugL << "send dropped:" << rtp->getSeq(); return; } #endif } else { WarnL << "send rtx rtp:" << rtp->getSeq(); } - pair ctx{rtx, info.get()}; + pair ctx{rtx, track.get()}; sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx); _bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize; } void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) { - auto pr = (pair *) ctx; + auto pr = (pair *) ctx; auto header = (RtpHeader *) buf; if (!pr->first || !pr->second->plan_rtx) { //普通的rtp,或者不支持rtx, 修改目标pt和ssrc - changeRtpExtId(*pr->second, header, false, false); + pr->second->rtp_ext_ctx->changeRtpExtId(header, false); header->pt = pr->second->plan_rtp->pt; header->ssrc = htonl(pr->second->answer_ssrc_rtp); } else { //重传的rtp, rtx - changeRtpExtId(*pr->second, header, false, true); + pr->second->rtp_ext_ctx->changeRtpExtId(header, false); header->pt = pr->second->plan_rtx->pt; if (pr->second->answer_ssrc_rtx) { //有rtx单独的ssrc,有些情况下,浏览器支持rtx,但是未指定rtx单独的ssrc @@ -949,7 +892,7 @@ void WebRtcTransportImp::onShutdown(const SockException &ex){ bool WebRtcTransportImp::close(MediaSource &sender, bool force) { //此回调在其他线程触发 - if(!_push_src || (!force && _push_src->totalReaderCount())){ + if (!force && totalReaderCount(sender)) { return false; } string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; @@ -958,7 +901,11 @@ bool WebRtcTransportImp::close(MediaSource &sender, bool force) { } int WebRtcTransportImp::totalReaderCount(MediaSource &sender) { - return _push_src ? _push_src->totalReaderCount() : sender.readerCount(); + auto total_count = 0; + for (auto &src : _push_src_simulcast) { + total_count += src.second->totalReaderCount(); + } + return total_count; } MediaOriginType WebRtcTransportImp::getOriginType(MediaSource &sender) const { diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index c62f549e..5e3523f1 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -22,6 +22,8 @@ #include "Rtsp/RtspMediaSourceImp.h" #include "Rtcp/RtcpContext.h" #include "Rtcp/RtcpFCI.h" +#include "Nack.h" + using namespace toolkit; using namespace mediakit; @@ -125,151 +127,26 @@ private: RtcSession::Ptr _answer_sdp; }; -class RtpReceiverImp; - -class NackList { +class RtpChannel; +class MediaTrack { public: - void push_back(RtpPacket::Ptr rtp) { - auto seq = rtp->getSeq(); - _nack_cache_seq.emplace_back(seq); - _nack_cache_pkt.emplace(seq, std::move(rtp)); - while (get_cache_ms() > kMaxNackMS) { - //需要清除部分nack缓存 - pop_front(); - } - } + using Ptr = std::shared_ptr; + const RtcCodecPlan *plan_rtp; + const RtcCodecPlan *plan_rtx; + uint32_t offer_ssrc_rtp = 0; + uint32_t offer_ssrc_rtx = 0; + uint32_t answer_ssrc_rtp = 0; + uint32_t answer_ssrc_rtx = 0; + const RtcMedia *media; + RtpExtContext::Ptr rtp_ext_ctx; - template - void for_each_nack(const FCI_NACK &nack, const FUNC &func) { - auto seq = nack.getPid(); - for (auto bit : nack.getBitArray()) { - if (bit) { - //丢包 - RtpPacket::Ptr *ptr = get_rtp(seq); - if (ptr) { - func(*ptr); - } - } - ++seq; - } - } + //for send rtp + NackList nack_list; + RtcpContext::Ptr rtcp_context_send; -private: - void pop_front() { - if (_nack_cache_seq.empty()) { - return; - } - _nack_cache_pkt.erase(_nack_cache_seq.front()); - _nack_cache_seq.pop_front(); - } - - RtpPacket::Ptr *get_rtp(uint16_t seq) { - auto it = _nack_cache_pkt.find(seq); - if (it == _nack_cache_pkt.end()) { - return nullptr; - } - return &it->second; - } - - uint32_t get_cache_ms() { - if (_nack_cache_seq.size() < 2) { - return 0; - } - uint32_t back = _nack_cache_pkt[_nack_cache_seq.back()]->getStampMS(); - uint32_t front = _nack_cache_pkt[_nack_cache_seq.front()]->getStampMS(); - if (back > front) { - return back - front; - } - //很有可能回环了 - return back + (UINT32_MAX - front); - } - -private: - static constexpr uint32_t kMaxNackMS = 10 * 1000; - deque _nack_cache_seq; - unordered_map _nack_cache_pkt; -}; - -class NackContext { -public: - using onNack = function; - - void received(uint16_t seq) { - if (!_last_max_seq && _seq.empty()) { - _last_max_seq = seq - 1; - } - _seq.emplace(seq); - auto max_seq = *_seq.rbegin(); - auto min_seq = *_seq.begin(); - auto diff = max_seq - min_seq; - if (!diff) { - return; - } - - if (diff > UINT32_MAX / 2) { - //回环 - _seq.clear(); - _last_max_seq = min_seq; - return; - } - - if (_seq.size() == diff + 1 && _last_max_seq + 1 == min_seq) { - //都是连续的seq,未丢包 - _seq.clear(); - _last_max_seq = max_seq; - } else { - //seq不连续,有丢包 - if (min_seq == _last_max_seq + 1) { - //前面部分seq是连续的,未丢包,移除之 - eraseFrontSeq(); - } - - //有丢包,丢包从_last_max_seq开始 - if (max_seq - _last_max_seq > FCI_NACK::kBitSize) { - vector vec; - vec.resize(FCI_NACK::kBitSize); - for (auto i = 0; i < FCI_NACK::kBitSize; ++i) { - vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end(); - } - doNack(FCI_NACK(_last_max_seq + 1, vec)); - _last_max_seq += FCI_NACK::kBitSize + 1; - if (_last_max_seq >= max_seq) { - _seq.clear(); - } else { - auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq); - _seq.erase(_seq.begin(), it); - } - } - } - } - - void setOnNack(onNack cb) { - _cb = std::move(cb); - } - -private: - void doNack(const FCI_NACK &nack) { - if (_cb) { - _cb(nack); - } - } - - void eraseFrontSeq(){ - //前面部分seq是连续的,未丢包,移除之 - for (auto it = _seq.begin(); it != _seq.end();) { - if (*it != _last_max_seq + 1) { - //seq不连续,丢包了 - break; - } - _last_max_seq = *it; - it = _seq.erase(it); - } - } - -private: - onNack _cb; - set _seq; - uint16_t _last_max_seq = 0; + //for recv rtp + unordered_map > rtp_channel; + std::shared_ptr getRtpChannel(uint32_t ssrc) const; }; class WebRtcTransportImp : public WebRtcTransport, public MediaSourceEvent, public SockInfo, public std::enable_shared_from_this{ @@ -298,8 +175,6 @@ protected: void onRtcConfigure(RtcConfigure &configure) const override; void onRtp(const char *buf, size_t len) override; - void onRtp_l(const char *buf, size_t len, bool rtx); - void onRtcp(const char *buf, size_t len) override; void onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) override; void onBeforeEncryptRtcp(const char *buf, size_t &len, void *ctx) override {}; @@ -339,28 +214,9 @@ private: bool canSendRtp() const; bool canRecvRtp() const; - class RtpPayloadInfo { - public: - using Ptr = std::shared_ptr; - const RtcCodecPlan *plan_rtp; - const RtcCodecPlan *plan_rtx; - uint32_t offer_ssrc_rtp = 0; - uint32_t offer_ssrc_rtx = 0; - uint32_t answer_ssrc_rtp = 0; - uint32_t answer_ssrc_rtx = 0; - const RtcMedia *media; - NackList nack_list; - RtcpContext::Ptr rtcp_context_send; - unordered_map > rid_ssrc; - unordered_map rtx_ssrc_to_rtp_ssrc; - unordered_map nack_ctx; - unordered_map rtcp_context_recv; - unordered_map > receiver; - }; - - void onSortedRtp(RtpPayloadInfo &info, const string &rid, RtpPacket::Ptr rtp); - void onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, uint32_t ssrc); - void changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *header, bool is_recv, bool is_rtx = false, string *rid_ptr = nullptr) const; + void onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp); + void onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc); + void createRtpChannel(const string &rid, uint32_t ssrc, const MediaTrack::Ptr &track); private: uint16_t _rtx_seq[2] = {0, 0}; @@ -386,13 +242,9 @@ private: //播放rtsp源的reader对象 RtspMediaSource::RingType::RingReader::Ptr _reader; //根据发送rtp的track类型获取相关信息 - RtpPayloadInfo::Ptr _send_rtp_info[2]; + MediaTrack::Ptr _type_to_track[2]; //根据接收rtp的pt获取相关信息 - unordered_map > _rtp_info_pt; - //根据rtcp的ssrc获取相关信息 - unordered_map > _rtp_info_ssrc; - //发送rtp时需要修改rtp ext id - map _rtp_ext_type_to_id; - //接收rtp时需要修改rtp ext id - unordered_map _rtp_ext_id_to_type; + unordered_map > _pt_to_track; + //根据rtcp的ssrc获取相关信息,收发rtp和rtx的ssrc都会记录 + unordered_map _ssrc_to_track; }; \ No newline at end of file diff --git a/www/webrtc/ZLMRTCClient.js b/www/webrtc/ZLMRTCClient.js index a92ed4a4..9934e7b8 100644 --- a/www/webrtc/ZLMRTCClient.js +++ b/www/webrtc/ZLMRTCClient.js @@ -7399,18 +7399,11 @@ var ZLMRTCClient = (function (exports) { }; if (this.options.simulcast && stream.getVideoTracks().length > 0) { - VideoTransceiverInit.sendEncodings = [{ - rid: 'q', - active: true, - scaleResolutionDownBy: 4.0 - }, { - rid: 'h', - active: true, - scaleResolutionDownBy: 2.0 - }, { - rid: 'f', - active: true - }]; + VideoTransceiverInit.sendEncodings = [ + { rid: "h", active: true, maxBitrate: 1000000 }, + { rid: "m", active: true, maxBitrate: 500000, scaleResolutionDownBy: 2 }, + { rid: "l", active: true, maxBitrate: 200000, scaleResolutionDownBy: 4 } + ]; } if (stream.getAudioTracks().length > 0) {