初步添加RTCP包接收接口已经发送接口

This commit is contained in:
xiongziliang 2019-05-08 15:08:57 +08:00
parent 7997c06010
commit 3e0e0ce2aa
2 changed files with 114 additions and 59 deletions

View File

@ -63,7 +63,8 @@ void RtspPlayer::teardown(){
_strContentBase.clear();
RtpReceiver::clear();
CLEAR_ARR(_apUdpSock);
CLEAR_ARR(_apRtpSock);
CLEAR_ARR(_apRtcpSock);
CLEAR_ARR(_aui16FirstSeq)
CLEAR_ARR(_aui64RtpRecv)
CLEAR_ARR(_aui64RtpRecv)
@ -248,13 +249,20 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) {
}
break;
case Rtsp::RTP_UDP: {
_apUdpSock[trackIndex].reset(new Socket());
if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apUdpSock[trackIndex].reset();
throw std::runtime_error("open udp sock err");
_apRtpSock[trackIndex].reset(new Socket());
if (!_apRtpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apRtpSock[trackIndex].reset();
throw std::runtime_error("open rtp sock err");
}
int port = _apUdpSock[trackIndex]->get_local_port();
sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1});
_apRtcpSock[trackIndex].reset(new Socket());
if (!_apRtcpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apRtcpSock[trackIndex].reset();
throw std::runtime_error("open rtcp sock err");
}
sendRtspRequest("SETUP",baseUrl,{"Transport",
StrPrinter << "RTP/AVP;unicast;client_port="
<< _apRtpSock[trackIndex]->get_local_port() << "-"
<< _apRtcpSock[trackIndex]->get_local_port()});
}
break;
default:
@ -290,30 +298,69 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
}else{
const char *strPos = (_eType == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ;
auto port_str = FindField((strTransport + ";").data(), strPos, ";");
uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data());
auto &pUdpSockRef = _apUdpSock[uiTrackIndex];
if(!pUdpSockRef){
pUdpSockRef.reset(new Socket());
}
uint16_t rtp_port = atoi(FindField(port_str.data(), NULL, "-").data());
uint16_t rtcp_port = atoi(FindField(port_str.data(), "-",NULL).data());
auto &pRtpSockRef = _apRtpSock[uiTrackIndex];
auto &pRtcpSockRef = _apRtcpSock[uiTrackIndex];
if (_eType == Rtsp::RTP_MULTICAST) {
//udp组播
auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";");
if (!pUdpSockRef->bindUdpSock(port, "0.0.0.0")) {
pUdpSockRef.reset();
pRtpSockRef.reset(new Socket());
if (!pRtpSockRef->bindUdpSock(rtp_port, "0.0.0.0")) {
pRtpSockRef.reset();
throw std::runtime_error("open udp sock err");
}
auto fd = pUdpSockRef->rawFD();
auto fd = pRtpSockRef->rawFD();
if (-1 == SockUtil::joinMultiAddrFilter(fd, multiAddr.data(), get_peer_ip().data(),get_local_ip().data())) {
SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data());
}
} else {
//udp单播
struct sockaddr_in rtpto;
rtpto.sin_port = ntohs(port);
rtpto.sin_port = ntohs(rtp_port);
rtpto.sin_family = AF_INET;
rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
pUdpSockRef->send("\xce\xfa\xed\xfe", 4);
pRtpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
//发送rtp打洞包
pRtpSockRef->send("\xce\xfa\xed\xfe", 4);
//设置rtcp发送目标为后续发送rtcp做准备
rtpto.sin_port = ntohs(rtcp_port);
rtpto.sin_family = AF_INET;
rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
pRtcpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
}
auto srcIP = inet_addr(get_peer_ip().data());
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
//设置rtp over udp接收回调处理函数
pRtpSockRef->setOnRead([srcIP, uiTrackIndex, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) {
WarnL << "收到其他地址的rtp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
return;
}
strongSelf->handleOneRtp(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size());
});
if(pRtcpSockRef) {
//设置rtcp over udp接收回调处理函数
pRtcpSockRef->setOnRead([srcIP, uiTrackIndex, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) {
WarnL << "收到其他地址的rtcp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
return;
}
strongSelf->onRecvRtcp(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size());
});
}
}
if (uiTrackIndex < _aTrackInfo.size() - 1) {
@ -321,41 +368,19 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
sendSetup(uiTrackIndex + 1);
return;
}
//所有setup命令发送完毕
//设置心跳包发送定时器
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
_pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0, [weakSelf](){
auto strongSelf = weakSelf.lock();
if (!strongSelf){
return false;
}
strongSelf->sendRtcpPacket();
return true;
},getPoller()));
for (unsigned int i = 0; i < _aTrackInfo.size() && _eType != Rtsp::RTP_TCP; i++) {
auto &pUdpSockRef = _apUdpSock[i];
if(!pUdpSockRef){
continue;
}
auto srcIP = inet_addr(get_peer_ip().data());
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
pUdpSockRef->setOnRead([srcIP,i,weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr) {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
if(((struct sockaddr_in *)addr)->sin_addr.s_addr != srcIP) {
WarnL << "收到其他地址的UDP数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
return;
}
strongSelf->handleOneRtp(i,strongSelf->_aTrackInfo[i],(unsigned char *)buf->data(),buf->size());
});
}
/////////////////////////心跳/////////////////////////////////
//有些设备在rtp over tcp的情况下也需要定时发送心跳包(比较坑爹)
//if(_eType != Rtsp::RTP_TCP)
{
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
_pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0, [weakSelf](){
auto strongSelf = weakSelf.lock();
if (!strongSelf){
return false;
}
strongSelf->sendOptions();
return true;
},getPoller()));
}
//发送play命令
pause(false);
}
@ -449,14 +474,27 @@ void RtspPlayer::onWholeRtspPacket(Parser &parser) {
void RtspPlayer::onRtpPacket(const char *data, uint64_t len) {
int trackIdx = -1;
uint8_t interleaved = data[1];
if(interleaved %2 ==0){
if(interleaved %2 == 0){
trackIdx = getTrackIndexByInterleaved(interleaved);
}
if (trackIdx != -1) {
handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
if (trackIdx != -1) {
handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
}
}else{
trackIdx = getTrackIndexByInterleaved(interleaved - 1);
if (trackIdx != -1) {
onRecvRtcp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
}
}
}
void RtspPlayer::onRecvRtcp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
}
void RtspPlayer::sendRtcpPacket(){
//目前只实现了通过options命令实现心跳包
sendOptions();
}
void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){
//统计丢包率

View File

@ -84,13 +84,28 @@ protected:
* @param trackidx track索引
*/
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
/**
* RTCP包回调
* @param iTrackidx
* @param track
* @param pucData
* @param uiLen
*/
virtual void onRecvRtcp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen);
/**
* rtcp包维持心跳
*/
virtual void sendRtcpPacket();
private:
void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track);
void onPlayResult_l(const SockException &ex);
int getTrackIndexByControlSuffix(const string &controlSuffix) const;
int getTrackIndexByInterleaved(int interleaved) const;
int getTrackIndexByTrackType(TrackType trackId) const;
int getTrackIndexByTrackType(TrackType trackType) const;
void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType);
void onConnect(const SockException &err) override;
@ -115,8 +130,10 @@ private:
SdpAttr _sdpAttr;
vector<SdpTrack::Ptr> _aTrackInfo;
function<void(const Parser&)> _onHandshake;
Socket::Ptr _apUdpSock[2];
//rtsp鉴权相关
Socket::Ptr _apRtpSock[2]; //RTP端口,trackid idx 为数组下标
Socket::Ptr _apRtcpSock[2];//RTCP端口,trackid idx 为数组下标
//rtsp鉴权相关
string _rtspMd5Nonce;
string _rtspRealm;
//rtsp info