ZLMediaKit/src/Rtsp/RtspSession.cpp
2018-11-14 09:52:28 +08:00

1187 lines
37 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* MIT License
*
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <atomic>
#include "Common/config.h"
#include "UDPServer.h"
#include "RtspSession.h"
#include "Util/mini.h"
#include "Util/MD5.h"
#include "Util/base64.h"
#include "Util/onceToken.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
#include "Network/sockutil.h"
using namespace std;
using namespace toolkit;
namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
string dateHeader() {
char buf[200];
time_t tt = time(NULL);
strftime(buf, sizeof buf, "Date: %a, %b %d %Y %H:%M:%S GMT\r\n", gmtime(&tt));
return buf;
}
unordered_map<string, weak_ptr<RtspSession> > RtspSession::g_mapGetter;
unordered_map<void *, std::shared_ptr<RtspSession> > RtspSession::g_mapPostter;
recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护
recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护
RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) :
TcpSession(pTh, pSock), _pSender(pSock) {
//设置10秒发送缓存
pSock->setSendBufSecond(10);
//设置15秒发送超时时间
pSock->setSendTimeOutSecond(15);
DebugL << get_peer_ip();
}
RtspSession::~RtspSession() {
if (_onDestory) {
_onDestory();
}
DebugL << get_peer_ip();
}
void RtspSession::shutdown(){
shutdown_l(true);
}
void RtspSession::shutdown_l(bool close){
if (_sock) {
_sock->emitErr(SockException(Err_other, "self shutdown"),close);
}
if (_bBase64need && !_sock) {
//quickTime http postter,and self is detached from tcpServer
lock_guard<recursive_mutex> lock(g_mtxPostter);
g_mapPostter.erase(this);
}
if (_pBrdcaster) {
_pBrdcaster->setDetachCB(this, nullptr);
_pBrdcaster.reset();
}
if (_pRtpReader) {
_pRtpReader.reset();
}
}
void RtspSession::onError(const SockException& err) {
TraceL << err.getErrCode() << " " << err.what();
if (_bListenPeerUdpData) {
//取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
_bListenPeerUdpData = false;
}
if (!_bBase64need && _strSessionCookie.size() != 0) {
//quickTime http getter
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter.erase(_strSessionCookie);
}
if (_bBase64need && err.getErrCode() == Err_eof) {
//quickTime http postter,正在发送rtp; QuickTime只是断开了请求连接,请继续发送rtp
_sock = nullptr;
lock_guard<recursive_mutex> lock(g_mtxPostter);
//为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用
try {
g_mapPostter.emplace(this, dynamic_pointer_cast<RtspSession>(shared_from_this()));
}catch (std::exception &ex){
}
TraceL << "quickTime will not send request any more!";
}
//流量统计事件广播
GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
_mediaInfo,
_ui64TotalBytes,
_ticker.createdTime()/1000,
*this);
}
}
void RtspSession::onManager() {
if (_ticker.createdTime() > 15 * 1000) {
if (_strSession.size() == 0) {
WarnL << "非法链接:" << get_peer_ip();
shutdown();
return;
}
}
if (_rtpType != PlayerBase::RTP_TCP && _ticker.elapsedTime() > 15 * 1000) {
WarnL << "RTSP会话超时:" << get_peer_ip();
shutdown();
return;
}
if(_delayTask){
if(time(NULL) > _iTaskTimeLine){
_delayTask();
_delayTask = nullptr;
}
}
}
int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
char tmp[2 * 1024];
_pcBuf = tmp;
_parser.Parse(header); //rtsp请求解析
string strCmd = _parser.Method(); //提取出请求命令字
_iCseq = atoi(_parser["CSeq"].data());
typedef bool (RtspSession::*rtspCMDHandle)();
static unordered_map<string, rtspCMDHandle> g_mapCmd;
static onceToken token( []() {
g_mapCmd.emplace("OPTIONS",&RtspSession::handleReq_Options);
g_mapCmd.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
g_mapCmd.emplace("SETUP",&RtspSession::handleReq_Setup);
g_mapCmd.emplace("PLAY",&RtspSession::handleReq_Play);
g_mapCmd.emplace("PAUSE",&RtspSession::handleReq_Pause);
g_mapCmd.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
g_mapCmd.emplace("GET",&RtspSession::handleReq_Get);
g_mapCmd.emplace("POST",&RtspSession::handleReq_Post);
g_mapCmd.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
g_mapCmd.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
}, []() {});
auto it = g_mapCmd.find(strCmd);
if (it != g_mapCmd.end()) {
auto fun = it->second;
if(!(this->*fun)()){
shutdown();
}
} else{
shutdown();
WarnL << "cmd=" << strCmd;
}
_parser.Clear();
return 0;
}
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
_ticker.resetTime();
_ui64TotalBytes += pBuf->size();
if (_bBase64need) {
//quicktime 加密后的rtsp请求需要解密
auto str = decodeBase64(string(pBuf->data(),pBuf->size()));
inputRtspOrRtcp(str.data(),str.size());
} else {
inputRtspOrRtcp(pBuf->data(),pBuf->size());
}
}
void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
if(data[0] == '$' && _rtpType == PlayerBase::RTP_TCP){
//这是rtcp
return;
}
input(data,len);
}
bool RtspSession::handleReq_Options() {
//支持这些命令
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY,"
" PAUSE, SET_PARAMETER, GET_PARAMETER\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
return true;
}
bool RtspSession::handleReq_Describe() {
{
//解析url获取媒体名称
_strUrl = _parser.Url();
_mediaInfo.parse(_parser.FullUrl());
}
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());\
auto parserCopy = _parser;
findStream([weakSelf,parserCopy](bool success){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
//恢复现场
strongSelf->_parser = parserCopy;
char tmp[2 * 1024];
strongSelf->_pcBuf = tmp;
if(!success){
//未找到相应的MediaSource
WarnL << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
strongSelf->send_StreamNotFound();
strongSelf->shutdown();
return;
}
//该请求中的认证信息
auto authorization = strongSelf->_parser["Authorization"];
onGetRealm invoker = [weakSelf,authorization](const string &realm){
if(realm.empty()){
//无需认证,回复sdp
onAuthSuccess(weakSelf);
return;
}
//该流需要认证
onAuthUser(weakSelf,realm,authorization);
};
//广播是否需要认证事件
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
strongSelf->_mediaInfo,
invoker,
*strongSelf)){
//无人监听此事件,说明无需认证
invoker("");
}
});
return true;
}
void RtspSession::onAuthSuccess(const weak_ptr<RtspSession> &weakSelf) {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已销毁
return;
}
strongSelf->async([weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已销毁
return;
}
char response[2 * 1024];
int n = sprintf(response,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"x-Accept-Retransmit: our-retransmit\r\n"
"x-Accept-Dynamic-Rate: 1\r\n"
"Content-Base: %s/\r\n"
"Content-Type: application/sdp\r\n"
"Content-Length: %d\r\n\r\n%s",
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), strongSelf->_strUrl.data(),
(int) strongSelf->_strSdp.length(), strongSelf->_strSdp.data());
strongSelf->SocketHelper::send(response, n);
});
}
void RtspSession::onAuthFailed(const weak_ptr<RtspSession> &weakSelf,const string &realm) {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已销毁
return;
}
strongSelf->async([weakSelf,realm]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
//本对象已销毁
return;
}
int n;
char response[2 * 1024];
GET_CONFIG_AND_REGISTER(bool,authBasic,Rtsp::kAuthBasic);
if (!authBasic) {
//我们需要客户端优先以md5方式认证
strongSelf->_strNonce = makeRandStr(32);
n = sprintf(response,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"WWW-Authenticate: Digest realm=\"%s\",nonce=\"%s\"\r\n\r\n",
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), realm.data(), strongSelf->_strNonce.data());
}else {
//当然我们也支持base64认证,但是我们不建议这样做
n = sprintf(response,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"WWW-Authenticate: Basic realm=\"%s\"\r\n\r\n",
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), realm.data());
}
strongSelf->SocketHelper::send(response, n);
});
}
void RtspSession::onAuthBasic(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &strBase64){
//base64认证
char user_pwd_buf[512];
av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
auto user_pwd_vec = split(user_pwd_buf,":");
if(user_pwd_vec.size() < 2){
//认证信息格式不合法回复401 Unauthorized
onAuthFailed(weakSelf,realm);
return;
}
auto user = user_pwd_vec[0];
auto pwd = user_pwd_vec[1];
onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
if(!encrypted && pwd == good_pwd){
//提供的是明文密码且匹配正确
onAuthSuccess(weakSelf);
}else{
//密码错误
onAuthFailed(weakSelf,realm);
}
};
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已销毁
return;
}
//此时必须提供明文密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,user, true,invoker,*strongSelf)){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
//我们输入的密码是明文
invoker(false,pwd);
}
}
void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &strMd5){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
DebugL << strMd5;
auto mapTmp = Parser::parseArgs(strMd5,",","=");
decltype(mapTmp) map;
for(auto &pr : mapTmp){
map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
}
//check realm
if(realm != map["realm"]){
TraceL << "realm not mached:" << realm << "," << map["realm"];
onAuthFailed(weakSelf,realm);
return ;
}
//check nonce
auto nonce = map["nonce"];
if(strongSelf->_strNonce != nonce){
TraceL << "nonce not mached:" << nonce << "," << strongSelf->_strNonce;
onAuthFailed(weakSelf,realm);
return ;
}
//check username and uri
auto username = map["username"];
auto uri = map["uri"];
auto response = map["response"];
if(username.empty() || uri.empty() || response.empty()){
TraceL << "username/uri/response empty:" << username << "," << uri << "," << response;
onAuthFailed(weakSelf,realm);
return ;
}
auto realInvoker = [weakSelf,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
if(ignoreAuth){
//忽略认证
onAuthSuccess(weakSelf);
TraceL << "auth ignored";
return;
}
/*
response计算方法如下
RTSP客户端应该使用username + password并计算response如下:
(1)当password为MD5编码,则
response = md5( password:nonce:md5(public_method:url) );
(2)当password为ANSI字符串,则
response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
*/
auto encrypted_pwd = good_pwd;
if(!encrypted){
//提供的是明文密码
encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
}
auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
if(strcasecmp(good_response.data(),response.data()) == 0){
//认证成功md5不区分大小写
onAuthSuccess(weakSelf);
TraceL << "onAuthSuccess";
}else{
//认证失败!
onAuthFailed(weakSelf,realm);
TraceL << "onAuthFailed";
}
};
onAuth invoker = [realInvoker](bool encrypted,const string &good_pwd){
realInvoker(false,encrypted,good_pwd);
};
//此时可以提供明文或md5加密的密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,username, false,invoker,*strongSelf)){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
realInvoker(true,true,"");
}
}
void RtspSession::onAuthUser(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &authorization){
//请求中包含认证信息
auto authType = FindField(authorization.data(),NULL," ");
auto authStr = FindField(authorization.data()," ",NULL);
if(authType.empty() || authStr.empty()){
//认证信息格式不合法回复401 Unauthorized
onAuthFailed(weakSelf,realm);
return;
}
if(authType == "Basic"){
//base64认证需要明文密码
onAuthBasic(weakSelf,realm,authStr);
}else if(authType == "Digest"){
//md5认证
onAuthDigest(weakSelf,realm,authStr);
}else{
//其他认证方式?不支持!
onAuthFailed(weakSelf,realm);
}
}
inline void RtspSession::send_StreamNotFound() {
int n = sprintf(_pcBuf,
"RTSP/1.0 404 Stream Not Found\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
}
inline void RtspSession::send_UnsupportedTransport() {
int n = sprintf(_pcBuf,
"RTSP/1.0 461 Unsupported Transport\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
}
inline void RtspSession::send_SessionNotFound() {
int n = sprintf(_pcBuf,
"RTSP/1.0 454 Session Not Found\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
/*40 Method Not Allowed*/
}
bool RtspSession::handleReq_Setup() {
//处理setup命令该函数可能进入多次
auto controlSuffix = _parser.FullUrl().substr(1 + _parser.FullUrl().rfind('/'));
int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
if (trackIdx == -1) {
//未找到相应track
return false;
}
SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
if (trackRef->_inited) {
//已经初始化过该Track
return false;
}
trackRef->_inited = true; //现在初始化
if(!_bSetUped){
_bSetUped = true;
auto strTransport = _parser["Transport"];
if(strTransport.find("TCP") != string::npos){
_rtpType = PlayerBase::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){
_rtpType = PlayerBase::RTP_MULTICAST;
}else{
_rtpType = PlayerBase::RTP_UDP;
}
}
switch (_rtpType) {
case PlayerBase::RTP_TCP: {
int iLen = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP/TCP;unicast;"
"interleaved=%d-%d;ssrc=%s;mode=play\r\n"
"Session: %s\r\n"
"x-Transport-Options: late-tolerance=1.400000\r\n"
"x-Dynamic-Rate: 1\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), trackRef->_type * 2,
trackRef->_type * 2 + 1,
printSSRC(trackRef->_ssrc).data(),
_strSession.data());
SocketHelper::send(_pcBuf, iLen);
}
break;
case PlayerBase::RTP_UDP: {
//我们用trackIdx区分rtp和rtcp包
auto pSockRtp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx);
if (!pSockRtp) {
//分配端口失败
WarnL << "分配rtp端口失败";
send_NotAcceptable();
return false;
}
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1);
if (!pSockRtcp) {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return false;
}
_apUdpSock[trackIdx] = pSockRtp;
//设置客户端内网端口信息
string strClientPort = FindField(_parser["Transport"].data(), "client_port=", NULL);
uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
struct sockaddr_in peerAddr;
peerAddr.sin_family = AF_INET;
peerAddr.sin_port = htons(ui16PeerPort);
peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr)));
//尝试获取客户端nat映射地址
startListenPeerUdpData();
//InfoL << "分配端口:" << srv_port;
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP/UDP;unicast;"
"client_port=%s;server_port=%d-%d;ssrc=%s;mode=play\r\n"
"Session: %s\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), strClientPort.data(),
pSockRtp->get_local_port(), pSockRtcp->get_local_port(),
printSSRC(trackRef->_ssrc).data(),
_strSession.data());
SocketHelper::send(_pcBuf, n);
}
break;
case PlayerBase::RTP_MULTICAST: {
if(!_pBrdcaster){
_pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
if (!_pBrdcaster) {
send_NotAcceptable();
return false;
}
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pBrdcaster->setDetachCB(this, [weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->safeShutdown();
});
}
int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
//我们用trackIdx区分rtp和rtcp包
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
if (!pSockRtcp) {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return false;
}
startListenPeerUdpData();
GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL);
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP;multicast;destination=%s;"
"source=%s;port=%d-%d;ttl=%d;ssrc=%s\r\n"
"Session: %s\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _pBrdcaster->getIP().data(),
get_local_ip().data(), iSrvPort, pSockRtcp->get_local_port(),
udpTTL, printSSRC(trackRef->_ssrc).data(),
_strSession.data());
SocketHelper::send(_pcBuf, n);
}
break;
default:
break;
}
return true;
}
bool RtspSession::handleReq_Play() {
if (_aTrackInfo.empty() || _parser["Session"] != _strSession) {
send_SessionNotFound();
return false;
}
auto strRange = _parser["Range"];
auto onRes = [this,strRange](const string &err){
bool authSuccess = err.empty();
char response[2 * 1024];
_pcBuf = response;
if(!authSuccess){
//第一次play是播放否则是恢复播放。只对播放鉴权
int n = sprintf(_pcBuf,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Content-Type: text/plain\r\n"
"Content-Length: %d\r\n\r\n%s",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(),(int)err.size(),err.data());
SocketHelper::send(_pcBuf,n);
shutdown();
return;
}
auto pMediaSrc = _pMediaSrc.lock();
if(!pMediaSrc){
send_StreamNotFound();
shutdown();
return;
}
bool useBuf = true;
_enableSendRtp = false;
if (strRange.size() && !_bFirstPlay) {
//这个是seek操作
auto strStart = FindField(strRange.data(), "npt=", "-");
if (strStart == "now") {
strStart = "0";
}
auto iStartTime = 1000 * atof(strStart.data());
InfoL << "rtsp seekTo(ms):" << iStartTime;
useBuf = !pMediaSrc->seekTo(iStartTime);
}else if(pMediaSrc->getRing()->readerCount() == 0){
//第一个消费者
pMediaSrc->seekTo(0);
}
_bFirstPlay = false;
int iLen = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n"
"Range: npt=%.2f-\r\n"
"RTP-Info: ", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data(), pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0);
for(auto &track : _aTrackInfo){
if (track->_inited == false) {
//还有track没有setup
shutdown();
return;
}
track->_ssrc = pMediaSrc->getSsrc(track->_type);
track->_seq = pMediaSrc->getSeqence(track->_type);
track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
iLen += sprintf(_pcBuf + iLen, "url=%s/%s;seq=%d;rtptime=%u,",
_strUrl.data(),
track->_control_surffix.data(),
track->_seq,
track->_time_stamp * (track->_samplerate / 1000));
}
iLen -= 1;
(_pcBuf)[iLen] = '\0';
iLen += sprintf(_pcBuf + iLen, "\r\n\r\n");
SocketHelper::send(_pcBuf, iLen);
_enableSendRtp = true;
//提高发送性能
(*this) << SocketFlags(kSockFlags);
SockUtil::setNoDelay(_pSender->rawFD(),false);
if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pRtpReader = pMediaSrc->getRing()->attach(useBuf);
_pRtpReader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->safeShutdown();
});
_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
if(!strongSelf->_enableSendRtp) {
return;
}
strongSelf->async([weakSelf,pack](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
if(strongSelf->_enableSendRtp) {
strongSelf->sendRtpPacket(pack);
}
});
});
}
};
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
strongSelf->async([weakSelf,onRes,err](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
onRes(err);
});
};
if(_bFirstPlay){
//第一次收到play命令需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
if(!flag){
//该事件无人监听,默认不鉴权
onRes("");
}
}else{
//后面是seek或恢复命令不需要鉴权
onRes("");
}
return true;
}
bool RtspSession::handleReq_Pause() {
if (_parser["Session"] != _strSession) {
send_SessionNotFound();
return false;
}
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(_pcBuf, n);
_enableSendRtp = false;
return true;
}
bool RtspSession::handleReq_Teardown() {
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(_pcBuf, n);
TraceL << "播放器断开连接!";
return false;
}
bool RtspSession::handleReq_Get() {
_strSessionCookie = _parser["x-sessioncookie"];
int n = sprintf(_pcBuf,
"HTTP/1.0 200 OK\r\n"
"%s"
"Connection: close\r\n"
"Cache-Control: no-store\r\n"
"Pragma: no-cache\r\n"
"Content-Type: application/x-rtsp-tunnelled\r\n\r\n",
dateHeader().data());
//注册GET
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter[_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
//InfoL << _strSessionCookie;
SocketHelper::send(_pcBuf, n);
return true;
}
bool RtspSession::handleReq_Post() {
lock_guard<recursive_mutex> lock(g_mtxGetter);
string sessioncookie = _parser["x-sessioncookie"];
//Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) {
//WarnL << sessioncookie;
return false;
}
_bBase64need = true;
//Poster 找到Getter的SOCK
auto strongSession = it->second.lock();
g_mapGetter.erase(sessioncookie);
if (!strongSession) {
send_SessionNotFound();
//WarnL;
return false;
}
initSender(strongSession);
return true;
}
bool RtspSession::handleReq_SET_PARAMETER() {
//TraceL<<endl;
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(_pcBuf, n);
return true;
}
inline void RtspSession::send_NotAcceptable() {
int n = sprintf(_pcBuf,
"RTSP/1.0 406 Not Acceptable\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
}
void RtspSession::doDelay(int delaySec, const std::function<void()> &fun) {
if(_delayTask){
_delayTask();
}
_delayTask = fun;
_iTaskTimeLine = time(NULL) + delaySec;
}
void RtspSession::cancelDelyaTask(){
_delayTask = nullptr;
}
void RtspSession::findStream(const function<void(bool)> &cb) {
bool success = findStream();
if (success) {
cb(true);
return;
}
//广播未找到流
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,_mediaInfo,*this);
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto task_id = this;
auto media_info = _mediaInfo;
auto onRegist = [task_id, weakSelf, media_info, cb](BroadcastMediaChangedArgs) {
if (bRegist &&
schema == media_info._schema &&
vhost == media_info._vhost &&
app == media_info._app &&
stream == media_info._streamid) {
//播发器请求的rtsp流终于注册上了
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
//切换到自己的线程再回复
//如果触发 kBroadcastMediaChanged 事件的线程与本RtspSession绑定的线程相同,
//那么strongSelf->async操作可能是同步操作,
//通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行,
//以便防止遍历事件监听对象map时做删除操作
strongSelf->async([task_id, weakSelf, media_info, cb]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
DebugL << "收到rtsp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/"
<< media_info._app << "/" << media_info._streamid;
cb(strongSelf->findStream());
//取消延时任务,防止多次回复
strongSelf->cancelDelyaTask();
//取消事件监听
//在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃
NoticeCenter::Instance().delListener(task_id, Broadcast::kBroadcastMediaChanged);
}, false);
}
};
NoticeCenter::Instance().addListener(task_id, Broadcast::kBroadcastMediaChanged, onRegist);
//5秒后执行失败回调
doDelay(5, [cb,task_id]() {
NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged);
cb(false);
});
}
inline bool RtspSession::findStream() {
RtspMediaSource::Ptr pMediaSrc =
dynamic_pointer_cast<RtspMediaSource>( MediaSource::find(RTSP_SCHEMA,_mediaInfo._vhost, _mediaInfo._app,_mediaInfo._streamid) );
if (!pMediaSrc) {
return false;
}
_strSdp = pMediaSrc->getSdp();
_sdpAttr.load(_strSdp);
_aTrackInfo = _sdpAttr.getAvailableTrack();
if (_aTrackInfo.empty()) {
return false;
}
_strSession = makeRandStr(12);
_pMediaSrc = pMediaSrc;
for(auto &track : _aTrackInfo){
track->_ssrc = pMediaSrc->getSsrc(track->_type);
track->_seq = pMediaSrc->getSeqence(track->_type);
track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
}
return true;
}
inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
//InfoL<<(int)pkt.Interleaved;
switch (_rtpType) {
case PlayerBase::RTP_TCP: {
BufferRtp::Ptr buffer(new BufferRtp(pkt));
send(buffer);
#ifdef RTSP_SEND_RTCP
int iTrackIndex = getTrackIndexByTrackId(pkt.interleaved / 2);
RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
counter.pktCnt += 1;
counter.octCount += (pkt.length - 12);
auto &_ticker = _aRtcpTicker[iTrackIndex];
if (_ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
_ticker.resetTime();
counter.timeStamp = pkt.timeStamp;
sendRTCP();
}
#endif
}
break;
case PlayerBase::RTP_UDP: {
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
auto pSock = _apUdpSock[iTrackIndex].lock();
if (!pSock) {
shutdown();
return;
}
auto peerAddr = _apPeerUdpAddr[iTrackIndex];
if (!peerAddr) {
return;
}
BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
_ui64TotalBytes += buffer->size();
pSock->send(buffer,kSockFlags, peerAddr.get());
}
break;
default:
break;
}
}
inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
if(iTrackIdx % 2 == 0){
//这是rtp探测包
if(!_bGotAllPeerUdp){
//还没有获取完整的rtp探测包
if(SockUtil::in_same_lan(get_local_ip().data(),get_peer_ip().data())){
//在内网中客户端上报的端口号是真实的所以我们忽略udp打洞包
_bGotAllPeerUdp = true;
return;
}
//设置真实的客户端nat映射端口号
_apPeerUdpAddr[iTrackIdx / 2].reset(new struct sockaddr(addr));
_abGotPeerUdp[iTrackIdx / 2] = true;
_bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (!_abGotPeerUdp[i]) {
//还有track没获取到rtp探测包
_bGotAllPeerUdp = false;
break;
}
}
}
}else{
//这是rtcp心跳包说明播放器还存活
_ticker.resetTime();
//TraceL << "rtcp:" << (iTrackIdx-1)/2 ;
}
}
inline void RtspSession::startListenPeerUdpData() {
_bListenPeerUdpData = true;
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
UDPServer::Instance().listenPeer(get_peer_ip().data(), this,
[weakSelf](int iTrackIdx,const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
struct sockaddr addr=*pPeerAddr;
strongSelf->async_first([weakSelf,pBuf,addr,iTrackIdx]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr);
});
return true;
});
}
inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session) {
_pSender = session->_sock;
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
session->_onDestory = [weakSelf]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
//DebugL;
strongSelf->_pSender->setOnErr([weakSelf](const SockException &err) {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->safeShutdown();
});
};
session->shutdown_l(false);
}
#ifdef RTSP_SEND_RTCP
inline void RtspSession::sendRTCP() {
//DebugL;
uint8_t aui8Rtcp[60] = {0};
uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
for (uint8_t i = 0; i < _uiTrackCnt; i++) {
auto &track = _aTrackInfo[i];
auto &counter = _aRtcpCnt[i];
aui8Rtcp[0] = '$';
aui8Rtcp[1] = track.trackId * 2 + 1;
aui8Rtcp[2] = 56 / 256;
aui8Rtcp[3] = 56 % 256;
pui8Rtcp_SR[0] = 0x80;
pui8Rtcp_SR[1] = 0xC8;
pui8Rtcp_SR[2] = 0x00;
pui8Rtcp_SR[3] = 0x06;
uint32_t ssrc=htonl(track.ssrc);
memcpy(&pui8Rtcp_SR[4], &ssrc, 4);
uint64_t msw;
uint64_t lsw;
struct timeval tv;
gettimeofday(&tv, NULL);
msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);
msw = htonl(msw);
memcpy(&pui8Rtcp_SR[8], &msw, 4);
lsw = htonl(lsw);
memcpy(&pui8Rtcp_SR[12], &lsw, 4);
uint32_t rtpStamp = htonl(counter.timeStamp);
memcpy(&pui8Rtcp_SR[16], &rtpStamp, 4);
uint32_t pktCnt = htonl(counter.pktCnt);
memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);
uint32_t octCount = htonl(counter.octCount);
memcpy(&pui8Rtcp_SR[24], &octCount, 4);
pui8Rtcp_SDES[0] = 0x81;
pui8Rtcp_SDES[1] = 0xCA;
pui8Rtcp_SDES[2] = 0x00;
pui8Rtcp_SDES[3] = 0x06;
memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);
pui8Rtcp_SDES[8] = 0x01;
pui8Rtcp_SDES[9] = 0x0f;
memcpy(&pui8Rtcp_SDES[10], "_ZL_RtspServer_", 15);
pui8Rtcp_SDES[25] = 0x00;
send((char *) aui8Rtcp, 60);
}
}
#endif
}
/* namespace mediakit */