支持rtcp心跳包

This commit is contained in:
xiongziliang 2017-11-08 16:56:02 +08:00
parent 8242ad674d
commit 101261c299
4 changed files with 64 additions and 42 deletions

View File

@ -107,10 +107,10 @@ void RtspSession::shutdown(){
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {
TraceL << err.getErrCode() << " " << err.what(); TraceL << err.getErrCode() << " " << err.what();
if (m_bListenPeerUdpPort) { if (m_bListenPeerUdpData) {
//取消UDP口监听 //取消UDP口监听
UDPServer::Instance().stopListenPeer(getPeerIp().data(), this); UDPServer::Instance().stopListenPeer(getPeerIp().data(), this);
m_bListenPeerUdpPort = false; m_bListenPeerUdpData = false;
} }
if (!m_bBase64need && m_strSessionCookie.size() != 0) { if (!m_bBase64need && m_strSessionCookie.size() != 0) {
//quickTime http getter //quickTime http getter
@ -135,10 +135,6 @@ void RtspSession::onManager() {
shutdown(); shutdown();
return; return;
} }
if (m_bListenPeerUdpPort) {
UDPServer::Instance().stopListenPeer(getPeerIp().data(), this);
m_bListenPeerUdpPort = false;
}
} }
if (m_rtpType != PlayerBase::RTP_TCP && m_ticker.elapsedTime() > 15 * 1000) { if (m_rtpType != PlayerBase::RTP_TCP && m_ticker.elapsedTime() > 15 * 1000) {
WarnL << "RTSP会话超时:" << getPeerIp(); WarnL << "RTSP会话超时:" << getPeerIp();
@ -330,15 +326,23 @@ bool RtspSession::handleReq_Setup() {
} }
break; break;
case PlayerBase::RTP_UDP: { case PlayerBase::RTP_UDP: {
auto pSock = UDPServer::Instance().getSock(getLocalIp().data(),trackIdx); //我们用trackIdx区分rtp和rtcp包
if (!pSock) { auto pSockRtp = UDPServer::Instance().getSock(getLocalIp().data(),2*trackIdx);
if (!pSockRtp) {
//分配端口失败 //分配端口失败
WarnL << "分配端口失败"; WarnL << "分配rtp端口失败";
send_NotAcceptable(); send_NotAcceptable();
return false; return false;
} }
m_apUdpSock[trackIdx] = pSock; auto pSockRtcp = UDPServer::Instance().getSock(getLocalIp().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1);
int iSrvPort = pSock->get_local_port(); if (!pSockRtcp) {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return false;
}
m_apUdpSock[trackIdx] = pSockRtp;
//设置客户端内网端口信息
string strClientPort = FindField(m_parser["Transport"].data(), "client_port=", NULL); string strClientPort = FindField(m_parser["Transport"].data(), "client_port=", NULL);
uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data()); uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
struct sockaddr_in peerAddr; struct sockaddr_in peerAddr;
@ -347,7 +351,8 @@ bool RtspSession::handleReq_Setup() {
peerAddr.sin_addr.s_addr = inet_addr(getPeerIp().data()); peerAddr.sin_addr.s_addr = inet_addr(getPeerIp().data());
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero); bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
m_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr))); m_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr)));
tryGetPeerUdpPort(); //尝试获取客户端nat映射地址
startListenPeerUdpData();
//InfoL << "分配端口:" << srv_port; //InfoL << "分配端口:" << srv_port;
int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n" int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n" "CSeq: %d\r\n"
@ -359,7 +364,7 @@ bool RtspSession::handleReq_Setup() {
m_iCseq, g_serverName.data(), m_iCseq, g_serverName.data(),
RTSP_VERSION, RTSP_BUILDTIME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), strClientPort.data(), dateHeader().data(), strClientPort.data(),
iSrvPort, iSrvPort + 1, pSockRtp->get_local_port(), pSockRtcp->get_local_port(),
printSSRC(trackRef.ssrc).data(), printSSRC(trackRef.ssrc).data(),
m_strSession.data()); m_strSession.data());
send(m_pcBuf, n); send(m_pcBuf, n);
@ -382,6 +387,15 @@ bool RtspSession::handleReq_Setup() {
}); });
} }
int iSrvPort = m_pBrdcaster->getPort(trackid); int iSrvPort = m_pBrdcaster->getPort(trackid);
//我们用trackIdx区分rtp和rtcp包
auto pSockRtcp = UDPServer::Instance().getSock(getLocalIp().data(),2*trackIdx + 1,iSrvPort + 1);
if (!pSockRtcp) {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return false;
}
startListenPeerUdpData();
static uint32_t udpTTL = mINI::Instance()[MultiCast::kUdpTTL].as<uint32_t>(); static uint32_t udpTTL = mINI::Instance()[MultiCast::kUdpTTL].as<uint32_t>();
int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n" int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n" "CSeq: %d\r\n"
@ -393,7 +407,7 @@ bool RtspSession::handleReq_Setup() {
m_iCseq, g_serverName.data(), m_iCseq, g_serverName.data(),
RTSP_VERSION, RTSP_BUILDTIME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), m_pBrdcaster->getIP().data(), dateHeader().data(), m_pBrdcaster->getIP().data(),
getLocalIp().data(), iSrvPort, iSrvPort + 1, getLocalIp().data(), iSrvPort, pSockRtcp->get_local_port(),
udpTTL,printSSRC(trackRef.ssrc).data(), udpTTL,printSSRC(trackRef.ssrc).data(),
m_strSession.data()); m_strSession.data());
send(m_pcBuf, n); send(m_pcBuf, n);
@ -581,11 +595,11 @@ inline void RtspSession::send_NotAcceptable() {
} }
inline bool RtspSession::findStream() { inline bool RtspSession::findStream() {
string strHost = FindField(m_strUrl.data(), "://", "/"); string strHost = FindField(m_strUrl.data(), "://", "/");
m_strApp = FindField(m_strUrl.data(), (strHost + "/").data(), "/"); m_strApp = FindField(m_strUrl.data(), (strHost + "/").data(), "/");
m_strStream = FindField(m_strUrl.data(), (strHost + "/" + m_strApp + "/").data(), NULL); m_strStream = FindField(m_strUrl.data(), (strHost + "/" + m_strApp + "/").data(), NULL);
auto iPos = m_strStream.find('?'); auto iPos = m_strStream.find('?');
if(iPos != string::npos ){ if(iPos != string::npos ){
m_strStream.erase(iPos); m_strStream.erase(iPos);
@ -655,30 +669,37 @@ inline void RtspSession::sendRtpPacket(const RtpPacket& pkt) {
} }
inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Socket::Buffer::Ptr &pBuf, const struct sockaddr& addr) { inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Socket::Buffer::Ptr &pBuf, const struct sockaddr& addr) {
m_apPeerUdpAddr[iTrackIdx].reset(new struct sockaddr(addr)); if(iTrackIdx % 2 == 0){
m_abGotPeerUdp[iTrackIdx] = true; //这是rtp探测包
bool bGotAllPeerUdp = true; if(!m_bGotAllPeerUdp){
for (unsigned int i = 0; i < m_uiTrackCnt; i++) { //还没有获取完整的rtp探测包
if (!m_abGotPeerUdp[i]) { if(SockUtil::in_same_lan(getLocalIp().data(),getPeerIp().data())){
bGotAllPeerUdp = false; //在内网中客户端上报的端口号是真实的所以我们忽略udp打洞包
break; m_bGotAllPeerUdp = true;
} return;
} }
if (bGotAllPeerUdp) { //设置真实的客户端nat映射端口号
if (m_bListenPeerUdpPort) { m_apPeerUdpAddr[iTrackIdx / 2].reset(new struct sockaddr(addr));
UDPServer::Instance().stopListenPeer(getPeerIp().data(), this); m_abGotPeerUdp[iTrackIdx / 2] = true;
m_bListenPeerUdpPort = false; m_bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包
InfoL << "获取到客户端端口"; for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
if (!m_abGotPeerUdp[i]) {
//还有track没获取到rtp探测包
m_bGotAllPeerUdp = false;
break;
}
}
} }
}else{
//这是rtcp心跳包说明播放器还存活
m_ticker.resetTime();
TraceL << "rtcp:" << (iTrackIdx-1)/2 ;
} }
} }
inline void RtspSession::tryGetPeerUdpPort() { inline void RtspSession::startListenPeerUdpData() {
if(SockUtil::in_same_lan(getLocalIp().data(),getPeerIp().data())){ m_bListenPeerUdpData = true;
return;
}
m_bListenPeerUdpPort = true;
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
UDPServer::Instance().listenPeer(getPeerIp().data(), this, UDPServer::Instance().listenPeer(getPeerIp().data(), this,
[weakSelf](int iTrackIdx,const Socket::Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool { [weakSelf](int iTrackIdx,const Socket::Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool {

View File

@ -102,7 +102,7 @@ private:
return -1; return -1;
} }
inline void onRcvPeerUdpData(int iTrackIdx, const Socket::Buffer::Ptr &pBuf, const struct sockaddr &addr); inline void onRcvPeerUdpData(int iTrackIdx, const Socket::Buffer::Ptr &pBuf, const struct sockaddr &addr);
inline void tryGetPeerUdpPort(); inline void startListenPeerUdpData();
char *m_pcBuf = nullptr; char *m_pcBuf = nullptr;
Ticker m_ticker; Ticker m_ticker;
@ -125,6 +125,7 @@ private:
int m_iCseq = 0; int m_iCseq = 0;
unsigned int m_uiTrackCnt = 0; //媒体track个数 unsigned int m_uiTrackCnt = 0; //媒体track个数
RtspTrack m_aTrackInfo[2]; //媒体track信息,trackid idx 为数组下标 RtspTrack m_aTrackInfo[2]; //媒体track信息,trackid idx 为数组下标
bool m_bGotAllPeerUdp = false;
#ifdef RTSP_SEND_RTCP #ifdef RTSP_SEND_RTCP
RtcpCounter m_aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标 RtcpCounter m_aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标
@ -136,7 +137,7 @@ private:
bool m_abGotPeerUdp[2] = { false, false }; //获取客户端udp端口计数 bool m_abGotPeerUdp[2] = { false, false }; //获取客户端udp端口计数
weak_ptr<Socket> m_apUdpSock[2]; //发送RTP的UDP端口,trackid idx 为数组下标 weak_ptr<Socket> m_apUdpSock[2]; //发送RTP的UDP端口,trackid idx 为数组下标
std::shared_ptr<struct sockaddr> m_apPeerUdpAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标 std::shared_ptr<struct sockaddr> m_apPeerUdpAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标
bool m_bListenPeerUdpPort = false; bool m_bListenPeerUdpData = false;
RtpBroadCaster::Ptr m_pBrdcaster; RtpBroadCaster::Ptr m_pBrdcaster;
//RTSP over HTTP //RTSP over HTTP

View File

@ -39,15 +39,15 @@ UDPServer::~UDPServer() {
InfoL; InfoL;
} }
Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex) { Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex,uint16_t iLocalPort) {
lock_guard<mutex> lck(m_mtxUpdSock); lock_guard<mutex> lck(m_mtxUpdSock);
string strKey = StrPrinter << strLocalIp << ":" << iTrackIndex << endl; string strKey = StrPrinter << strLocalIp << ":" << iTrackIndex << endl;
auto it = m_mapUpdSock.find(strKey); auto it = m_mapUpdSock.find(strKey);
if (it == m_mapUpdSock.end()) { if (it == m_mapUpdSock.end()) {
Socket::Ptr pSock(new Socket()); Socket::Ptr pSock(new Socket());
//InfoL<<localIp; //InfoL<<localIp;
if (!pSock->bindUdpSock(0, strLocalIp)) { if (!pSock->bindUdpSock(iLocalPort, strLocalIp)) {
//系统随机分配端口 //分配失败
return nullptr; return nullptr;
} }

View File

@ -54,7 +54,7 @@ public:
static void Destory() { static void Destory() {
delete &UDPServer::Instance(); delete &UDPServer::Instance();
} }
Socket::Ptr getSock(const char *strLocalIp, int iTrackIndex); Socket::Ptr getSock(const char *strLocalIp, int iTrackIndex,uint16_t iLocalPort = 0);
void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb); void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb);
void stopListenPeer(const char *strPeerIp, void *pSelf); void stopListenPeer(const char *strPeerIp, void *pSelf);
private: private: