diff --git a/webrtc/Sdp.cpp b/webrtc/Sdp.cpp index 5f487482..f83b2b7e 100644 --- a/webrtc/Sdp.cpp +++ b/webrtc/Sdp.cpp @@ -923,9 +923,74 @@ std::shared_ptr wrapSdpAttr(SdpItem::Ptr item){ return ret; } -string RtcSession::toString() const{ +static void toRtsp(vector &items) { + for (auto it = items.begin(); it != items.end();) { + switch ((*it)->getKey()[0]) { + case 'v': + case 'o': + case 's': + case 'i': + case 't': + case 'c': + case 'b':{ + ++it; + break; + } + + case 'm': { + auto m = dynamic_pointer_cast(*it); + CHECK(m); + m->proto = "RTP/AVP"; + ++it; + break; + } + case 'a': { + auto attr = dynamic_pointer_cast(*it); + CHECK(attr); + if (!strcasecmp(attr->detail->getKey(), "rtpmap") + || !strcasecmp(attr->detail->getKey(), "fmtp")) { + ++it; + break; + } + } + default: { + it = items.erase(it); + break; + } + } + } +} + +string RtcSession::toRtspSdp() const{ checkValid(); - RtcSessionSdp sdp; + RtcSession copy = *this; + copy.media.clear(); + for (auto &m : media) { + switch (m.type) { + case TrackAudio: + case TrackVideo: { + copy.media.emplace_back(m); + copy.media.back().plan.resize(1); + break; + } + default: + continue; + } + } + + auto sdp = copy.toRtcSessionSdp(); + toRtsp(sdp->items); + int i = 0; + for (auto &m : sdp->medias) { + toRtsp(m.items); + m.items.push_back(wrapSdpAttr(std::make_shared("control", string("trackID=") + to_string(i++)))); + } + return sdp->toString(); +} + +RtcSessionSdp::Ptr RtcSession::toRtcSessionSdp() const{ + RtcSessionSdp::Ptr ret = std::make_shared(); + auto &sdp = *ret; sdp.items.emplace_back(std::make_shared >(to_string(version))); sdp.items.emplace_back(std::make_shared(origin)); sdp.items.emplace_back(std::make_shared >(session_name)); @@ -1076,7 +1141,12 @@ string RtcSession::toString() const{ sdp_media.items.emplace_back(wrapSdpAttr(std::make_shared(cand))); } } - return sdp.toString(); + return ret; +} + +string RtcSession::toString() const{ + checkValid(); + return toRtcSessionSdp()->toString(); } string RtcCodecPlan::getFmtp(const char *key) const{ @@ -1146,7 +1216,7 @@ void RtcSession::checkValid() const{ } } -RtcMedia *RtcSession::getMedia(TrackType type){ +const RtcMedia *RtcSession::getMedia(TrackType type) const{ for(auto &m : media){ if(m.type == type){ return &m; diff --git a/webrtc/Sdp.h b/webrtc/Sdp.h index c6720618..0d0f972a 100644 --- a/webrtc/Sdp.h +++ b/webrtc/Sdp.h @@ -550,6 +550,8 @@ public: class RtcSessionSdp : public RtcSdpBase{ public: + using Ptr = std::shared_ptr; + vector medias; void parse(const string &str); string toString() const override; @@ -652,7 +654,11 @@ public: void loadFrom(const string &sdp, bool check = true); void checkValid() const; string toString() const; - RtcMedia *getMedia(TrackType type); + string toRtspSdp() const; + const RtcMedia *getMedia(TrackType type) const; + +private: + RtcSessionSdp::Ptr toRtcSessionSdp() const; }; class RtcConfigure { diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index e35381c6..46ff8477 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -1,6 +1,11 @@ #include "WebRtcTransport.h" #include #include "Rtcp/Rtcp.h" +#include "Rtsp/RtpReceiver.h" +#define RTX_SSRC_OFFSET 2 +#define RTP_CNAME "zlmediakit-rtp" +#define RTX_CNAME "zlmediakit-rtx" + WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) { _dtls_transport = std::make_shared(poller, this); @@ -51,7 +56,7 @@ void WebRtcTransport::OnDtlsTransportConnected( std::string &remoteCert) { InfoL; _srtp_session_send = std::make_shared(RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen); - _srtp_session_recv = std::make_shared(RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen); + _srtp_session_recv = std::make_shared(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen); onStartWebRTC(); } @@ -209,6 +214,29 @@ void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src) { } void WebRtcTransportImp::onStartWebRTC() { + if (canRecvRtp()) { + _push_src = std::make_shared(DEFAULT_VHOST, "live", "push"); + auto rtsp_sdp = getSdp(SdpType::answer).toRtspSdp(); + _push_src->setSdp(rtsp_sdp); + + for (auto &m : getSdp(SdpType::offer).media) { + for (auto &plan : m.plan) { + auto hit_pan = getSdp(SdpType::answer).getMedia(m.type)->getPlan(plan.pt); + if (!hit_pan) { + continue; + } + auto &ref = _rtp_receiver[plan.pt]; + ref.plan = &plan; + ref.media = &m; + ref.is_common_rtp = getCodecId(plan.codec) != CodecInvalid; + ref.receiver = std::make_shared([&ref, this](RtpPacket::Ptr rtp) { + onSortedRtp(ref, std::move(rtp)); + }, [ref, this](const RtpPacket::Ptr &rtp) { + onBeforeSortedRtp(ref, rtp); + }); + } + } + } if (!canSendRtp()) { return; } @@ -244,6 +272,11 @@ bool WebRtcTransportImp::canSendRtp() const{ return sdp.media[0].direction == RtpDirection::sendrecv || sdp.media[0].direction == RtpDirection::sendonly; } +bool WebRtcTransportImp::canRecvRtp() const{ + auto &sdp = getSdp(SdpType::answer); + return sdp.media[0].direction == RtpDirection::sendrecv || sdp.media[0].direction == RtpDirection::recvonly; +} + void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) const{ WebRtcTransport::onCheckSdp(type, sdp); if (type != SdpType::answer || !canSendRtp()) { @@ -255,7 +288,12 @@ void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) const{ continue; } m.rtp_ssrc.ssrc = _src->getSsrc(m.type); - m.rtp_ssrc.cname = "zlmediakit-rtc"; + m.rtp_ssrc.cname = RTP_CNAME; + //todo 先屏蔽rtx,因为chrome报错 + if (false && m.getRelatedRtxPlan(m.plan[0].pt)) { + m.rtx_ssrc.ssrc = RTX_SSRC_OFFSET + m.rtp_ssrc.ssrc; + m.rtx_ssrc.cname = RTX_CNAME; + } auto rtsp_media = _rtsp_send_sdp.getMedia(m.type); if (rtsp_media && getCodecId(rtsp_media->plan[0].codec) == getCodecId(m.plan[0].codec)) { _send_rtp_pt[m.type] = m.plan[0].pt; @@ -309,14 +347,60 @@ SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{ return candidate; } +class RtpReceiverImp : public RtpReceiver { +public: + RtpReceiverImp( function cb, function cb_before = nullptr){ + _on_sort = std::move(cb); + _on_before_sort = std::move(cb_before); + } + + ~RtpReceiverImp() override = default; + + bool inputRtp(TrackType type, int samplerate, uint8_t *ptr, size_t len){ + return handleOneRtp((int) type, type, samplerate, ptr, len); + } + +protected: + void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override { + _on_sort(std::move(rtp)); + } + + void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override { + if (_on_before_sort) { + _on_before_sort(rtp); + } + } + +private: + function _on_sort; + function _on_before_sort; +}; + void WebRtcTransportImp::onRtp(const char *buf, size_t len) { RtpHeader *rtp = (RtpHeader *) buf; + auto it = _rtp_receiver.find(rtp->pt); + if (it == _rtp_receiver.end()) { + WarnL; + return; + } + auto &info = it->second; + info.receiver->inputRtp(info.media->type, info.plan->sample_rate, (uint8_t *) buf, len); } void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { RtcpHeader *rtcp = (RtcpHeader *) buf; } +void WebRtcTransportImp::onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr rtp) { + if(!info.is_common_rtp){ + WarnL; + } + _push_src->onWrite(std::move(rtp), true); +} + +void WebRtcTransportImp::onBeforeSortedRtp(const RtpPayloadInfo &info, const RtpPacket::Ptr &rtp) { + +} /////////////////////////////////////////////////////////////////// diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 441be5db..d754f2ca 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -9,7 +9,7 @@ #include "Sdp.h" #include "Poller/EventPoller.h" #include "Network/Socket.h" -#include "Rtsp/RtspMediaSource.h" +#include "Rtsp/RtspMediaSourceImp.h" using namespace toolkit; using namespace mediakit; @@ -95,6 +95,8 @@ private: RtcSession::Ptr _answer_sdp; }; +class RtpReceiverImp; + class WebRtcTransportImp : public WebRtcTransport, public std::enable_shared_from_this{ public: using Ptr = std::shared_ptr; @@ -128,6 +130,18 @@ private: void onSendRtp(const RtpPacket::Ptr &rtp, bool flush); SdpAttrCandidate::Ptr getIceCandidate() const; bool canSendRtp() const; + bool canRecvRtp() const; + + class RtpPayloadInfo { + public: + bool is_common_rtp; + const RtcCodecPlan *plan; + const RtcMedia *media; + std::shared_ptr receiver; + }; + + void onSortedRtp(const RtpPayloadInfo &info,RtpPacket::Ptr rtp); + void onBeforeSortedRtp(const RtpPayloadInfo &info,const RtpPacket::Ptr &rtp); private: Socket::Ptr _socket; @@ -136,6 +150,8 @@ private: RtcSession _answer_sdp; mutable RtcSession _rtsp_send_sdp; mutable uint8_t _send_rtp_pt[2] = {0, 0}; + RtspMediaSourceImp::Ptr _push_src; + unordered_map _rtp_receiver; };