diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index bad1648e..67d7dea8 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -12,6 +12,10 @@ WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) { _ice_server = std::make_shared(this, makeRandStr(4), makeRandStr(28).substr(4)); } +void WebRtcTransport::onCreate(){ + +} + void WebRtcTransport::onDestory(){ _dtls_transport = nullptr; _ice_server = nullptr; @@ -208,18 +212,28 @@ WebRtcTransportImp::Ptr WebRtcTransportImp::create(const EventPoller::Ptr &polle ptr->onDestory(); delete ptr; }); + ret->onCreate(); return ret; } -WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) { - _socket = Socket::createSocket(poller, false); +void WebRtcTransportImp::onCreate(){ + WebRtcTransport::onCreate(); + _socket = Socket::createSocket(getPoller(), false); //随机端口,绑定全部网卡 _socket->bindUdpSock(0); - _socket->setOnRead([this](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable { - inputSockData(buf->data(), buf->size(), addr); + weak_ptr weak_self = shared_from_this(); + _socket->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->inputSockData(buf->data(), buf->size(), addr); + } }); } +WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) { + +} + void WebRtcTransportImp::onDestory() { WebRtcTransport::onDestory(); } @@ -281,7 +295,7 @@ void WebRtcTransportImp::onStartWebRTC() { _push_src->setSdp(getSdp(SdpType::answer).toRtspSdp()); } if (canSendRtp()) { - _reader = _play_src->getRing()->attach(_socket->getPoller(), true); + _reader = _play_src->getRing()->attach(getPoller(), true); weak_ptr weak_self = shared_from_this(); _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { auto strongSelf = weak_self.lock(); @@ -403,7 +417,6 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { it->second->rtcp_context_recv->onRtcp(sr); auto rr = it->second->rtcp_context_recv->createRtcpRR(sr->items.ssrc, sr->ssrc); sendRtcpPacket(rr->data(), rr->size(), true); - InfoL << "send rtcp rr"; } break; } @@ -414,12 +427,21 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { if (it != _rtp_info_ssrc.end()) { auto sr = it->second->rtcp_context_send->createRtcpSR(rr->items.ssrc); sendRtcpPacket(sr->data(), sr->size(), true); - InfoL << "send rtcp sr"; } break; } case RtcpType::RTCP_BYE : { - //todo 此处应该销毁对象 + //对方汇报停止发送rtp + RtcpBye *bye = (RtcpBye *) rtcp; + for (auto ssrc : bye->getSSRC()) { + auto it = _rtp_info_ssrc.find(*ssrc); + if (it == _rtp_info_ssrc.end()) { + continue; + } + _rtp_info_pt.erase(it->second->plan->pt); + _rtp_info_ssrc.erase(it); + } + onShutdown(SockException(Err_eof, "rtcp bye message received")); break; } case RtcpType::RTCP_PSFB: { @@ -459,7 +481,6 @@ void WebRtcTransportImp::onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr pli->ssrc = htonl(0); pli->ssrc_media = htonl(_recv_video_ssrc); sendRtcpPacket((char *) pli.get(), sizeof(RtcpPli), true); - InfoL << "send pli"; } if (_push_src) { _push_src->onWrite(std::move(rtp), false); @@ -481,3 +502,8 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush){ //统计rtp发送情况,好做sr汇报 _rtp_info_pt[pt].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); } + +void WebRtcTransportImp::onShutdown(const SockException &ex){ + InfoL << ex.what(); +} + diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index db84b186..29fe7cb5 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -21,7 +21,12 @@ public: ~WebRtcTransport() override = default; /** - * 消费对象 + * 创建对象 + */ + virtual void onCreate(); + + /** + * 销毁对象 */ virtual void onDestory(); @@ -84,6 +89,7 @@ protected: virtual void onRtp(const char *buf, size_t len) = 0; virtual void onRtcp(const char *buf, size_t len) = 0; + virtual void onShutdown(const SockException &ex) = 0; protected: const RtcSession& getSdp(SdpType type) const; @@ -131,9 +137,11 @@ protected: void onRtp(const char *buf, size_t len) override; void onRtcp(const char *buf, size_t len) override; + void onShutdown(const SockException &ex) override; private: WebRtcTransportImp(const EventPoller::Ptr &poller); + void onCreate() override; void onDestory() override; void onSendRtp(const RtpPacket::Ptr &rtp, bool flush); SdpAttrCandidate::Ptr getIceCandidate() const;