初步添加RTCP包接收接口

This commit is contained in:
xiongziliang 2019-05-08 14:23:18 +08:00
parent 962dca081c
commit 7997c06010
5 changed files with 50 additions and 30 deletions

@ -1 +1 @@
Subproject commit 0d9ed0a73e11193c880b6a7f3d923e4e1e4e65a2 Subproject commit 7cfb157c63ce33c2e14461c0b9fe444318bfd217

View File

@ -191,12 +191,20 @@ void RtspSession::onRtpPacket(const char *data, uint64_t len) {
uint8_t interleaved = data[1]; uint8_t interleaved = data[1];
if(interleaved %2 == 0){ if(interleaved %2 == 0){
trackIdx = getTrackIndexByInterleaved(interleaved); trackIdx = getTrackIndexByInterleaved(interleaved);
} if (trackIdx != -1) {
if (trackIdx != -1) { handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4); }
}else{
trackIdx = getTrackIndexByInterleaved(interleaved - 1);
if (trackIdx != -1) {
onRecvRtcp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
}
} }
} }
void RtspSession::onRecvRtcp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
}
int64_t RtspSession::getContentLength(Parser &parser) { int64_t RtspSession::getContentLength(Parser &parser) {
if(parser.Method() == "POST"){ if(parser.Method() == "POST"){
//http post请求的content数据部分是base64编码后的rtsp请求信令包 //http post请求的content数据部分是base64编码后的rtsp请求信令包
@ -998,14 +1006,13 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) { void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
_pushSrc->onWrite(rtppt, false); _pushSrc->onWrite(rtppt, false);
} }
inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) { inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
//这是rtcp心跳包说明播放器还存活 //这是rtcp心跳包说明播放器还存活
_ticker.resetTime(); _ticker.resetTime();
if(iTrackIdx % 2 == 0){ if(intervaled % 2 == 0){
if(_pushSrc){ if(_pushSrc){
handleOneRtp(iTrackIdx / 2,_aTrackInfo[iTrackIdx / 2],( unsigned char *)pBuf->data(),pBuf->size()); handleOneRtp(intervaled / 2,_aTrackInfo[intervaled / 2],( unsigned char *)pBuf->data(),pBuf->size());
} }
//这是rtp探测包 //这是rtp探测包
@ -1017,8 +1024,8 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf
return; return;
} }
//设置真实的客户端nat映射端口号 //设置真实的客户端nat映射端口号
_apRtpSock[iTrackIdx / 2]->setSendPeerAddr(&addr); _apRtpSock[intervaled / 2]->setSendPeerAddr(&addr);
_abGotPeerUdp[iTrackIdx / 2] = true; _abGotPeerUdp[intervaled / 2] = true;
_bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包 _bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (!_abGotPeerUdp[i]) { if (!_abGotPeerUdp[i]) {
@ -1028,25 +1035,28 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf
} }
} }
} }
} }else{
//rtcp包
onRecvRtcp((intervaled - 1) / 2,_aTrackInfo[(intervaled - 1) / 2],( unsigned char *)pBuf->data(),pBuf->size());
}
} }
inline void RtspSession::startListenPeerUdpData(int trackIdx) { inline void RtspSession::startListenPeerUdpData(int trackIdx) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto onUdpData = [weakSelf](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int iTrackIdx){ auto onUdpData = [weakSelf](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return false; return false;
} }
struct sockaddr addr=*pPeerAddr; struct sockaddr addr=*pPeerAddr;
strongSelf->async([weakSelf,pBuf,addr,iTrackIdx]() { strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr); strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr);
}); });
return true; return true;
}; };
@ -1055,19 +1065,19 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) {
case Rtsp::RTP_MULTICAST:{ case Rtsp::RTP_MULTICAST:{
//组播使用的共享rtcp端口 //组播使用的共享rtcp端口
UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData]( UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
int iTrackIdx, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) { int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
return onUdpData(pBuf,pPeerAddr,iTrackIdx); return onUdpData(pBuf,pPeerAddr,intervaled);
}); });
} }
break; break;
case Rtsp::RTP_UDP:{ case Rtsp::RTP_UDP:{
auto setEvent = [&](Socket::Ptr &sock,int iTrackIdx){ auto setEvent = [&](Socket::Ptr &sock,int intervaled){
if(!sock){ if(!sock){
WarnL << "udp端口为空:" << iTrackIdx; WarnL << "udp端口为空:" << intervaled;
return; return;
} }
sock->setOnRead([onUdpData,iTrackIdx](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){ sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){
onUdpData(pBuf,pPeerAddr,iTrackIdx); onUdpData(pBuf,pPeerAddr,intervaled);
}); });
}; };
setEvent(_apRtpSock[trackIdx], 2*trackIdx ); setEvent(_apRtpSock[trackIdx], 2*trackIdx );

View File

