基本完成webrtc单端口改造

This commit is contained in:
xiongziliang 2021-09-10 22:31:44 +08:00
parent 7ba44d1ac8
commit d2a0b1e3e6
7 changed files with 98 additions and 55 deletions

@ -1 +1 @@
Subproject commit b8e066222b757a2c11b5e44c49ef6982acb95fe2 Subproject commit eb89de6e349202c9b6c85d55544faec0cdc7d581

View File

@ -227,6 +227,10 @@ timeoutSec=15
timeoutSec=15 timeoutSec=15
#本机对rtc客户端的可见ip作为服务器时一般为公网ip置空时会自动获取网卡ip #本机对rtc客户端的可见ip作为服务器时一般为公网ip置空时会自动获取网卡ip
externIP= externIP=
#rtc udp服务器监听端口号所有rtc客户端将通过该端口传输stun/dtls/srtp/srtcp数据
#该端口是多线程的,同时支持客户端网络切换导致的连接迁移
#需要注意的是如果服务器在nat内需要做端口映射时必须确保外网映射端口跟该端口一致
port=8000
#设置remb比特率非0时关闭twcc并开启remb。该设置在rtc推流时有效可以控制推流画质 #设置remb比特率非0时关闭twcc并开启remb。该设置在rtc推流时有效可以控制推流画质
rembBitRate=1000000 rembBitRate=1000000

View File

@ -165,7 +165,7 @@ namespace RTC
EC_KEY* ecKey{ nullptr }; EC_KEY* ecKey{ nullptr };
X509_NAME* certName{ nullptr }; X509_NAME* certName{ nullptr };
std::string subject = std::string subject =
std::string("mediasoup") + std::to_string(rand() % 999999 + 100000); std::string("mediasoup") + to_string(rand() % 999999 + 100000);
// Create key with curve. // Create key with curve.
ecKey = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1); ecKey = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1);

View File

@ -45,7 +45,7 @@ EventPoller::Ptr WebRtcSession::getPoller(const Buffer::Ptr &buffer) {
if (user_name.empty()) { if (user_name.empty()) {
return nullptr; return nullptr;
} }
auto ret = WebRtcTransportImp::getTransport(user_name); auto ret = WebRtcTransportImp::getRtcTransport(user_name, false);
return ret ? ret->getPoller() : nullptr; return ret ? ret->getPoller() : nullptr;
} }
@ -60,24 +60,34 @@ void WebRtcSession::onRecv(const Buffer::Ptr &buffer) {
WarnL << user_name; WarnL << user_name;
return; return;
} }
_transport = WebRtcTransportImp::getTransport(user_name); _transport = WebRtcTransportImp::getRtcTransport(user_name, true);
if (!_transport) { if (!_transport) {
//逻辑分支不太可能走到这里 //逻辑分支不太可能走到这里
WarnL << user_name; WarnL << user_name;
return; return;
} }
_transport->setSession(this); _transport->setSession(shared_from_this());
} }
_ticker.resetTime();
_transport->inputSockData(buf, len, &_peer_addr); _transport->inputSockData(buf, len, &_peer_addr);
} }
void WebRtcSession::onError(const SockException &err) { void WebRtcSession::onError(const SockException &err) {
if (_transport) { //udp链接超时但是rtc链接不一定超时因为可能存在udp链接迁移的情况
_transport->unrefSelf(err); //在udp链接迁移时新的WebRtcSession对象将接管WebRtcTransport对象的生命周期
//本WebRtcSession对象将在超时后自动销毁
WarnP(this) << err.what();
_transport = nullptr; _transport = nullptr;
}
} }
void WebRtcSession::onManager() { void WebRtcSession::onManager() {
GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec);
if (!_transport && _ticker.createdTime() > timeoutSec * 1000) {
shutdown(SockException(Err_timeout, "illegal webrtc connection"));
return;
}
if (_ticker.elapsedTime() > timeoutSec * 1000) {
shutdown(SockException(Err_timeout, "webrtc connection timeout"));
return;
}
} }

View File

@ -30,6 +30,7 @@ public:
void onManager() override; void onManager() override;
private: private:
Ticker _ticker;
struct sockaddr _peer_addr; struct sockaddr _peer_addr;
std::shared_ptr<WebRtcTransportImp> _transport; std::shared_ptr<WebRtcTransportImp> _transport;
}; };

View File

