整理代码并完善rtcp相关功能

This commit is contained in:
xiongziliang 2021-04-03 08:32:20 +08:00
parent e9c963dc82
commit cfda7d8ba6

View File

@ -224,6 +224,12 @@ void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src) {
_src = src;
}
void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) {
auto ptr = BufferRaw::create();
ptr->assign(buf, len);
_socket->send(ptr, (struct sockaddr *)(dst), sizeof(struct sockaddr), flush);
}
void WebRtcTransportImp::onStartWebRTC() {
if (canRecvRtp()) {
_push_src = std::make_shared<RtspMediaSourceImp>(DEFAULT_VHOST, "live", "push");
@ -251,35 +257,20 @@ void WebRtcTransportImp::onStartWebRTC() {
}
}
}
if (!canSendRtp()) {
return;
}
_reader = _src->getRing()->attach(_socket->getPoller(), true);
weak_ptr<WebRtcTransportImp> weak_self = shared_from_this();
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt){
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
size_t i = 0;
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
strongSelf->onSendRtp(rtp, ++i == pkt->size());
if (canSendRtp()) {
_reader = _src->getRing()->attach(_socket->getPoller(), true);
weak_ptr<WebRtcTransportImp> weak_self = shared_from_this();
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
size_t i = 0;
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
strongSelf->onSendRtp(rtp, ++i == pkt->size());
});
});
});
}
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush){
if (!_send_rtp_pt[rtp->type]) {
//忽略,对方不支持该编码类型
return;
}
auto tmp = rtp->getHeader()->pt;
//设置pt
rtp->getHeader()->pt = _send_rtp_pt[rtp->type];
sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush);
_rtp_receiver[_send_rtp_pt[rtp->type]].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
//还原pt
rtp->getHeader()->pt = tmp;
}
bool WebRtcTransportImp::canSendRtp() const{
@ -343,13 +334,6 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
configure.addCandidate(*getIceCandidate());
}
void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) {
auto ptr = BufferRaw::create();
ptr->assign(buf, len);
_socket->send(ptr, (struct sockaddr *)(dst), sizeof(struct sockaddr), flush);
}
SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{
auto candidate = std::make_shared<SdpAttrCandidate>();
candidate->foundation = "udpcandidate";
@ -391,17 +375,6 @@ private:
function<void(const RtpPacket::Ptr &rtp)> _on_before_sort;
};
void WebRtcTransportImp::onRtp(const char *buf, size_t len) {
RtpHeader *rtp = (RtpHeader *) buf;
auto it = _rtp_receiver.find(rtp->pt);
if (it == _rtp_receiver.end()) {
WarnL;
return;
}
auto &info = it->second;
info.receiver->inputRtp(info.media->type, info.plan->sample_rate, (uint8_t *) buf, len);
}
void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len);
for (auto rtcp : rtcps) {
@ -409,10 +382,10 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
case RtcpType::RTCP_SR : {
//对方汇报rtp发送情况
RtcpSR *sr = (RtcpSR *) rtcp;
auto it = _ssrc_info.find(sr->items.ssrc);
auto it = _ssrc_info.find(sr->ssrc);
if (it != _ssrc_info.end()) {
it->second->rtcp_context_recv->onRtcp(sr);
auto rr = it->second->rtcp_context_recv->createRtcpRR(sr->ssrc, sr->items.ssrc);
auto rr = it->second->rtcp_context_recv->createRtcpRR(sr->items.ssrc, sr->ssrc);
sendRtcpPacket(rr->data(), rr->size(), true);
InfoL << "send rtcp rr";
}
@ -421,9 +394,9 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
case RtcpType::RTCP_RR : {
//对方汇报rtp接收情况
RtcpRR *rr = (RtcpRR *) rtcp;
auto it = _ssrc_info.find(rr->items.ssrc);
auto it = _ssrc_info.find(rr->ssrc);
if (it != _ssrc_info.end()) {
auto sr = it->second->rtcp_context_send->createRtcpSR(rr->ssrc);
auto sr = it->second->rtcp_context_send->createRtcpSR(rr->items.ssrc);
sendRtcpPacket(sr->data(), sr->size(), true);
InfoL << "send rtcp sr";
}
@ -446,6 +419,17 @@ int makeRtcpPli(char *packet, int len) {
return 12;
}
void WebRtcTransportImp::onRtp(const char *buf, size_t len) {
RtpHeader *rtp = (RtpHeader *) buf;
auto it = _rtp_receiver.find(rtp->pt);
if (it == _rtp_receiver.end()) {
WarnL;
return;
}
auto &info = it->second;
info.receiver->inputRtp(info.media->type, info.plan->sample_rate, (uint8_t *) buf, len);
}
void WebRtcTransportImp::onSortedRtp(const RtpPayloadInfo &info, RtpPacket::Ptr rtp) {
if(!info.is_common_rtp){
WarnL;
@ -466,6 +450,20 @@ void WebRtcTransportImp::onBeforeSortedRtp(const RtpPayloadInfo &info, const Rtp
//todo rtcp相关
info.rtcp_context_recv->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
}
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush){
if (!_send_rtp_pt[rtp->type]) {
//忽略,对方不支持该编码类型
return;
}
auto tmp = rtp->getHeader()->pt;
//设置pt
rtp->getHeader()->pt = _send_rtp_pt[rtp->type];
sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush);
_rtp_receiver[_send_rtp_pt[rtp->type]].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
//还原pt
rtp->getHeader()->pt = tmp;
}
///////////////////////////////////////////////////////////////////