@ -108,6 +108,16 @@ protected:
//TcpSession override //TcpSession override
int send(const Buffer::Ptr &pkt) override; int send(const Buffer::Ptr &pkt) override;
/**
* RTCP包回调
* @param iTrackidx
* @param track
* @param pucData
* @param uiLen
*/
void onRecvRtcp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen);
private: private:
bool handleReq_Options(const Parser &parser); //处理options方法 bool handleReq_Options(const Parser &parser); //处理options方法
bool handleReq_Describe(const Parser &parser); //处理describe方法 bool handleReq_Describe(const Parser &parser); //处理describe方法
@ -133,7 +143,7 @@ private:
inline int getTrackIndexByControlSuffix(const string &controlSuffix); inline int getTrackIndexByControlSuffix(const string &controlSuffix);
inline int getTrackIndexByInterleaved(int interleaved); inline int getTrackIndexByInterleaved(int interleaved);
inline void onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr &addr); inline void onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr &addr);
inline void startListenPeerUdpData(int iTrackIdx); inline void startListenPeerUdpData(int iTrackIdx);
//认证相关 //认证相关

View File

@ -41,9 +41,9 @@ UDPServer::~UDPServer() {
InfoL; InfoL;
} }
Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex,uint16_t iLocalPort) { Socket::Ptr UDPServer::getSock(const char* strLocalIp, int intervaled,uint16_t iLocalPort) {
lock_guard<mutex> lck(_mtxUpdSock); lock_guard<mutex> lck(_mtxUpdSock);
string strKey = StrPrinter << strLocalIp << ":" << iTrackIndex << endl; string strKey = StrPrinter << strLocalIp << ":" << intervaled << endl;
auto it = _mapUpdSock.find(strKey); auto it = _mapUpdSock.find(strKey);
if (it == _mapUpdSock.end()) { if (it == _mapUpdSock.end()) {
Socket::Ptr pSock(new Socket()); Socket::Ptr pSock(new Socket());
@ -53,10 +53,10 @@ Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex,uint16_t
return nullptr; return nullptr;
} }
pSock->setOnRead(bind(&UDPServer::onRcvData, this, iTrackIndex, placeholders::_1,placeholders::_2)); pSock->setOnRead(bind(&UDPServer::onRcvData, this, intervaled, placeholders::_1,placeholders::_2));
pSock->setOnErr(bind(&UDPServer::onErr, this, strKey, placeholders::_1)); pSock->setOnErr(bind(&UDPServer::onErr, this, strKey, placeholders::_1));
_mapUpdSock[strKey] = pSock; _mapUpdSock[strKey] = pSock;
DebugL << strLocalIp << " " << pSock->get_local_port() << " " << iTrackIndex; DebugL << strLocalIp << " " << pSock->get_local_port() << " " << intervaled;
return pSock; return pSock;
} }
return it->second; return it->second;
@ -89,7 +89,7 @@ void UDPServer::onErr(const string& strKey, const SockException& err) {
lock_guard<mutex> lck(_mtxUpdSock); lock_guard<mutex> lck(_mtxUpdSock);
_mapUpdSock.erase(strKey); _mapUpdSock.erase(strKey);
} }
void UDPServer::onRcvData(int iTrackIndex, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) { void UDPServer::onRcvData(int intervaled, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) {
//TraceL << trackIndex; //TraceL << trackIndex;
struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr; struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr;
string peerIp = inet_ntoa(in->sin_addr); string peerIp = inet_ntoa(in->sin_addr);
@ -101,7 +101,7 @@ void UDPServer::onRcvData(int iTrackIndex, const Buffer::Ptr &pBuf, struct socka
auto &mapRef = it0->second; auto &mapRef = it0->second;
for (auto it1 = mapRef.begin(); it1 != mapRef.end(); ++it1) { for (auto it1 = mapRef.begin(); it1 != mapRef.end(); ++it1) {
onRecvData &funRef = it1->second; onRecvData &funRef = it1->second;
if (!funRef(iTrackIndex, pBuf, pPeerAddr)) { if (!funRef(intervaled, pBuf, pPeerAddr)) {
it1 = mapRef.erase(it1); it1 = mapRef.erase(it1);
} }
} }

View File

@ -43,15 +43,15 @@ namespace mediakit {
class UDPServer : public std::enable_shared_from_this<UDPServer> { class UDPServer : public std::enable_shared_from_this<UDPServer> {
public: public:
typedef function< bool(int, const Buffer::Ptr &, struct sockaddr *)> onRecvData; typedef function< bool(int intervaled, const Buffer::Ptr &buffer, struct sockaddr *peer_addr)> onRecvData;
~UDPServer(); ~UDPServer();
static UDPServer &Instance(); static UDPServer &Instance();
Socket::Ptr getSock(const char *strLocalIp, int iTrackIndex,uint16_t iLocalPort = 0); Socket::Ptr getSock(const char *strLocalIp, int intervaled,uint16_t iLocalPort = 0);
void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb); void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb);
void stopListenPeer(const char *strPeerIp, void *pSelf); void stopListenPeer(const char *strPeerIp, void *pSelf);
private: private:
UDPServer(); UDPServer();
void onRcvData(int iTrackId, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr); void onRcvData(int intervaled, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr);
void onErr(const string &strKey,const SockException &err); void onErr(const string &strKey,const SockException &err);
unordered_map<string, Socket::Ptr> _mapUpdSock; unordered_map<string, Socket::Ptr> _mapUpdSock;
mutex _mtxUpdSock; mutex _mtxUpdSock;