ZLMediaKit/webrtc/WebRtcTransport.cpp

1061 lines
38 KiB
C++
Raw Normal View History

2021-04-09 20:42:36 +08:00
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "WebRtcTransport.h"
2021-03-24 16:52:41 +08:00
#include <iostream>
2021-05-07 18:34:04 +08:00
#include "RtpExt.h"
2021-03-24 16:52:41 +08:00
#include "Rtcp/Rtcp.h"
2021-04-28 15:41:36 +08:00
#include "Rtcp/RtcpFCI.h"
2021-04-02 23:01:58 +08:00
#include "Rtsp/RtpReceiver.h"
2021-04-28 15:41:36 +08:00
#define RTP_SSRC_OFFSET 1
2021-04-02 23:01:58 +08:00
#define RTX_SSRC_OFFSET 2
#define RTP_CNAME "zlmediakit-rtp"
2021-05-08 15:01:14 +08:00
#define RTP_LABEL "zlmediakit-label"
#define RTP_MSLABEL "zlmediakit-mslabel"
#define RTP_MSID RTP_MSLABEL " " RTP_LABEL
2021-04-02 23:01:58 +08:00
using namespace std;
using namespace mediakit;
2021-04-07 17:51:06 +08:00
//RTC配置项目
namespace RTC {
#define RTC_FIELD "rtc."
//rtp和rtcp接受超时时间
const string kTimeOutSec = RTC_FIELD"timeoutSec";
//服务器外网ip
const string kExternIP = RTC_FIELD"externIP";
2021-04-28 15:41:36 +08:00
//设置remb比特率非0时关闭twcc并开启remb。该设置在rtc推流时有效可以控制推流画质
const string kRembBitRate = RTC_FIELD"rembBitRate";
2021-09-08 18:00:55 +08:00
//webrtc单端口udp服务器
const string kPort = RTC_FIELD"port";
2021-04-07 17:51:06 +08:00
static onceToken token([]() {
mINI::Instance()[kTimeOutSec] = 15;
mINI::Instance()[kExternIP] = "";
2021-04-28 15:41:36 +08:00
mINI::Instance()[kRembBitRate] = 0;
2021-09-08 18:00:55 +08:00
mINI::Instance()[kPort] = 8000;
2021-04-07 17:51:06 +08:00
});
}//namespace RTC
2021-11-28 21:43:21 +08:00
static atomic<uint64_t> s_key{0};
2021-03-27 10:16:49 +08:00
WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) {
2021-04-04 23:20:10 +08:00
_poller = poller;
_identifier = "zlm_" + to_string(++s_key);
_packet_pool.setSize(64);
2021-03-24 16:52:41 +08:00
}
2021-04-07 17:21:59 +08:00
void WebRtcTransport::onCreate(){
2021-09-08 18:00:55 +08:00
_dtls_transport = std::make_shared<RTC::DtlsTransport>(_poller, this);
2021-10-15 17:12:39 +08:00
_ice_server = std::make_shared<RTC::IceServer>(this, _identifier, makeRandStr(24));
2021-04-07 17:21:59 +08:00
}
2021-03-27 10:16:49 +08:00
void WebRtcTransport::onDestory(){
2021-04-02 17:08:11 +08:00
_dtls_transport = nullptr;
_ice_server = nullptr;
2021-03-27 09:38:13 +08:00
}
2021-03-24 16:52:41 +08:00
2021-04-04 23:20:10 +08:00
const EventPoller::Ptr& WebRtcTransport::getPoller() const{
return _poller;
}
2021-10-15 17:12:39 +08:00
const string &WebRtcTransport::getIdentifier() const {
return _identifier;
2021-09-10 22:31:44 +08:00
}
2021-03-26 11:07:03 +08:00
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) {
sendSockData((char *) packet->GetData(), packet->GetSize(), tuple);
2021-03-26 11:07:03 +08:00
}
void WebRtcTransport::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) {
InfoL;
}
void WebRtcTransport::OnIceServerConnected(const RTC::IceServer *iceServer) {
InfoL;
2021-04-01 14:16:42 +08:00
}
void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer *iceServer) {
InfoL;
2021-04-01 10:09:50 +08:00
if (_answer_sdp->media[0].role == DtlsRole::passive) {
2021-04-02 17:08:11 +08:00
_dtls_transport->Run(RTC::DtlsTransport::Role::SERVER);
2021-04-01 10:09:50 +08:00
} else {
2021-04-02 17:08:11 +08:00
_dtls_transport->Run(RTC::DtlsTransport::Role::CLIENT);
2021-04-01 10:09:50 +08:00
}
2021-03-26 11:07:03 +08:00
}
void WebRtcTransport::OnIceServerDisconnected(const RTC::IceServer *iceServer) {
InfoL;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnDtlsTransportConnected(
const RTC::DtlsTransport *dtlsTransport,
RTC::SrtpSession::CryptoSuite srtpCryptoSuite,
uint8_t *srtpLocalKey,
size_t srtpLocalKeyLen,
uint8_t *srtpRemoteKey,
size_t srtpRemoteKeyLen,
std::string &remoteCert) {
InfoL;
2021-04-02 17:08:11 +08:00
_srtp_session_send = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen);
2021-07-28 11:18:09 +08:00
_srtp_session_recv = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen);
2021-04-02 17:08:11 +08:00
onStartWebRTC();
2021-03-26 11:07:03 +08:00
}
void WebRtcTransport::OnDtlsTransportSendData(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
sendSockData((char *)data, len, nullptr);
2021-03-26 11:07:03 +08:00
}
2021-04-07 17:51:06 +08:00
void WebRtcTransport::OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
}
void WebRtcTransport::OnDtlsTransportFailed(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
onShutdown(SockException(Err_shutdown, "dtls transport failed"));
}
void WebRtcTransport::OnDtlsTransportClosed(const RTC::DtlsTransport *dtlsTransport) {
InfoL;
onShutdown(SockException(Err_shutdown, "dtls close notify received"));
}
void WebRtcTransport::OnDtlsTransportApplicationDataReceived(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
InfoL << hexdump(data, len);
}
2021-03-26 11:07:03 +08:00
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTuple *tuple){
2022-01-06 14:30:44 +08:00
auto pkt = _packet_pool.obtain2();
pkt->assign(buf, len);
onSendSockData(std::move(pkt), true, tuple ? tuple : _ice_server->GetSelectedTuple());
2021-03-26 11:07:03 +08:00
}
RTC::TransportTuple* WebRtcTransport::getSelectedTuple() const{
return _ice_server->GetSelectedTuple();
}
2021-04-28 15:41:36 +08:00
void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) {
auto remb = FCI_REMB::create({ssrc}, (uint32_t)bit_rate);
auto fb = RtcpFB::create(PSFBType::RTCP_PSFB_REMB, remb.data(), remb.size());
fb->ssrc = htonl(0);
fb->ssrc_media = htonl(ssrc);
sendRtcpPacket((char *) fb.get(), fb->getSize(), true);
}
void WebRtcTransport::sendRtcpPli(uint32_t ssrc) {
auto pli = RtcpFB::create(PSFBType::RTCP_PSFB_PLI);
pli->ssrc = htonl(0);
pli->ssrc_media = htonl(ssrc);
sendRtcpPacket((char *) pli.get(), pli->getSize(), true);
}
2021-04-01 10:09:50 +08:00
string getFingerprint(const string &algorithm_str, const std::shared_ptr<RTC::DtlsTransport> &transport){
auto algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(algorithm_str);
for (auto &finger_prints : transport->GetLocalFingerprints()) {
if (finger_prints.algorithm == algorithm) {
return finger_prints.value;
}
}
throw std::invalid_argument(StrPrinter << "不支持的加密算法:" << algorithm_str);
}
2021-04-02 17:08:11 +08:00
void WebRtcTransport::setRemoteDtlsFingerprint(const RtcSession &remote){
//设置远端dtls签名
RTC::DtlsTransport::Fingerprint remote_fingerprint;
remote_fingerprint.algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(_offer_sdp->media[0].fingerprint.algorithm);
remote_fingerprint.value = _offer_sdp->media[0].fingerprint.hash;
_dtls_transport->SetRemoteFingerprint(remote_fingerprint);
}
2021-04-28 15:41:36 +08:00
void WebRtcTransport::onRtcConfigure(RtcConfigure &configure) const {
//开启remb后关闭twcc因为开启twcc后remb无效
GET_CONFIG(size_t, remb_bit_rate, RTC::kRembBitRate);
2021-05-07 18:34:04 +08:00
configure.enableTWCC(!remb_bit_rate);
2021-04-28 15:41:36 +08:00
}
2021-03-31 17:15:26 +08:00
std::string WebRtcTransport::getAnswerSdp(const string &offer){
try {
//// 解析offer sdp ////
_offer_sdp = std::make_shared<RtcSession>();
_offer_sdp->loadFrom(offer);
2021-10-12 17:32:22 +08:00
onCheckSdp(SdpType::offer, *_offer_sdp);
_offer_sdp->checkValid();
setRemoteDtlsFingerprint(*_offer_sdp);
//// sdp 配置 ////
SdpAttrFingerprint fingerprint;
fingerprint.algorithm = _offer_sdp->media[0].fingerprint.algorithm;
fingerprint.hash = getFingerprint(fingerprint.algorithm, _dtls_transport);
RtcConfigure configure;
configure.setDefaultSetting(_ice_server->GetUsernameFragment(), _ice_server->GetPassword(),
RtpDirection::sendrecv, fingerprint);
onRtcConfigure(configure);
//// 生成answer sdp ////
_answer_sdp = configure.createAnswer(*_offer_sdp);
onCheckSdp(SdpType::answer, *_answer_sdp);
_answer_sdp->checkValid();
return _answer_sdp->toString();
} catch (exception &ex) {
onShutdown(SockException(Err_shutdown, ex.what()));
throw;
}
2021-03-31 17:15:26 +08:00
}
static bool is_dtls(char *buf) {
2021-03-24 16:52:41 +08:00
return ((*buf > 19) && (*buf < 64));
}
static bool is_rtp(char *buf) {
2021-03-24 16:52:41 +08:00
RtpHeader *header = (RtpHeader *) buf;
return ((header->pt < 64) || (header->pt >= 96));
}
static bool is_rtcp(char *buf) {
2021-03-24 16:52:41 +08:00
RtpHeader *header = (RtpHeader *) buf;
return ((header->pt >= 64) && (header->pt < 96));
}
2021-09-24 11:29:04 +08:00
static string getPeerAddress(RTC::TransportTuple *tuple){
return SockUtil::inet_ntoa(((struct sockaddr_in *)tuple)->sin_addr) + ":" + to_string(ntohs(((struct sockaddr_in *)tuple)->sin_port));
}
void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tuple) {
2021-03-24 16:52:41 +08:00
if (RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {
2021-08-19 19:14:58 +08:00
std::unique_ptr<RTC::StunPacket> packet(RTC::StunPacket::Parse((const uint8_t *) buf, len));
if (!packet) {
2021-03-24 16:52:41 +08:00
WarnL << "parse stun error" << std::endl;
return;
}
2021-08-19 19:14:58 +08:00
_ice_server->ProcessStunPacket(packet.get(), tuple);
2021-03-24 16:52:41 +08:00
return;
}
2021-03-26 11:07:03 +08:00
if (is_dtls(buf)) {
2021-04-02 17:08:11 +08:00
_dtls_transport->ProcessDtlsData((uint8_t *) buf, len);
2021-03-24 16:52:41 +08:00
return;
}
2021-04-01 14:16:42 +08:00
if (is_rtp(buf)) {
2021-09-24 11:29:04 +08:00
if (!_srtp_session_recv) {
WarnL << "received rtp packet when dtls not completed from:" << getPeerAddress(tuple);
return;
}
2021-07-28 11:18:09 +08:00
if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) {
onRtp(buf, len, _ticker.createdTime());
2021-04-02 17:56:24 +08:00
}
2021-04-01 14:16:42 +08:00
return;
}
if (is_rtcp(buf)) {
2021-09-24 11:29:04 +08:00
if (!_srtp_session_recv) {
WarnL << "received rtcp packet when dtls not completed from:" << getPeerAddress(tuple);
return;
}
2021-07-28 11:18:09 +08:00
if (_srtp_session_recv->DecryptSrtcp((uint8_t *) buf, &len)) {
2021-04-02 17:56:24 +08:00
onRtcp(buf, len);
}
2021-04-01 14:16:42 +08:00
return;
}
2021-03-24 16:52:41 +08:00
}
void WebRtcTransport::sendRtpPacket(const char *buf, int len, bool flush, void *ctx) {
2021-04-02 17:08:11 +08:00
if (_srtp_session_send) {
2022-01-06 14:30:44 +08:00
auto pkt = _packet_pool.obtain2();
2021-05-12 20:51:51 +08:00
//预留rtx加入的两个字节
pkt->setCapacity((size_t) len + SRTP_MAX_TRAILER_LEN + 2);
pkt->assign(buf, len);
onBeforeEncryptRtp(pkt->data(), len, ctx);
if (_srtp_session_send->EncryptRtp(reinterpret_cast<uint8_t *>(pkt->data()), &len)) {
pkt->setSize(len);
onSendSockData(std::move(pkt), flush);
}
2021-03-24 16:52:41 +08:00
}
}
void WebRtcTransport::sendRtcpPacket(const char *buf, int len, bool flush, void *ctx) {
2021-04-02 23:12:37 +08:00
if (_srtp_session_send) {
2022-01-06 14:30:44 +08:00
auto pkt = _packet_pool.obtain2();
//预留rtx加入的两个字节
pkt->setCapacity((size_t) len + SRTP_MAX_TRAILER_LEN + 2);
pkt->assign(buf, len);
onBeforeEncryptRtcp(pkt->data(), len, ctx);
if (_srtp_session_send->EncryptRtcp(reinterpret_cast<uint8_t *>(pkt->data()), &len)) {
pkt->setSize(len);
onSendSockData(std::move(pkt), flush);
}
2021-04-02 23:12:37 +08:00
}
}
2021-03-24 16:52:41 +08:00
///////////////////////////////////////////////////////////////////////////////////
2021-04-07 17:21:59 +08:00
void WebRtcTransportImp::onCreate(){
WebRtcTransport::onCreate();
2021-09-10 22:31:44 +08:00
registerSelf();
2021-09-08 18:00:55 +08:00
weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
2021-04-07 17:51:06 +08:00
GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec);
_timer = std::make_shared<Timer>(timeoutSec / 2, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) {
strong_self->onShutdown(SockException(Err_timeout, "接受rtp和rtcp超时"));
}
return true;
}, getPoller());
2021-10-07 14:22:12 +08:00
_twcc_ctx.setOnSendTwccCB([this](uint32_t ssrc, string fci) {
onSendTwcc(ssrc, fci);
});
2021-03-24 16:52:41 +08:00
}
2021-04-07 17:21:59 +08:00
WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) {
2021-10-15 17:12:39 +08:00
InfoL << getIdentifier();
2021-04-07 17:51:06 +08:00
}
2021-04-07 17:21:59 +08:00
2021-04-07 17:51:06 +08:00
WebRtcTransportImp::~WebRtcTransportImp() {
2021-10-15 17:12:39 +08:00
InfoL << getIdentifier();
2021-04-07 17:21:59 +08:00
}
2021-03-27 10:16:49 +08:00
void WebRtcTransportImp::onDestory() {
WebRtcTransport::onDestory();
2021-09-10 22:31:44 +08:00
unregisterSelf();
2021-03-24 16:52:41 +08:00
}
void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::TransportTuple *tuple) {
if (!_selected_session) {
WarnL << "send data failed:" << buf->size();
2021-09-10 22:31:44 +08:00
return;
}
2021-09-24 11:29:04 +08:00
//一次性发送一帧的rtp数据提高网络io性能
_selected_session->setSendFlushFlag(flush);
_selected_session->send(std::move(buf));
2021-04-03 08:32:20 +08:00
}
2021-04-03 08:45:18 +08:00
///////////////////////////////////////////////////////////////////
bool WebRtcTransportImp::canSendRtp() const{
for (auto &m : _answer_sdp->media) {
if (m.direction == RtpDirection::sendrecv || m.direction == RtpDirection::sendonly) {
return true;
}
}
return false;
2021-04-03 08:45:18 +08:00
}
bool WebRtcTransportImp::canRecvRtp() const{
for (auto &m : _answer_sdp->media) {
if (m.direction == RtpDirection::sendrecv || m.direction == RtpDirection::recvonly) {
return true;
}
}
return false;
2021-04-03 08:45:18 +08:00
}
2021-04-02 17:08:11 +08:00
void WebRtcTransportImp::onStartWebRTC() {
2021-05-08 10:49:09 +08:00
//获取ssrc和pt相关信息,届时收到rtp和rtcp时分别可以根据pt和ssrc找到相关的信息
for (auto &m_answer : _answer_sdp->media) {
auto m_offer = _offer_sdp->getMedia(m_answer.type);
2021-06-25 17:17:48 +08:00
auto track = std::make_shared<MediaTrack>();
2021-05-16 16:12:10 +08:00
2021-06-25 17:17:48 +08:00
track->media = &m_answer;
track->answer_ssrc_rtp = m_answer.getRtpSSRC();
track->answer_ssrc_rtx = m_answer.getRtxSSRC();
track->offer_ssrc_rtp = m_offer->getRtpSSRC();
track->offer_ssrc_rtx = m_offer->getRtxSSRC();
track->plan_rtp = &m_answer.plan[0];;
track->plan_rtx = m_answer.getRelatedRtxPlan(track->plan_rtp->pt);
track->rtcp_context_send = std::make_shared<RtcpContextForSend>();
2021-05-16 16:12:10 +08:00
//rtp track type --> MediaTrack
if (m_answer.direction == RtpDirection::sendonly || m_answer.direction == RtpDirection::sendrecv) {
//该类型的track 才支持发送
_type_to_track[m_answer.type] = track;
}
2021-06-25 14:37:11 +08:00
//send ssrc --> MediaTrack
2021-06-25 17:17:48 +08:00
_ssrc_to_track[track->answer_ssrc_rtp] = track;
_ssrc_to_track[track->answer_ssrc_rtx] = track;
2021-05-16 16:12:10 +08:00
2021-06-25 14:37:11 +08:00
//recv ssrc --> MediaTrack
2021-06-25 17:17:48 +08:00
_ssrc_to_track[track->offer_ssrc_rtp] = track;
_ssrc_to_track[track->offer_ssrc_rtx] = track;
2021-05-16 16:12:10 +08:00
2021-06-25 14:37:11 +08:00
//rtp pt --> MediaTrack
2021-11-19 17:03:54 +08:00
_pt_to_track.emplace(track->plan_rtp->pt, std::unique_ptr<WrappedMediaTrack>(new WrappedRtpTrack(track, _twcc_ctx, *this)));
2021-06-25 17:17:48 +08:00
if (track->plan_rtx) {
2021-06-25 14:37:11 +08:00
//rtx pt --> MediaTrack
2021-11-19 17:03:54 +08:00
_pt_to_track.emplace(track->plan_rtx->pt, std::unique_ptr<WrappedMediaTrack>(new WrappedRtxTrack(track)));
2021-04-02 23:01:58 +08:00
}
2021-05-16 16:12:10 +08:00
if (m_offer->type != TrackApplication) {
2021-05-08 10:49:09 +08:00
//记录rtp ext类型与id的关系方便接收或发送rtp时修改rtp ext id
2021-06-25 17:17:48 +08:00
track->rtp_ext_ctx = std::make_shared<RtpExtContext>(*m_offer);
weak_ptr<MediaTrack> weak_track = track;
track->rtp_ext_ctx->setOnGetRtp([this, weak_track](uint8_t pt, uint32_t ssrc, const string &rid) {
2021-06-25 16:57:38 +08:00
//ssrc --> MediaTrack
auto track = weak_track.lock();
assert(track);
_ssrc_to_track[ssrc] = std::move(track);
2021-06-25 16:57:38 +08:00
InfoL << "get rtp, pt:" << (int) pt << ", ssrc:" << ssrc << ", rid:" << rid;
});
2021-06-25 18:08:21 +08:00
2021-09-03 11:45:44 +08:00
size_t index = 0;
2021-06-25 18:08:21 +08:00
for (auto &ssrc : m_offer->rtp_ssrc_sim) {
//记录ssrc对应的MediaTrack
_ssrc_to_track[ssrc.ssrc] = track;
if (m_offer->rtp_rids.size() > index) {
//支持firefox的simulcast, 提前映射好ssrc和rid的关系
track->rtp_ext_ctx->setRid(ssrc.ssrc, m_offer->rtp_rids[index]);
}
++index;
}
2021-05-08 10:49:09 +08:00
}
2021-04-02 23:01:58 +08:00
}
2021-04-02 20:35:43 +08:00
}
void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) {
2021-05-10 18:11:12 +08:00
//修改answer sdp的ip、端口信息
2021-04-27 01:16:01 +08:00
GET_CONFIG(string, extern_ip, RTC::kExternIP);
for (auto &m : sdp.media) {
m.addr.reset();
m.addr.address = extern_ip.empty() ? SockUtil::get_local_ip() : extern_ip;
m.rtcp_addr.reset();
m.rtcp_addr.address = m.addr.address;
2021-09-08 18:00:55 +08:00
GET_CONFIG(uint16_t, local_port, RTC::kPort);
m.rtcp_addr.port = local_port;
2021-04-27 01:16:01 +08:00
m.port = m.rtcp_addr.port;
sdp.origin.address = m.addr.address;
}
2021-05-12 20:51:51 +08:00
if (!canSendRtp()) {
//设置我们发送的rtp的ssrc
2021-04-02 17:56:24 +08:00
return;
}
2021-04-02 20:35:43 +08:00
2021-04-02 17:56:24 +08:00
for (auto &m : sdp.media) {
if (m.type == TrackApplication) {
continue;
}
if (!m.rtp_rtx_ssrc.empty()) {
//已经生成了ssrc
continue;
}
2021-04-05 00:06:21 +08:00
//添加answer sdp的ssrc信息
2021-05-08 18:39:35 +08:00
m.rtp_rtx_ssrc.emplace_back();
auto &ssrc = m.rtp_rtx_ssrc.back();
//发送的ssrc我们随便定义因为在发送rtp时会修改为此值
ssrc.ssrc = m.type + RTP_SSRC_OFFSET;
ssrc.cname = RTP_CNAME;
ssrc.label = RTP_LABEL;
ssrc.mslabel = RTP_MSLABEL;
ssrc.msid = RTP_MSID;
2021-05-08 15:01:14 +08:00
if (m.getRelatedRtxPlan(m.plan[0].pt)) {
//rtx ssrc
ssrc.rtx_ssrc = ssrc.ssrc + RTX_SSRC_OFFSET;
2021-04-02 23:01:58 +08:00
}
2021-04-02 17:56:24 +08:00
}
2021-03-24 16:52:41 +08:00
}
void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) {
switch (type) {
2021-10-12 17:32:22 +08:00
case SdpType::answer: onCheckAnswer(sdp); break;
case SdpType::offer: break;
2021-10-12 17:32:22 +08:00
default: /*不可达*/ assert(0); break;
}
}
2021-04-02 18:28:01 +08:00
void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
WebRtcTransport::onRtcConfigure(configure);
2021-04-03 08:45:18 +08:00
//添加接收端口candidate信息
2021-04-02 18:28:01 +08:00
configure.addCandidate(*getIceCandidate());
}
2021-04-02 17:08:11 +08:00
SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{
auto candidate = std::make_shared<SdpAttrCandidate>();
candidate->foundation = "udpcandidate";
2021-04-03 08:45:18 +08:00
//rtp端口
2021-04-02 17:08:11 +08:00
candidate->component = 1;
candidate->transport = "udp";
2021-04-03 08:45:18 +08:00
//优先级单candidate时随便
2021-04-02 17:08:11 +08:00
candidate->priority = 100;
2021-04-07 17:51:06 +08:00
GET_CONFIG(string, extern_ip, RTC::kExternIP);
candidate->address = extern_ip.empty() ? SockUtil::get_local_ip() : extern_ip;
2021-09-08 18:00:55 +08:00
GET_CONFIG(uint16_t, local_port, RTC::kPort);
candidate->port = local_port;
2021-04-02 17:08:11 +08:00
candidate->type = "host";
return candidate;
}
2021-04-03 08:45:18 +08:00
///////////////////////////////////////////////////////////////////
class RtpChannel : public RtpTrackImp, public std::enable_shared_from_this<RtpChannel> {
2021-06-25 14:37:11 +08:00
public:
RtpChannel(EventPoller::Ptr poller, RtpTrackImp::OnSorted cb, function<void(const FCI_NACK &nack)> on_nack) {
_poller = std::move(poller);
_on_nack = std::move(on_nack);
2021-06-25 16:24:44 +08:00
setOnSorted(std::move(cb));
_nack_ctx.setOnNack([this](const FCI_NACK &nack) {
onNack(nack);
});
2021-04-02 23:01:58 +08:00
}
2021-06-25 14:37:11 +08:00
~RtpChannel() override = default;
2021-04-02 23:01:58 +08:00
RtpPacket::Ptr inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len, bool is_rtx) {
auto rtp = RtpTrack::inputRtp(type, sample_rate, ptr, len);
if (!rtp) {
return rtp;
}
auto seq = rtp->getSeq();
_nack_ctx.received(seq, is_rtx);
if (!is_rtx) {
2021-06-25 14:37:11 +08:00
//统计rtp接受情况便于生成nack rtcp包
_rtcp_context.onRtp(seq, rtp->getStamp(), rtp->ntp_stamp, sample_rate, len);
2021-06-25 14:37:11 +08:00
}
return rtp;
2021-04-02 23:01:58 +08:00
}
Buffer::Ptr createRtcpRR(RtcpHeader *sr, uint32_t ssrc) {
2021-06-25 16:24:44 +08:00
_rtcp_context.onRtcp(sr);
2021-06-25 17:10:27 +08:00
return _rtcp_context.createRtcpRR(ssrc, getSSRC());
2021-04-02 23:01:58 +08:00
}
int getLossRate() {
return _rtcp_context.geLostInterval() * 100 / _rtcp_context.getExpectedPacketsInterval();
}
private:
void starNackTimer(){
if (_delay_task) {
return;
}
weak_ptr<RtpChannel> weak_self = shared_from_this();
_delay_task = _poller->doDelayTask(10, [weak_self]() -> uint64_t {
auto strong_self = weak_self.lock();
if (!strong_self) {
return 0;
}
auto ret = strong_self->_nack_ctx.reSendNack();
if (!ret) {
strong_self->_delay_task = nullptr;
}
return ret;
});
}
void onNack(const FCI_NACK &nack) {
_on_nack(nack);
starNackTimer();
}
2021-04-02 23:01:58 +08:00
private:
2021-06-25 16:24:44 +08:00
NackContext _nack_ctx;
RtcpContextForRecv _rtcp_context;
EventPoller::Ptr _poller;
EventPoller::DelayTask::Ptr _delay_task;
function<void(const FCI_NACK &nack)> _on_nack;
2021-04-02 23:01:58 +08:00
};
2021-06-25 15:31:13 +08:00
std::shared_ptr<RtpChannel> MediaTrack::getRtpChannel(uint32_t ssrc) const{
auto it_chn = rtp_channel.find(rtp_ext_ctx->getRid(ssrc));
2021-06-25 14:37:11 +08:00
if (it_chn == rtp_channel.end()) {
return nullptr;
}
return it_chn->second;
}
2021-04-02 17:56:24 +08:00
void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
2021-04-07 18:35:38 +08:00
_bytes_usage += len;
2021-04-03 00:04:52 +08:00
auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len);
for (auto rtcp : rtcps) {
switch ((RtcpType) rtcp->pt) {
case RtcpType::RTCP_SR : {
//对方汇报rtp发送情况
RtcpSR *sr = (RtcpSR *) rtcp;
auto it = _ssrc_to_track.find(sr->ssrc);
if (it != _ssrc_to_track.end()) {
2021-06-25 17:17:48 +08:00
auto &track = it->second;
auto rtp_chn = track->getRtpChannel(sr->ssrc);
2021-06-25 14:37:11 +08:00
if(!rtp_chn){
WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
2021-06-25 14:37:11 +08:00
} else {
//InfoL << "接收丢包率,ssrc:" << sr->ssrc << ",loss rate(%):" << rtp_chn->getLossRate();
//设置rtp时间戳与ntp时间戳的对应关系
2021-09-02 21:17:59 +08:00
rtp_chn->setNtpStamp(sr->rtpts, sr->getNtpUnixStampMS());
2021-06-25 17:17:48 +08:00
auto rr = rtp_chn->createRtcpRR(sr, track->answer_ssrc_rtp);
2021-06-25 14:37:11 +08:00
sendRtcpPacket(rr->data(), rr->size(), true);
2021-05-16 16:12:10 +08:00
}
2021-05-08 10:49:09 +08:00
} else {
2021-05-12 20:51:51 +08:00
WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
2021-04-03 00:04:52 +08:00
}
break;
}
case RtcpType::RTCP_RR : {
2021-04-07 17:51:06 +08:00
_alive_ticker.resetTime();
2021-04-03 00:04:52 +08:00
//对方汇报rtp接收情况
RtcpRR *rr = (RtcpRR *) rtcp;
2021-05-12 20:51:51 +08:00
for (auto item : rr->getItemList()) {
auto it = _ssrc_to_track.find(item->ssrc);
if (it != _ssrc_to_track.end()) {
2021-06-25 17:17:48 +08:00
auto &track = it->second;
2021-07-06 23:09:56 +08:00
track->rtcp_context_send->onRtcp(rtcp);
2021-06-25 17:17:48 +08:00
auto sr = track->rtcp_context_send->createRtcpSR(track->answer_ssrc_rtp);
sendRtcpPacket(sr->data(), sr->size(), true);
2021-05-12 20:51:51 +08:00
} else {
WarnL << "未识别的rr rtcp包:" << rtcp->dumpString();
}
2021-04-03 00:04:52 +08:00
}
break;
}
2021-04-03 08:45:18 +08:00
case RtcpType::RTCP_BYE : {
2021-04-07 17:21:59 +08:00
//对方汇报停止发送rtp
RtcpBye *bye = (RtcpBye *) rtcp;
for (auto ssrc : bye->getSSRC()) {
auto it = _ssrc_to_track.find(*ssrc);
if (it == _ssrc_to_track.end()) {
2021-05-12 20:51:51 +08:00
WarnL << "未识别的bye rtcp包:" << rtcp->dumpString();
2021-04-07 17:21:59 +08:00
continue;
}
_ssrc_to_track.erase(it);
2021-04-07 17:21:59 +08:00
}
onShutdown(SockException(Err_eof, "rtcp bye message received"));
2021-04-03 08:45:18 +08:00
break;
}
2021-04-23 18:27:47 +08:00
case RtcpType::RTCP_PSFB:
2021-04-22 17:34:26 +08:00
case RtcpType::RTCP_RTPFB: {
2021-05-11 00:54:33 +08:00
if ((RtcpType) rtcp->pt == RtcpType::RTCP_PSFB) {
break;
}
//RTPFB
switch ((RTPFBType) rtcp->report_count) {
case RTPFBType::RTCP_RTPFB_NACK : {
2021-05-12 20:51:51 +08:00
RtcpFB *fb = (RtcpFB *) rtcp;
auto it = _ssrc_to_track.find(fb->ssrc_media);
if (it == _ssrc_to_track.end()) {
2021-05-12 20:51:51 +08:00
WarnL << "未识别的 rtcp包:" << rtcp->dumpString();
return;
}
2021-06-25 17:17:48 +08:00
auto &track = it->second;
auto &fci = fb->getFci<FCI_NACK>();
2021-06-25 17:17:48 +08:00
track->nack_list.for_each_nack(fci, [&](const RtpPacket::Ptr &rtp) {
//rtp重传
onSendRtp(rtp, true, true);
});
2021-05-11 00:54:33 +08:00
break;
}
2021-05-11 15:48:01 +08:00
default: break;
2021-05-11 00:54:33 +08:00
}
2021-04-22 17:34:26 +08:00
break;
}
2021-04-03 00:04:52 +08:00
default: break;
}
}
}
2021-05-11 15:48:01 +08:00
///////////////////////////////////////////////////////////////////
void WebRtcTransportImp::createRtpChannel(const string &rid, uint32_t ssrc, MediaTrack &track) {
2021-06-25 14:37:11 +08:00
//rid --> RtpReceiverImp
auto &ref = track.rtp_channel[rid];
weak_ptr<WebRtcTransportImp> weak_self = dynamic_pointer_cast<WebRtcTransportImp>(shared_from_this());
ref = std::make_shared<RtpChannel>(getPoller(), [&track, this, rid](RtpPacket::Ptr rtp) mutable {
onSortedRtp(track, rid, std::move(rtp));
}, [&track, weak_self, ssrc](const FCI_NACK &nack) mutable {
//nack发送可能由定时器异步触发
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->onSendNack(track, nack, ssrc);
}
});
InfoL << "create rtp receiver of ssrc:" << ssrc << ", rid:" << rid << ", codec:" << track.plan_rtp->codec;
}
void WebRtcTransportImp::updateTicker() {
_alive_ticker.resetTime();
}
void WebRtcTransportImp::onRtp(const char *buf, size_t len, uint64_t stamp_ms) {
_bytes_usage += len;
_alive_ticker.resetTime();
RtpHeader *rtp = (RtpHeader *) buf;
2021-04-03 08:45:18 +08:00
//根据接收到的rtp的pt信息找到该流的信息
auto it = _pt_to_track.find(rtp->pt);
if (it == _pt_to_track.end()) {
WarnL << "unknown rtp pt:" << (int)rtp->pt;
2021-04-03 08:45:18 +08:00
return;
}
2021-11-18 18:41:23 +08:00
it->second->inputRtp(buf, len, stamp_ms, rtp);
}
void WrappedRtpTrack::inputRtp(const char *buf, size_t len, uint64_t stamp_ms, RtpHeader *rtp) {
#if 0
auto seq = ntohs(rtp->seq);
if (track->media->type == TrackVideo && seq % 100 == 0) {
//此处模拟接受丢包
return;
}
#endif
auto ssrc = ntohl(rtp->ssrc);
//修改ext id至统一
string rid;
2021-10-06 22:17:38 +08:00
auto twcc_ext = track->rtp_ext_ctx->changeRtpExtId(rtp, true, &rid, RtpExtType::transport_cc);
2021-11-18 18:41:23 +08:00
if (twcc_ext) {
_twcc_ctx.onRtp(ssrc, twcc_ext.getTransportCCSeq(), stamp_ms);
2021-10-06 22:17:38 +08:00
}
2021-06-25 17:17:48 +08:00
auto &ref = track->rtp_channel[rid];
if (!ref) {
2021-11-18 18:41:23 +08:00
_transport.createRtpChannel(rid, ssrc, *track);
}
2021-11-18 18:41:23 +08:00
//解析并排序rtp
ref->inputRtp(track->media->type, track->plan_rtp->sample_rate, (uint8_t *) buf, len, false);
}
void WrappedRtxTrack::inputRtp(const char *buf, size_t len, uint64_t stamp_ms, RtpHeader *rtp) {
//修改ext id至统一
string rid;
track->rtp_ext_ctx->changeRtpExtId(rtp, true, &rid, RtpExtType::transport_cc);
auto &ref = track->rtp_channel[rid];
if (!ref) {
//再接收到对应的rtp前丢弃rtx包
WarnL << "unknown rtx rtp, rid:" << rid << ", ssrc:" << ntohl(rtp->ssrc) << ", codec:" << track->plan_rtp->codec << ", seq:" << ntohs(rtp->seq);
2021-05-11 15:48:01 +08:00
return;
2021-05-11 11:18:55 +08:00
}
2021-05-11 15:48:01 +08:00
//这里是rtx重传包
2021-11-18 18:41:23 +08:00
// https://datatracker.ietf.org/doc/html/rfc4588#section-4
2021-05-11 15:48:01 +08:00
auto payload = rtp->getPayloadData();
auto size = rtp->getPayloadSize(len);
if (size < 2) {
return;
}
2021-05-11 15:48:01 +08:00
//前两个字节是原始的rtp的seq
auto origin_seq = payload[0] << 8 | payload[1];
2021-11-18 18:41:23 +08:00
// rtx 转换为 rtp
2021-06-25 17:17:48 +08:00
rtp->pt = track->plan_rtp->pt;
2021-06-25 14:37:11 +08:00
rtp->seq = htons(origin_seq);
2021-06-25 17:10:27 +08:00
rtp->ssrc = htonl(ref->getSSRC());
2021-06-25 14:37:11 +08:00
2021-05-11 15:48:01 +08:00
memmove((uint8_t *) buf + 2, buf, payload - (uint8_t *) buf);
buf += 2;
len -= 2;
2021-06-25 17:17:48 +08:00
ref->inputRtp(track->media->type, track->plan_rtp->sample_rate, (uint8_t *) buf, len, true);
2021-04-03 08:45:18 +08:00
}
2021-06-25 17:17:48 +08:00
void WebRtcTransportImp::onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc) {
2021-05-11 12:12:28 +08:00
auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize);
2022-02-26 22:33:00 +08:00
rtcp->ssrc = htonl(track.answer_ssrc_rtp);
rtcp->ssrc_media = htonl(ssrc);
2021-05-11 12:12:28 +08:00
sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true);
}
2021-10-07 14:22:12 +08:00
void WebRtcTransportImp::onSendTwcc(uint32_t ssrc, const string &twcc_fci) {
auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_TWCC, twcc_fci.data(), twcc_fci.size());
2022-02-26 22:33:00 +08:00
rtcp->ssrc = htonl(0);
2021-10-07 14:22:12 +08:00
rtcp->ssrc_media = htonl(ssrc);
sendRtcpPacket((char *) rtcp.get(), rtcp->getSize(), true);
}
2021-04-03 08:45:18 +08:00
///////////////////////////////////////////////////////////////////
2021-04-03 08:32:20 +08:00
2021-06-25 17:17:48 +08:00
void WebRtcTransportImp::onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) {
if (track.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) {
2021-04-07 21:02:28 +08:00
//定期发送pli请求关键帧方便非rtc等协议
2021-04-02 23:12:37 +08:00
_pli_ticker.resetTime();
2021-05-08 10:49:09 +08:00
sendRtcpPli(rtp->getSSRC());
2021-04-30 14:49:06 +08:00
//开启remb则发送remb包调节比特率
GET_CONFIG(size_t, remb_bit_rate, RTC::kRembBitRate);
if (remb_bit_rate && _answer_sdp->supportRtcpFb(SdpConst::kRembRtcpFb)) {
2021-05-08 10:49:09 +08:00
sendRtcpRemb(rtp->getSSRC(), remb_bit_rate);
2021-04-30 14:49:06 +08:00
}
2021-04-02 23:12:37 +08:00
}
2021-05-07 18:34:04 +08:00
onRecvRtp(track, rid, std::move(rtp));
2021-05-08 10:49:09 +08:00
}
2021-05-11 15:48:01 +08:00
///////////////////////////////////////////////////////////////////
2021-04-03 08:32:20 +08:00
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx) {
2021-06-25 17:17:48 +08:00
auto &track = _type_to_track[rtp->type];
if (!track) {
2021-04-03 08:32:20 +08:00
//忽略,对方不支持该编码类型
return;
}
2021-05-11 00:54:33 +08:00
if (!rtx) {
//统计rtp发送情况好做sr汇报
track->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
2021-06-25 17:17:48 +08:00
track->nack_list.push_back(rtp);
2021-05-12 20:53:37 +08:00
#if 0
2021-05-11 11:18:55 +08:00
//此处模拟发送丢包
2021-05-16 16:12:10 +08:00
if (rtp->type == TrackVideo && rtp->getSeq() % 100 == 0) {
2021-05-11 11:18:55 +08:00
return;
}
#endif
2021-05-11 00:54:33 +08:00
} else {
2021-11-27 22:37:15 +08:00
//发送rtx重传包
TraceL << "send rtx rtp:" << rtp->getSeq();
2021-05-11 00:54:33 +08:00
}
2021-06-25 17:17:48 +08:00
pair<bool/*rtx*/, MediaTrack *> ctx{rtx, track.get()};
2021-05-12 20:51:51 +08:00
sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx);
2021-04-07 18:35:38 +08:00
_bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize;
2021-04-03 08:32:20 +08:00
}
2021-04-07 17:21:59 +08:00
void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, int &len, void *ctx) {
2021-06-25 14:37:11 +08:00
auto pr = (pair<bool/*rtx*/, MediaTrack *> *) ctx;
2021-05-12 20:51:51 +08:00
auto header = (RtpHeader *) buf;
2021-05-12 20:51:51 +08:00
if (!pr->first || !pr->second->plan_rtx) {
//普通的rtp,或者不支持rtx, 修改目标pt和ssrc
2021-06-25 15:31:13 +08:00
pr->second->rtp_ext_ctx->changeRtpExtId(header, false);
2021-05-12 20:51:51 +08:00
header->pt = pr->second->plan_rtp->pt;
2021-05-16 16:12:10 +08:00
header->ssrc = htonl(pr->second->answer_ssrc_rtp);
2021-05-12 20:51:51 +08:00
} else {
//重传的rtp, rtx
2021-06-25 15:31:13 +08:00
pr->second->rtp_ext_ctx->changeRtpExtId(header, false);
2021-05-12 20:51:51 +08:00
header->pt = pr->second->plan_rtx->pt;
2021-05-16 16:12:10 +08:00
if (pr->second->answer_ssrc_rtx) {
//有rtx单独的ssrc,有些情况下浏览器支持rtx但是未指定rtx单独的ssrc
header->ssrc = htonl(pr->second->answer_ssrc_rtx);
2021-05-16 18:06:34 +08:00
} else {
//未单独指定rtx的ssrc那么使用rtp的ssrc
header->ssrc = htonl(pr->second->answer_ssrc_rtp);
2021-05-12 20:51:51 +08:00
}
2021-05-11 00:54:33 +08:00
2021-05-12 20:51:51 +08:00
auto origin_seq = ntohs(header->seq);
//seq跟原来的不一样
2021-12-30 15:28:02 +08:00
header->seq = htons(_rtx_seq[pr->second->media->type]);
++_rtx_seq[pr->second->media->type];
2021-05-12 20:51:51 +08:00
auto payload = header->getPayloadData();
auto payload_size = header->getPayloadSize(len);
if (payload_size) {
//rtp负载后移两个字节这两个字节用于存放osn
//https://datatracker.ietf.org/doc/html/rfc4588#section-4
memmove(payload + 2, payload, payload_size);
}
payload[0] = origin_seq >> 8;
payload[1] = origin_seq & 0xFF;
len += 2;
}
2021-05-11 00:54:33 +08:00
}
2021-04-07 17:21:59 +08:00
void WebRtcTransportImp::onShutdown(const SockException &ex){
WarnL << ex.what();
2021-09-10 22:31:44 +08:00
unrefSelf();
for (auto &pr : _history_sessions) {
auto session = pr.second.lock();
if (session) {
session->shutdown(ex);
}
2021-09-10 22:31:44 +08:00
}
2021-04-07 17:21:59 +08:00
}
void WebRtcTransportImp::setSession(Session::Ptr session) {
_history_sessions.emplace(session.get(), session);
_selected_session = std::move(session);
unrefSelf();
}
const Session::Ptr &WebRtcTransportImp::getSession() const {
return _selected_session;
}
uint64_t WebRtcTransportImp::getBytesUsage() const{
return _bytes_usage;
}
uint64_t WebRtcTransportImp::getDuration() const{
return _alive_ticker.createdTime() / 1000;
2021-09-10 22:31:44 +08:00
}
/////////////////////////////////////////////////////////////////////////////////////////////
2021-09-10 22:31:44 +08:00
void WebRtcTransportImp::registerSelf() {
_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
2021-10-16 10:25:23 +08:00
WebRtcTransportManager::Instance().addItem(getIdentifier(), _self);
}
2021-09-10 22:31:44 +08:00
void WebRtcTransportImp::unrefSelf() {
_self = nullptr;
}
void WebRtcTransportImp::unregisterSelf() {
unrefSelf();
2021-10-16 10:25:23 +08:00
WebRtcTransportManager::Instance().removeItem(getIdentifier());
2021-09-10 22:31:44 +08:00
}
2021-10-16 10:25:23 +08:00
WebRtcTransportManager &WebRtcTransportManager::Instance() {
static WebRtcTransportManager s_instance;
return s_instance;
}
2021-10-16 10:25:23 +08:00
2021-10-19 15:23:12 +08:00
void WebRtcTransportManager::addItem(const string &key, const WebRtcTransportImp::Ptr &ptr) {
lock_guard<mutex> lck(_mtx);
_map[key] = ptr;
}
2021-10-16 10:25:23 +08:00
WebRtcTransportImp::Ptr WebRtcTransportManager::getItem(const string &key) {
if (key.empty()) {
return nullptr;
2021-09-10 22:31:44 +08:00
}
lock_guard<mutex> lck(_mtx);
auto it = _map.find(key);
if (it == _map.end()) {
return nullptr;
}
return it->second.lock();
}
2021-10-16 10:25:23 +08:00
2021-10-19 15:23:12 +08:00
void WebRtcTransportManager::removeItem(const string &key) {
lock_guard<mutex> lck(_mtx);
_map.erase(key);
}
2021-10-19 15:23:12 +08:00
//////////////////////////////////////////////////////////////////////////////////////////////
WebRtcPluginManager &WebRtcPluginManager::Instance() {
static WebRtcPluginManager s_instance;
return s_instance;
}
void WebRtcPluginManager::registerPlugin(const string &type, Plugin cb) {
lock_guard<mutex> lck(_mtx_creator);
_map_creator[type] = std::move(cb);
}
void WebRtcPluginManager::getAnswerSdp(Session &sender, const string &type, const string &offer, const WebRtcArgs &args,
const onCreateRtc &cb) {
lock_guard<mutex> lck(_mtx_creator);
auto it = _map_creator.find(type);
if (it == _map_creator.end()) {
cb(WebRtcException(SockException(Err_other, "the type can not supported")));
return;
}
it->second(sender, offer, args, cb);
}
#include "WebRtcPlayer.h"
#include "WebRtcPusher.h"
#include "WebRtcEchoTest.h"
void echo_plugin(Session &sender, const string &offer, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
cb(*WebRtcEchoTest::create(EventPollerPool::Instance().getPoller()));
}
void push_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
MediaInfo info(args["url"]);
Broadcast::PublishAuthInvoker invoker = [cb, offer_sdp, info](const string &err, bool enable_hls, bool enable_mp4) mutable {
if (!err.empty()) {
cb(WebRtcException(SockException(Err_other, err)));
return;
}
2022-01-10 17:43:28 +08:00
RtspMediaSourceImp::Ptr push_src;
std::shared_ptr<void> push_src_ownership;
auto src = MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid);
auto push_failed = (bool)src;
while (src) {
//尝试断连后继续推流
auto rtsp_src = dynamic_pointer_cast<RtspMediaSourceImp>(src);
if (!rtsp_src) {
//源不是rtsp推流产生的
break;
}
auto ownership = rtsp_src->getOwnership();
if (!ownership) {
//获取推流源所有权失败
break;
}
push_src = std::move(rtsp_src);
push_src_ownership = std::move(ownership);
push_failed = false;
break;
}
if (push_failed) {
2021-10-19 15:23:12 +08:00
cb(WebRtcException(SockException(Err_other, "already publishing")));
return;
}
2022-01-10 17:43:28 +08:00
if (!push_src) {
push_src = std::make_shared<RtspMediaSourceImp>(info._vhost, info._app, info._streamid);
push_src_ownership = push_src->getOwnership();
push_src->setProtocolTranslation(enable_hls, enable_mp4);
}
auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info);
2021-10-19 15:23:12 +08:00
push_src->setListener(rtc);
cb(*rtc);
};
//rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, info, invoker, static_cast<SockInfo &>(sender));
2021-10-19 15:23:12 +08:00
if (!flag) {
//该事件无人监听,默认不鉴权
GET_CONFIG(bool, to_hls, General::kPublishToHls);
GET_CONFIG(bool, to_mp4, General::kPublishToMP4);
invoker("", to_hls, to_mp4);
}
}
void play_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
MediaInfo info(args["url"]);
auto session_ptr = sender.shared_from_this();
Broadcast::AuthInvoker invoker = [cb, offer_sdp, info, session_ptr](const string &err) mutable {
if (!err.empty()) {
cb(WebRtcException(SockException(Err_other, err)));
return;
}
//webrtc播放的是rtsp的源
info._schema = RTSP_SCHEMA;
MediaSource::findAsync(info, session_ptr, [=](const MediaSource::Ptr &src_in) mutable {
auto src = dynamic_pointer_cast<RtspMediaSource>(src_in);
if (!src) {
cb(WebRtcException(SockException(Err_other, "stream not found")));
return;
}
//还原成rtc目的是为了hook时识别哪种播放协议
info._schema = RTC_SCHEMA;
auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info);
cb(*rtc);
});
};
//广播通用播放url鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, info, invoker, static_cast<SockInfo &>(sender));
2021-10-19 15:23:12 +08:00
if (!flag) {
//该事件无人监听,默认不鉴权
invoker("");
}
}
static onceToken s_rtc_auto_register([](){
WebRtcPluginManager::Instance().registerPlugin("echo", echo_plugin);
WebRtcPluginManager::Instance().registerPlugin("push", push_plugin);
WebRtcPluginManager::Instance().registerPlugin("play", play_plugin);
});