From b5880535712c5bb3eb5e5ca5e356a842873f6ce2 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Fri, 10 Jul 2020 10:42:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86rtsp=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/RtspSession.cpp | 528 +++++++++++++++++++-------------------- src/Rtsp/RtspSession.h | 165 ++++++------ 2 files changed, 335 insertions(+), 358 deletions(-) diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 8a160e75..f62e66f4 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -56,29 +56,29 @@ static unordered_map > g_mapGetter; //对g_mapGetter上锁保护 static recursive_mutex g_mtxGetter; -RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { +RtspSession::RtspSession(const Socket::Ptr &sock) : TcpSession(sock) { DebugP(this); GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond); - pSock->setSendTimeOutSecond(keep_alive_sec); + sock->setSendTimeOutSecond(keep_alive_sec); //起始接收buffer缓存设置为4K,节省内存 - pSock->setReadBuffer(std::make_shared(4 * 1024)); + sock->setReadBuffer(std::make_shared(4 * 1024)); } RtspSession::~RtspSession() { DebugP(this); } -void RtspSession::onError(const SockException& err) { - bool isPlayer = !_pushSrc; - uint64_t duration = _ticker.createdTime()/1000; +void RtspSession::onError(const SockException &err) { + bool isPlayer = !_push_src; + uint64_t duration = _alive_ticker.createdTime() / 1000; WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(") - << _mediaInfo._vhost << "/" - << _mediaInfo._app << "/" - << _mediaInfo._streamid + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid << ")断开:" << err.what() << ",耗时(s):" << duration; - if (_rtpType == Rtsp::RTP_MULTICAST) { + if (_rtp_type == Rtsp::RTP_MULTICAST) { //取消UDP端口监听 UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); } @@ -91,8 +91,8 @@ void RtspSession::onError(const SockException& err) { //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); - if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast(*this)); + if(_bytes_usage > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, isPlayer, static_cast(*this)); } } @@ -101,30 +101,28 @@ void RtspSession::onManager() { GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond); GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond); - if (_ticker.createdTime() > handshake_sec * 1000) { - if (_strSession.size() == 0) { + if (_alive_ticker.createdTime() > handshake_sec * 1000) { + if (_sessionid.size() == 0) { shutdown(SockException(Err_timeout,"illegal connection")); return; } } - - if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) { + if ((_rtp_type == Rtsp::RTP_UDP || _push_src ) && _alive_ticker.elapsedTime() > keep_alive_sec * 1000) { //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测 shutdown(SockException(Err_timeout,"rtp over udp session timeouted")); return; } } -void RtspSession::onRecv(const Buffer::Ptr &pBuf) { - _ticker.resetTime(); - _ui64TotalBytes += pBuf->size(); - if (_onRecv) { +void RtspSession::onRecv(const Buffer::Ptr &buf) { + _alive_ticker.resetTime(); + _bytes_usage += buf->size(); + if (_on_recv) { //http poster的请求数据转发给http getter处理 - _onRecv(pBuf); + _on_recv(buf); } else { -// TraceP(this) << pBuf->size() << "\r\n" << pBuf->data(); - input(pBuf->data(),pBuf->size()); + input(buf->data(), buf->size()); } } @@ -132,15 +130,15 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { static inline bool end_of(const string &str, const string &substr){ auto pos = str.rfind(substr); return pos != string::npos && pos == str.size() - substr.size(); -}; +} void RtspSession::onWholeRtspPacket(Parser &parser) { - string strCmd = parser.Method(); //提取出请求命令字 - _iCseq = atoi(parser["CSeq"].data()); - if(_strContentBase.empty() && strCmd != "GET"){ - _strContentBase = parser.Url(); - _mediaInfo.parse(parser.FullUrl()); - _mediaInfo._schema = RTSP_SCHEMA; + string method = parser.Method(); //提取出请求命令字 + _cseq = atoi(parser["CSeq"].data()); + if(_content_base.empty() && method != "GET"){ + _content_base = parser.Url(); + _media_info.parse(parser.FullUrl()); + _media_info._schema = RTSP_SCHEMA; } typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser); @@ -160,10 +158,10 @@ void RtspSession::onWholeRtspPacket(Parser &parser) { s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER); }, []() {}); - auto it = s_cmd_functions.find(strCmd); + auto it = s_cmd_functions.find(method); if (it == s_cmd_functions.end()) { sendRtspResponse("403 Forbidden"); - shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd)); + shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << method)); return; } @@ -181,24 +179,22 @@ void RtspSession::onWholeRtspPacket(Parser &parser) { } void RtspSession::onRtpPacket(const char *data, uint64_t len) { - if(!_pushSrc){ + if(!_push_src){ return; } - int trackIdx = -1; uint8_t interleaved = data[1]; if(interleaved %2 == 0){ - trackIdx = getTrackIndexByInterleaved(interleaved); - handleOneRtp(trackIdx, _aTrackInfo[trackIdx]->_type, _aTrackInfo[trackIdx]->_samplerate, (unsigned char *) data + 4, len - 4); + auto track_idx = getTrackIndexByInterleaved(interleaved); + handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (unsigned char *) data + 4, len - 4); }else{ - trackIdx = getTrackIndexByInterleaved(interleaved - 1); - onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4); + auto track_idx = getTrackIndexByInterleaved(interleaved - 1); + onRtcpPacket(track_idx, _sdp_track[track_idx], (unsigned char *) data + 4, len - 4); } } -void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){ +void RtspSession::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len){} -} int64_t RtspSession::getContentLength(Parser &parser) { if(parser.Method() == "POST"){ //http post请求的content数据部分是base64编码后的rtsp请求信令包 @@ -207,7 +203,6 @@ int64_t RtspSession::getContentLength(Parser &parser) { return RtspSplitter::getContentLength(parser); } - void RtspSession::handleReq_Options(const Parser &parser) { //支持这些命令 sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"}); @@ -215,16 +210,16 @@ void RtspSession::handleReq_Options(const Parser &parser) { void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, - _mediaInfo._vhost, - _mediaInfo._app, - _mediaInfo._streamid)); + _media_info._vhost, + _media_info._app, + _media_info._streamid)); if(src){ sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing."); string err = StrPrinter << "ANNOUNCE:" << "Already publishing:" - << _mediaInfo._vhost << " " - << _mediaInfo._app << " " - << _mediaInfo._streamid << endl; + << _media_info._vhost << " " + << _media_info._app << " " + << _media_info._streamid << endl; throw SockException(Err_shutdown,err); } @@ -232,30 +227,30 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { if(end_of(full_url,".sdp")){ //去除.sdp后缀,防止EasyDarwin推流器强制添加.sdp后缀 full_url = full_url.substr(0,full_url.length() - 4); - _mediaInfo.parse(full_url); + _media_info.parse(full_url); } - if(_mediaInfo._app.empty() || _mediaInfo._streamid.empty()){ + if(_media_info._app.empty() || _media_info._streamid.empty()){ //推流rtsp url必须最少两级(rtsp://host/app/stream_id),不允许莫名其妙的推流url sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, "rtsp推流url非法,最少确保两级rtsp url"); throw SockException(Err_shutdown,StrPrinter << "rtsp推流url非法:" << full_url); } SdpParser sdpParser(parser.Content()); - _strSession = makeRandStr(12); - _aTrackInfo = sdpParser.getAvailableTrack(); + _sessionid = makeRandStr(12); + _sdp_track = sdpParser.getAvailableTrack(); - _pushSrc = std::make_shared(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid); - _pushSrc->setListener(dynamic_pointer_cast(shared_from_this())); - _pushSrc->setSdp(sdpParser.toString()); + _push_src = std::make_shared(_media_info._vhost, _media_info._app, _media_info._streamid); + _push_src->setListener(dynamic_pointer_cast(shared_from_this())); + _push_src->setSdp(sdpParser.toString()); - sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"}); + sendRtspResponse("200 OK",{"Content-Base", _content_base + "/"}); } void RtspSession::handleReq_RECORD(const Parser &parser){ - if (_aTrackInfo.empty() || parser["Session"] != _strSession) { + if (_sdp_track.empty() || parser["Session"] != _sessionid) { send_SessionNotFound(); - throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record"); + throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any availabe track when record" : "session not found when record"); } auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){ bool authSuccess = err.empty(); @@ -266,21 +261,21 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ } //设置转协议 - _pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4); + _push_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4); _StrPrinter rtp_info; - for(auto &track : _aTrackInfo){ + for(auto &track : _sdp_track){ if (track->_inited == false) { //还有track没有setup shutdown(SockException(Err_shutdown,"track not setuped")); return; } - rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ","; + rtp_info << "url=" << _content_base << "/" << track->_control_surffix << ","; } rtp_info.pop_back(); sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); - if(_rtpType == Rtsp::RTP_TCP){ + if(_rtp_type == Rtsp::RTP_TCP){ //如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能 _sock->setReadBuffer(std::make_shared(256 * 1024)); setSocketFlags(); @@ -303,7 +298,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ }; //rtsp推流需要鉴权 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast(*this)); + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); if(!flag){ //该事件无人监听,默认不鉴权 GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); @@ -341,7 +336,7 @@ void RtspSession::emitOnPlay(){ }; //广播通用播放url鉴权事件 - auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _mediaInfo, invoker, static_cast(*this)); + auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); if (!flag) { //该事件无人监听,默认不鉴权 onRes(""); @@ -381,7 +376,7 @@ void RtspSession::handleReq_Describe(const Parser &parser) { if(_rtsp_realm.empty()){ //广播是否需要rtsp专属认证事件 - if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _mediaInfo, invoker, static_cast(*this))) { + if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _media_info, invoker, static_cast(*this))) { //无人监听此事件,说明无需认证 invoker(""); } @@ -389,10 +384,11 @@ void RtspSession::handleReq_Describe(const Parser &parser) { invoker(_rtsp_realm); } } + void RtspSession::onAuthSuccess() { TraceP(this); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){ + MediaSource::findAsync(_media_info, weakSelf.lock(), [weakSelf](const MediaSource::Ptr &src){ auto strongSelf = weakSelf.lock(); if(!strongSelf){ return; @@ -400,43 +396,44 @@ void RtspSession::onAuthSuccess() { auto rtsp_src = dynamic_pointer_cast(src); if (!rtsp_src) { //未找到相应的MediaSource - string err = StrPrinter << "no such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; + string err = StrPrinter << "no such stream:" << strongSelf->_media_info._vhost << " " << strongSelf->_media_info._app << " " << strongSelf->_media_info._streamid; strongSelf->send_StreamNotFound(); strongSelf->shutdown(SockException(Err_shutdown,err)); return; } //找到了相应的rtsp流 - strongSelf->_aTrackInfo = SdpParser(rtsp_src->getSdp()).getAvailableTrack(); - if (strongSelf->_aTrackInfo.empty()) { + strongSelf->_sdp_track = SdpParser(rtsp_src->getSdp()).getAvailableTrack(); + if (strongSelf->_sdp_track.empty()) { //该流无效 DebugL << "无trackInfo,该流无效"; strongSelf->send_StreamNotFound(); strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp")); return; } - strongSelf->_strSession = makeRandStr(12); - strongSelf->_pMediaSrc = rtsp_src; - for(auto &track : strongSelf->_aTrackInfo){ + strongSelf->_sessionid = makeRandStr(12); + strongSelf->_play_src = rtsp_src; + for(auto &track : strongSelf->_sdp_track){ track->_ssrc = rtsp_src->getSsrc(track->_type); track->_seq = rtsp_src->getSeqence(track->_type); track->_time_stamp = rtsp_src->getTimeStamp(track->_type); } strongSelf->sendRtspResponse("200 OK", - {"Content-Base",strongSelf->_strContentBase + "/", + {"Content-Base", strongSelf->_content_base + "/", "x-Accept-Retransmit","our-retransmit", "x-Accept-Dynamic-Rate","1" },rtsp_src->getSdp()); }); } + void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) { GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic); if (!authBasic) { //我们需要客户端优先以md5方式认证 - _strNonce = makeRandStr(32); + _auth_nonce = makeRandStr(32); sendRtspResponse("401 Unauthorized", {"WWW-Authenticate", - StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" }); + StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _auth_nonce << "\"" }); }else { //当然我们也支持base64认证,但是我们不建议这样做 sendRtspResponse("401 Unauthorized", @@ -448,56 +445,56 @@ void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) } } -void RtspSession::onAuthBasic(const string &realm,const string &strBase64){ +void RtspSession::onAuthBasic(const string &realm,const string &auth_base64){ //base64认证 char user_pwd_buf[512]; - av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size()); - auto user_pwd_vec = split(user_pwd_buf,":"); - if(user_pwd_vec.size() < 2){ + av_base64_decode((uint8_t *) user_pwd_buf, auth_base64.data(), auth_base64.size()); + auto user_pwd_vec = split(user_pwd_buf, ":"); + if (user_pwd_vec.size() < 2) { //认证信息格式不合法,回复401 Unauthorized - onAuthFailed(realm,"can not find user and passwd when basic64 auth"); + onAuthFailed(realm, "can not find user and passwd when basic64 auth"); return; } auto user = user_pwd_vec[0]; auto pwd = user_pwd_vec[1]; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){ + onAuth invoker = [pwd, realm, weakSelf](bool encrypted, const string &good_pwd) { auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + if (!strongSelf) { //本对象已经销毁 return; } //切换到自己的线程执行 - strongSelf->async([weakSelf,good_pwd,pwd,realm](){ + strongSelf->async([weakSelf, good_pwd, pwd, realm]() { auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + if (!strongSelf) { //本对象已经销毁 return; } //base64忽略encrypted参数,上层必须传入明文密码 - if(pwd == good_pwd){ + if (pwd == good_pwd) { //提供的密码且匹配正确 strongSelf->onAuthSuccess(); return; } //密码错误 - strongSelf->onAuthFailed(realm,StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd); + strongSelf->onAuthFailed(realm, StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd); }); }; //此时必须提供明文密码 - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,static_cast(*this))){ + if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, user, true, invoker, static_cast(*this))) { //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 WarnP(this) << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 //我们输入的密码是明文 - invoker(false,pwd); + invoker(false, pwd); } } -void RtspSession::onAuthDigest(const string &realm,const string &strMd5){ - DebugP(this) << strMd5; - auto mapTmp = Parser::parseArgs(strMd5,",","="); +void RtspSession::onAuthDigest(const string &realm,const string &auth_md5){ + DebugP(this) << auth_md5; + auto mapTmp = Parser::parseArgs(auth_md5, ",", "="); decltype(mapTmp) map; for(auto &pr : mapTmp){ map[trim(string(pr.first)," \"")] = trim(pr.second," \""); @@ -509,8 +506,8 @@ void RtspSession::onAuthDigest(const string &realm,const string &strMd5){ } //check nonce auto nonce = map["nonce"]; - if(_strNonce != nonce){ - onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce); + if(_auth_nonce != nonce){ + onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _auth_nonce); return ; } //check username and uri @@ -570,7 +567,7 @@ void RtspSession::onAuthDigest(const string &realm,const string &strMd5){ }; //此时可以提供明文或md5加密的密码 - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,static_cast(*this))){ + if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, username, false, invoker, static_cast(*this))){ //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 WarnP(this) << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 @@ -602,9 +599,11 @@ void RtspSession::onAuthUser(const string &realm,const string &authorization){ onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType); } } + inline void RtspSession::send_StreamNotFound() { sendRtspResponse("404 Stream Not Found",{"Connection","Close"}); } + inline void RtspSession::send_UnsupportedTransport() { sendRtspResponse("461 Unsupported Transport",{"Connection","Close"}); } @@ -614,36 +613,36 @@ inline void RtspSession::send_SessionNotFound() { } void RtspSession::handleReq_Setup(const Parser &parser) { -//处理setup命令,该函数可能进入多次 - auto controlSuffix = split(parser.FullUrl(),"/").back();// parser.FullUrl().substr(_strContentBase.size()); + //处理setup命令,该函数可能进入多次 + auto controlSuffix = split(parser.FullUrl(),"/").back(); if(controlSuffix.front() == '/'){ controlSuffix = controlSuffix.substr(1); } int trackIdx = getTrackIndexByControlSuffix(controlSuffix); - SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx]; + SdpTrack::Ptr &trackRef = _sdp_track[trackIdx]; if (trackRef->_inited) { //已经初始化过该Track throw SockException(Err_shutdown, "can not setup one track twice"); } trackRef->_inited = true; //现在初始化 - if(_rtpType == Rtsp::RTP_Invalid){ + if(_rtp_type == Rtsp::RTP_Invalid){ auto &strTransport = parser["Transport"]; if(strTransport.find("TCP") != string::npos){ - _rtpType = Rtsp::RTP_TCP; + _rtp_type = Rtsp::RTP_TCP; }else if(strTransport.find("multicast") != string::npos){ - _rtpType = Rtsp::RTP_MULTICAST; + _rtp_type = Rtsp::RTP_MULTICAST; }else{ - _rtpType = Rtsp::RTP_UDP; + _rtp_type = Rtsp::RTP_UDP; } } //允许接收rtp、rtcp包 - RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP); + RtspSplitter::enableRecvRtp(_rtp_type == Rtsp::RTP_TCP); - switch (_rtpType) { + switch (_rtp_type) { case Rtsp::RTP_TCP: { - if(_pushSrc){ + if(_push_src){ //rtsp推流时,interleaved由推流者决定 auto key_values = Parser::parseArgs(parser["Transport"],";","="); int interleaved_rtp = -1 , interleaved_rtcp = -1; @@ -657,14 +656,16 @@ void RtspSession::handleReq_Setup(const Parser &parser) { trackRef->_interleaved = 2 * trackRef->_type; } sendRtspResponse("200 OK", - {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;" - << "interleaved=" << (int)trackRef->_interleaved << "-" << (int)trackRef->_interleaved + 1 << ";" - << "ssrc=" << printSSRC(trackRef->_ssrc), - "x-Transport-Options" , "late-tolerance=1.400000", - "x-Dynamic-Rate" , "1" + {"Transport", StrPrinter << "RTP/AVP/TCP;unicast;" + << "interleaved=" << (int) trackRef->_interleaved << "-" + << (int) trackRef->_interleaved + 1 << ";" + << "ssrc=" << printSSRC(trackRef->_ssrc), + "x-Transport-Options", "late-tolerance=1.400000", + "x-Dynamic-Rate", "1" }); } break; + case Rtsp::RTP_UDP: { std::pair pr; try{ @@ -675,8 +676,8 @@ void RtspSession::handleReq_Setup(const Parser &parser) { throw SockException(Err_shutdown, ex.what()); } - _apRtpSock[trackIdx] = pr.first; - _apRtcpSock[trackIdx] = pr.second; + _rtp_socks[trackIdx] = pr.first; + _rtcp_socks[trackIdx] = pr.second; //设置客户端内网端口信息 string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL); @@ -705,14 +706,15 @@ void RtspSession::handleReq_Setup(const Parser &parser) { sendRtspResponse("200 OK", {"Transport", StrPrinter << "RTP/AVP/UDP;unicast;" << "client_port=" << strClientPort << ";" - << "server_port=" << pr.first->get_local_port() << "-" << pr.second->get_local_port() << ";" + << "server_port=" << pr.first->get_local_port() << "-" + << pr.second->get_local_port() << ";" << "ssrc=" << printSSRC(trackRef->_ssrc) }); } break; case Rtsp::RTP_MULTICAST: { if(!_multicaster){ - _multicaster = RtpMultiCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid); + _multicaster = RtpMultiCaster::get(getPoller(), get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid); if (!_multicaster) { send_NotAcceptable(); throw SockException(Err_shutdown, "can not get a available udp multicast socket"); @@ -739,12 +741,12 @@ void RtspSession::handleReq_Setup(const Parser &parser) { GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL); sendRtspResponse("200 OK", - {"Transport",StrPrinter << "RTP/AVP;multicast;" - << "destination=" << _multicaster->getIP() << ";" - << "source=" << get_local_ip() << ";" - << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";" - << "ttl=" << udpTTL << ";" - << "ssrc=" << printSSRC(trackRef->_ssrc) + {"Transport", StrPrinter << "RTP/AVP;multicast;" + << "destination=" << _multicaster->getIP() << ";" + << "source=" << get_local_ip() << ";" + << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";" + << "ttl=" << udpTTL << ";" + << "ssrc=" << printSSRC(trackRef->_ssrc) }); } break; @@ -754,19 +756,19 @@ void RtspSession::handleReq_Setup(const Parser &parser) { } void RtspSession::handleReq_Play(const Parser &parser) { - if (_aTrackInfo.empty() || parser["Session"] != _strSession) { + if (_sdp_track.empty() || parser["Session"] != _sessionid) { send_SessionNotFound(); - throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any available track when play" : "session not found when play"); + throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any available track when play" : "session not found when play"); } - auto pMediaSrc = _pMediaSrc.lock(); - if(!pMediaSrc){ + auto play_src = _play_src.lock(); + if(!play_src){ send_StreamNotFound(); shutdown(SockException(Err_shutdown,"rtsp stream released")); return; } - bool useBuf = true; - _enableSendRtp = false; + bool useGOP = true; + _enable_send_rtp = false; float iStartTime = 0; auto strRange = parser["Range"]; if (strRange.size()) { @@ -777,53 +779,53 @@ void RtspSession::handleReq_Play(const Parser &parser) { } iStartTime = 1000 * atof(strStart.data()); InfoP(this) << "rtsp seekTo(ms):" << iStartTime; - useBuf = !pMediaSrc->seekTo(iStartTime); - } else if (pMediaSrc->totalReaderCount() == 0) { + useGOP = !play_src->seekTo(iStartTime); + } else if (play_src->totalReaderCount() == 0) { //第一个消费者 - pMediaSrc->seekTo(0); + play_src->seekTo(0); } _StrPrinter rtp_info; - for (auto &track : _aTrackInfo) { + for (auto &track : _sdp_track) { if (track->_inited == false) { //还有track没有setup shutdown(SockException(Err_shutdown, "track not setuped")); return; } - track->_ssrc = pMediaSrc->getSsrc(track->_type); - track->_seq = pMediaSrc->getSeqence(track->_type); - track->_time_stamp = pMediaSrc->getTimeStamp(track->_type); + track->_ssrc = play_src->getSsrc(track->_type); + track->_seq = play_src->getSeqence(track->_type); + track->_time_stamp = play_src->getTimeStamp(track->_type); - rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";" + rtp_info << "url=" << _content_base << "/" << track->_control_surffix << ";" << "seq=" << track->_seq << ";" << "rtptime=" << (int) (track->_time_stamp * (track->_samplerate / 1000)) << ","; } rtp_info.pop_back(); sendRtspResponse("200 OK", - {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useBuf? pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000), + {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useGOP ? play_src->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000), "RTP-Info",rtp_info }); - _enableSendRtp = true; + _enable_send_rtp = true; setSocketFlags(); - if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) { + if (!_play_reader && _rtp_type != Rtsp::RTP_MULTICAST) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRtpReader = pMediaSrc->getRing()->attach(getPoller(), useBuf); - _pRtpReader->setDetachCB([weakSelf]() { + _play_reader = play_src->getRing()->attach(getPoller(), useGOP); + _play_reader->setDetachCB([weakSelf]() { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; } strongSelf->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); }); - _pRtpReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) { + _play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; } - if (strongSelf->_enableSendRtp) { + if (strongSelf->_enable_send_rtp) { strongSelf->sendRtpPacket(pack); } }); @@ -831,13 +833,13 @@ void RtspSession::handleReq_Play(const Parser &parser) { } void RtspSession::handleReq_Pause(const Parser &parser) { - if (parser["Session"] != _strSession) { + if (parser["Session"] != _sessionid) { send_SessionNotFound(); throw SockException(Err_shutdown,"session not found when pause"); } sendRtspResponse("200 OK"); - _enableSendRtp = false; + _enable_send_rtp = false; } void RtspSession::handleReq_Teardown(const Parser &parser) { @@ -873,7 +875,7 @@ void RtspSession::handleReq_Post(const Parser &parser) { g_mapGetter.erase(sessioncookie); //http poster收到请求后转发给http getter处理 - _onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){ + _on_recv = [this,httpGetterWeak](const Buffer::Ptr &buf){ auto httpGetterStrong = httpGetterWeak.lock(); if(!httpGetterStrong){ shutdown(SockException(Err_shutdown,"http getter released")); @@ -881,18 +883,18 @@ void RtspSession::handleReq_Post(const Parser &parser) { } //切换到http getter的线程 - httpGetterStrong->async([pBuf,httpGetterWeak](){ + httpGetterStrong->async([buf,httpGetterWeak](){ auto httpGetterStrong = httpGetterWeak.lock(); if(!httpGetterStrong){ return; } - httpGetterStrong->onRecv(std::make_shared(decodeBase64(string(pBuf->data(),pBuf->size())))); + httpGetterStrong->onRecv(std::make_shared(decodeBase64(string(buf->data(), buf->size())))); }); }; if(!parser.Content().empty()){ //http poster后面的粘包 - _onRecv(std::make_shared(parser.Content())); + _on_recv(std::make_shared(parser.Content())); } sendRtspResponse("200 OK", @@ -911,82 +913,82 @@ inline void RtspSession::send_NotAcceptable() { sendRtspResponse("406 Not Acceptable",{"Connection","Close"}); } - -void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) { - _pushSrc->onWrite(rtppt, false); +void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtp, int) { + _push_src->onWrite(rtp, false); } -inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) { - //这是rtcp心跳包,说明播放器还存活 - _ticker.resetTime(); - if(intervaled % 2 == 0){ - if(_pushSrc){ +inline void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr) { + //这是rtcp心跳包,说明播放器还存活 + _alive_ticker.resetTime(); + + if (interleaved % 2 == 0) { + if (_push_src) { //这是rtsp推流上来的rtp包 - auto &ref = _aTrackInfo[intervaled / 2]; - handleOneRtp(intervaled / 2, ref->_type, ref->_samplerate, (unsigned char *) pBuf->data(), pBuf->size()); - }else if(!_udpSockConnected.count(intervaled)){ + auto &ref = _sdp_track[interleaved / 2]; + handleOneRtp(interleaved / 2, ref->_type, ref->_samplerate, (unsigned char *) buf->data(), buf->size()); + } else if (!_udp_connected_flags.count(interleaved)) { //这是rtsp播放器的rtp打洞包 - _udpSockConnected.emplace(intervaled); - _apRtpSock[intervaled / 2]->setSendPeerAddr(&addr); + _udp_connected_flags.emplace(interleaved); + _rtp_socks[interleaved / 2]->setSendPeerAddr(&addr); } - }else{ + } else { //rtcp包 - if(!_udpSockConnected.count(intervaled)){ - _udpSockConnected.emplace(intervaled); - _apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr); + if (!_udp_connected_flags.count(interleaved)) { + _udp_connected_flags.emplace(interleaved); + _rtcp_socks[(interleaved - 1) / 2]->setSendPeerAddr(&addr); } - onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),pBuf->size()); + onRtcpPacket((interleaved - 1) / 2, _sdp_track[(interleaved - 1) / 2], (unsigned char *) buf->data(), + buf->size()); } } - -inline void RtspSession::startListenPeerUdpData(int trackIdx) { +inline void RtspSession::startListenPeerUdpData(int track_idx) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); auto srcIP = inet_addr(get_peer_ip().data()); - auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){ - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { + auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &buf, struct sockaddr *peer_addr, int interleaved){ + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { return false; } - if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) { - WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") - << SockUtil::inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr); + if (((struct sockaddr_in *) peer_addr)->sin_addr.s_addr != srcIP) { + WarnP(strongSelf.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") + << SockUtil::inet_ntoa(((struct sockaddr_in *) peer_addr)->sin_addr); return true; } - struct sockaddr addr=*pPeerAddr; - strongSelf->async([weakSelf,pBuf,addr,intervaled]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { + struct sockaddr addr = *peer_addr; + strongSelf->async([weakSelf, buf, addr, interleaved]() { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { return; } - strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr); + strongSelf->onRcvPeerUdpData(interleaved, buf, addr); }); return true; }; - switch (_rtpType){ + switch (_rtp_type){ case Rtsp::RTP_MULTICAST:{ //组播使用的共享rtcp端口 - UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData]( - int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) { - return onUdpData(pBuf,pPeerAddr,intervaled); + UDPServer::Instance().listenPeer(get_peer_ip().data(), this, + [onUdpData]( int interleaved, const Buffer::Ptr &buf, struct sockaddr *peer_addr) { + return onUdpData(buf, peer_addr, interleaved); }); } break; case Rtsp::RTP_UDP:{ - auto setEvent = [&](Socket::Ptr &sock,int intervaled){ + auto setEvent = [&](Socket::Ptr &sock,int interleaved){ if(!sock){ - WarnP(this) << "udp端口为空:" << intervaled; + WarnP(this) << "udp端口为空:" << interleaved; return; } - sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){ - onUdpData(pBuf,pPeerAddr,intervaled); + sock->setOnRead([onUdpData,interleaved](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){ + onUdpData(pBuf, pPeerAddr, interleaved); }); }; - setEvent(_apRtpSock[trackIdx], 2*trackIdx ); - setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 ); + setEvent(_rtp_socks[track_idx], 2 * track_idx ); + setEvent(_rtcp_socks[track_idx], 2 * track_idx + 1 ); } break; @@ -1003,14 +1005,11 @@ static string dateStr(){ return buf; } -bool RtspSession::sendRtspResponse(const string &res_code, - const StrCaseMap &header_const, - const string &sdp, - const char *protocol){ +bool RtspSession::sendRtspResponse(const string &res_code, const StrCaseMap &header_const, const string &sdp, const char *protocol){ auto header = header_const; - header.emplace("CSeq",StrPrinter << _iCseq); - if(!_strSession.empty()){ - header.emplace("Session",_strSession); + header.emplace("CSeq",StrPrinter << _cseq); + if(!_sessionid.empty()){ + header.emplace("Session", _sessionid); } header.emplace("Server",SERVER_NAME); @@ -1040,14 +1039,11 @@ int RtspSession::send(const Buffer::Ptr &pkt){ // if(!_enableSendRtp){ // DebugP(this) << pkt->data(); // } - _ui64TotalBytes += pkt->size(); + _bytes_usage += pkt->size(); return TcpSession::send(pkt); } -bool RtspSession::sendRtspResponse(const string &res_code, - const std::initializer_list &header, - const string &sdp, - const char *protocol) { +bool RtspSession::sendRtspResponse(const string &res_code, const std::initializer_list &header, const string &sdp, const char *protocol) { string key; StrCaseMap header_map; int i = 0; @@ -1062,43 +1058,44 @@ bool RtspSession::sendRtspResponse(const string &res_code, } inline int RtspSession::getTrackIndexByTrackType(TrackType type) { - for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { - if (type == _aTrackInfo[i]->_type) { + for (unsigned int i = 0; i < _sdp_track.size(); i++) { + if (type == _sdp_track[i]->_type) { return i; } } - if(_aTrackInfo.size() == 1){ + if(_sdp_track.size() == 1){ return 0; } throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) type); } + inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) { - for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { - if (controlSuffix == _aTrackInfo[i]->_control_surffix) { + for (unsigned int i = 0; i < _sdp_track.size(); i++) { + if (controlSuffix == _sdp_track[i]->_control_surffix) { return i; } } - if(_aTrackInfo.size() == 1){ + if(_sdp_track.size() == 1){ return 0; } throw SockException(Err_shutdown, StrPrinter << "no such track with suffix:" << controlSuffix); } inline int RtspSession::getTrackIndexByInterleaved(int interleaved){ - for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { - if (_aTrackInfo[i]->_interleaved == interleaved) { + for (unsigned int i = 0; i < _sdp_track.size(); i++) { + if (_sdp_track[i]->_interleaved == interleaved) { return i; } } - if(_aTrackInfo.size() == 1){ + if(_sdp_track.size() == 1){ return 0; } throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved); } -bool RtspSession::close(MediaSource &sender,bool force) { +bool RtspSession::close(MediaSource &sender, bool force) { //此回调在其他线程触发 - if(!_pushSrc || (!force && _pushSrc->totalReaderCount())){ + if(!_push_src || (!force && _push_src->totalReaderCount())){ return false; } string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; @@ -1107,17 +1104,34 @@ bool RtspSession::close(MediaSource &sender,bool force) { } int RtspSession::totalReaderCount(MediaSource &sender) { - return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount(); + return _push_src ? _push_src->totalReaderCount() : sender.readerCount(); +} + +inline void RtspSession::onSendRtpPacket(const RtpPacket::Ptr &pkt){ +#if RTSP_SERVER_SEND_RTCP + int track_index = getTrackIndexByTrackType(pkt->type); + RtcpCounter &counter = _rtcp_counter[track_index]; + counter.pktCnt += 1; + counter.octCount += (pkt->size() - pkt->offset); + auto &ticker = _rtcp_send_tickers[track_index]; + if (ticker.elapsedTime() > 5 * 1000) { + //send rtcp every 5 second + ticker.resetTime(); + //直接保存网络字节序 + memcpy(&counter.timeStamp, pkt->data() + 8, 4); + sendSenderReport(_rtp_type == Rtsp::RTP_TCP, track_index); + } +#endif } void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) { - //InfoP(this) <<(int)pkt.Interleaved; - switch (_rtpType) { + switch (_rtp_type) { case Rtsp::RTP_TCP: { int i = 0; int size = pkt->size(); setSendFlushFlag(false); pkt->for_each([&](const RtpPacket::Ptr &rtp) { + onSendRtpPacket(rtp); if (++i == size) { setSendFlushFlag(true); } @@ -1129,15 +1143,15 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) { int i = 0; int size = pkt->size(); pkt->for_each([&](const RtpPacket::Ptr &rtp) { - int iTrackIndex = getTrackIndexByTrackType(rtp->type); - auto &pSock = _apRtpSock[iTrackIndex]; + onSendRtpPacket(rtp); + int track_index = getTrackIndexByTrackType(rtp->type); + auto &pSock = _rtp_socks[track_index]; if (!pSock) { shutdown(SockException(Err_shutdown, "udp sock not opened yet")); return; } - BufferRtp::Ptr buffer(new BufferRtp(rtp, 4)); - _ui64TotalBytes += buffer->size(); + _bytes_usage += buffer->size(); pSock->send(buffer, nullptr, 0, ++i == size); }); } @@ -1145,42 +1159,27 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) { default: break; } - -#if RTSP_SERVER_SEND_RTCP - int iTrackIndex = getTrackIndexByTrackType(pkt->type); - RtcpCounter &counter = _aRtcpCnt[iTrackIndex]; - counter.pktCnt += 1; - counter.octCount += (pkt->length - pkt->offset); - auto &ticker = _aRtcpTicker[iTrackIndex]; - if (ticker.elapsedTime() > 5 * 1000) { - //send rtcp every 5 second - ticker.resetTime(); - //直接保存网络字节序 - memcpy(&counter.timeStamp, pkt->payload + 8 , 4); - sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex); - } -#endif } -void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) { +void RtspSession::sendSenderReport(bool over_tcp, int track_index) { static const char s_cname[] = "ZLMediaKitRtsp"; - uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0}; - uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28; - auto &track = _aTrackInfo[iTrackIndex]; - auto &counter = _aRtcpCnt[iTrackIndex]; + uint8_t rtcp_buf[4 + 28 + 10 + sizeof(s_cname) + 1] = {0}; + uint8_t *rtcp_sr = rtcp_buf + 4, *rtcp_sdes = rtcp_sr + 28; + auto &track = _sdp_track[track_index]; + auto &counter = _rtcp_counter[track_index]; - aui8Rtcp[0] = '$'; - aui8Rtcp[1] = track->_interleaved + 1; - aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8; - aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF; + rtcp_buf[0] = '$'; + rtcp_buf[1] = track->_interleaved + 1; + rtcp_buf[2] = (sizeof(rtcp_buf) - 4) >> 8; + rtcp_buf[3] = (sizeof(rtcp_buf) - 4) & 0xFF; - pui8Rtcp_SR[0] = 0x80; - pui8Rtcp_SR[1] = 0xC8; - pui8Rtcp_SR[2] = 0x00; - pui8Rtcp_SR[3] = 0x06; + rtcp_sr[0] = 0x80; + rtcp_sr[1] = 0xC8; + rtcp_sr[2] = 0x00; + rtcp_sr[3] = 0x06; - uint32_t ssrc=htonl(track->_ssrc); - memcpy(&pui8Rtcp_SR[4], &ssrc, 4); + uint32_t ssrc = htonl(track->_ssrc); + memcpy(&rtcp_sr[4], &ssrc, 4); uint64_t msw; uint64_t lsw; @@ -1190,35 +1189,35 @@ void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) { lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6); msw = htonl(msw); - memcpy(&pui8Rtcp_SR[8], &msw, 4); + memcpy(&rtcp_sr[8], &msw, 4); lsw = htonl(lsw); - memcpy(&pui8Rtcp_SR[12], &lsw, 4); + memcpy(&rtcp_sr[12], &lsw, 4); //直接使用网络字节序 - memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4); + memcpy(&rtcp_sr[16], &counter.timeStamp, 4); uint32_t pktCnt = htonl(counter.pktCnt); - memcpy(&pui8Rtcp_SR[20], &pktCnt, 4); + memcpy(&rtcp_sr[20], &pktCnt, 4); uint32_t octCount = htonl(counter.octCount); - memcpy(&pui8Rtcp_SR[24], &octCount, 4); + memcpy(&rtcp_sr[24], &octCount, 4); - pui8Rtcp_SDES[0] = 0x81; - pui8Rtcp_SDES[1] = 0xCA; - pui8Rtcp_SDES[2] = 0x00; - pui8Rtcp_SDES[3] = 0x06; + rtcp_sdes[0] = 0x81; + rtcp_sdes[1] = 0xCA; + rtcp_sdes[2] = 0x00; + rtcp_sdes[3] = 0x06; - memcpy(&pui8Rtcp_SDES[4], &ssrc, 4); + memcpy(&rtcp_sdes[4], &ssrc, 4); - pui8Rtcp_SDES[8] = 0x01; - pui8Rtcp_SDES[9] = 0x0f; - memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname)); - pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00; + rtcp_sdes[8] = 0x01; + rtcp_sdes[9] = 0x0f; + memcpy(&rtcp_sdes[10], s_cname, sizeof(s_cname)); + rtcp_sdes[10 + sizeof(s_cname)] = 0x00; - if(overTcp){ - send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp))); - }else { - _apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4); + if (over_tcp) { + send(obtainBuffer((char *) rtcp_buf, sizeof(rtcp_buf))); + } else { + _rtcp_socks[track_index]->send((char *) rtcp_buf + 4, sizeof(rtcp_buf) - 4); } } @@ -1234,4 +1233,3 @@ void RtspSession::setSocketFlags(){ } /* namespace mediakit */ - diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index eeba0d85..2436efb6 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -59,51 +59,31 @@ public: //在请求明文密码时如果提供md5密码者则会导致认证失败 typedef std::function onAuth; - RtspSession(const Socket::Ptr &pSock); + RtspSession(const Socket::Ptr &sock); virtual ~RtspSession(); ////TcpSession override//// - void onRecv(const Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &buf) override; void onError(const SockException &err) override; void onManager() override; + protected: - //RtspSplitter override - /** - * 收到完整的rtsp包回调,包括sdp等content数据 - * @param parser rtsp包 - */ + /////RtspSplitter override///// + //收到完整的rtsp包回调,包括sdp等content数据 void onWholeRtspPacket(Parser &parser) override; - - /** - * 收到rtp包回调 - * @param data - * @param len - */ - void onRtpPacket(const char *data,uint64_t len) override; - - /** - * 从rtsp头中获取Content长度 - * @param parser - * @return - */ + //收到rtp包回调 + void onRtpPacket(const char *data, uint64_t len) override; + //从rtsp头中获取Content长度 int64_t getContentLength(Parser &parser) override; - - //RtpReceiver override - void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; - //MediaSourceEvent override - bool close(MediaSource &sender,bool force) override ; + ////RtpReceiver override//// + void onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) override; + ////MediaSourceEvent override//// + bool close(MediaSource &sender, bool force) override ; int totalReaderCount(MediaSource &sender) override; - - //TcpSession override + /////TcpSession override//// int send(const Buffer::Ptr &pkt) override; + //收到RTCP包回调 + virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len); - /** - * 收到RTCP包回调 - * @param iTrackidx - * @param track - * @param pucData - * @param uiLen - */ - virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen); private: //处理options方法,获取服务器能力 void handleReq_Options(const Parser &parser); @@ -127,101 +107,100 @@ private: void handleReq_Post(const Parser &parser); //处理SET_PARAMETER、GET_PARAMETER方法,一般用于心跳 void handleReq_SET_PARAMETER(const Parser &parser); - //rtsp资源未找到 - void inline send_StreamNotFound(); + void send_StreamNotFound(); //不支持的传输模式 - void inline send_UnsupportedTransport(); + void send_UnsupportedTransport(); //会话id错误 - void inline send_SessionNotFound(); + void send_SessionNotFound(); //一般rtsp服务器打开端口失败时触发 - void inline send_NotAcceptable(); - + void send_NotAcceptable(); //获取track下标 - inline int getTrackIndexByTrackType(TrackType type); - inline int getTrackIndexByControlSuffix(const string &controlSuffix); - inline int getTrackIndexByInterleaved(int interleaved); - + int getTrackIndexByTrackType(TrackType type); + int getTrackIndexByControlSuffix(const string &control_suffix); + int getTrackIndexByInterleaved(int interleaved); //一般用于接收udp打洞包,也用于rtsp推流 - inline void onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr &addr); + void onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr); //配合onRcvPeerUdpData使用 - inline void startListenPeerUdpData(int iTrackIdx); - + void startListenPeerUdpData(int track_idx); ////rtsp专有认证相关//// //认证成功 void onAuthSuccess(); //认证失败 - void onAuthFailed(const string &realm,const string &why,bool close = true); + void onAuthFailed(const string &realm, const string &why, bool close = true); //开始走rtsp专有认证流程 - void onAuthUser(const string &realm,const string &authorization); + void onAuthUser(const string &realm, const string &authorization); //校验base64方式的认证加密 - void onAuthBasic(const string &realm,const string &strBase64); + void onAuthBasic(const string &realm, const string &auth_base64); //校验md5方式的认证加密 - void onAuthDigest(const string &realm,const string &strMd5); + void onAuthDigest(const string &realm, const string &auth_md5); //触发url鉴权事件 void emitOnPlay(); - //发送rtp给客户端 void sendRtpPacket(const RtspMediaSource::RingDataType &pkt); + //触发rtcp发送 + void onSendRtpPacket(const RtpPacket::Ptr &rtp); //回复客户端 - bool sendRtspResponse(const string &res_code,const std::initializer_list &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); - bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); + bool sendRtspResponse(const string &res_code, const std::initializer_list &header, const string &sdp = "", const char *protocol = "RTSP/1.0"); + bool sendRtspResponse(const string &res_code, const StrCaseMap &header = StrCaseMap(), const string &sdp = "", const char *protocol = "RTSP/1.0"); //服务器发送rtcp - void sendSenderReport(bool overTcp,int iTrackIndex); + void sendSenderReport(bool over_tcp, int track_idx); //设置socket标志 void setSocketFlags(); + private: - //用于判断客户端是否超时 - Ticker _ticker; - //收到的seq,回复时一致 - int _iCseq = 0; - //ContentBase - string _strContentBase; - //Session号 - string _strSession; - //记录是否需要rtsp专属鉴权,防止重复触发事件 - string _rtsp_realm; //是否已经触发on_play事件 bool _emit_on_play = false; - //url解析后保存的相关信息 - MediaInfo _mediaInfo; - //rtsp播放器绑定的直播源 - std::weak_ptr _pMediaSrc; - //直播源读取器 - RtspMediaSource::RingType::RingReader::Ptr _pRtpReader; + //是否开始发送rtp + bool _enable_send_rtp; //推流或拉流客户端采用的rtp传输方式 - Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid; + Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid; + //收到的seq,回复时一致 + int _cseq = 0; + //消耗的总流量 + uint64_t _bytes_usage = 0; + //ContentBase + string _content_base; + //Session号 + string _sessionid; + //记录是否需要rtsp专属鉴权,防止重复触发事件 + string _rtsp_realm; + //登录认证 + string _auth_nonce; + //用于判断客户端是否超时 + Ticker _alive_ticker; + + //url解析后保存的相关信息 + MediaInfo _media_info; + //rtsp推流相关绑定的源 + RtspMediaSourceImp::Ptr _push_src; + //rtsp播放器绑定的直播源 + std::weak_ptr _play_src; + //直播源读取器 + RtspMediaSource::RingType::RingReader::Ptr _play_reader; //sdp里面有效的track,包含音频或视频 - vector _aTrackInfo; + vector _sdp_track; + + //rtcp统计,trackid idx 为数组下标 + RtcpCounter _rtcp_counter[2]; + //rtcp发送时间,trackid idx 为数组下标 + Ticker _rtcp_send_tickers[2]; + ////////RTP over udp//////// //RTP端口,trackid idx 为数组下标 - Socket::Ptr _apRtpSock[2]; + Socket::Ptr _rtp_socks[2]; //RTCP端口,trackid idx 为数组下标 - Socket::Ptr _apRtcpSock[2]; + Socket::Ptr _rtcp_socks[2]; //标记是否收到播放的udp打洞包,收到播放的udp打洞包后才能知道其外网udp端口号 - unordered_set _udpSockConnected; + unordered_set _udp_connected_flags; ////////RTP over udp_multicast//////// //共享的rtp组播对象 RtpMultiCaster::Ptr _multicaster; - - //登录认证 - string _strNonce; - //消耗的总流量 - uint64_t _ui64TotalBytes = 0; - - //RTSP over HTTP + ////////RTSP over HTTP //////// //quicktime 请求rtsp会产生两次tcp连接, //一次发送 get 一次发送post,需要通过x-sessioncookie关联起来 string _http_x_sessioncookie; - function _onRecv; - //是否开始发送rtp - bool _enableSendRtp; - //rtsp推流相关 - RtspMediaSourceImp::Ptr _pushSrc; - //rtcp统计,trackid idx 为数组下标 - RtcpCounter _aRtcpCnt[2]; - //rtcp发送时间,trackid idx 为数组下标 - Ticker _aRtcpTicker[2]; + function _on_recv; }; /**