diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 22c2feea..1ae2b0cd 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1082,19 +1082,87 @@ void installWebApi() { #ifdef ENABLE_WEBRTC static list rtcs; - api_regist("/index/api/webrtc",[](API_ARGS_STRING){ + api_regist("/index/api/webrtc",[](API_ARGS_STRING_ASYNC){ CHECK_ARGS("app", "stream"); - auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, DEFAULT_VHOST, allArgs.getUrlArgs()["app"], allArgs.getUrlArgs()["stream"])); - if (!src) { - throw ApiRetException("流不存在", API::NotFound); - } - headerOut["Content-Type"] = "text/plain"; + + auto offer_sdp = allArgs.Content(); + auto type = allArgs.getUrlArgs()["type"]; + MediaInfo info(StrPrinter << "rtc://" << headerIn["Host"] << "/" << allArgs.getUrlArgs()["app"] << "/" << allArgs.getUrlArgs()["stream"] << "?" << allArgs.Params()); + + //设置返回类型 + headerOut["Content-Type"] = HttpFileManager::getContentType(".json"); + //设置跨域 headerOut["Access-Control-Allow-Origin"] = "*"; - auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); - rtc->attach(src); - val["sdp"] = rtc->getAnswerSdp(allArgs.Content()); - val["type"] = "answer"; - rtcs.emplace_back(rtc); + + if (type.empty() || !strcasecmp(type.data(), "play")) { + Broadcast::AuthInvoker authInvoker = [invoker, offer_sdp, val, info, headerOut](const string &err) mutable { + try { + auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid)); + if (!src) { + throw runtime_error("流不存在"); + } + if (!err.empty()) { + throw runtime_error(StrPrinter << "播放鉴权失败:" << err); + } + auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); + rtc->attach(src); + val["sdp"] = rtc->getAnswerSdp(offer_sdp); + val["type"] = "answer"; + rtcs.emplace_back(rtc); + invoker(200, headerOut, val.toStyledString()); + } catch (std::exception &ex) { + val["code"] = API::Exception; + val["msg"] = ex.what(); + invoker(200, headerOut, val.toStyledString()); + } + }; + + //广播通用播放url鉴权事件 + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, info, authInvoker, sender); + if (!flag) { + //该事件无人监听,默认不鉴权 + authInvoker(""); + } + return; + } + + if (!strcasecmp(type.data(), "push")) { + Broadcast::PublishAuthInvoker authInvoker = [invoker, offer_sdp, val, info, headerOut](const string &err, bool enableHls, bool enableMP4) mutable { + try { + auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid)); + if (src) { + throw std::runtime_error("已经在推流"); + } + if (!err.empty()) { + throw runtime_error(StrPrinter << "推流鉴权失败:" << err); + } + auto push_src = std::make_shared(info._vhost, info._app, info._streamid); + push_src->setProtocolTranslation(enableHls, enableMP4); + auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); + rtc->attach(push_src); + val["sdp"] = rtc->getAnswerSdp(offer_sdp); + val["type"] = "answer"; + rtcs.emplace_back(rtc); + invoker(200, headerOut, val.toStyledString()); + } catch (std::exception &ex) { + val["code"] = API::Exception; + val["msg"] = ex.what(); + invoker(200, headerOut, val.toStyledString()); + } + }; + + //rtsp推流需要鉴权 + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, info, authInvoker, sender); + if (!flag) { + //该事件无人监听,默认不鉴权 + GET_CONFIG(bool, toHls, General::kPublishToHls); + GET_CONFIG(bool, toMP4, General::kPublishToMP4); + authInvoker("", toHls, toMP4); + } + return; + } + + throw ApiRetException("不支持该类型", API::InvalidArgs); }); #endif diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 9920157e..0e32859b 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -6,10 +6,10 @@ #define RTP_CNAME "zlmediakit-rtp" #define RTX_CNAME "zlmediakit-rtx" - WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) { + _poller = poller; _dtls_transport = std::make_shared(poller, this); - _ice_server = std::make_shared(this, makeRandStr(4), makeRandStr(24)); + _ice_server = std::make_shared(this, makeRandStr(4), makeRandStr(28).substr(4)); } void WebRtcTransport::onDestory(){ @@ -17,6 +17,10 @@ void WebRtcTransport::onDestory(){ _ice_server = nullptr; } +const EventPoller::Ptr& WebRtcTransport::getPoller() const{ + return _poller; +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) { @@ -127,10 +131,7 @@ std::string WebRtcTransport::getAnswerSdp(const string &offer){ //// 生成answer sdp //// _answer_sdp = configure.createAnswer(*_offer_sdp); onCheckSdp(SdpType::answer, *_answer_sdp); - - auto str = _answer_sdp->toString(); - TraceL << "\r\n" << str; - return str; + return _answer_sdp->toString(); } bool is_dtls(char *buf) { @@ -247,35 +248,33 @@ bool WebRtcTransportImp::canRecvRtp() const{ } void WebRtcTransportImp::onStartWebRTC() { - if (canRecvRtp()) { - _push_src = std::make_shared(DEFAULT_VHOST, "live", "push"); - auto rtsp_sdp = getSdp(SdpType::answer).toRtspSdp(); - _push_src->setSdp(rtsp_sdp); - - for (auto &m : getSdp(SdpType::offer).media) { - if (m.type == TrackVideo) { - _recv_video_ssrc = m.rtp_ssrc.ssrc; - } - for (auto &plan : m.plan) { - auto hit_pan = getSdp(SdpType::answer).getMedia(m.type)->getPlan(plan.pt); - if (!hit_pan) { - continue; - } - //获取offer端rtp的ssrc和pt相关信息 - auto &ref = _rtp_receiver[plan.pt]; - _ssrc_info[m.rtp_ssrc.ssrc] = &ref; - ref.plan = &plan; - ref.media = &m; - ref.is_common_rtp = getCodecId(plan.codec) != CodecInvalid; - ref.rtcp_context_recv = std::make_shared(ref.plan->sample_rate, true); - ref.rtcp_context_send = std::make_shared(ref.plan->sample_rate, false); - ref.receiver = std::make_shared([&ref, this](RtpPacket::Ptr rtp) { - onSortedRtp(ref, std::move(rtp)); - }, [ref, this](const RtpPacket::Ptr &rtp) { - onBeforeSortedRtp(ref, rtp); - }); - } + for (auto &m : getSdp(SdpType::offer).media) { + if (m.type == TrackVideo) { + _recv_video_ssrc = m.rtp_ssrc.ssrc; } + for (auto &plan : m.plan) { + auto hit_pan = getSdp(SdpType::answer).getMedia(m.type)->getPlan(plan.pt); + if (!hit_pan) { + continue; + } + //获取offer端rtp的ssrc和pt相关信息 + auto &ref = _rtp_info_pt[plan.pt]; + _rtp_info_ssrc[m.rtp_ssrc.ssrc] = &ref; + ref.plan = &plan; + ref.media = &m; + ref.is_common_rtp = getCodecId(plan.codec) != CodecInvalid; + ref.rtcp_context_recv = std::make_shared(ref.plan->sample_rate, true); + ref.rtcp_context_send = std::make_shared(ref.plan->sample_rate, false); + ref.receiver = std::make_shared([&ref, this](RtpPacket::Ptr rtp) { + onSortedRtp(ref, std::move(rtp)); + }, [ref, this](const RtpPacket::Ptr &rtp) { + onBeforeSortedRtp(ref, rtp); + }); + } + } + + if (canRecvRtp()) { + _src->setSdp(getSdp(SdpType::answer).toRtspSdp()); } if (canSendRtp()) { _reader = _src->getRing()->attach(_socket->getPoller(), true); @@ -320,22 +319,31 @@ void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) const{ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const { WebRtcTransport::onRtcConfigure(configure); - _rtsp_send_sdp.loadFrom(_src->getSdp(), false); - //根据rtsp流的相关信息,设置rtc最佳编码 - for (auto &m : _rtsp_send_sdp.media) { - switch (m.type) { - case TrackVideo: { - configure.video.preferred_codec.insert(configure.video.preferred_codec.begin(), getCodecId(m.plan[0].codec)); - break; + if (!_src->getSdp().empty()) { + //这是播放 + configure.video.direction = RtpDirection::sendonly; + configure.audio.direction = RtpDirection::sendonly; + _rtsp_send_sdp.loadFrom(_src->getSdp(), false); + //根据rtsp流的相关信息,设置rtc最佳编码 + for (auto &m : _rtsp_send_sdp.media) { + switch (m.type) { + case TrackVideo: { + configure.video.preferred_codec.insert(configure.video.preferred_codec.begin(), getCodecId(m.plan[0].codec)); + break; + } + case TrackAudio: { + configure.audio.preferred_codec.insert(configure.audio.preferred_codec.begin(),getCodecId(m.plan[0].codec)); + break; + } + default: + break; } - case TrackAudio: { - configure.audio.preferred_codec.insert(configure.audio.preferred_codec.begin(),getCodecId(m.plan[0].codec)); - break; - } - default: - break; } + } else { + //这是推流 + configure.video.direction = RtpDirection::recvonly; + configure.audio.direction = RtpDirection::recvonly; } //添加接收端口candidate信息 @@ -395,8 +403,8 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { case RtcpType::RTCP_SR : { //对方汇报rtp发送情况 RtcpSR *sr = (RtcpSR *) rtcp; - auto it = _ssrc_info.find(sr->ssrc); - if (it != _ssrc_info.end()) { + auto it = _rtp_info_ssrc.find(sr->ssrc); + if (it != _rtp_info_ssrc.end()) { it->second->rtcp_context_recv->onRtcp(sr); auto rr = it->second->rtcp_context_recv->createRtcpRR(sr->items.ssrc, sr->ssrc); sendRtcpPacket(rr->data(), rr->size(), true); @@ -407,8 +415,8 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { case RtcpType::RTCP_RR : { //对方汇报rtp接收情况 RtcpRR *rr = (RtcpRR *) rtcp; - auto it = _ssrc_info.find(rr->ssrc); - if (it != _ssrc_info.end()) { + auto it = _rtp_info_ssrc.find(rr->ssrc); + if (it != _rtp_info_ssrc.end()) { auto sr = it->second->rtcp_context_send->createRtcpSR(rr->items.ssrc); sendRtcpPacket(sr->data(), sr->size(), true); InfoL << "send rtcp sr"; @@ -431,8 +439,8 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { void WebRtcTransportImp::onRtp(const char *buf, size_t len) { RtpHeader *rtp = (RtpHeader *) buf; //根据接收到的rtp的pt信息,找到该流的信息 - auto it = _rtp_receiver.find(rtp->pt); - if (it == _rtp_receiver.end()) { + auto it = _rtp_info_pt.find(rtp->pt); + if (it == _rtp_info_pt.end()) { WarnL; return; } @@ -458,7 +466,7 @@ void WebRtcTransportImp::onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr sendRtcpPacket((char *) pli.get(), sizeof(RtcpPli), true); InfoL << "send pli"; } - _push_src->onWrite(std::move(rtp), false); + _src->onWrite(std::move(rtp), false); } void WebRtcTransportImp::onBeforeSortedRtp(const RtpPayloadInfo &info, const RtpPacket::Ptr &rtp) { @@ -474,5 +482,5 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush){ } sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, pt); //统计rtp发送情况,好做sr汇报 - _rtp_receiver[pt].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); + _rtp_info_pt[pt].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); } diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 3495fe84..000c4cc7 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -44,10 +44,14 @@ public: * 发送rtp * @param buf rtcp内容 * @param len rtcp长度 + * @param flush 是否flush socket + * @param pt rtp payload type */ void sendRtpPacket(char *buf, size_t len, bool flush, uint8_t pt); void sendRtcpPacket(char *buf, size_t len, bool flush); + const EventPoller::Ptr& getPoller() const; + protected: //// dtls相关的回调 //// void OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) override {}; @@ -89,6 +93,7 @@ private: void setRemoteDtlsFingerprint(const RtcSession &remote); private: + EventPoller::Ptr _poller; std::shared_ptr _ice_server; std::shared_ptr _dtls_transport; std::shared_ptr _srtp_session_send; @@ -148,17 +153,16 @@ private: void onBeforeSortedRtp(const RtpPayloadInfo &info,const RtpPacket::Ptr &rtp); private: - Socket::Ptr _socket; - RtspMediaSource::Ptr _src; - RtspMediaSource::RingType::RingReader::Ptr _reader; - RtcSession _answer_sdp; - mutable RtcSession _rtsp_send_sdp; - mutable uint8_t _send_rtp_pt[2] = {0, 0}; - RtspMediaSourceImp::Ptr _push_src; - unordered_map _rtp_receiver; - unordered_map _ssrc_info; uint32_t _recv_video_ssrc; + mutable uint8_t _send_rtp_pt[2] = {0, 0}; Ticker _pli_ticker; + Socket::Ptr _socket; + RtcSession _answer_sdp; + RtspMediaSource::Ptr _src; + mutable RtcSession _rtsp_send_sdp; + RtspMediaSource::RingType::RingReader::Ptr _reader; + unordered_map _rtp_info_pt; + unordered_map _rtp_info_ssrc; };