@ -50,45 +50,21 @@ void WebRtcTransport::onCreate(){
_key = to_string(reinterpret_cast<uint64_t>(this)); _key = to_string(reinterpret_cast<uint64_t>(this));
_dtls_transport = std::make_shared<RTC::DtlsTransport>(_poller, this); _dtls_transport = std::make_shared<RTC::DtlsTransport>(_poller, this);
_ice_server = std::make_shared<RTC::IceServer>(this, _key, makeRandStr(24)); _ice_server = std::make_shared<RTC::IceServer>(this, _key, makeRandStr(24));
refSelf();
} }
void WebRtcTransport::onDestory(){ void WebRtcTransport::onDestory(){
_dtls_transport = nullptr; _dtls_transport = nullptr;
_ice_server = nullptr; _ice_server = nullptr;
unrefSelf(SockException());
}
static mutex s_rtc_mtx;
static unordered_map<string, weak_ptr<WebRtcTransportImp> > s_rtc_map;
void WebRtcTransport::refSelf() {
_self = shared_from_this();
lock_guard<mutex> lck(s_rtc_mtx);
s_rtc_map[_key] = static_pointer_cast<WebRtcTransportImp>(_self);
}
void WebRtcTransport::unrefSelf(const SockException &ex) {
_self = nullptr;
lock_guard<mutex> lck(s_rtc_mtx);
s_rtc_map.erase(_key);
}
WebRtcTransportImp::Ptr WebRtcTransportImp::getTransport(const string &key){
lock_guard<mutex> lck(s_rtc_mtx);
auto it = s_rtc_map.find(key);
if (it == s_rtc_map.end()) {
return nullptr;
}
return it->second.lock();
} }
const EventPoller::Ptr& WebRtcTransport::getPoller() const{ const EventPoller::Ptr& WebRtcTransport::getPoller() const{
return _poller; return _poller;
} }
const string &WebRtcTransport::getKey() const {
return _key;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) { void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) {
@ -330,6 +306,8 @@ WebRtcTransportImp::Ptr WebRtcTransportImp::create(const EventPoller::Ptr &polle
void WebRtcTransportImp::onCreate(){ void WebRtcTransportImp::onCreate(){
WebRtcTransport::onCreate(); WebRtcTransport::onCreate();
registerSelf();
weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this()); weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec); GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec);
_timer = std::make_shared<Timer>(timeoutSec / 2, [weak_self]() { _timer = std::make_shared<Timer>(timeoutSec / 2, [weak_self]() {
@ -354,8 +332,14 @@ WebRtcTransportImp::~WebRtcTransportImp() {
void WebRtcTransportImp::onDestory() { void WebRtcTransportImp::onDestory() {
WebRtcTransport::onDestory(); WebRtcTransport::onDestory();
uint64_t duration = _alive_ticker.createdTime() / 1000; unregisterSelf();
auto session = _session.lock();
if (!session) {
return;
}
uint64_t duration = _alive_ticker.createdTime() / 1000;
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
@ -366,7 +350,7 @@ void WebRtcTransportImp::onDestory() {
<< _media_info._streamid << _media_info._streamid
<< ")结束播放,耗时(s):" << duration; << ")结束播放,耗时(s):" << duration;
if (_bytes_usage >= iFlowThreshold * 1024) { if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, *static_cast<SockInfo *>(_session)); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, static_cast<SockInfo &>(*session));
} }
} }
@ -377,7 +361,7 @@ void WebRtcTransportImp::onDestory() {
<< _media_info._streamid << _media_info._streamid
<< ")结束推流,耗时(s):" << duration; << ")结束推流,耗时(s):" << duration;
if (_bytes_usage >= iFlowThreshold * 1024) { if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, *static_cast<SockInfo *>(_session)); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, static_cast<SockInfo &>(*session));
} }
} }
} }
@ -393,9 +377,14 @@ void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, const MediaInfo
} }
void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) { void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) {
auto session = _session.lock();
if (!session) {
WarnL << "send data failed:" << len;
return;
}
auto ptr = BufferRaw::create(); auto ptr = BufferRaw::create();
ptr->assign(buf, len); ptr->assign(buf, len);
_session->send(std::move(ptr)); session->send(std::move(ptr));
} }
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////
@ -966,8 +955,11 @@ void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, int &len, void *ctx
void WebRtcTransportImp::onShutdown(const SockException &ex){ void WebRtcTransportImp::onShutdown(const SockException &ex){
WarnL << ex.what(); WarnL << ex.what();
unrefSelf(ex); unrefSelf();
_session->shutdown(ex); auto session = _session.lock();
if (session) {
session->shutdown(ex);
}
} }
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
@ -999,9 +991,43 @@ string WebRtcTransportImp::getOriginUrl(MediaSource &sender) const {
} }
std::shared_ptr<SockInfo> WebRtcTransportImp::getOriginSock(MediaSource &sender) const { std::shared_ptr<SockInfo> WebRtcTransportImp::getOriginSock(MediaSource &sender) const {
return static_pointer_cast<SockInfo>(const_cast<Session *>(_session)->shared_from_this()); return static_pointer_cast<SockInfo>(_session.lock());
} }
void WebRtcTransportImp::setSession(Session *session) { void WebRtcTransportImp::setSession(weak_ptr<Session> session) {
_session = session; _session = std::move(session);
}
static mutex s_rtc_mtx;
static unordered_map<string, weak_ptr<WebRtcTransportImp> > s_rtc_map;
void WebRtcTransportImp::registerSelf() {
_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
lock_guard<mutex> lck(s_rtc_mtx);
s_rtc_map[getKey()] = static_pointer_cast<WebRtcTransportImp>(_self);
}
void WebRtcTransportImp::unrefSelf() {
_self = nullptr;
}
void WebRtcTransportImp::unregisterSelf() {
unrefSelf();
lock_guard<mutex> lck(s_rtc_mtx);
s_rtc_map.erase(getKey());
}
WebRtcTransportImp::Ptr WebRtcTransportImp::getRtcTransport(const string &key, bool unref_self){
lock_guard<mutex> lck(s_rtc_mtx);
auto it = s_rtc_map.find(key);
if (it == s_rtc_map.end()) {
return nullptr;
}
auto ret = it->second.lock();
if (unref_self) {
//此对象不再强引用自己因为自己将被WebRtcSession对象持有
ret->unrefSelf();
}
return ret;
} }

