ZLMediaKit/webrtc/WebRtcTransport.cpp

871 lines
31 KiB
C++
Raw Normal View History

2021-04-09 20:42:36 +08:00
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "WebRtcTransport.h"
2021-03-24 16:52:41 +08:00
#include <iostream>
2021-05-07 18:34:04 +08:00
#include "RtpExt.h"
2021-03-24 16:52:41 +08:00
#include "Rtcp/Rtcp.h"
2021-04-28 15:41:36 +08:00
#include "Rtcp/RtcpFCI.h"
2021-04-02 23:01:58 +08:00
#include "Rtsp/RtpReceiver.h"
2021-04-28 15:41:36 +08:00
2021-04-02 23:01:58 +08:00
#define RTX_SSRC_OFFSET 2
#define RTP_CNAME "zlmediakit-rtp"
2021-05-08 15:01:14 +08:00
#define RTP_LABEL "zlmediakit-label"
#define RTP_MSLABEL "zlmediakit-mslabel"
#define RTP_MSID RTP_MSLABEL " " RTP_LABEL
2021-04-02 23:01:58 +08:00
2021-04-07 17:51:06 +08:00
//RTC配置项目
namespace RTC {
#define RTC_FIELD "rtc."
//rtp和rtcp接受超时时间
const string kTimeOutSec = RTC_FIELD"timeoutSec";
//服务器外网ip
const string kExternIP = RTC_FIELD"externIP";
2021-04-28 15:41:36 +08:00
//设置remb比特率非0时关闭twcc并开启remb。该设置在rtc推流时有效可以控制推流画质
const string kRembBitRate = RTC_FIELD"rembBitRate";
2021-04-07 17:51:06 +08:00
static onceToken token([]() {
mINI::Instance()[kTimeOutSec] = 15;
mINI::Instance()[kExternIP] = "";
2021-04-28 15:41:36 +08:00
mINI::Instance()[kRembBitRate] = 0;
2021-04-07 17:51:06 +08:00
});
}//namespace RTC
2021-03-27 10:16:49 +08:00
WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) {
2021-04-04 23:20:10 +08:00
_poller = poller;
2021-04-02 17:08:11 +08:00
_dtls_transport = std::make_shared<RTC::DtlsTransport>(poller, this);
2021-04-04 23:20:10 +08:00
_ice_server = std::make_shared<RTC::IceServer>(this, makeRandStr(4), makeRandStr(28).substr(4));
2021-03-24 16:52:41 +08:00
}
2021-04-07 17:21:59 +08:00
void WebRtcTransport::onCreate(){
}
2021-03-27 10:16:49 +08:00
void WebRtcTransport::onDestory(){
2021-04-02 17:08:11 +08:00
_dtls_transport = nullptr;
_ice_server = nullptr;
2021-03-27 09:38:13 +08:00
}
2021-03-24 16:52:41 +08:00
2021-04-04 23:20:10 +08:00
const EventPoller::Ptr& WebRtcTransport::getPoller() const{
return _poller;
}
2021-03-26 11:07:03 +08:00
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) {
2021-04-02 17:08:11 +08:00
onSendSockData((char *) packet->GetData(), packet->GetSize(), (struct sockaddr_in *) tuple);
2021-03-26 11:07:03 +08:00
}
void WebRtcTransport::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) {
InfoL;
}
void WebRtcTransport::OnIceServerConnected(const RTC::IceServer *iceServer) {
InfoL;
2021-04-01 14:16:42 +08:00
}
void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer *iceServer) {
InfoL;
2021-04-01 10:09:50 +08:00
if (_answer_sdp->media[0].role == DtlsRole::passive) {
2021-04-02 17:08:11 +08:00
_dtls_transport->Run(RTC::DtlsTransport::Role::SERVER);
2021-04-01 10:09:50 +08:00
} else {
2021-04-02 17:08:11 +08:00
_dtls_transport->Run(RTC::DtlsTransport::Role::CLIENT);
2021-04-01 10:09:50 +08:00
}
2021-03-26 11:07:03 +08:00
}
void WebRtcTransport::OnIceServerDisconnected(const RTC::IceServer *iceServer) {
InfoL;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnDtlsTransportConnected(
const RTC::DtlsTransport *dtlsTransport,
RTC::SrtpSession::CryptoSuite srtpCryptoSuite,
uint8_t *srtpLocalKey,
size_t srtpLocalKeyLen,
uint8_t *srtpRemoteKey,
size_t srtpRemoteKeyLen,
std::string &remoteCert) {
InfoL;
2021-04-02 17:08:11 +08:00
_srtp_session_send = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen);
2021-04-02 23:01:58 +08:00
_srtp_session_recv = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen);
2021-04-02 17:08:11 +08:00
onStartWebRTC();
2021-03-26 11:07:03 +08:00
}
void WebRtcTransport::OnDtlsTransportSendData(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
2021-04-02 17:08:11 +08:00
onSendSockData((char *)data, len);
2021-03-26 11:07:03 +08:00
}
2021-04-07 17:51:06 +08:00
void WebRtcTransport::OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
}
void WebRtcTransport::OnDtlsTransportFailed(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
onShutdown(SockException(Err_shutdown, "dtls transport failed"));
}
void WebRtcTransport::OnDtlsTransportClosed(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
onShutdown(SockException(Err_shutdown, "dtls close notify received"));
}
void WebRtcTransport::OnDtlsTransportApplicationDataReceived(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
InfoL << hexdump(data, len);
}
2021-03-26 11:07:03 +08:00
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
2021-04-02 17:56:24 +08:00
void WebRtcTransport::onSendSockData(const char *buf, size_t len, bool flush){
2021-04-02 17:08:11 +08:00
auto tuple = _ice_server->GetSelectedTuple();
2021-03-26 11:07:03 +08:00
assert(tuple);
2021-04-02 17:56:24 +08:00
onSendSockData(buf, len, (struct sockaddr_in *) tuple, flush);
2021-03-26 11:07:03 +08:00
}
2021-04-02 20:35:43 +08:00
const RtcSession& WebRtcTransport::getSdp(SdpType type) const{
switch (type) {
case SdpType::offer: return *_offer_sdp;
case SdpType::answer: return *_answer_sdp;
default: throw std::invalid_argument("不识别的sdp类型");
}
}
RTC::TransportTuple* WebRtcTransport::getSelectedTuple() const{
return _ice_server->GetSelectedTuple();
}
2021-04-28 15:41:36 +08:00
void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) {
auto remb = FCI_REMB::create({ssrc}, (uint32_t)bit_rate);
auto fb = RtcpFB::create(PSFBType::RTCP_PSFB_REMB, remb.data(), remb.size());
fb->ssrc = htonl(0);
fb->ssrc_media = htonl(ssrc);
sendRtcpPacket((char *) fb.get(), fb->getSize(), true);
TraceL << ssrc << " " << bit_rate;
}
void WebRtcTransport::sendRtcpPli(uint32_t ssrc) {
auto pli = RtcpFB::create(PSFBType::RTCP_PSFB_PLI);
pli->ssrc = htonl(0);
pli->ssrc_media = htonl(ssrc);
sendRtcpPacket((char *) pli.get(), pli->getSize(), true);
}
2021-04-01 10:09:50 +08:00
string getFingerprint(const string &algorithm_str, const std::shared_ptr<RTC::DtlsTransport> &transport){
auto algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(algorithm_str);
for (auto &finger_prints : transport->GetLocalFingerprints()) {
if (finger_prints.algorithm == algorithm) {
return finger_prints.value;
}
}
throw std::invalid_argument(StrPrinter << "不支持的加密算法:" << algorithm_str);
}
2021-04-02 17:08:11 +08:00
void WebRtcTransport::setRemoteDtlsFingerprint(const RtcSession &remote){
//设置远端dtls签名
RTC::DtlsTransport::Fingerprint remote_fingerprint;
remote_fingerprint.algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(_offer_sdp->media[0].fingerprint.algorithm);
remote_fingerprint.value = _offer_sdp->media[0].fingerprint.hash;
_dtls_transport->SetRemoteFingerprint(remote_fingerprint);
}
2021-04-05 00:12:46 +08:00
void WebRtcTransport::onCheckSdp(SdpType type, RtcSession &sdp){
2021-04-02 17:08:11 +08:00
for (auto &m : sdp.media) {
if (m.type != TrackApplication && !m.rtcp_mux) {
throw std::invalid_argument("只支持rtcp-mux模式");
}
}
if (sdp.group.mids.empty()) {
throw std::invalid_argument("只支持group BUNDLE模式");
}
2021-05-10 18:11:12 +08:00
if (type == SdpType::offer) {
sdp.checkValidSSRC();
}
2021-04-02 17:08:11 +08:00
}
2021-04-28 15:41:36 +08:00
void WebRtcTransport::onRtcConfigure(RtcConfigure &configure) const {
//开启remb后关闭twcc因为开启twcc后remb无效
GET_CONFIG(size_t, remb_bit_rate, RTC::kRembBitRate);
2021-05-07 18:34:04 +08:00
configure.enableTWCC(!remb_bit_rate);
2021-04-28 15:41:36 +08:00
}
2021-03-31 17:15:26 +08:00
std::string WebRtcTransport::getAnswerSdp(const string &offer){
try {
//// 解析offer sdp ////
_offer_sdp = std::make_shared<RtcSession>();
_offer_sdp->loadFrom(offer);
onCheckSdp(SdpType::offer, *_offer_sdp);
setRemoteDtlsFingerprint(*_offer_sdp);
//// sdp 配置 ////
SdpAttrFingerprint fingerprint;
fingerprint.algorithm = _offer_sdp->media[0].fingerprint.algorithm;
fingerprint.hash = getFingerprint(fingerprint.algorithm, _dtls_transport);
RtcConfigure configure;
configure.setDefaultSetting(_ice_server->GetUsernameFragment(), _ice_server->GetPassword(),
RtpDirection::sendrecv, fingerprint);
onRtcConfigure(configure);
//// 生成answer sdp ////
_answer_sdp = configure.createAnswer(*_offer_sdp);
onCheckSdp(SdpType::answer, *_answer_sdp);
return _answer_sdp->toString();
} catch (exception &ex) {
onShutdown(SockException(Err_shutdown, ex.what()));
throw;
}
2021-03-31 17:15:26 +08:00
}
2021-03-24 16:52:41 +08:00
bool is_dtls(char *buf) {
return ((*buf > 19) && (*buf < 64));
}
bool is_rtp(char *buf) {
RtpHeader *header = (RtpHeader *) buf;
return ((header->pt < 64) || (header->pt >= 96));
}
bool is_rtcp(char *buf) {
RtpHeader *header = (RtpHeader *) buf;
return ((header->pt >= 64) && (header->pt < 96));
}
2021-04-02 17:08:11 +08:00
void WebRtcTransport::inputSockData(char *buf, size_t len, RTC::TransportTuple *tuple) {
2021-03-24 16:52:41 +08:00
if (RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {
RTC::StunPacket *packet = RTC::StunPacket::Parse((const uint8_t *) buf, len);
if (packet == nullptr) {
WarnL << "parse stun error" << std::endl;
return;
}
2021-04-02 17:08:11 +08:00
_ice_server->ProcessStunPacket(packet, tuple);
2021-03-24 16:52:41 +08:00
return;
}
2021-03-26 11:07:03 +08:00
if (is_dtls(buf)) {
2021-04-02 17:08:11 +08:00
_dtls_transport->ProcessDtlsData((uint8_t *) buf, len);
2021-03-24 16:52:41 +08:00
return;
}
2021-04-01 14:16:42 +08:00
if (is_rtp(buf)) {
2021-04-02 17:56:24 +08:00
if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) {
onRtp(buf, len);
2021-04-03 08:45:18 +08:00
} else {
WarnL;
2021-04-02 17:56:24 +08:00
}
2021-04-01 14:16:42 +08:00
return;
}
if (is_rtcp(buf)) {
2021-04-02 17:56:24 +08:00
if (_srtp_session_recv->DecryptSrtcp((uint8_t *) buf, &len)) {
onRtcp(buf, len);
2021-04-03 08:45:18 +08:00
} else {
WarnL;
2021-04-02 17:56:24 +08:00
}
2021-04-01 14:16:42 +08:00
return;
}
2021-03-24 16:52:41 +08:00
}
2021-05-11 00:54:33 +08:00
void WebRtcTransport::sendRtpPacket(char *buf, size_t len, bool flush, void *ctx) {
2021-04-02 17:08:11 +08:00
if (_srtp_session_send) {
CHECK(len + SRTP_MAX_TRAILER_LEN <= sizeof(_srtp_buf));
memcpy(_srtp_buf, buf, len);
2021-05-11 00:54:33 +08:00
onBeforeEncryptRtp((char *) _srtp_buf, len, ctx);
if (_srtp_session_send->EncryptRtp(_srtp_buf, &len)) {
onSendSockData((char *) _srtp_buf, len, flush);
}
2021-03-24 16:52:41 +08:00
}
}
2021-05-11 00:54:33 +08:00
void WebRtcTransport::sendRtcpPacket(char *buf, size_t len, bool flush, void *ctx){
2021-04-02 23:12:37 +08:00
if (_srtp_session_send) {
CHECK(len + SRTP_MAX_TRAILER_LEN <= sizeof(_srtp_buf));
memcpy(_srtp_buf, buf, len);
2021-05-11 00:54:33 +08:00
onBeforeEncryptRtcp((char *) _srtp_buf, len, ctx);
if (_srtp_session_send->EncryptRtcp(_srtp_buf, &len)) {
onSendSockData((char *) _srtp_buf, len, flush);
}
2021-04-02 23:12:37 +08:00
}
}
2021-03-24 16:52:41 +08:00
///////////////////////////////////////////////////////////////////////////////////
2021-03-27 10:16:49 +08:00
WebRtcTransportImp::Ptr WebRtcTransportImp::create(const EventPoller::Ptr &poller){
WebRtcTransportImp::Ptr ret(new WebRtcTransportImp(poller), [](WebRtcTransportImp *ptr){
ptr->onDestory();
delete ptr;
});
2021-04-07 17:21:59 +08:00
ret->onCreate();
2021-03-27 10:16:49 +08:00
return ret;
}
2021-03-24 16:52:41 +08:00
2021-04-07 17:21:59 +08:00
void WebRtcTransportImp::onCreate(){
WebRtcTransport::onCreate();
_socket = Socket::createSocket(getPoller(), false);
2021-03-24 16:52:41 +08:00
//随机端口,绑定全部网卡
_socket->bindUdpSock(0);
2021-04-07 17:21:59 +08:00
weak_ptr<WebRtcTransportImp> weak_self = shared_from_this();
_socket->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->inputSockData(buf->data(), buf->size(), addr);
}
2021-03-24 16:52:41 +08:00
});
2021-04-07 17:51:06 +08:00
_self = shared_from_this();
GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec);
_timer = std::make_shared<Timer>(timeoutSec / 2, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) {
strong_self->onShutdown(SockException(Err_timeout, "接受rtp和rtcp超时"));
}
return true;
}, getPoller());
2021-03-24 16:52:41 +08:00
}
2021-04-07 17:21:59 +08:00
WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) {
2021-04-07 17:51:06 +08:00
InfoL << this;
}
2021-04-07 17:21:59 +08:00
2021-04-07 17:51:06 +08:00
WebRtcTransportImp::~WebRtcTransportImp() {
InfoL << this;
2021-04-07 17:21:59 +08:00
}
2021-03-27 10:16:49 +08:00
void WebRtcTransportImp::onDestory() {
WebRtcTransport::onDestory();
2021-04-07 18:35:38 +08:00
uint64_t duration = _alive_ticker.createdTime() / 1000;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_play_src) {
2021-04-07 20:40:42 +08:00
WarnL << "RTC播放器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")结束播放,耗时(s):" << duration;
2021-04-07 18:35:38 +08:00
if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, static_cast<SockInfo &>(*_socket));
}
}
if (_push_src) {
2021-04-07 20:40:42 +08:00
WarnL << "RTC推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")结束推流,耗时(s):" << duration;
2021-04-07 18:35:38 +08:00
if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, static_cast<SockInfo &>(*_socket));
}
}
2021-03-27 10:16:49 +08:00
}
2021-04-07 18:35:38 +08:00
void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play) {
2021-03-24 16:52:41 +08:00
assert(src);
2021-04-07 18:35:38 +08:00
_media_info = info;
2021-04-05 11:32:38 +08:00
if (is_play) {
_play_src = src;
} else {
_push_src = src;
}
2021-03-24 16:52:41 +08:00
}
2021-04-03 08:32:20 +08:00
void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) {
auto ptr = BufferRaw::create();
ptr->assign(buf, len);
_socket->send(ptr, (struct sockaddr *)(dst), sizeof(struct sockaddr), flush);
}
2021-04-03 08:45:18 +08:00
///////////////////////////////////////////////////////////////////
bool WebRtcTransportImp::canSendRtp() const{
auto &sdp = getSdp(SdpType::answer);
2021-04-05 11:32:38 +08:00
return _play_src && (sdp.media[0].direction == RtpDirection::sendrecv || sdp.media[0].direction == RtpDirection::sendonly);
2021-04-03 08:45:18 +08:00
}
bool WebRtcTransportImp::canRecvRtp() const{
auto &sdp = getSdp(SdpType::answer);
2021-04-05 11:32:38 +08:00
return _push_src && (sdp.media[0].direction == RtpDirection::sendrecv || sdp.media[0].direction == RtpDirection::recvonly);
2021-04-03 08:45:18 +08:00
}
2021-05-10 18:11:12 +08:00
const RtcSession& WebRtcTransportImp::getSdpWithSSRC() const{
auto &offer = getSdp(SdpType::offer);
if (offer.haveSSRC()) {
return offer;
}
auto &answer = getSdp(SdpType::answer);
CHECK(answer.haveSSRC());
return answer;
}
2021-04-02 17:08:11 +08:00
void WebRtcTransportImp::onStartWebRTC() {
2021-05-08 10:49:09 +08:00
//获取ssrc和pt相关信息,届时收到rtp和rtcp时分别可以根据pt和ssrc找到相关的信息
2021-04-04 23:20:10 +08:00
for (auto &m : getSdp(SdpType::offer).media) {
for (auto &plan : m.plan) {
auto hit_pan = getSdp(SdpType::answer).getMedia(m.type)->getPlan(plan.pt);
if (!hit_pan) {
continue;
2021-04-02 23:01:58 +08:00
}
2021-05-10 18:11:12 +08:00
auto m_with_ssrc = getSdpWithSSRC().getMedia(m.type);
2021-04-04 23:20:10 +08:00
//获取offer端rtp的ssrc和pt相关信息
auto &ref = _rtp_info_pt[plan.pt];
ref.plan = &plan;
2021-05-10 18:11:12 +08:00
ref.media = m_with_ssrc;
2021-04-04 23:20:10 +08:00
ref.is_common_rtp = getCodecId(plan.codec) != CodecInvalid;
2021-05-11 00:54:33 +08:00
if (ref.is_common_rtp) {
//rtp
_rtp_info_ssrc[m_with_ssrc->rtp_rtx_ssrc[0].ssrc] = &ref;
2021-05-11 15:48:01 +08:00
} else {
//rtx
auto apt = atoi(plan.getFmtp("apt").data());
ref.plan_apt = m_with_ssrc->getPlan(apt);
2021-05-11 00:54:33 +08:00
}
2021-04-04 23:20:10 +08:00
ref.rtcp_context_recv = std::make_shared<RtcpContext>(ref.plan->sample_rate, true);
ref.rtcp_context_send = std::make_shared<RtcpContext>(ref.plan->sample_rate, false);
2021-05-11 11:18:55 +08:00
ref.receiver = std::make_shared<RtpReceiverImp>([&ref, this](RtpPacket::Ptr rtp) mutable{
2021-04-04 23:20:10 +08:00
onSortedRtp(ref, std::move(rtp));
2021-05-11 11:18:55 +08:00
}, [&ref, this](const RtpPacket::Ptr &rtp) mutable {
2021-04-04 23:20:10 +08:00
onBeforeSortedRtp(ref, rtp);
});
2021-05-11 12:12:28 +08:00
ref.nack_ctx.setOnNack([&ref, this](const FCI_NACK &nack) mutable{
onNack(ref, nack);
});
2021-04-02 23:01:58 +08:00
}
2021-05-08 10:49:09 +08:00
if (m.type != TrackApplication) {
//记录rtp ext类型与id的关系方便接收或发送rtp时修改rtp ext id
2021-05-10 10:14:42 +08:00
for (auto &ext : m.extmap) {
auto ext_type = RtpExt::getExtType(ext.ext);
_rtp_ext_id_to_type.emplace(ext.id, ext_type);
_rtp_ext_type_to_id.emplace(ext_type, ext.id);
2021-05-08 10:49:09 +08:00
}
}
2021-04-02 23:01:58 +08:00
}
2021-04-04 23:20:10 +08:00
if (canRecvRtp()) {
2021-04-05 11:32:38 +08:00
_push_src->setSdp(getSdp(SdpType::answer).toRtspSdp());
2021-04-04 23:20:10 +08:00
}
2021-04-03 08:32:20 +08:00
if (canSendRtp()) {
2021-04-07 17:21:59 +08:00
_reader = _play_src->getRing()->attach(getPoller(), true);
2021-04-03 08:32:20 +08:00
weak_ptr<WebRtcTransportImp> weak_self = shared_from_this();
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
size_t i = 0;
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
strongSelf->onSendRtp(rtp, ++i == pkt->size());
});
2021-03-24 16:52:41 +08:00
});
2021-05-08 10:49:09 +08:00
RtcSession rtsp_send_sdp;
rtsp_send_sdp.loadFrom(_play_src->getSdp(), false);
2021-05-10 18:11:12 +08:00
for (auto &m : getSdp(SdpType::answer).media) {
2021-05-08 10:49:09 +08:00
if (m.type == TrackApplication) {
continue;
}
auto rtsp_media = rtsp_send_sdp.getMedia(m.type);
if (rtsp_media && getCodecId(rtsp_media->plan[0].codec) == getCodecId(m.plan[0].codec)) {
2021-05-10 18:11:12 +08:00
auto it = _rtp_info_pt.find(m.plan[0].pt);
CHECK(it != _rtp_info_pt.end());
//记录发送rtp时约定的信息届时发送rtp时需要修改pt和ssrc
_send_rtp_info[m.type] = &it->second;
2021-05-08 10:49:09 +08:00
}
}
2021-04-02 20:35:43 +08:00
}
}
2021-04-05 00:12:46 +08:00
void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp){
2021-04-02 17:56:24 +08:00
WebRtcTransport::onCheckSdp(type, sdp);
2021-04-27 01:16:01 +08:00
if (type != SdpType::answer) {
2021-05-10 18:11:12 +08:00
//我们只修改answer sdp
2021-04-27 01:16:01 +08:00
return;
}
2021-05-10 18:11:12 +08:00
//修改answer sdp的ip、端口信息
2021-04-27 01:16:01 +08:00
GET_CONFIG(string, extern_ip, RTC::kExternIP);
for (auto &m : sdp.media) {
m.addr.reset();
m.addr.address = extern_ip.empty() ? SockUtil::get_local_ip() : extern_ip;
m.rtcp_addr.reset();
m.rtcp_addr.address = m.addr.address;
m.rtcp_addr.port = _socket->get_local_port();
m.port = m.rtcp_addr.port;
sdp.origin.address = m.addr.address;
}
2021-05-10 18:11:12 +08:00
if (!canSendRtp() || getSdp(SdpType::offer).haveSSRC()) {
//offer sdp未包含ssrc相关信息那么我们才在answer sdp中回复ssrc相关信息
2021-04-02 17:56:24 +08:00
return;
}
2021-04-02 20:35:43 +08:00
2021-04-02 17:56:24 +08:00
for (auto &m : sdp.media) {
if (m.type == TrackApplication) {
continue;
}
2021-04-05 00:06:21 +08:00
//添加answer sdp的ssrc信息
2021-05-08 18:39:35 +08:00
m.rtp_rtx_ssrc.emplace_back();
m.rtp_rtx_ssrc[0].ssrc = _play_src->getSsrc(m.type);
m.rtp_rtx_ssrc[0].cname = RTP_CNAME;
m.rtp_rtx_ssrc[0].label = RTP_LABEL;
m.rtp_rtx_ssrc[0].mslabel = RTP_MSLABEL;
m.rtp_rtx_ssrc[0].msid = RTP_MSID;
2021-05-08 15:01:14 +08:00
if (m.getRelatedRtxPlan(m.plan[0].pt)) {
2021-05-08 18:39:35 +08:00
m.rtp_rtx_ssrc.emplace_back();
m.rtp_rtx_ssrc[1] = m.rtp_rtx_ssrc[0];
m.rtp_rtx_ssrc[1].ssrc += RTX_SSRC_OFFSET;
2021-04-02 23:01:58 +08:00
}
2021-04-02 17:56:24 +08:00
}
2021-03-24 16:52:41 +08:00
}
2021-04-02 18:28:01 +08:00
void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
WebRtcTransport::onRtcConfigure(configure);
2021-04-05 11:32:38 +08:00
if (_play_src) {
//这是播放,同时也可能有推流
configure.video.direction = _push_src ? RtpDirection::sendrecv : RtpDirection::sendonly;
configure.audio.direction = configure.video.direction;
configure.setPlayRtspInfo(_play_src->getSdp());
} else if (_push_src) {
//这只是推流
2021-04-04 23:20:10 +08:00
configure.video.direction = RtpDirection::recvonly;
configure.audio.direction = RtpDirection::recvonly;
2021-04-05 11:32:38 +08:00
} else {
throw std::invalid_argument("未设置播放或推流的媒体源");
2021-04-02 18:28:01 +08:00
}
2021-04-03 08:45:18 +08:00
//添加接收端口candidate信息
2021-04-02 18:28:01 +08:00
configure.addCandidate(*getIceCandidate());
}
2021-04-02 17:08:11 +08:00
SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{
auto candidate = std::make_shared<SdpAttrCandidate>();
candidate->foundation = "udpcandidate";
2021-04-03 08:45:18 +08:00
//rtp端口
2021-04-02 17:08:11 +08:00
candidate->component = 1;
candidate->transport = "udp";
2021-04-03 08:45:18 +08:00
//优先级单candidate时随便
2021-04-02 17:08:11 +08:00
candidate->priority = 100;
2021-04-07 17:51:06 +08:00
GET_CONFIG(string, extern_ip, RTC::kExternIP);
candidate->address = extern_ip.empty() ? SockUtil::get_local_ip() : extern_ip;
2021-04-02 17:56:24 +08:00
candidate->port = _socket->get_local_port();
2021-04-02 17:08:11 +08:00
candidate->type = "host";
return candidate;
}
2021-04-03 08:45:18 +08:00
///////////////////////////////////////////////////////////////////
2021-04-02 23:01:58 +08:00
class RtpReceiverImp : public RtpReceiver {
public:
RtpReceiverImp( function<void(RtpPacket::Ptr rtp)> cb, function<void(const RtpPacket::Ptr &rtp)> cb_before = nullptr){
_on_sort = std::move(cb);
_on_before_sort = std::move(cb_before);
}
~RtpReceiverImp() override = default;
bool inputRtp(TrackType type, int samplerate, uint8_t *ptr, size_t len){
return handleOneRtp((int) type, type, samplerate, ptr, len);
}
protected:
void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override {
_on_sort(std::move(rtp));
}
void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override {
if (_on_before_sort) {
_on_before_sort(rtp);
}
}
private:
function<void(RtpPacket::Ptr rtp)> _on_sort;
function<void(const RtpPacket::Ptr &rtp)> _on_before_sort;
};
2021-04-02 17:56:24 +08:00
void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
2021-04-07 18:35:38 +08:00
_bytes_usage += len;
2021-04-03 00:04:52 +08:00
auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len);
for (auto rtcp : rtcps) {
switch ((RtcpType) rtcp->pt) {
case RtcpType::RTCP_SR : {
//对方汇报rtp发送情况
RtcpSR *sr = (RtcpSR *) rtcp;
2021-04-04 23:20:10 +08:00
auto it = _rtp_info_ssrc.find(sr->ssrc);
if (it != _rtp_info_ssrc.end()) {
2021-04-03 00:04:52 +08:00
it->second->rtcp_context_recv->onRtcp(sr);
2021-04-03 08:32:20 +08:00
auto rr = it->second->rtcp_context_recv->createRtcpRR(sr->items.ssrc, sr->ssrc);
2021-04-03 00:04:52 +08:00
sendRtcpPacket(rr->data(), rr->size(), true);
2021-05-08 10:49:09 +08:00
} else {
2021-05-08 22:25:56 +08:00
WarnL << "未识别的sr rtcp包:" << (uint32_t)sr->ssrc;
2021-04-03 00:04:52 +08:00
}
break;
}
case RtcpType::RTCP_RR : {
2021-04-07 17:51:06 +08:00
_alive_ticker.resetTime();
2021-04-03 00:04:52 +08:00
//对方汇报rtp接收情况
RtcpRR *rr = (RtcpRR *) rtcp;
2021-04-04 23:20:10 +08:00
auto it = _rtp_info_ssrc.find(rr->ssrc);
if (it != _rtp_info_ssrc.end()) {
2021-04-03 08:32:20 +08:00
auto sr = it->second->rtcp_context_send->createRtcpSR(rr->items.ssrc);
2021-04-03 00:04:52 +08:00
sendRtcpPacket(sr->data(), sr->size(), true);
2021-05-08 10:49:09 +08:00
} else {
2021-05-08 22:25:56 +08:00
WarnL << "未识别的rr rtcp包:" << (uint32_t)rr->ssrc;
2021-04-03 00:04:52 +08:00
}
break;
}
2021-04-03 08:45:18 +08:00
case RtcpType::RTCP_BYE : {
2021-04-07 17:21:59 +08:00
//对方汇报停止发送rtp
RtcpBye *bye = (RtcpBye *) rtcp;
for (auto ssrc : bye->getSSRC()) {
auto it = _rtp_info_ssrc.find(*ssrc);
if (it == _rtp_info_ssrc.end()) {
2021-05-08 10:49:09 +08:00
WarnL << "未识别的bye rtcp包:" << *ssrc;
2021-04-07 17:21:59 +08:00
continue;
}
_rtp_info_pt.erase(it->second->plan->pt);
_rtp_info_ssrc.erase(it);
}
onShutdown(SockException(Err_eof, "rtcp bye message received"));
2021-04-03 08:45:18 +08:00
break;
}
2021-04-23 18:27:47 +08:00
case RtcpType::RTCP_PSFB:
2021-04-22 17:34:26 +08:00
case RtcpType::RTCP_RTPFB: {
2021-05-11 00:54:33 +08:00
RtcpFB *fb = (RtcpFB *) rtcp;
auto it = _rtp_info_ssrc.find(fb->ssrc);
if (it == _rtp_info_ssrc.end()) {
WarnL << "未识别的 rtcp包:" << rtcp->dumpString();
return;
}
if ((RtcpType) rtcp->pt == RtcpType::RTCP_PSFB) {
break;
}
//RTPFB
switch ((RTPFBType) rtcp->report_count) {
case RTPFBType::RTCP_RTPFB_NACK : {
auto &fci = fb->getFci<FCI_NACK>();
it->second->nack_list.for_each_nack(fci, [&](const RtpPacket::Ptr &rtp) {
//rtp重传
onSendRtp(rtp, true, true);
});
break;
}
2021-05-11 15:48:01 +08:00
default: break;
2021-05-11 00:54:33 +08:00
}
2021-04-22 17:34:26 +08:00
break;
}
2021-04-03 00:04:52 +08:00
default: break;
}
}
}
2021-05-11 15:48:01 +08:00
///////////////////////////////////////////////////////////////////
static void setExtType(RtpExt &ext, uint8_t tp) {}
static void setExtType(RtpExt &ext, RtpExtType tp) {
ext.setType(tp);
}
template<typename Type>
static void changeRtpExtId(const RtpHeader *header, const Type &map) {
auto ext_map = RtpExt::getExtValue(header);
for (auto &pr : ext_map) {
auto it = map.find((typename Type::key_type) (pr.first));
if (it == map.end()) {
WarnL << "未处理的rtp ext, 类型不识别:" << (int) pr.first;
pr.second.clearExt();
continue;
}
setExtType(pr.second, it->first);
setExtType(pr.second, it->second);
pr.second.setExtId((uint8_t) it->second);
}
}
2021-04-03 08:45:18 +08:00
void WebRtcTransportImp::onRtp(const char *buf, size_t len) {
2021-05-11 15:48:01 +08:00
onRtp_l(buf, len, false);
}
void WebRtcTransportImp::onRtp_l(const char *buf, size_t len, bool rtx) {
2021-04-07 18:35:38 +08:00
_bytes_usage += len;
2021-04-07 17:51:06 +08:00
_alive_ticker.resetTime();
2021-04-03 08:45:18 +08:00
RtpHeader *rtp = (RtpHeader *) buf;
//根据接收到的rtp的pt信息找到该流的信息
2021-04-04 23:20:10 +08:00
auto it = _rtp_info_pt.find(rtp->pt);
if (it == _rtp_info_pt.end()) {
2021-04-03 08:45:18 +08:00
WarnL;
return;
}
auto &info = it->second;
2021-05-11 12:12:28 +08:00
if (info.is_common_rtp) {
2021-05-11 15:48:01 +08:00
//这是普通的rtp数据
auto seq = ntohs(rtp->seq);
#if 0
if (!rtx && info.media->type == TrackVideo && seq % 100 == 0) {
//此处模拟接受丢包
DebugL << "recv dropped:" << seq;
2021-05-11 12:12:28 +08:00
return;
2021-05-11 15:48:01 +08:00
}
#endif
if (!rtx) {
//统计rtp接受情况便于生成nack rtcp包
2021-05-11 12:12:28 +08:00
info.nack_ctx.received(seq);
}
2021-05-11 15:48:01 +08:00
//解析并排序rtp
info.receiver->inputRtp(info.media->type, info.plan->sample_rate, (uint8_t *) buf, len);
return;
2021-05-11 11:18:55 +08:00
}
2021-05-11 15:48:01 +08:00
//这里是rtx重传包
//https://datatracker.ietf.org/doc/html/rfc4588#section-4
auto payload = rtp->getPayloadData();
auto size = rtp->getPayloadSize(len);
if (size < 2) {
return;
}
//前两个字节是原始的rtp的seq
auto origin_seq = payload[0] << 8 | payload[1];
InfoL << "received rtx rtp: " << origin_seq;
rtp->seq = htons(origin_seq);
rtp->ssrc = htonl(info.media->rtp_rtx_ssrc[0].ssrc);
rtp->pt = info.plan_apt->pt;
memmove((uint8_t *) buf + 2, buf, payload - (uint8_t *) buf);
buf += 2;
len -= 2;
onRtp_l(buf, len, true);
2021-04-03 08:45:18 +08:00
}
2021-05-11 12:12:28 +08:00
void WebRtcTransportImp::onNack(RtpPayloadInfo &info, const FCI_NACK &nack) {
auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize);
rtcp->ssrc = htons(0);
rtcp->ssrc_media = htonl(info.media->rtp_rtx_ssrc[0].ssrc);
sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true);
}
2021-04-03 08:45:18 +08:00
///////////////////////////////////////////////////////////////////
2021-04-03 08:32:20 +08:00
2021-05-11 15:48:01 +08:00
void WebRtcTransportImp::onBeforeSortedRtp(RtpPayloadInfo &info, const RtpPacket::Ptr &rtp) {
changeRtpExtId(rtp->getHeader(), _rtp_ext_id_to_type);
//统计rtp收到的情况好做rr汇报
info.rtcp_context_recv->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
}
2021-05-11 11:18:55 +08:00
void WebRtcTransportImp::onSortedRtp(RtpPayloadInfo &info, RtpPacket::Ptr rtp) {
2021-05-08 10:49:09 +08:00
if (info.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) {
2021-04-07 21:02:28 +08:00
//定期发送pli请求关键帧方便非rtc等协议
2021-04-02 23:12:37 +08:00
_pli_ticker.resetTime();
2021-05-08 10:49:09 +08:00
sendRtcpPli(rtp->getSSRC());
2021-04-30 14:49:06 +08:00
//开启remb则发送remb包调节比特率
GET_CONFIG(size_t, remb_bit_rate, RTC::kRembBitRate);
2021-04-30 15:15:35 +08:00
if (remb_bit_rate && getSdp(SdpType::answer).supportRtcpFb(SdpConst::kRembRtcpFb)) {
2021-05-08 10:49:09 +08:00
sendRtcpRemb(rtp->getSSRC(), remb_bit_rate);
2021-04-30 14:49:06 +08:00
}
2021-04-02 23:12:37 +08:00
}
2021-05-07 18:34:04 +08:00
2021-05-08 10:49:09 +08:00
if (_push_src) {
_push_src->onWrite(std::move(rtp), false);
}
}
2021-05-11 15:48:01 +08:00
///////////////////////////////////////////////////////////////////
2021-04-03 08:32:20 +08:00
2021-05-11 00:54:33 +08:00
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx){
2021-05-10 18:11:12 +08:00
auto info = _send_rtp_info[rtp->type];
if (!info) {
2021-04-03 08:32:20 +08:00
//忽略,对方不支持该编码类型
return;
}
2021-05-11 00:54:33 +08:00
if (!rtx) {
//统计rtp发送情况好做sr汇报
info->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
info->nack_list.push_back(rtp);
2021-05-11 11:18:55 +08:00
#if 0
//此处模拟发送丢包
2021-05-11 15:48:01 +08:00
if(rtp->getSeq() % 100 == 0){
DebugL << "send droped:" << rtp->getSeq();
2021-05-11 11:18:55 +08:00
return;
}
#endif
2021-05-11 00:54:33 +08:00
} else {
2021-05-11 15:48:01 +08:00
WarnL << "send rtx rtp:" << rtp->getSeq();
2021-05-11 00:54:33 +08:00
}
sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, info);
2021-04-07 18:35:38 +08:00
_bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize;
2021-04-03 08:32:20 +08:00
}
2021-04-07 17:21:59 +08:00
2021-05-11 00:54:33 +08:00
void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, size_t len, void *ctx) {
RtpPayloadInfo *info = reinterpret_cast<RtpPayloadInfo *>(ctx);
auto header = (RtpHeader *)buf;
2021-05-10 18:11:12 +08:00
//修改目标pt和ssrc
header->pt = info->plan->pt;
header->ssrc = htons(info->media->rtp_rtx_ssrc[0].ssrc);
changeRtpExtId(header, _rtp_ext_type_to_id);
}
2021-05-11 00:54:33 +08:00
void WebRtcTransportImp::onBeforeEncryptRtcp(const char *buf, size_t len, void *ctx) {
}
2021-04-07 17:21:59 +08:00
void WebRtcTransportImp::onShutdown(const SockException &ex){
InfoL << ex.what();
2021-04-07 17:51:06 +08:00
_self = nullptr;
2021-04-07 17:21:59 +08:00
}
/////////////////////////////////////////////////////////////////////////////////////////////
bool WebRtcTransportImp::close(MediaSource &sender, bool force) {
//此回调在其他线程触发
if(!_push_src || (!force && _push_src->totalReaderCount())){
return false;
}
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
onShutdown(SockException(Err_shutdown,err));
return true;
}
int WebRtcTransportImp::totalReaderCount(MediaSource &sender) {
return _push_src ? _push_src->totalReaderCount() : sender.readerCount();
}
MediaOriginType WebRtcTransportImp::getOriginType(MediaSource &sender) const {
return MediaOriginType::rtc_push;
}
string WebRtcTransportImp::getOriginUrl(MediaSource &sender) const {
return "";
}
std::shared_ptr<SockInfo> WebRtcTransportImp::getOriginSock(MediaSource &sender) const {
return const_cast<WebRtcTransportImp *>(this)->shared_from_this();
}
/////////////////////////////////////////////////////////////////////////////////////////////
string WebRtcTransportImp::get_local_ip() {
return getSdp(SdpType::answer).media[0].candidate[0].address;
}
uint16_t WebRtcTransportImp::get_local_port() {
return _socket->get_local_port();
}
string WebRtcTransportImp::get_peer_ip() {
return SockUtil::inet_ntoa(((struct sockaddr_in *) getSelectedTuple())->sin_addr);
}
uint16_t WebRtcTransportImp::get_peer_port() {
return ntohs(((struct sockaddr_in *) getSelectedTuple())->sin_port);
}
string WebRtcTransportImp::getIdentifier() const {
return StrPrinter << this;
}