diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 5321d987..eaf7b97c 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -224,6 +224,12 @@ void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src) { _src = src; } +void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) { + auto ptr = BufferRaw::create(); + ptr->assign(buf, len); + _socket->send(ptr, (struct sockaddr *)(dst), sizeof(struct sockaddr), flush); +} + void WebRtcTransportImp::onStartWebRTC() { if (canRecvRtp()) { _push_src = std::make_shared(DEFAULT_VHOST, "live", "push"); @@ -251,35 +257,20 @@ void WebRtcTransportImp::onStartWebRTC() { } } } - if (!canSendRtp()) { - return; - } - _reader = _src->getRing()->attach(_socket->getPoller(), true); - weak_ptr weak_self = shared_from_this(); - _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt){ - auto strongSelf = weak_self.lock(); - if (!strongSelf) { - return; - } - size_t i = 0; - pkt->for_each([&](const RtpPacket::Ptr &rtp) { - strongSelf->onSendRtp(rtp, ++i == pkt->size()); + if (canSendRtp()) { + _reader = _src->getRing()->attach(_socket->getPoller(), true); + weak_ptr weak_self = shared_from_this(); + _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { + auto strongSelf = weak_self.lock(); + if (!strongSelf) { + return; + } + size_t i = 0; + pkt->for_each([&](const RtpPacket::Ptr &rtp) { + strongSelf->onSendRtp(rtp, ++i == pkt->size()); + }); }); - }); -} - -void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush){ - if (!_send_rtp_pt[rtp->type]) { - //忽略,对方不支持该编码类型 - return; } - auto tmp = rtp->getHeader()->pt; - //设置pt - rtp->getHeader()->pt = _send_rtp_pt[rtp->type]; - sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush); - _rtp_receiver[_send_rtp_pt[rtp->type]].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); - //还原pt - rtp->getHeader()->pt = tmp; } bool WebRtcTransportImp::canSendRtp() const{ @@ -343,13 +334,6 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const { configure.addCandidate(*getIceCandidate()); } - -void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) { - auto ptr = BufferRaw::create(); - ptr->assign(buf, len); - _socket->send(ptr, (struct sockaddr *)(dst), sizeof(struct sockaddr), flush); -} - SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{ auto candidate = std::make_shared(); candidate->foundation = "udpcandidate"; @@ -391,17 +375,6 @@ private: function _on_before_sort; }; -void WebRtcTransportImp::onRtp(const char *buf, size_t len) { - RtpHeader *rtp = (RtpHeader *) buf; - auto it = _rtp_receiver.find(rtp->pt); - if (it == _rtp_receiver.end()) { - WarnL; - return; - } - auto &info = it->second; - info.receiver->inputRtp(info.media->type, info.plan->sample_rate, (uint8_t *) buf, len); -} - void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len); for (auto rtcp : rtcps) { @@ -409,10 +382,10 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { case RtcpType::RTCP_SR : { //对方汇报rtp发送情况 RtcpSR *sr = (RtcpSR *) rtcp; - auto it = _ssrc_info.find(sr->items.ssrc); + auto it = _ssrc_info.find(sr->ssrc); if (it != _ssrc_info.end()) { it->second->rtcp_context_recv->onRtcp(sr); - auto rr = it->second->rtcp_context_recv->createRtcpRR(sr->ssrc, sr->items.ssrc); + auto rr = it->second->rtcp_context_recv->createRtcpRR(sr->items.ssrc, sr->ssrc); sendRtcpPacket(rr->data(), rr->size(), true); InfoL << "send rtcp rr"; } @@ -421,9 +394,9 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { case RtcpType::RTCP_RR : { //对方汇报rtp接收情况 RtcpRR *rr = (RtcpRR *) rtcp; - auto it = _ssrc_info.find(rr->items.ssrc); + auto it = _ssrc_info.find(rr->ssrc); if (it != _ssrc_info.end()) { - auto sr = it->second->rtcp_context_send->createRtcpSR(rr->ssrc); + auto sr = it->second->rtcp_context_send->createRtcpSR(rr->items.ssrc); sendRtcpPacket(sr->data(), sr->size(), true); InfoL << "send rtcp sr"; } @@ -446,6 +419,17 @@ int makeRtcpPli(char *packet, int len) { return 12; } +void WebRtcTransportImp::onRtp(const char *buf, size_t len) { + RtpHeader *rtp = (RtpHeader *) buf; + auto it = _rtp_receiver.find(rtp->pt); + if (it == _rtp_receiver.end()) { + WarnL; + return; + } + auto &info = it->second; + info.receiver->inputRtp(info.media->type, info.plan->sample_rate, (uint8_t *) buf, len); +} + void WebRtcTransportImp::onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr rtp) { if(!info.is_common_rtp){ WarnL; @@ -466,6 +450,20 @@ void WebRtcTransportImp::onBeforeSortedRtp(const RtpPayloadInfo &info, const Rtp //todo rtcp相关 info.rtcp_context_recv->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); } + +void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush){ + if (!_send_rtp_pt[rtp->type]) { + //忽略,对方不支持该编码类型 + return; + } + auto tmp = rtp->getHeader()->pt; + //设置pt + rtp->getHeader()->pt = _send_rtp_pt[rtp->type]; + sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush); + _rtp_receiver[_send_rtp_pt[rtp->type]].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); + //还原pt + rtp->getHeader()->pt = tmp; +} ///////////////////////////////////////////////////////////////////