View File

@ -31,6 +31,7 @@ using namespace mediakit;
//RTC配置项目 //RTC配置项目
namespace RTC { namespace RTC {
extern const string kPort; extern const string kPort;
extern const string kTimeOutSec;
}//namespace RTC }//namespace RTC
class WebRtcTransport : public RTC::DtlsTransport::Listener, public RTC::IceServer::Listener, public std::enable_shared_from_this<WebRtcTransport> { class WebRtcTransport : public RTC::DtlsTransport::Listener, public RTC::IceServer::Listener, public std::enable_shared_from_this<WebRtcTransport> {
@ -39,8 +40,6 @@ public:
WebRtcTransport(const EventPoller::Ptr &poller); WebRtcTransport(const EventPoller::Ptr &poller);
~WebRtcTransport() override = default; ~WebRtcTransport() override = default;
void unrefSelf(const SockException &ex);
/** /**
* *
*/ */
@ -77,6 +76,7 @@ public:
void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr);
const EventPoller::Ptr& getPoller() const; const EventPoller::Ptr& getPoller() const;
const string& getKey() const;
protected: protected:
//// dtls相关的回调 //// //// dtls相关的回调 ////
@ -123,7 +123,6 @@ protected:
private: private:
void onSendSockData(const char *buf, size_t len, bool flush = true); void onSendSockData(const char *buf, size_t len, bool flush = true);
void setRemoteDtlsFingerprint(const RtcSession &remote); void setRemoteDtlsFingerprint(const RtcSession &remote);
void refSelf();
private: private:
uint8_t _srtp_buf[2000]; uint8_t _srtp_buf[2000];
@ -135,8 +134,6 @@ private:
std::shared_ptr<RTC::SrtpSession> _srtp_session_recv; std::shared_ptr<RTC::SrtpSession> _srtp_session_recv;
RtcSession::Ptr _offer_sdp; RtcSession::Ptr _offer_sdp;
RtcSession::Ptr _answer_sdp; RtcSession::Ptr _answer_sdp;
//保持自我强引用
WebRtcTransport::Ptr _self;
}; };
class RtpChannel; class RtpChannel;
@ -172,9 +169,9 @@ public:
* @return * @return
*/ */
static Ptr create(const EventPoller::Ptr &poller); static Ptr create(const EventPoller::Ptr &poller);
static Ptr getTransport(const string &key); static Ptr getRtcTransport(const string &key, bool unref_self);
void setSession(Session *session); void setSession(weak_ptr<Session> session);
/** /**
* rtsp媒体源 * rtsp媒体源
@ -220,12 +217,17 @@ private:
void onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp); void onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp);
void onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc); void onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc);
void createRtpChannel(const string &rid, uint32_t ssrc, MediaTrack &track); void createRtpChannel(const string &rid, uint32_t ssrc, MediaTrack &track);
void registerSelf();
void unregisterSelf();
void unrefSelf();
private: private:
bool _simulcast = false; bool _simulcast = false;
uint16_t _rtx_seq[2] = {0, 0}; uint16_t _rtx_seq[2] = {0, 0};
//用掉的总流量 //用掉的总流量
uint64_t _bytes_usage = 0; uint64_t _bytes_usage = 0;
//保持自我强引用
Ptr _self;
//媒体相关元数据 //媒体相关元数据
MediaInfo _media_info; MediaInfo _media_info;
//检测超时的定时器 //检测超时的定时器
@ -235,7 +237,7 @@ private:
//pli rtcp计时器 //pli rtcp计时器
Ticker _pli_ticker; Ticker _pli_ticker;
//udp session //udp session
Session *_session; weak_ptr<Session> _session;
//推流的rtsp源 //推流的rtsp源
RtspMediaSource::Ptr _push_src; RtspMediaSource::Ptr _push_src;
unordered_map<string/*rid*/, RtspMediaSource::Ptr> _push_src_simulcast; unordered_map<string/*rid*/, RtspMediaSource::Ptr> _push_src_simulcast;