diff --git a/server/WebApi.cpp b/server/WebApi.cpp index d6379733..eb78f713 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -38,7 +38,8 @@ #include "Rtp/RtpServer.h" #endif #ifdef ENABLE_WEBRTC -#include "../webrtc/WebRtcTransport.h" +#include "../webrtc/WebRtcPlayer.h" +#include "../webrtc/WebRtcPusher.h" #endif using namespace toolkit; @@ -1219,8 +1220,7 @@ void installWebApi() { } //还原成rtc,目的是为了hook时识别哪种播放协议 info._schema = "rtc"; - auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); - rtc->attach(src, info, true); + auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info); val["sdp"] = rtc->getAnswerSdp(offer_sdp); val["type"] = "answer"; invoker(200, headerOut, val.toStyledString()); @@ -1248,9 +1248,8 @@ void installWebApi() { } auto push_src = std::make_shared(info._vhost, info._app, info._streamid); push_src->setProtocolTranslation(enableHls, enableMP4); - auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); + auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, info); push_src->setListener(rtc); - rtc->attach(push_src, info, false); val["sdp"] = rtc->getAnswerSdp(offer_sdp); val["type"] = "answer"; invoker(200, headerOut, val.toStyledString()); diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp new file mode 100644 index 00000000..85dcf869 --- /dev/null +++ b/webrtc/WebRtcPlayer.cpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "WebRtcPlayer.h" + +WebRtcPlayer::Ptr WebRtcPlayer::create(const EventPoller::Ptr &poller, + const RtspMediaSource::Ptr &src, + const MediaInfo &info) { + WebRtcPlayer::Ptr ret(new WebRtcPlayer(poller, src, info), [](WebRtcPlayer *ptr) { + ptr->onDestory(); + delete ptr; + }); + ret->onCreate(); + return ret; +} + +WebRtcPlayer::WebRtcPlayer(const EventPoller::Ptr &poller, + const RtspMediaSource::Ptr &src, + const MediaInfo &info) : WebRtcTransportImp(poller) { + InfoL << this; + _media_info = info; + _play_src = src; + CHECK(_play_src); +} + +void WebRtcPlayer::onStartWebRTC() { + CHECK(_play_src); + WebRtcTransportImp::onStartWebRTC(); + if (canSendRtp()) { + _play_src->pause(false); + _reader = _play_src->getRing()->attach(getPoller(), true); + weak_ptr weak_self = static_pointer_cast(shared_from_this()); + _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { + auto strongSelf = weak_self.lock(); + if (!strongSelf) { + return; + } + size_t i = 0; + pkt->for_each([&](const RtpPacket::Ptr &rtp) { + strongSelf->beforeSendRtp(rtp, ++i == pkt->size()); + }); + }); + _reader->setDetachCB([weak_self]() { + auto strongSelf = weak_self.lock(); + if (!strongSelf) { + return; + } + strongSelf->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); + }); + + //确保该rtp codec类型对方支持 + memset(_can_send_rtp, 0, sizeof(_can_send_rtp)); + for (auto &m : _answer_sdp->media) { + _can_send_rtp[m.type] = m.direction == RtpDirection::sendonly || m.direction == RtpDirection::sendrecv; + } + } + //使用完毕后,释放强引用,这样确保推流器断开后能及时注销媒体 + _play_src = nullptr; +} + +void WebRtcPlayer::onDestory() { + WebRtcTransportImp::onDestory(); + + auto duration = getDuration(); + auto bytes_usage = getBytesUsage(); + //流量统计事件广播 + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); + if (_reader && getSession()) { + WarnL << "RTC播放器(" + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid + << ")结束播放,耗时(s):" << duration; + if (bytes_usage >= iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, + true, static_cast(*getSession())); + } + } +} + +void WebRtcPlayer::onRtcConfigure(RtcConfigure &configure) const { + CHECK(_play_src); + WebRtcTransportImp::onRtcConfigure(configure); + //这是播放 + configure.audio.direction = configure.video.direction = RtpDirection::sendonly; + configure.setPlayRtspInfo(_play_src->getSdp()); +} + +void WebRtcPlayer::beforeSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx) { + if (!_can_send_rtp[rtp->type]) { + return; + } + onSendRtp(rtp, flush, rtx); +} diff --git a/webrtc/WebRtcPlayer.h b/webrtc/WebRtcPlayer.h new file mode 100644 index 00000000..4ca5eb75 --- /dev/null +++ b/webrtc/WebRtcPlayer.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_WEBRTCPLAYER_H +#define ZLMEDIAKIT_WEBRTCPLAYER_H + +#include "WebRtcTransport.h" + +class WebRtcPlayer : public WebRtcTransportImp { +public: + using Ptr = std::shared_ptr; + ~WebRtcPlayer() override = default; + static Ptr create(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, const MediaInfo &info); + +protected: + ///////WebRtcTransportImp override/////// + void onStartWebRTC() override; + void onDestory() override; + void onRtcConfigure(RtcConfigure &configure) const override; + void onRecvRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) {}; + +private: + WebRtcPlayer(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, const MediaInfo &info); + void beforeSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx = false); + +private: + bool _can_send_rtp[TrackMax]; + //媒体相关元数据 + MediaInfo _media_info; + //播放的rtsp源 + RtspMediaSource::Ptr _play_src; + //播放rtsp源的reader对象 + RtspMediaSource::RingType::RingReader::Ptr _reader; +}; + + +#endif //ZLMEDIAKIT_WEBRTCPLAYER_H diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp new file mode 100644 index 00000000..b0d5e6d5 --- /dev/null +++ b/webrtc/WebRtcPusher.cpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "WebRtcPusher.h" + +WebRtcPusher::Ptr WebRtcPusher::create(const EventPoller::Ptr &poller, + const RtspMediaSource::Ptr &src, + const MediaInfo &info) { + WebRtcPusher::Ptr ret(new WebRtcPusher(poller, src, info), [](WebRtcPusher *ptr) { + ptr->onDestory(); + delete ptr; + }); + ret->onCreate(); + return ret; +} + +WebRtcPusher::WebRtcPusher(const EventPoller::Ptr &poller, + const RtspMediaSource::Ptr &src, + const MediaInfo &info) : WebRtcTransportImp(poller) { + InfoL << this; + _media_info = info; + _push_src = src; + CHECK(_push_src); +} + +bool WebRtcPusher::close(MediaSource &sender, bool force) { + //此回调在其他线程触发 + if (!force && totalReaderCount(sender)) { + return false; + } + string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" + << sender.getApp() << "/" << sender.getId() << " " << force; + weak_ptr weak_self = static_pointer_cast(shared_from_this()); + getPoller()->async([weak_self, err]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->onShutdown(SockException(Err_shutdown, err)); + } + }); + return true; +} + +int WebRtcPusher::totalReaderCount(MediaSource &sender) { + auto total_count = 0; + for (auto &src : _push_src_simulcast) { + total_count += src.second->totalReaderCount(); + } + return total_count + _push_src->totalReaderCount(); +} + +MediaOriginType WebRtcPusher::getOriginType(MediaSource &sender) const { + return MediaOriginType::rtc_push; +} + +string WebRtcPusher::getOriginUrl(MediaSource &sender) const { + return _media_info._full_url; +} + +std::shared_ptr WebRtcPusher::getOriginSock(MediaSource &sender) const { + return static_pointer_cast(getSession()); +} + +void WebRtcPusher::onRecvRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) { + if (!_simulcast) { + assert(_push_src); + _push_src->onWrite(rtp, false); + return; + } + + if (rtp->type == TrackAudio) { + //音频 + for (auto &pr : _push_src_simulcast) { + pr.second->onWrite(rtp, false); + } + } else { + //视频 + auto &src = _push_src_simulcast[rid]; + if (!src) { + auto stream_id = rid.empty() ? _push_src->getId() : _push_src->getId() + "_" + rid; + auto src_imp = std::make_shared(_push_src->getVhost(), _push_src->getApp(), stream_id); + src_imp->setSdp(_push_src->getSdp()); + src_imp->setProtocolTranslation(_push_src->isRecording(Recorder::type_hls), + _push_src->isRecording(Recorder::type_mp4)); + src_imp->setListener(static_pointer_cast(shared_from_this())); + src = src_imp; + } + src->onWrite(std::move(rtp), false); + } +} + +void WebRtcPusher::onStartWebRTC() { + WebRtcTransportImp::onStartWebRTC(); + _simulcast = _answer_sdp->supportSimulcast(); + if (canRecvRtp()) { + _push_src->setSdp(_answer_sdp->toRtspSdp()); + } +} + +void WebRtcPusher::onDestory() { + WebRtcTransportImp::onDestory(); + + auto duration = getDuration(); + auto bytes_usage = getBytesUsage(); + //流量统计事件广播 + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); + + if (getSession()) { + WarnL << "RTC推流器(" + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid + << ")结束推流,耗时(s):" << duration; + if (bytes_usage >= iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, + false, static_cast(*getSession())); + } + } +} + +void WebRtcPusher::onRtcConfigure(RtcConfigure &configure) const { + WebRtcTransportImp::onRtcConfigure(configure); + //这只是推流 + configure.audio.direction = configure.video.direction = RtpDirection::recvonly; +} \ No newline at end of file diff --git a/webrtc/WebRtcPusher.h b/webrtc/WebRtcPusher.h new file mode 100644 index 00000000..30408d45 --- /dev/null +++ b/webrtc/WebRtcPusher.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_WEBRTCPUSHER_H +#define ZLMEDIAKIT_WEBRTCPUSHER_H + +#include "WebRtcTransport.h" + +class WebRtcPusher : public WebRtcTransportImp, public MediaSourceEvent { +public: + using Ptr = std::shared_ptr; + ~WebRtcPusher() override = default; + static Ptr create(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, const MediaInfo &info); + +protected: + ///////WebRtcTransportImp override/////// + void onStartWebRTC() override; + void onDestory() override; + void onRtcConfigure(RtcConfigure &configure) const override; + void onRecvRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) override; + +protected: + ///////MediaSourceEvent override/////// + // 关闭 + bool close(MediaSource &sender, bool force) override; + // 播放总人数 + int totalReaderCount(MediaSource &sender) override; + // 获取媒体源类型 + MediaOriginType getOriginType(MediaSource &sender) const override; + // 获取媒体源url或者文件路径 + string getOriginUrl(MediaSource &sender) const override; + // 获取媒体源客户端相关信息 + std::shared_ptr getOriginSock(MediaSource &sender) const override; + +private: + WebRtcPusher(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src, const MediaInfo &info); + +private: + bool _simulcast = false; + //媒体相关元数据 + MediaInfo _media_info; + //推流的rtsp源 + RtspMediaSource::Ptr _push_src; + //推流的rtsp源,支持simulcast + unordered_map _push_src_simulcast; +}; + +#endif //ZLMEDIAKIT_WEBRTCPUSHER_H diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 5aab8e10..fbbbe0b5 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -15,6 +15,7 @@ #include "Rtcp/RtcpFCI.h" #include "Rtsp/RtpReceiver.h" +#define RTP_SSRC_OFFSET 1 #define RTX_SSRC_OFFSET 2 #define RTP_CNAME "zlmediakit-rtp" #define RTP_LABEL "zlmediakit-label" @@ -207,16 +208,16 @@ std::string WebRtcTransport::getAnswerSdp(const string &offer){ } } -bool is_dtls(char *buf) { +static bool is_dtls(char *buf) { return ((*buf > 19) && (*buf < 64)); } -bool is_rtp(char *buf) { +static bool is_rtp(char *buf) { RtpHeader *header = (RtpHeader *) buf; return ((header->pt < 64) || (header->pt >= 96)); } -bool is_rtcp(char *buf) { +static bool is_rtcp(char *buf) { RtpHeader *header = (RtpHeader *) buf; return ((header->pt >= 64) && (header->pt < 96)); } @@ -285,14 +286,6 @@ void WebRtcTransport::sendRtcpPacket(const char *buf, int len, bool flush, void } /////////////////////////////////////////////////////////////////////////////////// -WebRtcTransportImp::Ptr WebRtcTransportImp::create(const EventPoller::Ptr &poller){ - WebRtcTransportImp::Ptr ret(new WebRtcTransportImp(poller), [](WebRtcTransportImp *ptr){ - ptr->onDestory(); - delete ptr; - }); - ret->onCreate(); - return ret; -} void WebRtcTransportImp::onCreate(){ WebRtcTransport::onCreate(); @@ -327,46 +320,6 @@ WebRtcTransportImp::~WebRtcTransportImp() { void WebRtcTransportImp::onDestory() { WebRtcTransport::onDestory(); unregisterSelf(); - - if (!_session) { - return; - } - - uint64_t duration = _alive_ticker.createdTime() / 1000; - //流量统计事件广播 - GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); - - if (_reader) { - WarnL << "RTC播放器(" - << _media_info._vhost << "/" - << _media_info._app << "/" - << _media_info._streamid - << ")结束播放,耗时(s):" << duration; - if (_bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, static_cast(*_session)); - } - } - - if (_push_src) { - WarnL << "RTC推流器(" - << _media_info._vhost << "/" - << _media_info._app << "/" - << _media_info._streamid - << ")结束推流,耗时(s):" << duration; - if (_bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, static_cast(*_session)); - } - } -} - -void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play) { - assert(src); - _media_info = info; - if (is_play) { - _play_src = src; - } else { - _push_src = src; - } } void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) { @@ -384,9 +337,6 @@ void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sock /////////////////////////////////////////////////////////////////// bool WebRtcTransportImp::canSendRtp() const{ - if (!_play_src) { - return false; - } for (auto &m : _answer_sdp->media) { if (m.direction == RtpDirection::sendrecv || m.direction == RtpDirection::sendonly) { return true; @@ -396,9 +346,6 @@ bool WebRtcTransportImp::canSendRtp() const{ } bool WebRtcTransportImp::canRecvRtp() const{ - if (!_push_src) { - return false; - } for (auto &m : _answer_sdp->media) { if (m.direction == RtpDirection::sendrecv || m.direction == RtpDirection::recvonly) { return true; @@ -422,6 +369,8 @@ void WebRtcTransportImp::onStartWebRTC() { track->plan_rtx = m_answer.getRelatedRtxPlan(track->plan_rtp->pt); track->rtcp_context_send = std::make_shared(); + //rtp track type --> MediaTrack + _type_to_track[m_answer.type] = track; //send ssrc --> MediaTrack _ssrc_to_track[track->answer_ssrc_rtp] = track; _ssrc_to_track[track->answer_ssrc_rtx] = track; @@ -460,50 +409,6 @@ void WebRtcTransportImp::onStartWebRTC() { } } } - - if (canRecvRtp()) { - _push_src->setSdp(_answer_sdp->toRtspSdp()); - _simulcast = _answer_sdp->supportSimulcast(); - } - if (canSendRtp()) { - RtcSession rtsp_send_sdp; - rtsp_send_sdp.loadFrom(_play_src->getSdp(), false); - for (auto &m : _answer_sdp->media) { - if (m.type == TrackApplication) { - continue; - } - auto rtsp_media = rtsp_send_sdp.getMedia(m.type); - if (rtsp_media && getCodecId(rtsp_media->plan[0].codec) == getCodecId(m.plan[0].codec)) { - auto it = _pt_to_track.find(m.plan[0].pt); - CHECK(it != _pt_to_track.end()); - //记录发送rtp时约定的信息,届时发送rtp时需要修改pt和ssrc - _type_to_track[m.type] = it->second.second; - } - } - - _play_src->pause(false); - _reader = _play_src->getRing()->attach(getPoller(), true); - weak_ptr weak_self = static_pointer_cast(shared_from_this()); - _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { - auto strongSelf = weak_self.lock(); - if (!strongSelf) { - return; - } - size_t i = 0; - pkt->for_each([&](const RtpPacket::Ptr &rtp) { - strongSelf->onSendRtp(rtp, ++i == pkt->size()); - }); - }); - _reader->setDetachCB([weak_self](){ - auto strongSelf = weak_self.lock(); - if (!strongSelf) { - return; - } - strongSelf->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); - }); - } - //使用完毕后,释放强引用,这样确保推流器断开后能及时注销媒体 - _play_src = nullptr; } void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) { @@ -533,7 +438,8 @@ void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) { //添加answer sdp的ssrc信息 m.rtp_rtx_ssrc.emplace_back(); auto &ssrc = m.rtp_rtx_ssrc.back(); - ssrc.ssrc = _play_src->getSsrc(m.type); + //发送的ssrc我们随便定义,因为在发送rtp时会修改为此值 + ssrc.ssrc = m.type + RTP_SSRC_OFFSET; ssrc.cname = RTP_CNAME; ssrc.label = RTP_LABEL; ssrc.mslabel = RTP_MSLABEL; @@ -557,20 +463,6 @@ void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) { void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const { WebRtcTransport::onRtcConfigure(configure); - - if (_play_src) { - //这是播放,同时也可能有推流 - configure.video.direction = _push_src ? RtpDirection::sendrecv : RtpDirection::sendonly; - configure.audio.direction = configure.video.direction; - configure.setPlayRtspInfo(_play_src->getSdp()); - } else if (_push_src) { - //这只是推流 - configure.video.direction = RtpDirection::recvonly; - configure.audio.direction = RtpDirection::recvonly; - } else { - throw std::invalid_argument("未设置播放或推流的媒体源"); - } - //添加接收端口candidate信息 configure.addCandidate(*getIceCandidate()); } @@ -872,35 +764,12 @@ void WebRtcTransportImp::onSortedRtp(MediaTrack &track, const string &rid, RtpPa } } - if (!_simulcast) { - assert(_push_src); - _push_src->onWrite(rtp, false); - return; - } - - if (rtp->type == TrackAudio) { - //音频 - for (auto &pr : _push_src_simulcast) { - pr.second->onWrite(rtp, false); - } - } else { - //视频 - auto &src = _push_src_simulcast[rid]; - if (!src) { - auto stream_id = rid.empty() ? _push_src->getId() : _push_src->getId() + "_" + rid; - auto src_imp = std::make_shared(_push_src->getVhost(), _push_src->getApp(), stream_id); - src_imp->setSdp(_push_src->getSdp()); - src_imp->setProtocolTranslation(_push_src->isRecording(Recorder::type_hls),_push_src->isRecording(Recorder::type_mp4)); - src_imp->setListener(static_pointer_cast(shared_from_this())); - src = src_imp; - } - src->onWrite(std::move(rtp), false); - } + onRecvRtp(track, rid, std::move(rtp)); } /////////////////////////////////////////////////////////////////// -void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx){ +void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx) { auto &track = _type_to_track[rtp->type]; if (!track) { //忽略,对方不支持该编码类型 @@ -969,48 +838,24 @@ void WebRtcTransportImp::onShutdown(const SockException &ex){ } } -///////////////////////////////////////////////////////////////////////////////////////////// - -bool WebRtcTransportImp::close(MediaSource &sender, bool force) { - //此回调在其他线程触发 - if (!force && totalReaderCount(sender)) { - return false; - } - string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; - weak_ptr weak_self = static_pointer_cast(shared_from_this()); - getPoller()->async([weak_self, err]() { - auto strong_self = weak_self.lock(); - if (strong_self) { - strong_self->onShutdown(SockException(Err_shutdown, err)); - } - }); - return true; -} - -int WebRtcTransportImp::totalReaderCount(MediaSource &sender) { - auto total_count = 0; - for (auto &src : _push_src_simulcast) { - total_count += src.second->totalReaderCount(); - } - return total_count + _push_src->totalReaderCount(); -} - -MediaOriginType WebRtcTransportImp::getOriginType(MediaSource &sender) const { - return MediaOriginType::rtc_push; -} - -string WebRtcTransportImp::getOriginUrl(MediaSource &sender) const { - return _media_info._full_url; -} - -std::shared_ptr WebRtcTransportImp::getOriginSock(MediaSource &sender) const { - return static_pointer_cast(_session); -} - void WebRtcTransportImp::setSession(Session::Ptr session) { _session = std::move(session); } +const Session::Ptr &WebRtcTransportImp::getSession() const { + return _session; +} + +uint64_t WebRtcTransportImp::getBytesUsage() const{ + return _bytes_usage; +} + +uint64_t WebRtcTransportImp::getDuration() const{ + return _alive_ticker.createdTime() / 1000; +} + +///////////////////////////////////////////////////////////////////////////////////////////// + class WebRtcTransportManager { mutable mutex _mtx; unordered_map > _map; diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 29ce2550..38109083 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -116,8 +116,6 @@ protected: virtual void onBeforeEncryptRtcp(const char *buf, int &len, void *ctx) = 0; protected: - RtcSession::Ptr _offer_sdp; - RtcSession::Ptr _answer_sdp; RTC::TransportTuple* getSelectedTuple() const; void sendRtcpRemb(uint32_t ssrc, size_t bit_rate); void sendRtcpPli(uint32_t ssrc); @@ -126,6 +124,10 @@ private: void onSendSockData(const char *buf, size_t len, bool flush = true); void setRemoteDtlsFingerprint(const RtcSession &remote); +protected: + RtcSession::Ptr _offer_sdp; + RtcSession::Ptr _answer_sdp; + private: uint8_t _srtp_buf[2000]; string _key; @@ -159,7 +161,7 @@ public: std::shared_ptr getRtpChannel(uint32_t ssrc) const; }; -class WebRtcTransportImp : public WebRtcTransport, public MediaSourceEvent{ +class WebRtcTransportImp : public WebRtcTransport { public: using Ptr = std::shared_ptr; ~WebRtcTransportImp() override; @@ -169,20 +171,19 @@ public: * @param poller 改对象需要绑定的线程 * @return 对象 */ - static Ptr create(const EventPoller::Ptr &poller); static Ptr get(const string &key); // 借用 static Ptr move(const string &key); // 所有权转移 void setSession(Session::Ptr session); - - /** - * 绑定rtsp媒体源 - * @param src 媒体源 - * @param is_play 是播放还是推流 - */ - void attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play = true); + const Session::Ptr& getSession() const; + uint64_t getBytesUsage() const; + uint64_t getDuration() const; + bool canSendRtp() const; + bool canRecvRtp() const; + void onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx = false); protected: + WebRtcTransportImp(const EventPoller::Ptr &poller); void onStartWebRTC() override; void onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush = true) override; void onCheckSdp(SdpType type, RtcSession &sdp) override; @@ -192,30 +193,13 @@ protected: void onRtcp(const char *buf, size_t len) override; void onBeforeEncryptRtp(const char *buf, int &len, void *ctx) override; void onBeforeEncryptRtcp(const char *buf, int &len, void *ctx) override {}; - - void onShutdown(const SockException &ex) override; - - ///////MediaSourceEvent override/////// - // 关闭 - bool close(MediaSource &sender, bool force) override; - // 播放总人数 - int totalReaderCount(MediaSource &sender) override; - // 获取媒体源类型 - MediaOriginType getOriginType(MediaSource &sender) const override; - // 获取媒体源url或者文件路径 - string getOriginUrl(MediaSource &sender) const override; - // 获取媒体源客户端相关信息 - std::shared_ptr getOriginSock(MediaSource &sender) const override; - -private: - WebRtcTransportImp(const EventPoller::Ptr &poller); void onCreate() override; void onDestory() override; - void onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx = false); - SdpAttrCandidate::Ptr getIceCandidate() const; - bool canSendRtp() const; - bool canRecvRtp() const; + void onShutdown(const SockException &ex) override; + virtual void onRecvRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) = 0; +private: + SdpAttrCandidate::Ptr getIceCandidate() const; void onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp); void onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc); void onSendTwcc(uint32_t ssrc, const string &twcc_fci); @@ -226,14 +210,11 @@ private: void onCheckAnswer(RtcSession &sdp); private: - bool _simulcast = false; uint16_t _rtx_seq[2] = {0, 0}; //用掉的总流量 uint64_t _bytes_usage = 0; //保持自我强引用 Ptr _self; - //媒体相关元数据 - MediaInfo _media_info; //检测超时的定时器 Timer::Ptr _timer; //刷新计时器 @@ -242,18 +223,12 @@ private: Ticker _pli_ticker; //udp session Session::Ptr _session; - //推流的rtsp源 - RtspMediaSource::Ptr _push_src; - unordered_map _push_src_simulcast; - //播放的rtsp源 - RtspMediaSource::Ptr _play_src; - //播放rtsp源的reader对象 - RtspMediaSource::RingType::RingReader::Ptr _reader; + //twcc rtcp发送上下文对象 + TwccContext _twcc_ctx; //根据发送rtp的track类型获取相关信息 MediaTrack::Ptr _type_to_track[2]; - //根据接收rtp的pt获取相关信息 - unordered_map > _pt_to_track; //根据rtcp的ssrc获取相关信息,收发rtp和rtx的ssrc都会记录 unordered_map _ssrc_to_track; - TwccContext _twcc_ctx; + //根据接收rtp的pt获取相关信息 + unordered_map > _pt_to_track; }; \ No newline at end of file