ZLMediaKit/src/Rtsp/RtspSession.cpp

1254 lines
47 KiB
C++
Raw Normal View History

2019-08-08 19:01:45 +08:00
/*
2020-04-04 20:30:09 +08:00
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
2017-09-27 16:20:30 +08:00
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
2020-04-04 20:30:09 +08:00
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
2017-04-01 16:35:56 +08:00
*/
2017-08-09 18:39:30 +08:00
2017-04-25 11:35:41 +08:00
#include <atomic>
#include <iomanip>
2017-05-02 17:15:12 +08:00
#include "Common/config.h"
2017-04-25 11:35:41 +08:00
#include "UDPServer.h"
2017-04-01 16:35:56 +08:00
#include "RtspSession.h"
2017-04-25 11:35:41 +08:00
#include "Util/mini.h"
2017-12-10 01:34:43 +08:00
#include "Util/MD5.h"
2018-09-20 15:43:49 +08:00
#include "Util/base64.h"
2017-04-25 11:35:41 +08:00
#include "Util/onceToken.h"
2017-04-01 16:35:56 +08:00
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
2017-04-25 11:35:41 +08:00
#include "Network/sockutil.h"
2017-04-01 16:35:56 +08:00
2019-06-11 18:31:34 +08:00
#define RTSP_SERVER_SEND_RTCP 0
2018-10-24 17:17:55 +08:00
using namespace std;
using namespace toolkit;
2017-04-01 16:35:56 +08:00
2018-10-24 17:17:55 +08:00
namespace mediakit {
2017-04-01 16:35:56 +08:00
/**
* rtsp协议有多种方式传输rtp数据包4
* 1: rtp over udp ,rtp通过单独的udp端口传输
* 2: rtp over udp_multicast,rtp通过共享udp组播端口传输
* 3: rtp over tcp,rtsp信令tcp通道完成传输
* 4: rtp over httprtp over http
*
* rtp over http rtsp协议伪装成http协议以达到穿透防火墙的目的
* http请求至rtsp服务器http get请求
* http post请求
*
* http请求头中的x-sessioncookie键完成绑定
*
* http get请求用于接收rtprtcp和rtsp回复
* http post请求用于发送rtsp请求rtsp握手结束后可能会断开连接rtp发送
* http post请求中的content负载就是base64编码后的rtsp请求包
* rtsp请求伪装成http content负载发送至rtsp服务器rtsp服务器又把回复发送给第一次http get请求的tcp链接
* rtsp会话就是两次http请求
*
* zlmediakit在处理rtsp over http的请求时http poster中的content数据base64解码后转发给http getter处理
*/
//rtsp over http 情况下get请求实例在请求实例用于接收rtp数据包
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
2018-12-17 15:21:23 +08:00
2018-12-20 10:42:51 +08:00
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
2019-05-29 18:08:50 +08:00
DebugP(this);
2019-05-30 10:41:25 +08:00
GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
pSock->setSendTimeOutSecond(keep_alive_sec);
2019-05-29 18:08:50 +08:00
//起始接收buffer缓存设置为4K节省内存
pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
2017-04-01 16:35:56 +08:00
}
RtspSession::~RtspSession() {
2019-05-28 18:46:52 +08:00
DebugP(this);
2017-04-01 16:35:56 +08:00
}
void RtspSession::onError(const SockException& err) {
2019-10-23 12:00:53 +08:00
bool isPlayer = !_pushSrc;
2020-02-13 12:10:08 +08:00
uint64_t duration = _ticker.createdTime()/1000;
2019-12-29 15:38:29 +08:00
WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(")
2019-10-23 12:00:53 +08:00
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
2020-02-13 12:10:08 +08:00
<< ")断开:" << err.what()
<< ",耗时(s):" << duration;
2019-10-23 12:00:53 +08:00
2020-03-20 11:51:24 +08:00
if (_rtpType == Rtsp::RTP_MULTICAST) {
//取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
}
2017-04-01 16:35:56 +08:00
2020-03-20 11:51:24 +08:00
if (_http_x_sessioncookie.size() != 0) {
//移除http getter的弱引用记录
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter.erase(_http_x_sessioncookie);
}
//流量统计事件广播
2019-05-28 17:14:36 +08:00
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
2018-10-24 15:43:52 +08:00
if(_ui64TotalBytes > iFlowThreshold * 1024){
2020-04-23 22:04:59 +08:00
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast<SockInfo &>(*this));
}
2017-04-01 16:35:56 +08:00
}
void RtspSession::onManager() {
2019-08-28 18:20:40 +08:00
GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond);
2019-05-29 18:24:35 +08:00
GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
if (_ticker.createdTime() > handshake_sec * 1000) {
2020-03-20 11:51:24 +08:00
if (_strSession.size() == 0) {
shutdown(SockException(Err_timeout,"illegal connection"));
return;
}
}
2019-01-16 17:58:54 +08:00
2020-03-20 11:51:24 +08:00
if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) {
//如果是推流端或者rtp over udp类型的播放端那么就做超时检测
2019-05-29 18:08:50 +08:00
shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
return;
2020-03-20 11:51:24 +08:00
}
2017-04-01 16:35:56 +08:00
}
2018-12-17 15:21:23 +08:00
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
2020-03-20 11:51:24 +08:00
_ticker.resetTime();
2018-12-17 15:21:23 +08:00
_ui64TotalBytes += pBuf->size();
if (_onRecv) {
2020-03-20 11:51:24 +08:00
//http poster的请求数据转发给http getter处理
_onRecv(pBuf);
} else {
2019-05-28 18:46:52 +08:00
// TraceP(this) << pBuf->size() << "\r\n" << pBuf->data();
2020-03-20 11:51:24 +08:00
input(pBuf->data(),pBuf->size());
}
2018-12-17 15:21:23 +08:00
}
2018-09-20 18:44:32 +08:00
//字符串是否以xx结尾
static inline bool end_of(const string &str, const string &substr){
auto pos = str.rfind(substr);
return pos != string::npos && pos == str.size() - substr.size();
};
2018-12-17 15:21:23 +08:00
void RtspSession::onWholeRtspPacket(Parser &parser) {
2020-03-20 11:51:24 +08:00
string strCmd = parser.Method(); //提取出请求命令字
_iCseq = atoi(parser["CSeq"].data());
if(_strContentBase.empty() && strCmd != "GET"){
_strContentBase = parser.Url();
_mediaInfo.parse(parser.FullUrl());
_mediaInfo._schema = RTSP_SCHEMA;
}
2017-04-01 16:35:56 +08:00
2020-03-20 11:51:24 +08:00
typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
static unordered_map<string, rtsp_request_handler> s_cmd_functions;
static onceToken token( []() {
s_cmd_functions.emplace("OPTIONS",&RtspSession::handleReq_Options);
s_cmd_functions.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
s_cmd_functions.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
s_cmd_functions.emplace("RECORD",&RtspSession::handleReq_RECORD);
s_cmd_functions.emplace("SETUP",&RtspSession::handleReq_Setup);
s_cmd_functions.emplace("PLAY",&RtspSession::handleReq_Play);
s_cmd_functions.emplace("PAUSE",&RtspSession::handleReq_Pause);
s_cmd_functions.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
s_cmd_functions.emplace("GET",&RtspSession::handleReq_Get);
s_cmd_functions.emplace("POST",&RtspSession::handleReq_Post);
s_cmd_functions.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
}, []() {});
auto it = s_cmd_functions.find(strCmd);
if (it == s_cmd_functions.end()) {
2019-05-30 12:14:20 +08:00
sendRtspResponse("403 Forbidden");
2019-05-29 18:08:50 +08:00
shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
2019-05-30 12:14:20 +08:00
return;
2020-03-20 11:51:24 +08:00
}
2019-05-30 12:14:20 +08:00
auto &fun = it->second;
try {
(this->*fun)(parser);
}catch (SockException &ex){
if(ex){
shutdown(ex);
}
}catch (exception &ex){
shutdown(SockException(Err_shutdown,ex.what()));
}
parser.Clear();
2018-09-20 18:44:32 +08:00
}
2018-12-17 15:21:23 +08:00
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
2020-03-20 11:51:24 +08:00
if(!_pushSrc){
return;
}
int trackIdx = -1;
uint8_t interleaved = data[1];
if(interleaved %2 == 0){
trackIdx = getTrackIndexByInterleaved(interleaved);
2019-05-08 14:23:18 +08:00
if (trackIdx != -1) {
handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
}
2020-03-20 11:51:24 +08:00
}else{
2019-05-08 14:23:18 +08:00
trackIdx = getTrackIndexByInterleaved(interleaved - 1);
if (trackIdx != -1) {
2019-05-09 10:49:50 +08:00
onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4);
2019-05-08 14:23:18 +08:00
}
2020-03-20 11:51:24 +08:00
}
2018-09-20 18:44:32 +08:00
}
2019-05-09 10:49:50 +08:00
void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
2019-05-08 14:23:18 +08:00
}
2018-12-17 15:21:23 +08:00
int64_t RtspSession::getContentLength(Parser &parser) {
2020-03-20 11:51:24 +08:00
if(parser.Method() == "POST"){
//http post请求的content数据部分是base64编码后的rtsp请求信令包
return remainDataSize();
}
return RtspSplitter::getContentLength(parser);
2017-04-01 16:35:56 +08:00
}
2018-12-17 15:21:23 +08:00
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_Options(const Parser &parser) {
2020-03-20 11:51:24 +08:00
//支持这些命令
sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
2020-03-20 11:51:24 +08:00
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTSP_SCHEMA,
_mediaInfo._vhost,
_mediaInfo._app,
_mediaInfo._streamid,
false));
if(src){
sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
2019-05-30 12:14:20 +08:00
string err = StrPrinter << "ANNOUNCE:"
<< "Already publishing:"
<< _mediaInfo._vhost << " "
<< _mediaInfo._app << " "
<< _mediaInfo._streamid << endl;
2020-03-20 11:51:24 +08:00
throw SockException(Err_shutdown,err);
}
2018-12-14 18:13:05 +08:00
auto full_url = parser.FullUrl();
if(end_of(full_url,".sdp")){
//去除.sdp后缀防止EasyDarwin推流器强制添加.sdp后缀
2019-09-10 17:08:24 +08:00
full_url = full_url.substr(0,full_url.length() - 4);
_mediaInfo.parse(full_url);
}
2020-05-25 16:40:41 +08:00
if(_mediaInfo._app.empty() || _mediaInfo._streamid.empty()){
//推流rtsp url必须最少两级(rtsp://host/app/stream_id)不允许莫名其妙的推流url
sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, "rtsp推流url非法,最少确保两级rtsp url");
throw SockException(Err_shutdown,StrPrinter << "rtsp推流url非法:" << full_url);
}
SdpParser sdpParser(parser.Content());
_strSession = makeRandStr(12);
_aTrackInfo = sdpParser.getAvailableTrack();
2018-12-14 18:13:05 +08:00
2020-03-20 11:51:24 +08:00
_pushSrc = std::make_shared<RtspMediaSourceImp>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
2019-12-25 11:04:12 +08:00
_pushSrc->setSdp(sdpParser.toString());
2019-09-10 16:00:39 +08:00
2020-03-20 11:51:24 +08:00
sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"});
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_RECORD(const Parser &parser){
2020-03-20 11:51:24 +08:00
if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
send_SessionNotFound();
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
2020-03-20 11:51:24 +08:00
}
auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
bool authSuccess = err.empty();
if(!authSuccess){
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
return;
}
//设置转协议
_pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);
2020-03-20 11:51:24 +08:00
_StrPrinter rtp_info;
for(auto &track : _aTrackInfo){
if (track->_inited == false) {
//还有track没有setup
2019-05-29 18:08:50 +08:00
shutdown(SockException(Err_shutdown,"track not setuped"));
return;
2020-03-20 11:51:24 +08:00
}
rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ",";
}
rtp_info.pop_back();
sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
if(_rtpType == Rtsp::RTP_TCP){
//如果是rtsp推流服务器并且是TCP推流那么加大TCP接收缓存这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags();
2020-03-20 11:51:24 +08:00
}
};
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
strongSelf->async([weakSelf,onRes,err,enableRtxp,enableHls,enableMP4](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
onRes(err,enableRtxp,enableHls,enableMP4);
});
};
//rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
2020-03-20 11:51:24 +08:00
if(!flag){
//该事件无人监听,默认不鉴权
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
onRes("",toRtxp,toHls,toMP4);
}
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_Describe(const Parser &parser) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
//该请求中的认证信息
auto authorization = parser["Authorization"];
onGetRealm invoker = [weakSelf,authorization](const string &realm){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return;
}
//切换到自己的线程然后执行
strongSelf->async([weakSelf,realm,authorization](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return;
}
if(realm.empty()){
//无需认证,回复sdp
strongSelf->onAuthSuccess();
return;
}
//该流需要认证
strongSelf->onAuthUser(realm,authorization);
});
};
//广播是否需要认证事件
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,_mediaInfo,invoker,static_cast<SockInfo &>(*this))){
//无人监听此事件,说明无需认证
invoker("");
}
}
void RtspSession::onAuthSuccess() {
TraceP(this);
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2019-12-03 16:10:02 +08:00
MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
if (!rtsp_src) {
//未找到相应的MediaSource
2019-05-29 18:08:50 +08:00
string err = StrPrinter << "no such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
strongSelf->send_StreamNotFound();
2019-05-29 18:08:50 +08:00
strongSelf->shutdown(SockException(Err_shutdown,err));
return;
}
2019-08-30 11:17:27 +08:00
//找到了相应的rtsp流
strongSelf->_aTrackInfo = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
if (strongSelf->_aTrackInfo.empty()) {
//该流无效
2020-03-20 11:51:24 +08:00
DebugL << "无trackInfo该流无效";
strongSelf->send_StreamNotFound();
2019-09-20 10:36:37 +08:00
strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp"));
return;
}
strongSelf->_strSession = makeRandStr(12);
strongSelf->_pMediaSrc = rtsp_src;
for(auto &track : strongSelf->_aTrackInfo){
track->_ssrc = rtsp_src->getSsrc(track->_type);
track->_seq = rtsp_src->getSeqence(track->_type);
track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
}
2018-10-29 15:02:18 +08:00
strongSelf->sendRtspResponse("200 OK",
{"Content-Base",strongSelf->_strContentBase + "/",
"x-Accept-Retransmit","our-retransmit",
"x-Accept-Dynamic-Rate","1"
},rtsp_src->getSdp());
});
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
if (!authBasic) {
//我们需要客户端优先以md5方式认证
_strNonce = makeRandStr(32);
sendRtspResponse("401 Unauthorized",
{"WWW-Authenticate",
StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" });
}else {
//当然我们也支持base64认证,但是我们不建议这样做
sendRtspResponse("401 Unauthorized",
{"WWW-Authenticate",
StrPrinter << "Basic realm=\"" << realm << "\"" });
}
if(close){
shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << why));
2017-12-10 01:34:43 +08:00
}
}
2019-05-30 12:14:20 +08:00
void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
2017-12-10 01:34:43 +08:00
//base64认证
char user_pwd_buf[512];
av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
auto user_pwd_vec = split(user_pwd_buf,":");
2017-12-10 01:34:43 +08:00
if(user_pwd_vec.size() < 2){
//认证信息格式不合法回复401 Unauthorized
2019-05-30 12:14:20 +08:00
onAuthFailed(realm,"can not find user and passwd when basic64 auth");
2017-12-10 01:34:43 +08:00
return;
}
auto user = user_pwd_vec[0];
auto pwd = user_pwd_vec[1];
2019-05-30 12:14:20 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2017-12-10 01:34:43 +08:00
onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
2019-05-30 12:14:20 +08:00
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return;
2017-12-10 01:34:43 +08:00
}
2019-05-30 12:14:20 +08:00
//切换到自己的线程执行
strongSelf->async([weakSelf,good_pwd,pwd,realm](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return;
}
//base64忽略encrypted参数上层必须传入明文密码
if(pwd == good_pwd){
//提供的密码且匹配正确
strongSelf->onAuthSuccess();
return;
}
//密码错误
strongSelf->onAuthFailed(realm,StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd);
});
2017-12-10 01:34:43 +08:00
};
2018-02-09 15:50:21 +08:00
2017-12-10 01:34:43 +08:00
//此时必须提供明文密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,static_cast<SockInfo &>(*this))){
2017-12-10 01:34:43 +08:00
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
2019-05-30 12:14:20 +08:00
WarnP(this) << "请监听kBroadcastOnRtspAuth事件";
2017-12-10 01:34:43 +08:00
//但是我们还是忽略认证以便完成播放
//我们输入的密码是明文
invoker(false,pwd);
}
}
2019-05-30 12:14:20 +08:00
void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
2020-03-20 11:51:24 +08:00
DebugP(this) << strMd5;
2017-12-10 01:34:43 +08:00
auto mapTmp = Parser::parseArgs(strMd5,",","=");
decltype(mapTmp) map;
for(auto &pr : mapTmp){
map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
2017-12-10 01:34:43 +08:00
}
//check realm
if(realm != map["realm"]){
2019-05-30 12:14:20 +08:00
onAuthFailed(realm,StrPrinter << "realm not mached:" << realm << " != " << map["realm"]);
2017-12-10 01:34:43 +08:00
return ;
}
//check nonce
auto nonce = map["nonce"];
2019-05-30 12:14:20 +08:00
if(_strNonce != nonce){
onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce);
2017-12-10 01:34:43 +08:00
return ;
}
//check username and uri
auto username = map["username"];
auto uri = map["uri"];
auto response = map["response"];
if(username.empty() || uri.empty() || response.empty()){
2019-05-30 12:14:20 +08:00
onAuthFailed(realm,StrPrinter << "username/uri/response empty:" << username << "," << uri << "," << response);
2017-12-10 01:34:43 +08:00
return ;
}
2019-05-30 12:14:20 +08:00
auto realInvoker = [this,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
2017-12-10 01:34:43 +08:00
if(ignoreAuth){
//忽略认证
2019-05-30 12:14:20 +08:00
TraceP(this) << "auth ignored";
onAuthSuccess();
2017-12-10 01:34:43 +08:00
return;
}
/*
response计算方法如下
RTSP客户端应该使用username + password并计算response如下:
(1)password为MD5编码,
response = md5( password:nonce:md5(public_method:url) );
(2)password为ANSI字符串,
response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
*/
auto encrypted_pwd = good_pwd;
if(!encrypted){
//提供的是明文密码
encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
}
auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
2018-11-27 11:05:44 +08:00
if(strcasecmp(good_response.data(),response.data()) == 0){
2017-12-10 01:34:43 +08:00
//认证成功md5不区分大小写
2019-05-30 12:14:20 +08:00
onAuthSuccess();
2017-12-10 01:34:43 +08:00
}else{
//认证失败!
2019-05-30 12:14:20 +08:00
onAuthFailed(realm, StrPrinter << "password mismatch when md5 auth:" << good_response << " != " << response );
2017-12-10 01:34:43 +08:00
}
};
2019-05-30 12:14:20 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
onAuth invoker = [realInvoker,weakSelf](bool encrypted,const string &good_pwd){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
//切换到自己的线程确保realInvoker执行时this指针有效
strongSelf->async([realInvoker,weakSelf,encrypted,good_pwd](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
realInvoker(false,encrypted,good_pwd);
});
2017-12-10 01:34:43 +08:00
};
//此时可以提供明文或md5加密的密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,static_cast<SockInfo &>(*this))){
2017-12-10 01:34:43 +08:00
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
2019-05-30 12:14:20 +08:00
WarnP(this) << "请监听kBroadcastOnRtspAuth事件";
2017-12-10 01:34:43 +08:00
//但是我们还是忽略认证以便完成播放
realInvoker(true,true,"");
}
}
2019-05-30 12:14:20 +08:00
void RtspSession::onAuthUser(const string &realm,const string &authorization){
if(authorization.empty()){
onAuthFailed(realm,"", false);
return;
}
2017-12-10 01:34:43 +08:00
//请求中包含认证信息
auto authType = FindField(authorization.data(),NULL," ");
2020-03-20 11:51:24 +08:00
auto authStr = FindField(authorization.data()," ",NULL);
2017-12-10 01:34:43 +08:00
if(authType.empty() || authStr.empty()){
//认证信息格式不合法回复401 Unauthorized
2019-05-30 12:14:20 +08:00
onAuthFailed(realm,"can not find auth type or auth string");
2017-12-10 01:34:43 +08:00
return;
}
if(authType == "Basic"){
//base64认证需要明文密码
2019-05-30 12:14:20 +08:00
onAuthBasic(realm,authStr);
2017-12-10 01:34:43 +08:00
}else if(authType == "Digest"){
//md5认证
2019-05-30 12:14:20 +08:00
onAuthDigest(realm,authStr);
2017-12-10 01:34:43 +08:00
}else{
//其他认证方式?不支持!
2019-05-30 12:14:20 +08:00
onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
2017-12-10 01:34:43 +08:00
}
}
2017-04-01 16:35:56 +08:00
inline void RtspSession::send_StreamNotFound() {
2020-03-20 11:51:24 +08:00
sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
2017-04-01 16:35:56 +08:00
}
inline void RtspSession::send_UnsupportedTransport() {
2020-03-20 11:51:24 +08:00
sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
2017-04-01 16:35:56 +08:00
}
inline void RtspSession::send_SessionNotFound() {
2020-03-20 11:51:24 +08:00
sendRtspResponse("454 Session Not Found",{"Connection","Close"});
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_Setup(const Parser &parser) {
2017-04-01 16:35:56 +08:00
//处理setup命令该函数可能进入多次
2019-09-10 16:00:39 +08:00
auto controlSuffix = split(parser.FullUrl(),"/").back();// parser.FullUrl().substr(_strContentBase.size());
2018-12-21 17:12:26 +08:00
if(controlSuffix.front() == '/'){
2020-03-20 11:51:24 +08:00
controlSuffix = controlSuffix.substr(1);
2018-12-21 17:12:26 +08:00
}
2020-03-20 11:51:24 +08:00
int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
if (trackIdx == -1) {
//未找到相应track
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown, StrPrinter << "can not find any track by control suffix:" << controlSuffix);
}
2020-03-20 11:51:24 +08:00
SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
if (trackRef->_inited) {
//已经初始化过该Track
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown, "can not setup one track twice");
2020-03-20 11:51:24 +08:00
}
trackRef->_inited = true; //现在初始化
if(_rtpType == Rtsp::RTP_Invalid){
auto &strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){
_rtpType = Rtsp::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){
_rtpType = Rtsp::RTP_MULTICAST;
}else{
_rtpType = Rtsp::RTP_UDP;
}
}
2017-04-01 16:35:56 +08:00
//允许接收rtp、rtcp包
2020-03-20 11:51:24 +08:00
RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP);
2018-12-17 15:21:23 +08:00
2020-03-20 11:51:24 +08:00
switch (_rtpType) {
case Rtsp::RTP_TCP: {
if(_pushSrc){
//rtsp推流时interleaved由推流者决定
auto key_values = Parser::parseArgs(parser["Transport"],";","=");
int interleaved_rtp = -1 , interleaved_rtcp = -1;
if(2 == sscanf(key_values["interleaved"].data(),"%d-%d",&interleaved_rtp,&interleaved_rtcp)){
trackRef->_interleaved = interleaved_rtp;
}else{
throw SockException(Err_shutdown, "can not find interleaved when setup of rtp over tcp");
}
}else{
//rtsp播放时由于数据共享分发所以interleaved必须由服务器决定
trackRef->_interleaved = 2 * trackRef->_type;
}
2020-03-20 11:51:24 +08:00
sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
<< "interleaved=" << (int)trackRef->_interleaved << "-" << (int)trackRef->_interleaved + 1 << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc),
"x-Transport-Options" , "late-tolerance=1.400000",
"x-Dynamic-Rate" , "1"
});
}
break;
case Rtsp::RTP_UDP: {
2020-05-12 10:22:21 +08:00
std::pair<Socket::Ptr, Socket::Ptr> pr;
try{
pr = makeSockPair(_sock->getPoller(), get_local_ip());
}catch(std::exception &ex) {
2020-03-20 11:51:24 +08:00
//分配端口失败
send_NotAcceptable();
2020-05-12 10:22:21 +08:00
throw SockException(Err_shutdown, ex.what());
2019-05-30 12:14:20 +08:00
}
2020-05-12 09:26:02 +08:00
2020-05-12 10:22:21 +08:00
_apRtpSock[trackIdx] = pr.first;
_apRtcpSock[trackIdx] = pr.second;
2020-05-12 09:26:02 +08:00
2020-03-20 11:51:24 +08:00
//设置客户端内网端口信息
string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
uint16_t ui16RtpPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
2019-05-08 16:19:00 +08:00
uint16_t ui16RtcpPort = atoi( FindField(strClientPort.data(), "-" , NULL).data());
struct sockaddr_in peerAddr;
//设置rtp发送目标地址
2020-03-20 11:51:24 +08:00
peerAddr.sin_family = AF_INET;
peerAddr.sin_port = htons(ui16RtpPort);
peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
2020-05-12 10:22:21 +08:00
pr.first->setSendPeerAddr((struct sockaddr *)(&peerAddr));
2019-05-08 16:19:00 +08:00
2020-03-20 11:51:24 +08:00
//设置rtcp发送目标地址
2019-05-08 16:19:00 +08:00
peerAddr.sin_family = AF_INET;
peerAddr.sin_port = htons(ui16RtcpPort);
peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
2020-05-12 10:22:21 +08:00
pr.second->setSendPeerAddr((struct sockaddr *)(&peerAddr));
2019-05-08 16:19:00 +08:00
2020-03-20 11:51:24 +08:00
//尝试获取客户端nat映射地址
startListenPeerUdpData(trackIdx);
//InfoP(this) << "分配端口:" << srv_port;
sendRtspResponse("200 OK",
2020-05-12 10:22:21 +08:00
{"Transport", StrPrinter << "RTP/AVP/UDP;unicast;"
<< "client_port=" << strClientPort << ";"
<< "server_port=" << pr.first->get_local_port() << "-" << pr.second->get_local_port() << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc)
2020-03-20 11:51:24 +08:00
});
}
break;
case Rtsp::RTP_MULTICAST: {
if(!_multicaster){
_multicaster = RtpMultiCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
if (!_multicaster) {
send_NotAcceptable();
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown, "can not get a available udp multicast socket");
}
2020-03-20 11:51:24 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_multicaster->setDetachCB(this, [weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
});
}
int iSrvPort = _multicaster->getPort(trackRef->_type);
//我们用trackIdx区分rtp和rtcp包
//由于组播udp端口是共享的而rtcp端口为组播udp端口+1所以rtcp端口需要改成共享端口
auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
if (!pSockRtcp) {
//分配端口失败
send_NotAcceptable();
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown, "open shared rtcp socket failed");
2020-03-20 11:51:24 +08:00
}
startListenPeerUdpData(trackIdx);
2019-05-28 17:14:36 +08:00
GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL);
2020-03-20 11:51:24 +08:00
sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP;multicast;"
<< "destination=" << _multicaster->getIP() << ";"
<< "source=" << get_local_ip() << ";"
<< "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
<< "ttl=" << udpTTL << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc)
});
}
break;
default:
break;
}
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_Play(const Parser &parser) {
2020-03-20 11:51:24 +08:00
if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
send_SessionNotFound();
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when play" : "session not found when play");
}
2020-03-20 11:51:24 +08:00
auto strRange = parser["Range"];
2018-05-22 18:41:56 +08:00
auto onRes = [this,strRange](const string &err){
2018-02-06 16:17:37 +08:00
bool authSuccess = err.empty();
2018-10-26 22:48:03 +08:00
if(!authSuccess){
//第一次play是播放否则是恢复播放。只对播放鉴权
2020-03-20 11:51:24 +08:00
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
2019-05-29 18:08:50 +08:00
shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
return;
}
2017-04-01 16:35:56 +08:00
2018-10-24 15:43:52 +08:00
auto pMediaSrc = _pMediaSrc.lock();
2018-10-26 22:32:50 +08:00
if(!pMediaSrc){
2020-03-20 11:51:24 +08:00
send_StreamNotFound();
shutdown(SockException(Err_shutdown,"rtsp stream released"));
return;
}
2018-10-26 22:32:50 +08:00
bool useBuf = true;
_enableSendRtp = false;
float iStartTime = 0;
if (strRange.size() && !_bFirstPlay) {
2018-10-26 22:48:03 +08:00
//这个是seek操作
auto strStart = FindField(strRange.data(), "npt=", "-");
if (strStart == "now") {
strStart = "0";
}
iStartTime = 1000 * atof(strStart.data());
InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
useBuf = !pMediaSrc->seekTo(iStartTime);
}else if(pMediaSrc->totalReaderCount() == 0){
//第一个消费者
pMediaSrc->seekTo(0);
}
_bFirstPlay = false;
2020-03-20 11:51:24 +08:00
_StrPrinter rtp_info;
for(auto &track : _aTrackInfo){
if (track->_inited == false) {
//还有track没有setup
2019-05-29 18:08:50 +08:00
shutdown(SockException(Err_shutdown,"track not setuped"));
return;
2020-03-20 11:51:24 +08:00
}
track->_ssrc = pMediaSrc->getSsrc(track->_type);
track->_seq = pMediaSrc->getSeqence(track->_type);
track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
2018-10-26 22:32:50 +08:00
2020-03-20 11:51:24 +08:00
rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";"
<< "seq=" << track->_seq << ";"
<< "rtptime=" << (int)(track->_time_stamp * (track->_samplerate / 1000)) << ",";
}
2018-10-26 10:12:37 +08:00
2020-03-20 11:51:24 +08:00
rtp_info.pop_back();
sendRtspResponse("200 OK",
{"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useBuf? pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000),
"RTP-Info",rtp_info
});
2020-03-20 11:51:24 +08:00
_enableSendRtp = true;
setSocketFlags();
2018-10-26 22:48:03 +08:00
2020-03-20 11:51:24 +08:00
if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf);
_pRtpReader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
2019-05-29 18:08:50 +08:00
strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached"));
});
2020-04-07 13:03:53 +08:00
_pRtpReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
2020-03-20 11:51:24 +08:00
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
if(strongSelf->_enableSendRtp) {
strongSelf->sendRtpPacket(pack);
}
});
}
};
2017-04-01 16:35:56 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2018-02-06 16:17:37 +08:00
Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
2017-04-01 16:35:56 +08:00
}
2018-02-06 16:17:37 +08:00
strongSelf->async([weakSelf,onRes,err](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
2018-02-06 16:17:37 +08:00
onRes(err);
});
};
2018-10-26 22:48:03 +08:00
if(_bFirstPlay){
//第一次收到play命令需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
2018-10-26 22:48:03 +08:00
if(!flag){
//该事件无人监听,默认不鉴权
onRes("");
}
}else{
//后面是seek或恢复命令不需要鉴权
2018-02-06 16:17:37 +08:00
onRes("");
}
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_Pause(const Parser &parser) {
2020-03-20 11:51:24 +08:00
if (parser["Session"] != _strSession) {
send_SessionNotFound();
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown,"session not found when pause");
}
2020-03-20 11:51:24 +08:00
sendRtspResponse("200 OK");
_enableSendRtp = false;
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_Teardown(const Parser &parser) {
2020-03-20 11:51:24 +08:00
sendRtspResponse("200 OK");
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown,"rtsp player send teardown request");
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_Get(const Parser &parser) {
2020-03-20 11:51:24 +08:00
_http_x_sessioncookie = parser["x-sessioncookie"];
sendRtspResponse("200 OK",
{"Cache-Control","no-store",
"Pragma","no-store",
"Content-Type","application/x-rtsp-tunnelled",
},"","HTTP/1.0");
//注册http getter以便http poster绑定
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_Post(const Parser &parser) {
2020-03-20 11:51:24 +08:00
lock_guard<recursive_mutex> lock(g_mtxGetter);
string sessioncookie = parser["x-sessioncookie"];
//Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) {
2019-05-30 12:14:20 +08:00
throw SockException(Err_shutdown,"can not find http getter by x-sessioncookie");
}
2020-03-20 11:51:24 +08:00
//Poster 找到Getter的SOCK
auto httpGetterWeak = it->second;
//移除http getter的弱引用记录
g_mapGetter.erase(sessioncookie);
//http poster收到请求后转发给http getter处理
_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){
shutdown(SockException(Err_shutdown,"http getter released"));
return;
}
//切换到http getter的线程
httpGetterStrong->async([pBuf,httpGetterWeak](){
auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){
return;
}
httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size()))));
});
};
if(!parser.Content().empty()){
//http poster后面的粘包
_onRecv(std::make_shared<BufferString>(parser.Content()));
}
sendRtspResponse("200 OK",
{"Cache-Control","no-store",
"Pragma","no-store",
"Content-Type","application/x-rtsp-tunnelled",
},"","HTTP/1.0");
2017-04-01 16:35:56 +08:00
}
2019-05-30 12:14:20 +08:00
void RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
2020-03-20 11:51:24 +08:00
//TraceP(this) <<endl;
sendRtspResponse("200 OK");
2017-04-01 16:35:56 +08:00
}
inline void RtspSession::send_NotAcceptable() {
2020-03-20 11:51:24 +08:00
sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
2017-04-01 16:35:56 +08:00
}
2018-01-30 09:35:54 +08:00
2018-12-14 18:13:05 +08:00
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
2020-03-20 11:51:24 +08:00
_pushSrc->onWrite(rtppt, false);
2018-12-14 18:13:05 +08:00
}
2019-05-08 14:23:18 +08:00
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
2020-03-20 11:51:24 +08:00
//这是rtcp心跳包说明播放器还存活
_ticker.resetTime();
if(intervaled % 2 == 0){
if(_pushSrc){
//这是rtsp推流上来的rtp包
handleOneRtp(intervaled / 2,_aTrackInfo[intervaled / 2],( unsigned char *)pBuf->data(),pBuf->size());
}else if(!_udpSockConnected.count(intervaled)){
2019-06-27 12:53:35 +08:00
//这是rtsp播放器的rtp打洞包
2019-05-08 16:19:00 +08:00
_udpSockConnected.emplace(intervaled);
_apRtpSock[intervaled / 2]->setSendPeerAddr(&addr);
2020-03-20 11:51:24 +08:00
}
}else{
//rtcp包
2019-06-27 12:53:35 +08:00
if(!_udpSockConnected.count(intervaled)){
_udpSockConnected.emplace(intervaled);
2019-05-08 16:19:00 +08:00
_apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr);
}
2019-06-27 12:53:35 +08:00
onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),pBuf->size());
2019-05-08 14:23:18 +08:00
}
2017-04-01 16:35:56 +08:00
}
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
2020-03-20 11:51:24 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2019-05-08 16:24:45 +08:00
auto srcIP = inet_addr(get_peer_ip().data());
2020-03-20 11:51:24 +08:00
auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
2019-05-28 18:46:52 +08:00
if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
2020-04-23 16:14:24 +08:00
<< SockUtil::inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
2019-05-28 18:46:52 +08:00
return true;
}
2020-03-20 11:51:24 +08:00
struct sockaddr addr=*pPeerAddr;
strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr);
});
return true;
};
switch (_rtpType){
case Rtsp::RTP_MULTICAST:{
//组播使用的共享rtcp端口
UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
return onUdpData(pBuf,pPeerAddr,intervaled);
});
}
break;
case Rtsp::RTP_UDP:{
auto setEvent = [&](Socket::Ptr &sock,int intervaled){
if(!sock){
WarnP(this) << "udp端口为空:" << intervaled;
return;
}
sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
onUdpData(pBuf,pPeerAddr,intervaled);
});
};
setEvent(_apRtpSock[trackIdx], 2*trackIdx );
setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 );
}
break;
default:
break;
}
2017-04-01 16:35:56 +08:00
}
static string dateStr(){
2020-03-20 11:51:24 +08:00
char buf[64];
time_t tt = time(NULL);
strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
return buf;
}
bool RtspSession::sendRtspResponse(const string &res_code,
2020-03-20 11:51:24 +08:00
const StrCaseMap &header_const,
const string &sdp,
const char *protocol){
auto header = header_const;
header.emplace("CSeq",StrPrinter << _iCseq);
if(!_strSession.empty()){
header.emplace("Session",_strSession);
}
2020-04-04 19:55:11 +08:00
header.emplace("Server",SERVER_NAME);
2020-03-20 11:51:24 +08:00
header.emplace("Date",dateStr());
if(!sdp.empty()){
header.emplace("Content-Length",StrPrinter << sdp.size());
header.emplace("Content-Type","application/sdp");
}
_StrPrinter printer;
printer << protocol << " " << res_code << "\r\n";
for (auto &pr : header){
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n";
if(!sdp.empty()){
printer << sdp;
}
2019-05-28 18:46:52 +08:00
// DebugP(this) << printer;
2020-03-20 11:51:24 +08:00
return send(std::make_shared<BufferString>(printer)) > 0 ;
}
int RtspSession::send(const Buffer::Ptr &pkt){
2018-12-17 15:21:23 +08:00
// if(!_enableSendRtp){
2019-05-28 18:46:52 +08:00
// DebugP(this) << pkt->data();
2018-12-17 15:21:23 +08:00
// }
2020-03-20 11:51:24 +08:00
_ui64TotalBytes += pkt->size();
return TcpSession::send(pkt);
}
bool RtspSession::sendRtspResponse(const string &res_code,
2020-03-20 11:51:24 +08:00
const std::initializer_list<string> &header,
const string &sdp,
const char *protocol) {
string key;
StrCaseMap header_map;
int i = 0;
for(auto &val : header){
if(++i % 2 == 0){
header_map.emplace(key,val);
}else{
key = val;
}
}
return sendRtspResponse(res_code,header_map,sdp,protocol);
}
inline string RtspSession::printSSRC(uint32_t ui32Ssrc) {
2020-03-20 11:51:24 +08:00
char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc);
uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
for (int i = 0; i < 4; i++) {
sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
}
return tmp;
}
inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
2020-03-20 11:51:24 +08:00
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (type == _aTrackInfo[i]->_type) {
return i;
}
}
2019-11-25 17:59:04 +08:00
if(_aTrackInfo.size() == 1){
return 0;
}
2020-03-20 11:51:24 +08:00
return -1;
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
2020-03-20 11:51:24 +08:00
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
return i;
}
}
if(_aTrackInfo.size() == 1){
2018-12-21 17:12:26 +08:00
return 0;
2020-03-20 11:51:24 +08:00
}
return -1;
}
2018-12-17 15:21:23 +08:00
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
2020-03-20 11:51:24 +08:00
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (_aTrackInfo[i]->_interleaved == interleaved) {
return i;
}
}
2019-11-25 17:59:04 +08:00
if(_aTrackInfo.size() == 1){
return 0;
}
2020-03-20 11:51:24 +08:00
return -1;
2018-12-17 15:21:23 +08:00
}
2019-05-29 18:08:50 +08:00
bool RtspSession::close(MediaSource &sender,bool force) {
2019-05-31 15:40:55 +08:00
//此回调在其他线程触发
2019-12-28 16:48:11 +08:00
if(!_pushSrc || (!force && _pushSrc->totalReaderCount())){
2019-05-27 18:39:43 +08:00
return false;
}
2020-03-20 11:51:24 +08:00
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
safeShutdown(SockException(Err_shutdown,err));
return true;
}
2019-12-28 16:48:11 +08:00
int RtspSession::totalReaderCount(MediaSource &sender) {
return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount();
}
2019-05-31 15:40:55 +08:00
2020-04-07 13:03:53 +08:00
void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
2019-05-28 18:46:52 +08:00
//InfoP(this) <<(int)pkt.Interleaved;
2019-05-08 17:49:05 +08:00
switch (_rtpType) {
case Rtsp::RTP_TCP: {
2020-04-07 13:03:53 +08:00
int i = 0;
int size = pkt->size();
setSendFlushFlag(false);
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
if (++i == size) {
setSendFlushFlag(true);
}
send(rtp);
});
2019-05-08 17:49:05 +08:00
}
break;
case Rtsp::RTP_UDP: {
2020-04-07 13:03:53 +08:00
int i = 0;
int size = pkt->size();
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
int iTrackIndex = getTrackIndexByTrackType(rtp->type);
auto &pSock = _apRtpSock[iTrackIndex];
if (!pSock) {
shutdown(SockException(Err_shutdown, "udp sock not opened yet"));
return;
}
BufferRtp::Ptr buffer(new BufferRtp(rtp, 4));
_ui64TotalBytes += buffer->size();
pSock->send(buffer, nullptr, 0, ++i == size);
});
2019-05-08 17:49:05 +08:00
}
break;
default:
break;
}
2019-06-11 18:31:34 +08:00
#if RTSP_SERVER_SEND_RTCP
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
2019-05-08 17:49:05 +08:00
if(iTrackIndex == -1){
return;
}
RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
counter.pktCnt += 1;
counter.octCount += (pkt->length - pkt->offset);
auto &ticker = _aRtcpTicker[iTrackIndex];
if (ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
ticker.resetTime();
//直接保存网络字节序
memcpy(&counter.timeStamp, pkt->payload + 8 , 4);
sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex);
}
2019-06-11 18:31:34 +08:00
#endif
2019-05-08 17:49:05 +08:00
}
2019-06-26 10:01:04 +08:00
void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
2019-05-09 13:35:54 +08:00
static const char s_cname[] = "ZLMediaKitRtsp";
uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
2019-05-08 17:49:05 +08:00
uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
auto &track = _aTrackInfo[iTrackIndex];
auto &counter = _aRtcpCnt[iTrackIndex];
aui8Rtcp[0] = '$';
aui8Rtcp[1] = track->_interleaved + 1;
2019-05-09 13:35:54 +08:00
aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8;
aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;
2019-05-08 17:49:05 +08:00
pui8Rtcp_SR[0] = 0x80;
pui8Rtcp_SR[1] = 0xC8;
pui8Rtcp_SR[2] = 0x00;
pui8Rtcp_SR[3] = 0x06;
uint32_t ssrc=htonl(track->_ssrc);
memcpy(&pui8Rtcp_SR[4], &ssrc, 4);
uint64_t msw;
uint64_t lsw;
struct timeval tv;
gettimeofday(&tv, NULL);
msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);
msw = htonl(msw);
memcpy(&pui8Rtcp_SR[8], &msw, 4);
lsw = htonl(lsw);
memcpy(&pui8Rtcp_SR[12], &lsw, 4);
//直接使用网络字节序
memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4);
uint32_t pktCnt = htonl(counter.pktCnt);
memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);
uint32_t octCount = htonl(counter.octCount);
memcpy(&pui8Rtcp_SR[24], &octCount, 4);
pui8Rtcp_SDES[0] = 0x81;
pui8Rtcp_SDES[1] = 0xCA;
pui8Rtcp_SDES[2] = 0x00;
pui8Rtcp_SDES[3] = 0x06;
memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);
pui8Rtcp_SDES[8] = 0x01;
pui8Rtcp_SDES[9] = 0x0f;
2019-05-09 13:35:54 +08:00
memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;
2019-05-08 17:49:05 +08:00
if(overTcp){
send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
}else {
_apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
}
2017-04-01 16:35:56 +08:00
}
void RtspSession::setSocketFlags(){
2020-04-29 11:08:43 +08:00
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
if(mergeWriteMS > 0) {
//推流模式下关闭TCP_NODELAY会增加推流端的延时但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下开启MSG_MORE会增加延时但是能提高发送性能
2020-04-23 17:50:12 +08:00
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}
2017-04-01 16:35:56 +08:00
}
2018-10-24 17:17:55 +08:00
/* namespace mediakit */
2017-04-01 16:35:56 +08:00