ZLMediaKit/webrtc/WebRtcTransport.cpp
2021-06-25 10:08:53 +08:00

996 lines
37 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "WebRtcTransport.h"
#include <iostream>
#include "RtpExt.h"
#include "Rtcp/Rtcp.h"
#include "Rtcp/RtcpFCI.h"
#include "Rtsp/RtpReceiver.h"
#define RTX_SSRC_OFFSET 2
#define RTP_CNAME "zlmediakit-rtp"
#define RTP_LABEL "zlmediakit-label"
#define RTP_MSLABEL "zlmediakit-mslabel"
#define RTP_MSID RTP_MSLABEL " " RTP_LABEL
//RTC配置项目
namespace RTC {
#define RTC_FIELD "rtc."
//rtp和rtcp接受超时时间
const string kTimeOutSec = RTC_FIELD"timeoutSec";
//服务器外网ip
const string kExternIP = RTC_FIELD"externIP";
//设置remb比特率非0时关闭twcc并开启remb。该设置在rtc推流时有效可以控制推流画质
const string kRembBitRate = RTC_FIELD"rembBitRate";
static onceToken token([]() {
mINI::Instance()[kTimeOutSec] = 15;
mINI::Instance()[kExternIP] = "";
mINI::Instance()[kRembBitRate] = 0;
});
}//namespace RTC
WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) {
_poller = poller;
_dtls_transport = std::make_shared<RTC::DtlsTransport>(poller, this);
_ice_server = std::make_shared<RTC::IceServer>(this, makeRandStr(4), makeRandStr(28).substr(4));
}
void WebRtcTransport::onCreate(){
}
void WebRtcTransport::onDestory(){
_dtls_transport = nullptr;
_ice_server = nullptr;
}
const EventPoller::Ptr& WebRtcTransport::getPoller() const{
return _poller;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) {
onSendSockData((char *) packet->GetData(), packet->GetSize(), (struct sockaddr_in *) tuple);
}
void WebRtcTransport::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) {
InfoL;
}
void WebRtcTransport::OnIceServerConnected(const RTC::IceServer *iceServer) {
InfoL;
}
void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer *iceServer) {
InfoL;
if (_answer_sdp->media[0].role == DtlsRole::passive) {
_dtls_transport->Run(RTC::DtlsTransport::Role::SERVER);
} else {
_dtls_transport->Run(RTC::DtlsTransport::Role::CLIENT);
}
}
void WebRtcTransport::OnIceServerDisconnected(const RTC::IceServer *iceServer) {
InfoL;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnDtlsTransportConnected(
const RTC::DtlsTransport *dtlsTransport,
RTC::SrtpSession::CryptoSuite srtpCryptoSuite,
uint8_t *srtpLocalKey,
size_t srtpLocalKeyLen,
uint8_t *srtpRemoteKey,
size_t srtpRemoteKeyLen,
std::string &remoteCert) {
InfoL;
_srtp_session_send = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen);
_srtp_session_recv = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen);
onStartWebRTC();
}
void WebRtcTransport::OnDtlsTransportSendData(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
onSendSockData((char *)data, len);
}
void WebRtcTransport::OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
}
void WebRtcTransport::OnDtlsTransportFailed(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
onShutdown(SockException(Err_shutdown, "dtls transport failed"));
}
void WebRtcTransport::OnDtlsTransportClosed(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
onShutdown(SockException(Err_shutdown, "dtls close notify received"));
}
void WebRtcTransport::OnDtlsTransportApplicationDataReceived(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
InfoL << hexdump(data, len);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::onSendSockData(const char *buf, size_t len, bool flush){
auto tuple = _ice_server->GetSelectedTuple();
assert(tuple);
onSendSockData(buf, len, (struct sockaddr_in *) tuple, flush);
}
const RtcSession& WebRtcTransport::getSdp(SdpType type) const{
switch (type) {
case SdpType::offer: return *_offer_sdp;
case SdpType::answer: return *_answer_sdp;
default: throw std::invalid_argument("不识别的sdp类型");
}
}
RTC::TransportTuple* WebRtcTransport::getSelectedTuple() const{
return _ice_server->GetSelectedTuple();
}
void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) {
auto remb = FCI_REMB::create({ssrc}, (uint32_t)bit_rate);
auto fb = RtcpFB::create(PSFBType::RTCP_PSFB_REMB, remb.data(), remb.size());
fb->ssrc = htonl(0);
fb->ssrc_media = htonl(ssrc);
sendRtcpPacket((char *) fb.get(), fb->getSize(), true);
TraceL << ssrc << " " << bit_rate;
}
void WebRtcTransport::sendRtcpPli(uint32_t ssrc) {
auto pli = RtcpFB::create(PSFBType::RTCP_PSFB_PLI);
pli->ssrc = htonl(0);
pli->ssrc_media = htonl(ssrc);
sendRtcpPacket((char *) pli.get(), pli->getSize(), true);
}
string getFingerprint(const string &algorithm_str, const std::shared_ptr<RTC::DtlsTransport> &transport){
auto algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(algorithm_str);
for (auto &finger_prints : transport->GetLocalFingerprints()) {
if (finger_prints.algorithm == algorithm) {
return finger_prints.value;
}
}
throw std::invalid_argument(StrPrinter << "不支持的加密算法:" << algorithm_str);
}
void WebRtcTransport::setRemoteDtlsFingerprint(const RtcSession &remote){
//设置远端dtls签名
RTC::DtlsTransport::Fingerprint remote_fingerprint;
remote_fingerprint.algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(_offer_sdp->media[0].fingerprint.algorithm);
remote_fingerprint.value = _offer_sdp->media[0].fingerprint.hash;
_dtls_transport->SetRemoteFingerprint(remote_fingerprint);
}
void WebRtcTransport::onCheckSdp(SdpType type, RtcSession &sdp){
for (auto &m : sdp.media) {
if (m.type != TrackApplication && !m.rtcp_mux) {
throw std::invalid_argument("只支持rtcp-mux模式");
}
}
if (sdp.group.mids.empty()) {
throw std::invalid_argument("只支持group BUNDLE模式");
}
if (type == SdpType::offer) {
sdp.checkValidSSRC();
}
}
void WebRtcTransport::onRtcConfigure(RtcConfigure &configure) const {
//开启remb后关闭twcc因为开启twcc后remb无效
GET_CONFIG(size_t, remb_bit_rate, RTC::kRembBitRate);
configure.enableTWCC(!remb_bit_rate);
}
std::string WebRtcTransport::getAnswerSdp(const string &offer){
try {
//// 解析offer sdp ////
_offer_sdp = std::make_shared<RtcSession>();
_offer_sdp->loadFrom(offer);
onCheckSdp(SdpType::offer, *_offer_sdp);
setRemoteDtlsFingerprint(*_offer_sdp);
//// sdp 配置 ////
SdpAttrFingerprint fingerprint;
fingerprint.algorithm = _offer_sdp->media[0].fingerprint.algorithm;
fingerprint.hash = getFingerprint(fingerprint.algorithm, _dtls_transport);
RtcConfigure configure;
configure.setDefaultSetting(_ice_server->GetUsernameFragment(), _ice_server->GetPassword(),
RtpDirection::sendrecv, fingerprint);
onRtcConfigure(configure);
//// 生成answer sdp ////
_answer_sdp = configure.createAnswer(*_offer_sdp);
onCheckSdp(SdpType::answer, *_answer_sdp);
return _answer_sdp->toString();
} catch (exception &ex) {
onShutdown(SockException(Err_shutdown, ex.what()));
throw;
}
}
bool is_dtls(char *buf) {
return ((*buf > 19) && (*buf < 64));
}
bool is_rtp(char *buf) {
RtpHeader *header = (RtpHeader *) buf;
return ((header->pt < 64) || (header->pt >= 96));
}
bool is_rtcp(char *buf) {
RtpHeader *header = (RtpHeader *) buf;
return ((header->pt >= 64) && (header->pt < 96));
}
void WebRtcTransport::inputSockData(char *buf, size_t len, RTC::TransportTuple *tuple) {
if (RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {
RTC::StunPacket *packet = RTC::StunPacket::Parse((const uint8_t *) buf, len);
if (packet == nullptr) {
WarnL << "parse stun error" << std::endl;
return;
}
_ice_server->ProcessStunPacket(packet, tuple);
return;
}
if (is_dtls(buf)) {
_dtls_transport->ProcessDtlsData((uint8_t *) buf, len);
return;
}
if (is_rtp(buf)) {
if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) {
onRtp(buf, len);
} else {
WarnL;
}
return;
}
if (is_rtcp(buf)) {
if (_srtp_session_recv->DecryptSrtcp((uint8_t *) buf, &len)) {
onRtcp(buf, len);
} else {
WarnL;
}
return;
}
}
void WebRtcTransport::sendRtpPacket(const char *buf, size_t len, bool flush, void *ctx) {
if (_srtp_session_send) {
//预留rtx加入的两个字节
CHECK(len + SRTP_MAX_TRAILER_LEN + 2 <= sizeof(_srtp_buf));
memcpy(_srtp_buf, buf, len);
onBeforeEncryptRtp((char *) _srtp_buf, len, ctx);
if (_srtp_session_send->EncryptRtp(_srtp_buf, &len)) {
onSendSockData((char *) _srtp_buf, len, flush);
}
}
}
void WebRtcTransport::sendRtcpPacket(const char *buf, size_t len, bool flush, void *ctx){
if (_srtp_session_send) {
CHECK(len + SRTP_MAX_TRAILER_LEN <= sizeof(_srtp_buf));
memcpy(_srtp_buf, buf, len);
onBeforeEncryptRtcp((char *) _srtp_buf, len, ctx);
if (_srtp_session_send->EncryptRtcp(_srtp_buf, &len)) {
onSendSockData((char *) _srtp_buf, len, flush);
}
}
}
///////////////////////////////////////////////////////////////////////////////////
WebRtcTransportImp::Ptr WebRtcTransportImp::create(const EventPoller::Ptr &poller){
WebRtcTransportImp::Ptr ret(new WebRtcTransportImp(poller), [](WebRtcTransportImp *ptr){
ptr->onDestory();
delete ptr;
});
ret->onCreate();
return ret;
}
void WebRtcTransportImp::onCreate(){
WebRtcTransport::onCreate();
_socket = Socket::createSocket(getPoller(), false);
//随机端口,绑定全部网卡
_socket->bindUdpSock(0);
weak_ptr<WebRtcTransportImp> weak_self = shared_from_this();
_socket->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->inputSockData(buf->data(), buf->size(), addr);
}
});
_self = shared_from_this();
GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec);
_timer = std::make_shared<Timer>(timeoutSec / 2, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) {
strong_self->onShutdown(SockException(Err_timeout, "接受rtp和rtcp超时"));
}
return true;
}, getPoller());
}
WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) {
InfoL << this;
}
WebRtcTransportImp::~WebRtcTransportImp() {
InfoL << this;
}
void WebRtcTransportImp::onDestory() {
WebRtcTransport::onDestory();
uint64_t duration = _alive_ticker.createdTime() / 1000;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_reader) {
WarnL << "RTC播放器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")结束播放,耗时(s):" << duration;
if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, static_cast<SockInfo &>(*_socket));
}
}
if (_push_src) {
WarnL << "RTC推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")结束推流,耗时(s):" << duration;
if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, static_cast<SockInfo &>(*_socket));
}
}
}
void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play) {
assert(src);
_media_info = info;
if (is_play) {
_play_src = src;
} else {
_push_src = src;
}
}
void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) {
auto ptr = BufferRaw::create();
ptr->assign(buf, len);
_socket->send(ptr, (struct sockaddr *)(dst), sizeof(struct sockaddr), flush);
}
///////////////////////////////////////////////////////////////////
bool WebRtcTransportImp::canSendRtp() const{
auto &sdp = getSdp(SdpType::answer);
return _play_src && (sdp.media[0].direction == RtpDirection::sendrecv || sdp.media[0].direction == RtpDirection::sendonly);
}
bool WebRtcTransportImp::canRecvRtp() const{
auto &sdp = getSdp(SdpType::answer);
return _push_src && (sdp.media[0].direction == RtpDirection::sendrecv || sdp.media[0].direction == RtpDirection::recvonly);
}
void WebRtcTransportImp::onStartWebRTC() {
//获取ssrc和pt相关信息,届时收到rtp和rtcp时分别可以根据pt和ssrc找到相关的信息
for (auto &m_answer : getSdp(SdpType::answer).media) {
auto m_offer = getSdp(SdpType::offer).getMedia(m_answer.type);
auto info = std::make_shared<RtpPayloadInfo>();
info->media = &m_answer;
info->answer_ssrc_rtp = m_answer.getRtpSSRC();
info->answer_ssrc_rtx = m_answer.getRtxSSRC();
info->offer_ssrc_rtp = m_offer->getRtpSSRC();
info->offer_ssrc_rtx = m_offer->getRtxSSRC();
info->plan_rtp = &m_answer.plan[0];;
info->plan_rtx = m_answer.getRelatedRtxPlan(info->plan_rtp->pt);
info->rtcp_context_send = std::make_shared<RtcpContext>(info->plan_rtp->sample_rate, false);
//send ssrc --> RtpPayloadInfo
_rtp_info_ssrc[info->answer_ssrc_rtp] = std::make_pair(false, info);
_rtp_info_ssrc[info->answer_ssrc_rtx] = std::make_pair(true, info);
//recv ssrc --> RtpPayloadInfo
_rtp_info_ssrc[info->offer_ssrc_rtp] = std::make_pair(false, info);;
_rtp_info_ssrc[info->offer_ssrc_rtx] = std::make_pair(true, info);;
//rtp pt --> RtpPayloadInfo
_rtp_info_pt.emplace(info->plan_rtp->pt, std::make_pair(false, info));
if (info->plan_rtx) {
//rtx pt --> RtpPayloadInfo
_rtp_info_pt.emplace(info->plan_rtx->pt, std::make_pair(true, info));
}
if (m_offer->type != TrackApplication) {
//记录rtp ext类型与id的关系方便接收或发送rtp时修改rtp ext id
for (auto &ext : m_offer->extmap) {
auto ext_type = RtpExt::getExtType(ext.ext);
_rtp_ext_id_to_type.emplace(ext.id, ext_type);
_rtp_ext_type_to_id.emplace(ext_type, ext.id);
}
}
}
if (canRecvRtp()) {
_push_src->setSdp(getSdp(SdpType::answer).toRtspSdp());
}
if (canSendRtp()) {
_reader = _play_src->getRing()->attach(getPoller(), true);
weak_ptr<WebRtcTransportImp> weak_self = shared_from_this();
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
size_t i = 0;
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
strongSelf->onSendRtp(rtp, ++i == pkt->size());
});
});
_reader->setDetachCB([weak_self](){
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
strongSelf->onShutdown(SockException(Err_eof, "rtsp ring buffer detached"));
});
RtcSession rtsp_send_sdp;
rtsp_send_sdp.loadFrom(_play_src->getSdp(), false);
for (auto &m : getSdp(SdpType::answer).media) {
if (m.type == TrackApplication) {
continue;
}
auto rtsp_media = rtsp_send_sdp.getMedia(m.type);
if (rtsp_media && getCodecId(rtsp_media->plan[0].codec) == getCodecId(m.plan[0].codec)) {
auto it = _rtp_info_pt.find(m.plan[0].pt);
CHECK(it != _rtp_info_pt.end());
//记录发送rtp时约定的信息届时发送rtp时需要修改pt和ssrc
_send_rtp_info[m.type] = it->second.second;
}
}
}
//使用完毕后,释放强引用,这样确保推流器断开后能及时注销媒体
_play_src = nullptr;
}
void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp){
WebRtcTransport::onCheckSdp(type, sdp);
if (type != SdpType::answer) {
//我们只修改answer sdp
return;
}
//修改answer sdp的ip、端口信息
GET_CONFIG(string, extern_ip, RTC::kExternIP);
for (auto &m : sdp.media) {
m.addr.reset();
m.addr.address = extern_ip.empty() ? SockUtil::get_local_ip() : extern_ip;
m.rtcp_addr.reset();
m.rtcp_addr.address = m.addr.address;
m.rtcp_addr.port = _socket->get_local_port();
m.port = m.rtcp_addr.port;
sdp.origin.address = m.addr.address;
}
if (!canSendRtp()) {
//设置我们发送的rtp的ssrc
return;
}
for (auto &m : sdp.media) {
if (m.type == TrackApplication) {
continue;
}
//添加answer sdp的ssrc信息
m.rtp_rtx_ssrc.emplace_back();
m.rtp_rtx_ssrc[0].ssrc = _play_src->getSsrc(m.type);
m.rtp_rtx_ssrc[0].cname = RTP_CNAME;
m.rtp_rtx_ssrc[0].label = RTP_LABEL;
m.rtp_rtx_ssrc[0].mslabel = RTP_MSLABEL;
m.rtp_rtx_ssrc[0].msid = RTP_MSID;
if (m.getRelatedRtxPlan(m.plan[0].pt)) {
m.rtp_rtx_ssrc.emplace_back();
m.rtp_rtx_ssrc[1] = m.rtp_rtx_ssrc[0];
m.rtp_rtx_ssrc[1].ssrc += RTX_SSRC_OFFSET;
}
}
}
void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
WebRtcTransport::onRtcConfigure(configure);
if (_play_src) {
//这是播放,同时也可能有推流
configure.video.direction = _push_src ? RtpDirection::sendrecv : RtpDirection::sendonly;
configure.audio.direction = configure.video.direction;
configure.setPlayRtspInfo(_play_src->getSdp());
} else if (_push_src) {
//这只是推流
configure.video.direction = RtpDirection::recvonly;
configure.audio.direction = RtpDirection::recvonly;
} else {
throw std::invalid_argument("未设置播放或推流的媒体源");
}
//添加接收端口candidate信息
configure.addCandidate(*getIceCandidate());
}
SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{
auto candidate = std::make_shared<SdpAttrCandidate>();
candidate->foundation = "udpcandidate";
//rtp端口
candidate->component = 1;
candidate->transport = "udp";
//优先级单candidate时随便
candidate->priority = 100;
GET_CONFIG(string, extern_ip, RTC::kExternIP);
candidate->address = extern_ip.empty() ? SockUtil::get_local_ip() : extern_ip;
candidate->port = _socket->get_local_port();
candidate->type = "host";
return candidate;
}
///////////////////////////////////////////////////////////////////
class RtpReceiverImp : public RtpReceiver {
public:
RtpReceiverImp( function<void(RtpPacket::Ptr rtp)> cb){
_on_sort = std::move(cb);
}
~RtpReceiverImp() override = default;
bool inputRtp(TrackType type, int samplerate, uint8_t *ptr, size_t len){
return handleOneRtp((int) type, type, samplerate, ptr, len);
}
protected:
void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override {
_on_sort(std::move(rtp));
}
private:
function<void(RtpPacket::Ptr rtp)> _on_sort;
};
void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
_bytes_usage += len;
auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len);
for (auto rtcp : rtcps) {
switch ((RtcpType) rtcp->pt) {
case RtcpType::RTCP_SR : {
//对方汇报rtp发送情况
RtcpSR *sr = (RtcpSR *) rtcp;
auto it = _rtp_info_ssrc.find(sr->ssrc);
if (it != _rtp_info_ssrc.end()) {
auto rtx = it->second.first;
if (!rtx) {
auto &info = it->second.second;
auto it = info->rtcp_context_recv.find(sr->ssrc);
if (it != info->rtcp_context_recv.end()) {
it->second->onRtcp(sr);
auto rr = it->second->createRtcpRR(info->answer_ssrc_rtp, sr->ssrc);
sendRtcpPacket(rr->data(), rr->size(), true);
} else {
WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
}
}
} else {
WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
}
break;
}
case RtcpType::RTCP_RR : {
_alive_ticker.resetTime();
//对方汇报rtp接收情况
RtcpRR *rr = (RtcpRR *) rtcp;
for (auto item : rr->getItemList()) {
auto it = _rtp_info_ssrc.find(item->ssrc);
if (it != _rtp_info_ssrc.end()) {
auto rtx = it->second.first;
if (!rtx) {
auto &info = it->second.second;
auto sr = info->rtcp_context_send->createRtcpSR(info->answer_ssrc_rtp);
sendRtcpPacket(sr->data(), sr->size(), true);
}
} else {
WarnL << "未识别的rr rtcp包:" << rtcp->dumpString();
}
}
break;
}
case RtcpType::RTCP_BYE : {
//对方汇报停止发送rtp
RtcpBye *bye = (RtcpBye *) rtcp;
for (auto ssrc : bye->getSSRC()) {
auto it = _rtp_info_ssrc.find(*ssrc);
if (it == _rtp_info_ssrc.end()) {
WarnL << "未识别的bye rtcp包:" << rtcp->dumpString();
continue;
}
_rtp_info_ssrc.erase(it);
}
onShutdown(SockException(Err_eof, "rtcp bye message received"));
break;
}
case RtcpType::RTCP_PSFB:
case RtcpType::RTCP_RTPFB: {
if ((RtcpType) rtcp->pt == RtcpType::RTCP_PSFB) {
break;
}
//RTPFB
switch ((RTPFBType) rtcp->report_count) {
case RTPFBType::RTCP_RTPFB_NACK : {
RtcpFB *fb = (RtcpFB *) rtcp;
auto it = _rtp_info_ssrc.find(fb->ssrc_media);
if (it == _rtp_info_ssrc.end()) {
WarnL << "未识别的 rtcp包:" << rtcp->dumpString();
return;
}
auto rtx = it->second.first;
if (!rtx) {
auto &info = it->second.second;
auto &fci = fb->getFci<FCI_NACK>();
info->nack_list.for_each_nack(fci, [&](const RtpPacket::Ptr &rtp) {
//rtp重传
onSendRtp(rtp, true, true);
});
}
break;
}
default: break;
}
break;
}
default: break;
}
}
}
///////////////////////////////////////////////////////////////////
void WebRtcTransportImp::changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *header, bool is_recv, bool is_rtx, string *rid_ptr) const{
auto ext_map = RtpExt::getExtValue(header);
for (auto &pr : ext_map) {
if (is_recv) {
auto it = _rtp_ext_id_to_type.find(pr.first);
if (it == _rtp_ext_id_to_type.end()) {
WarnL << "接收rtp时,忽略不识别的rtp ext, id=" << (int) pr.first;
pr.second.clearExt();
continue;
}
pr.second.setType(it->second);
//重新赋值ext id为 ext type作为后面处理ext的统一中间类型
pr.second.setExtId((uint8_t) it->second);
switch(it->second){
case RtpExtType::sdes_repaired_rtp_stream_id :
case RtpExtType::sdes_rtp_stream_id : {
auto ssrc = ntohl(header->ssrc);
auto rid = it->second == RtpExtType::sdes_rtp_stream_id ? pr.second.getRtpStreamId() : pr.second.getRepairedRtpStreamId();
//根据rid获取rtp或rtx的ssrc
auto &ssrc_ref = is_rtx ? info.rid_ssrc[rid].second : info.rid_ssrc[rid].first;
if (!ssrc_ref) {
//ssrc未赋值赋值
ssrc_ref = ssrc;
DebugL << (is_rtx ? "got rid of rtx:" : "got rid:") << rid << ", ssrc:" << ssrc;
}
if (is_rtx) {
//rtx ssrc --> rtp ssrc
auto &rtp_ssrc_ref = info.rtx_ssrc_to_rtp_ssrc[ssrc];
if (!rtp_ssrc_ref && info.rid_ssrc[rid].first) {
//未找到rtx到rtp ssrc的映射关系且已经获取rtp的ssrc那么设置映射关系
rtp_ssrc_ref = info.rid_ssrc[rid].first;
DebugL << "got ssrc of rid:" << rid << ", [rtx-rtp]:" << ssrc << "-" << rtp_ssrc_ref;
}
}
if (rid_ptr) {
*rid_ptr = rid;
}
break;
}
default : break;
}
} else {
pr.second.setType((RtpExtType) pr.first);
auto it = _rtp_ext_type_to_id.find((RtpExtType) pr.first);
if (it == _rtp_ext_type_to_id.end()) {
WarnL << "发送rtp时, 忽略不被客户端支持rtp ext:" << pr.second.dumpString();
pr.second.clearExt();
continue;
}
//重新赋值ext id为客户端sdp声明的类型
pr.second.setExtId(it->second);
}
}
}
void WebRtcTransportImp::onRtp(const char *buf, size_t len) {
onRtp_l(buf, len, false);
}
void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) {
if (!rtx) {
_bytes_usage += len;
_alive_ticker.resetTime();
}
RtpHeader *rtp = (RtpHeader *) buf;
auto ssrc = ntohl(rtp->ssrc);
//根据接收到的rtp的pt信息找到该流的信息
auto it = _rtp_info_pt.find(rtp->pt);
if (it == _rtp_info_pt.end()) {
WarnL;
return;
}
auto &info = it->second.second;
if (!it->second.first) {
//这是普通的rtp数据
auto seq = ntohs(rtp->seq);
#if 0
if (!rtx && info->media->type == TrackVideo && seq % 100 == 0) {
//此处模拟接受丢包
DebugL << "recv dropped:" << seq;
return;
}
#endif
auto &ref = info->receiver[ssrc];
if (!rtx) {
//统计rtp接受情况便于生成nack rtcp包
info->nack_ctx[ssrc].received(seq);
//时间戳转换成毫秒
auto stamp_ms = ntohl(rtp->stamp) * uint64_t(1000) / info->plan_rtp->sample_rate;
//统计rtp收到的情况好做rr汇报
auto &cxt_ref = info->rtcp_context_recv[ssrc];
if (!cxt_ref) {
cxt_ref = std::make_shared<RtcpContext>(info->plan_rtp->sample_rate, true);
}
cxt_ref->onRtp(seq, stamp_ms, len);
//修改ext id至统一
string rid;
changeRtpExtId(*info, rtp, true, false, &rid);
if (!ref) {
ref = std::make_shared<RtpReceiverImp>([info, this, rid](RtpPacket::Ptr rtp) mutable {
onSortedRtp(*info, rid, std::move(rtp));
});
info->nack_ctx[ssrc].setOnNack([info, this, ssrc](const FCI_NACK &nack) mutable {
onSendNack(*info, nack, ssrc);
});
//recv simulcast ssrc --> RtpPayloadInfo
_rtp_info_ssrc[ssrc] = std::make_pair(false, info);
InfoL << "receive rtp of ssrc:" << ssrc;
}
}
//解析并排序rtp
if(!ref){
InfoL << "ignore no rtp receiver of ssrc:" << ssrc<<" is rtx:"<<rtx;
return;
}
assert(ref);
ref->inputRtp(info->media->type, info->plan_rtp->sample_rate, (uint8_t *) buf, len);
return;
}
//这里是rtx重传包
//https://datatracker.ietf.org/doc/html/rfc4588#section-4
auto payload = rtp->getPayloadData();
auto size = rtp->getPayloadSize(len);
if (size < 2) {
return;
}
//修改ext id至统一
changeRtpExtId(*info, rtp, true, true);
//前两个字节是原始的rtp的seq
auto origin_seq = payload[0] << 8 | payload[1];
rtp->seq = htons(origin_seq);
if (info->offer_ssrc_rtp) {
//非simulcast或音频
rtp->ssrc = htonl(info->offer_ssrc_rtp);
TraceL << "received rtx rtp,ssrc: " << ssrc << ", seq:" << origin_seq << ", pt:" << (int)rtp->pt;
} else {
//todo simulcast下辅码流通过rtx传输
//simulcast情况下根据rtx的ssrc查找rtp的ssrc
rtp->ssrc = htonl(info->rtx_ssrc_to_rtp_ssrc[ntohl(rtp->ssrc)]);
}
rtp->pt = info->plan_rtp->pt;
memmove((uint8_t *) buf + 2, buf, payload - (uint8_t *) buf);
buf += 2;
len -= 2;
onRtp_l(buf, len, true);
}
void WebRtcTransportImp::onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, uint32_t ssrc) {
auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize);
rtcp->ssrc = htons(info.answer_ssrc_rtp);
rtcp->ssrc_media = htonl(ssrc);
DebugL << htonl(ssrc) << " " << nack.getPid();
sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true);
}
///////////////////////////////////////////////////////////////////
void WebRtcTransportImp::onSortedRtp(RtpPayloadInfo &info, const string &rid, RtpPacket::Ptr rtp) {
if (info.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) {
//定期发送pli请求关键帧方便非rtc等协议
_pli_ticker.resetTime();
sendRtcpPli(rtp->getSSRC());
//开启remb则发送remb包调节比特率
GET_CONFIG(size_t, remb_bit_rate, RTC::kRembBitRate);
if (remb_bit_rate && getSdp(SdpType::answer).supportRtcpFb(SdpConst::kRembRtcpFb)) {
sendRtcpRemb(rtp->getSSRC(), remb_bit_rate);
}
}
if (_push_src) {
if (rtp->type == TrackAudio) {
//音频
for (auto &pr : _push_src_simulcast) {
pr.second->onWrite(rtp, false);
}
} else {
//视频
auto &src = _push_src_simulcast[rid];
if (!src) {
auto stream_id = rid.empty() ? _push_src->getId() : _push_src->getId() + "_" + rid;
auto src_imp = std::make_shared<RtspMediaSourceImp>(_push_src->getVhost(), _push_src->getApp(), stream_id);
src_imp->setSdp(_push_src->getSdp());
src_imp->setProtocolTranslation(_push_src->isRecording(Recorder::type_hls),_push_src->isRecording(Recorder::type_mp4));
src_imp->setListener(shared_from_this());
src = src_imp;
}
src->onWrite(std::move(rtp), false);
}
}
}
///////////////////////////////////////////////////////////////////
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx){
auto &info = _send_rtp_info[rtp->type];
if (!info) {
//忽略,对方不支持该编码类型
return;
}
if (!rtx) {
//统计rtp发送情况好做sr汇报
info->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
info->nack_list.push_back(rtp);
#if 0
//此处模拟发送丢包
if (rtp->type == TrackVideo && rtp->getSeq() % 100 == 0) {
DebugL << "send dropped:" << rtp->getSeq();
return;
}
#endif
} else {
WarnL << "send rtx rtp:" << rtp->getSeq();
}
pair<bool/*rtx*/, RtpPayloadInfo *> ctx{rtx, info.get()};
sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx);
_bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize;
}
void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) {
auto pr = (pair<bool/*rtx*/, RtpPayloadInfo *> *) ctx;
auto header = (RtpHeader *) buf;
if (!pr->first || !pr->second->plan_rtx) {
//普通的rtp,或者不支持rtx, 修改目标pt和ssrc
changeRtpExtId(*pr->second, header, false, false);
header->pt = pr->second->plan_rtp->pt;
header->ssrc = htonl(pr->second->answer_ssrc_rtp);
} else {
//重传的rtp, rtx
changeRtpExtId(*pr->second, header, false, true);
header->pt = pr->second->plan_rtx->pt;
if (pr->second->answer_ssrc_rtx) {
//有rtx单独的ssrc,有些情况下浏览器支持rtx但是未指定rtx单独的ssrc
header->ssrc = htonl(pr->second->answer_ssrc_rtx);
} else {
//未单独指定rtx的ssrc那么使用rtp的ssrc
header->ssrc = htonl(pr->second->answer_ssrc_rtp);
}
auto origin_seq = ntohs(header->seq);
//seq跟原来的不一样
header->seq = htons(_rtx_seq[pr->second->media->type]++);
auto payload = header->getPayloadData();
auto payload_size = header->getPayloadSize(len);
if (payload_size) {
//rtp负载后移两个字节这两个字节用于存放osn
//https://datatracker.ietf.org/doc/html/rfc4588#section-4
memmove(payload + 2, payload, payload_size);
}
payload[0] = origin_seq >> 8;
payload[1] = origin_seq & 0xFF;
len += 2;
}
}
void WebRtcTransportImp::onShutdown(const SockException &ex){
WarnL << ex.what();
_self = nullptr;
}
/////////////////////////////////////////////////////////////////////////////////////////////
bool WebRtcTransportImp::close(MediaSource &sender, bool force) {
//此回调在其他线程触发
if(!_push_src || (!force && _push_src->totalReaderCount())){
return false;
}
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
onShutdown(SockException(Err_shutdown,err));
return true;
}
int WebRtcTransportImp::totalReaderCount(MediaSource &sender) {
return _push_src ? _push_src->totalReaderCount() : sender.readerCount();
}
MediaOriginType WebRtcTransportImp::getOriginType(MediaSource &sender) const {
return MediaOriginType::rtc_push;
}
string WebRtcTransportImp::getOriginUrl(MediaSource &sender) const {
return "";
}
std::shared_ptr<SockInfo> WebRtcTransportImp::getOriginSock(MediaSource &sender) const {
return const_cast<WebRtcTransportImp *>(this)->shared_from_this();
}
/////////////////////////////////////////////////////////////////////////////////////////////
string WebRtcTransportImp::get_local_ip() {
return getSdp(SdpType::answer).media[0].candidate[0].address;
}
uint16_t WebRtcTransportImp::get_local_port() {
return _socket->get_local_port();
}
string WebRtcTransportImp::get_peer_ip() {
return SockUtil::inet_ntoa(((struct sockaddr_in *) getSelectedTuple())->sin_addr);
}
uint16_t WebRtcTransportImp::get_peer_port() {
return ntohs(((struct sockaddr_in *) getSelectedTuple())->sin_port);
}
string WebRtcTransportImp::getIdentifier() const {
return StrPrinter << this;
}