ZLMediaKit/src/Rtsp/RtspSession.cpp

1188 lines
37 KiB
C++
Raw Normal View History

2018-09-07 16:28:58 +08:00
/*
2017-09-27 16:20:30 +08:00
* MIT License
2017-04-01 16:35:56 +08:00
*
2017-09-27 16:20:30 +08:00
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
2017-04-01 16:35:56 +08:00
*/
2017-08-09 18:39:30 +08:00
2017-04-25 11:35:41 +08:00
#include <atomic>
2017-05-02 17:15:12 +08:00
#include "Common/config.h"
2017-04-25 11:35:41 +08:00
#include "UDPServer.h"
2017-04-01 16:35:56 +08:00
#include "RtspSession.h"
2017-04-25 11:35:41 +08:00
#include "Util/mini.h"
2017-12-10 01:34:43 +08:00
#include "Util/MD5.h"
2018-09-20 15:43:49 +08:00
#include "Util/base64.h"
2017-04-25 11:35:41 +08:00
#include "Util/onceToken.h"
2017-04-01 16:35:56 +08:00
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
2017-04-25 11:35:41 +08:00
#include "Network/sockutil.h"
2017-04-01 16:35:56 +08:00
2018-10-24 17:17:55 +08:00
using namespace std;
using namespace toolkit;
2017-04-01 16:35:56 +08:00
2018-10-24 17:17:55 +08:00
namespace mediakit {
2017-04-01 16:35:56 +08:00
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
2017-04-01 16:35:56 +08:00
string dateHeader() {
char buf[200];
time_t tt = time(NULL);
strftime(buf, sizeof buf, "Date: %a, %b %d %Y %H:%M:%S GMT\r\n", gmtime(&tt));
return buf;
}
unordered_map<string, weak_ptr<RtspSession> > RtspSession::g_mapGetter;
unordered_map<void *, std::shared_ptr<RtspSession> > RtspSession::g_mapPostter;
recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护
recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护
RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) :
2018-10-24 15:43:52 +08:00
TcpSession(pTh, pSock), _pSender(pSock) {
//设置10秒发送缓存
pSock->setSendBufSecond(10);
//设置15秒发送超时时间
pSock->setSendTimeOutSecond(15);
2017-04-01 16:35:56 +08:00
2018-02-23 15:36:51 +08:00
DebugL << get_peer_ip();
2017-04-01 16:35:56 +08:00
}
RtspSession::~RtspSession() {
2018-10-24 15:43:52 +08:00
if (_onDestory) {
_onDestory();
2017-04-01 16:35:56 +08:00
}
2018-02-23 15:36:51 +08:00
DebugL << get_peer_ip();
2017-04-01 16:35:56 +08:00
}
void RtspSession::shutdown(){
2018-09-14 18:04:41 +08:00
shutdown_l(true);
}
void RtspSession::shutdown_l(bool close){
2018-02-23 15:36:51 +08:00
if (_sock) {
2018-09-14 18:04:41 +08:00
_sock->emitErr(SockException(Err_other, "self shutdown"),close);
2017-04-01 16:35:56 +08:00
}
2018-10-24 15:43:52 +08:00
if (_bBase64need && !_sock) {
2017-04-01 16:35:56 +08:00
//quickTime http postter,and self is detached from tcpServer
lock_guard<recursive_mutex> lock(g_mtxPostter);
g_mapPostter.erase(this);
}
2018-10-24 15:43:52 +08:00
if (_pBrdcaster) {
_pBrdcaster->setDetachCB(this, nullptr);
_pBrdcaster.reset();
2017-04-01 16:35:56 +08:00
}
2018-10-24 15:43:52 +08:00
if (_pRtpReader) {
_pRtpReader.reset();
2017-04-01 16:35:56 +08:00
}
}
void RtspSession::onError(const SockException& err) {
TraceL << err.getErrCode() << " " << err.what();
2018-10-24 15:43:52 +08:00
if (_bListenPeerUdpData) {
2017-11-08 16:56:02 +08:00
//取消UDP端口监听
2018-02-23 15:36:51 +08:00
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
2018-10-24 15:43:52 +08:00
_bListenPeerUdpData = false;
2017-04-01 16:35:56 +08:00
}
2018-10-24 15:43:52 +08:00
if (!_bBase64need && _strSessionCookie.size() != 0) {
2017-04-01 16:35:56 +08:00
//quickTime http getter
lock_guard<recursive_mutex> lock(g_mtxGetter);
2018-10-24 15:43:52 +08:00
g_mapGetter.erase(_strSessionCookie);
2017-04-01 16:35:56 +08:00
}
2018-10-24 15:43:52 +08:00
if (_bBase64need && err.getErrCode() == Err_eof) {
2017-04-01 16:35:56 +08:00
//quickTime http postter,正在发送rtp; QuickTime只是断开了请求连接,请继续发送rtp
2018-02-23 15:36:51 +08:00
_sock = nullptr;
2017-04-01 16:35:56 +08:00
lock_guard<recursive_mutex> lock(g_mtxPostter);
//为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用
2018-09-14 18:04:41 +08:00
try {
g_mapPostter.emplace(this, dynamic_pointer_cast<RtspSession>(shared_from_this()));
}catch (std::exception &ex){
}
2017-08-09 18:39:30 +08:00
TraceL << "quickTime will not send request any more!";
2017-04-01 16:35:56 +08:00
}
//流量统计事件广播
2018-02-09 11:42:55 +08:00
GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold);
2018-10-24 15:43:52 +08:00
if(_ui64TotalBytes > iFlowThreshold * 1024){
2018-10-09 09:36:03 +08:00
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
2018-10-24 15:43:52 +08:00
_mediaInfo,
_ui64TotalBytes,
_ticker.createdTime()/1000,
2018-10-09 09:36:03 +08:00
*this);
}
2017-04-01 16:35:56 +08:00
}
void RtspSession::onManager() {
2018-10-24 15:43:52 +08:00
if (_ticker.createdTime() > 15 * 1000) {
if (_strSession.size() == 0) {
2018-02-23 15:36:51 +08:00
WarnL << "非法链接:" << get_peer_ip();
2017-04-01 16:35:56 +08:00
shutdown();
return;
}
}
2018-10-24 15:43:52 +08:00
if (_rtpType != PlayerBase::RTP_TCP && _ticker.elapsedTime() > 15 * 1000) {
2018-02-23 15:36:51 +08:00
WarnL << "RTSP会话超时:" << get_peer_ip();
2017-04-01 16:35:56 +08:00
shutdown();
return;
}
if(_delayTask){
if(time(NULL) > _iTaskTimeLine){
_delayTask();
_delayTask = nullptr;
}
}
2017-04-01 16:35:56 +08:00
}
2018-09-20 18:44:32 +08:00
2018-09-23 00:55:00 +08:00
int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
char tmp[2 * 1024];
2018-10-24 15:43:52 +08:00
_pcBuf = tmp;
2018-10-24 15:43:52 +08:00
_parser.Parse(header); //rtsp请求解析
string strCmd = _parser.Method(); //提取出请求命令字
_iCseq = atoi(_parser["CSeq"].data());
2017-04-01 16:35:56 +08:00
typedef bool (RtspSession::*rtspCMDHandle)();
static unordered_map<string, rtspCMDHandle> g_mapCmd;
static onceToken token( []() {
g_mapCmd.emplace("OPTIONS",&RtspSession::handleReq_Options);
g_mapCmd.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
g_mapCmd.emplace("SETUP",&RtspSession::handleReq_Setup);
g_mapCmd.emplace("PLAY",&RtspSession::handleReq_Play);
g_mapCmd.emplace("PAUSE",&RtspSession::handleReq_Pause);
g_mapCmd.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
g_mapCmd.emplace("GET",&RtspSession::handleReq_Get);
g_mapCmd.emplace("POST",&RtspSession::handleReq_Post);
g_mapCmd.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
g_mapCmd.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
}, []() {});
2017-04-01 16:35:56 +08:00
auto it = g_mapCmd.find(strCmd);
if (it != g_mapCmd.end()) {
auto fun = it->second;
2018-09-20 18:44:32 +08:00
if(!(this->*fun)()){
shutdown();
}
} else{
2017-04-01 16:35:56 +08:00
shutdown();
WarnL << "cmd=" << strCmd;
}
2018-09-20 18:44:32 +08:00
2018-10-24 15:43:52 +08:00
_parser.Clear();
2018-09-20 18:44:32 +08:00
return 0;
}
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
2018-10-24 15:43:52 +08:00
_ticker.resetTime();
_ui64TotalBytes += pBuf->size();
if (_bBase64need) {
2018-09-20 18:44:32 +08:00
//quicktime 加密后的rtsp请求需要解密
2018-09-23 00:55:00 +08:00
auto str = decodeBase64(string(pBuf->data(),pBuf->size()));
inputRtspOrRtcp(str.data(),str.size());
2018-09-20 18:44:32 +08:00
} else {
2018-09-23 00:55:00 +08:00
inputRtspOrRtcp(pBuf->data(),pBuf->size());
2018-09-20 18:44:32 +08:00
}
}
2018-09-23 00:55:00 +08:00
void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
2018-10-24 15:43:52 +08:00
if(data[0] == '$' && _rtpType == PlayerBase::RTP_TCP){
2018-09-20 18:44:32 +08:00
//这是rtcp
return;
}
2018-09-23 00:55:00 +08:00
input(data,len);
2017-04-01 16:35:56 +08:00
}
bool RtspSession::handleReq_Options() {
//支持这些命令
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY,"
" PAUSE, SET_PARAMETER, GET_PARAMETER\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
return true;
}
bool RtspSession::handleReq_Describe() {
{
//解析url获取媒体名称
2018-10-24 15:43:52 +08:00
_strUrl = _parser.Url();
_mediaInfo.parse(_parser.FullUrl());
}
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());\
auto parserCopy = _parser;
findStream([weakSelf,parserCopy](bool success){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
//恢复现场
strongSelf->_parser = parserCopy;
if(!success){
//未找到相应的MediaSource
strongSelf->send_StreamNotFound();
strongSelf->shutdown();
return;
}
//该请求中的认证信息
auto authorization = strongSelf->_parser["Authorization"];
onGetRealm invoker = [weakSelf,authorization](const string &realm){
if(realm.empty()){
//无需认证,回复sdp
onAuthSuccess(weakSelf);
return;
}
//该流需要认证
onAuthUser(weakSelf,realm,authorization);
};
//广播是否需要认证事件
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
strongSelf->_mediaInfo,
invoker,
*strongSelf)){
//无人监听此事件,说明无需认证
invoker("");
}
});
2017-12-10 01:34:43 +08:00
return true;
2017-04-01 16:35:56 +08:00
}
2017-12-10 01:34:43 +08:00
void RtspSession::onAuthSuccess(const weak_ptr<RtspSession> &weakSelf) {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已销毁
return;
}
strongSelf->async([weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已销毁
return;
}
char response[2 * 1024];
int n = sprintf(response,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"x-Accept-Retransmit: our-retransmit\r\n"
"x-Accept-Dynamic-Rate: 1\r\n"
"Content-Base: %s/\r\n"
"Content-Type: application/sdp\r\n"
"Content-Length: %d\r\n\r\n%s",
2018-10-24 15:43:52 +08:00
strongSelf->_iCseq, SERVER_NAME,
2017-12-10 01:34:43 +08:00
RTSP_VERSION, RTSP_BUILDTIME,
2018-10-24 15:43:52 +08:00
dateHeader().data(), strongSelf->_strUrl.data(),
(int) strongSelf->_strSdp.length(), strongSelf->_strSdp.data());
strongSelf->SocketHelper::send(response, n);
2017-12-10 01:34:43 +08:00
});
}
void RtspSession::onAuthFailed(const weak_ptr<RtspSession> &weakSelf,const string &realm) {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已销毁
return;
}
strongSelf->async([weakSelf,realm]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
//本对象已销毁
return;
}
2017-04-01 16:35:56 +08:00
2017-12-10 01:34:43 +08:00
int n;
char response[2 * 1024];
2018-10-24 17:17:55 +08:00
GET_CONFIG_AND_REGISTER(bool,authBasic,Rtsp::kAuthBasic);
2017-12-10 01:34:43 +08:00
if (!authBasic) {
//我们需要客户端优先以md5方式认证
2018-10-24 15:43:52 +08:00
strongSelf->_strNonce = makeRandStr(32);
2017-12-10 01:34:43 +08:00
n = sprintf(response,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"WWW-Authenticate: Digest realm=\"%s\",nonce=\"%s\"\r\n\r\n",
2018-10-24 15:43:52 +08:00
strongSelf->_iCseq, SERVER_NAME,
2017-12-10 01:34:43 +08:00
RTSP_VERSION, RTSP_BUILDTIME,
2018-10-24 15:43:52 +08:00
dateHeader().data(), realm.data(), strongSelf->_strNonce.data());
2017-12-10 01:34:43 +08:00
}else {
//当然我们也支持base64认证,但是我们不建议这样做
n = sprintf(response,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"WWW-Authenticate: Basic realm=\"%s\"\r\n\r\n",
2018-10-24 15:43:52 +08:00
strongSelf->_iCseq, SERVER_NAME,
2017-12-10 01:34:43 +08:00
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), realm.data());
}
strongSelf->SocketHelper::send(response, n);
2017-12-10 01:34:43 +08:00
});
}
void RtspSession::onAuthBasic(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &strBase64){
//base64认证
char user_pwd_buf[512];
av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
auto user_pwd_vec = split(user_pwd_buf,":");
2017-12-10 01:34:43 +08:00
if(user_pwd_vec.size() < 2){
//认证信息格式不合法回复401 Unauthorized
onAuthFailed(weakSelf,realm);
return;
}
auto user = user_pwd_vec[0];
auto pwd = user_pwd_vec[1];
onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
if(!encrypted && pwd == good_pwd){
//提供的是明文密码且匹配正确
onAuthSuccess(weakSelf);
}else{
//密码错误
onAuthFailed(weakSelf,realm);
}
};
2018-02-09 15:50:21 +08:00
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已销毁
return;
}
2017-12-10 01:34:43 +08:00
//此时必须提供明文密码
2018-10-24 15:43:52 +08:00
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,user, true,invoker,*strongSelf)){
2017-12-10 01:34:43 +08:00
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
//我们输入的密码是明文
invoker(false,pwd);
}
}
2017-12-10 01:34:43 +08:00
void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &strMd5){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
DebugL << strMd5;
auto mapTmp = Parser::parseArgs(strMd5,",","=");
decltype(mapTmp) map;
for(auto &pr : mapTmp){
map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
2017-12-10 01:34:43 +08:00
}
//check realm
if(realm != map["realm"]){
TraceL << "realm not mached:" << realm << "," << map["realm"];
onAuthFailed(weakSelf,realm);
return ;
}
//check nonce
auto nonce = map["nonce"];
2018-10-24 15:43:52 +08:00
if(strongSelf->_strNonce != nonce){
TraceL << "nonce not mached:" << nonce << "," << strongSelf->_strNonce;
2017-12-10 01:34:43 +08:00
onAuthFailed(weakSelf,realm);
return ;
}
//check username and uri
auto username = map["username"];
auto uri = map["uri"];
auto response = map["response"];
if(username.empty() || uri.empty() || response.empty()){
TraceL << "username/uri/response empty:" << username << "," << uri << "," << response;
onAuthFailed(weakSelf,realm);
return ;
}
auto realInvoker = [weakSelf,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
if(ignoreAuth){
//忽略认证
onAuthSuccess(weakSelf);
TraceL << "auth ignored";
return;
}
/*
response计算方法如下
RTSP客户端应该使用username + password并计算response如下:
(1)password为MD5编码,
response = md5( password:nonce:md5(public_method:url) );
(2)password为ANSI字符串,
response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
*/
auto encrypted_pwd = good_pwd;
if(!encrypted){
//提供的是明文密码
encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
}
auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
if(strcasecmp(good_response.data(),response.data()) == 0){
//认证成功md5不区分大小写
onAuthSuccess(weakSelf);
TraceL << "onAuthSuccess";
}else{
//认证失败!
onAuthFailed(weakSelf,realm);
TraceL << "onAuthFailed";
}
};
onAuth invoker = [realInvoker](bool encrypted,const string &good_pwd){
realInvoker(false,encrypted,good_pwd);
};
//此时可以提供明文或md5加密的密码
2018-10-24 15:43:52 +08:00
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,username, false,invoker,*strongSelf)){
2017-12-10 01:34:43 +08:00
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
realInvoker(true,true,"");
}
}
void RtspSession::onAuthUser(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &authorization){
//请求中包含认证信息
auto authType = FindField(authorization.data(),NULL," ");
auto authStr = FindField(authorization.data()," ",NULL);
if(authType.empty() || authStr.empty()){
//认证信息格式不合法回复401 Unauthorized
onAuthFailed(weakSelf,realm);
return;
}
if(authType == "Basic"){
//base64认证需要明文密码
onAuthBasic(weakSelf,realm,authStr);
}else if(authType == "Digest"){
//md5认证
onAuthDigest(weakSelf,realm,authStr);
}else{
//其他认证方式?不支持!
onAuthFailed(weakSelf,realm);
}
}
2017-04-01 16:35:56 +08:00
inline void RtspSession::send_StreamNotFound() {
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 404 Stream Not Found\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
}
inline void RtspSession::send_UnsupportedTransport() {
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 461 Unsupported Transport\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
}
inline void RtspSession::send_SessionNotFound() {
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 454 Session Not Found\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
2018-05-18 18:48:17 +08:00
/*40 Method Not Allowed*/
2017-04-01 16:35:56 +08:00
}
bool RtspSession::handleReq_Setup() {
//处理setup命令该函数可能进入多次
2018-10-24 15:43:52 +08:00
auto controlSuffix = _parser.FullUrl().substr(1 + _parser.FullUrl().rfind('/'));
2018-03-26 18:56:22 +08:00
int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
2017-04-01 16:35:56 +08:00
if (trackIdx == -1) {
//未找到相应track
return false;
}
2018-10-26 09:56:29 +08:00
SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
2018-10-26 10:59:13 +08:00
if (trackRef->_inited) {
2017-04-01 16:35:56 +08:00
//已经初始化过该Track
return false;
}
2018-10-26 10:59:13 +08:00
trackRef->_inited = true; //现在初始化
2017-04-01 16:35:56 +08:00
2018-10-24 15:43:52 +08:00
auto strongRing = _pWeakRing.lock();
2017-04-01 16:35:56 +08:00
if (!strongRing) {
//the media source is released!
send_NotAcceptable();
return false;
}
2018-10-24 15:43:52 +08:00
if(!_bSetUped){
_bSetUped = true;
auto strTransport = _parser["Transport"];
2017-04-01 16:35:56 +08:00
if(strTransport.find("TCP") != string::npos){
2018-10-24 15:43:52 +08:00
_rtpType = PlayerBase::RTP_TCP;
2017-04-01 16:35:56 +08:00
}else if(strTransport.find("multicast") != string::npos){
2018-10-24 15:43:52 +08:00
_rtpType = PlayerBase::RTP_MULTICAST;
2017-04-01 16:35:56 +08:00
}else{
2018-10-24 15:43:52 +08:00
_rtpType = PlayerBase::RTP_UDP;
2017-04-01 16:35:56 +08:00
}
}
2018-10-24 15:43:52 +08:00
if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) {
_pRtpReader = strongRing->attach();
2017-04-01 16:35:56 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2018-10-24 15:43:52 +08:00
_pRtpReader->setDetachCB([weakSelf]() {
2017-04-01 16:35:56 +08:00
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->safeShutdown();
});
}
2018-10-24 15:43:52 +08:00
switch (_rtpType) {
2017-04-01 16:35:56 +08:00
case PlayerBase::RTP_TCP: {
2018-10-26 22:32:50 +08:00
int iLen = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP/TCP;unicast;"
"interleaved=%d-%d;ssrc=%s;mode=play\r\n"
"Session: %s\r\n"
"x-Transport-Options: late-tolerance=1.400000\r\n"
"x-Dynamic-Rate: 1\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), trackRef->_type * 2,
trackRef->_type * 2 + 1,
printSSRC(trackRef->_ssrc).data(),
_strSession.data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, iLen);
2017-04-01 16:35:56 +08:00
}
break;
case PlayerBase::RTP_UDP: {
2017-11-08 16:56:02 +08:00
//我们用trackIdx区分rtp和rtcp包
2018-02-23 15:36:51 +08:00
auto pSockRtp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx);
2017-11-08 16:56:02 +08:00
if (!pSockRtp) {
//分配端口失败
WarnL << "分配rtp端口失败";
send_NotAcceptable();
return false;
}
2018-02-23 15:36:51 +08:00
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1);
2017-11-08 16:56:02 +08:00
if (!pSockRtcp) {
2017-04-01 16:35:56 +08:00
//分配端口失败
2017-11-08 16:56:02 +08:00
WarnL << "分配rtcp端口失败";
2017-04-01 16:35:56 +08:00
send_NotAcceptable();
return false;
}
2018-10-24 15:43:52 +08:00
_apUdpSock[trackIdx] = pSockRtp;
2017-11-08 16:56:02 +08:00
//设置客户端内网端口信息
2018-10-24 15:43:52 +08:00
string strClientPort = FindField(_parser["Transport"].data(), "client_port=", NULL);
2017-04-01 16:35:56 +08:00
uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
struct sockaddr_in peerAddr;
peerAddr.sin_family = AF_INET;
peerAddr.sin_port = htons(ui16PeerPort);
2018-02-23 15:36:51 +08:00
peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
2017-04-01 16:35:56 +08:00
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
2018-10-24 15:43:52 +08:00
_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr)));
2017-11-08 16:56:02 +08:00
//尝试获取客户端nat映射地址
startListenPeerUdpData();
2017-04-01 16:35:56 +08:00
//InfoL << "分配端口:" << srv_port;
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP/UDP;unicast;"
"client_port=%s;server_port=%d-%d;ssrc=%s;mode=play\r\n"
"Session: %s\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), strClientPort.data(),
pSockRtp->get_local_port(), pSockRtcp->get_local_port(),
printSSRC(trackRef->_ssrc).data(),
_strSession.data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
}
break;
case PlayerBase::RTP_MULTICAST: {
2018-10-24 15:43:52 +08:00
if(!_pBrdcaster){
_pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
if (!_pBrdcaster) {
2017-04-01 16:35:56 +08:00
send_NotAcceptable();
return false;
}
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2018-10-24 15:43:52 +08:00
_pBrdcaster->setDetachCB(this, [weakSelf]() {
2017-04-01 16:35:56 +08:00
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->safeShutdown();
});
}
2018-10-26 10:59:13 +08:00
int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
2017-11-08 16:56:02 +08:00
//我们用trackIdx区分rtp和rtcp包
2018-02-23 15:36:51 +08:00
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
2017-11-08 16:56:02 +08:00
if (!pSockRtcp) {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return false;
}
startListenPeerUdpData();
2018-02-09 11:42:55 +08:00
GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL);
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP;multicast;destination=%s;"
"source=%s;port=%d-%d;ttl=%d;ssrc=%s\r\n"
"Session: %s\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _pBrdcaster->getIP().data(),
get_local_ip().data(), iSrvPort, pSockRtcp->get_local_port(),
udpTTL, printSSRC(trackRef->_ssrc).data(),
_strSession.data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
}
break;
default:
break;
}
return true;
}
bool RtspSession::handleReq_Play() {
2018-10-26 22:48:03 +08:00
if (_aTrackInfo.empty() || _parser["Session"] != _strSession) {
2017-04-01 16:35:56 +08:00
send_SessionNotFound();
return false;
}
2018-10-24 15:43:52 +08:00
auto strRange = _parser["Range"];
2018-05-22 18:41:56 +08:00
auto onRes = [this,strRange](const string &err){
2018-02-06 16:17:37 +08:00
bool authSuccess = err.empty();
char response[2 * 1024];
2018-10-24 15:43:52 +08:00
_pcBuf = response;
2018-10-26 22:48:03 +08:00
if(!authSuccess){
//第一次play是播放否则是恢复播放。只对播放鉴权
2018-10-24 15:43:52 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
2018-02-05 17:25:22 +08:00
"%s"
"Content-Type: text/plain\r\n"
"Content-Length: %d\r\n\r\n%s",
2018-10-24 15:43:52 +08:00
_iCseq, SERVER_NAME,
2018-02-05 17:25:22 +08:00
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(),(int)err.size(),err.data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf,n);
shutdown();
return;
}
2017-04-01 16:35:56 +08:00
2018-10-24 15:43:52 +08:00
auto pMediaSrc = _pMediaSrc.lock();
2018-10-26 22:32:50 +08:00
if(!pMediaSrc){
send_StreamNotFound();
shutdown();
return;
}
2018-10-26 22:32:50 +08:00
if (strRange.size() && !_bFirstPlay) {
2018-10-26 22:48:03 +08:00
//这个是seek操作
2018-10-26 22:32:50 +08:00
auto strStart = FindField(strRange.data(), "npt=", "-");
if (strStart == "now") {
strStart = "0";
}
auto iStartTime = 1000 * atof(strStart.data());
InfoL << "rtsp seekTo(ms):" << iStartTime;
pMediaSrc->seekTo(iStartTime);
}else if(pMediaSrc->getRing()->readerCount() == 1){
//第一个消费者
pMediaSrc->seekTo(0);
}
_bFirstPlay = false;
int iLen = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n"
"Range: npt=%.2f-\r\n"
"RTP-Info: ", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data(), pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0);
2018-10-26 10:12:37 +08:00
for(auto &track : _aTrackInfo){
2018-10-26 10:59:13 +08:00
if (track->_inited == false) {
2018-10-26 10:12:37 +08:00
//还有track没有setup
shutdown();
return;
}
2018-10-26 22:32:50 +08:00
track->_ssrc = pMediaSrc->getSsrc(track->_type);
track->_seq = pMediaSrc->getSeqence(track->_type);
track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
2018-10-26 14:12:16 +08:00
iLen += sprintf(_pcBuf + iLen, "url=%s/%s;seq=%d;rtptime=%u,",
_strUrl.data(),
track->_control_surffix.data(),
track->_seq,
2018-10-26 22:32:50 +08:00
track->_time_stamp * (track->_samplerate / 1000));
2018-10-26 10:12:37 +08:00
}
iLen -= 1;
2018-10-24 15:43:52 +08:00
(_pcBuf)[iLen] = '\0';
iLen += sprintf(_pcBuf + iLen, "\r\n\r\n");
SocketHelper::send(_pcBuf, iLen);
//提高发送性能
(*this) << SocketFlags(kSockFlags);
2018-10-24 15:43:52 +08:00
SockUtil::setNoDelay(_pSender->rawFD(),false);
2018-10-26 22:48:03 +08:00
if(_pRtpReader){
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
SockUtil::setNoDelay(_pSender->rawFD(), false);
_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->async([weakSelf,pack](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->sendRtpPacket(pack);
});
});
}
};
2017-04-01 16:35:56 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2018-02-06 16:17:37 +08:00
Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
2017-04-01 16:35:56 +08:00
}
2018-02-06 16:17:37 +08:00
strongSelf->async([weakSelf,onRes,err](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
2018-02-06 16:17:37 +08:00
onRes(err);
});
};
2018-10-26 22:48:03 +08:00
if(_bFirstPlay){
//第一次收到play命令需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
if(!flag){
//该事件无人监听,默认不鉴权
onRes("");
}
}else{
//后面是seek或恢复命令不需要鉴权
2018-02-06 16:17:37 +08:00
onRes("");
}
2017-04-01 16:35:56 +08:00
return true;
}
bool RtspSession::handleReq_Pause() {
2018-10-24 15:43:52 +08:00
if (_parser["Session"] != _strSession) {
2017-04-01 16:35:56 +08:00
send_SessionNotFound();
return false;
}
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2018-10-26 22:32:50 +08:00
if (_pRtpReader) {
2018-10-24 15:43:52 +08:00
_pRtpReader->setReadCB(nullptr);
2017-04-01 16:35:56 +08:00
}
return true;
}
bool RtspSession::handleReq_Teardown() {
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
TraceL << "播放器断开连接!";
return false;
}
bool RtspSession::handleReq_Get() {
2018-10-24 15:43:52 +08:00
_strSessionCookie = _parser["x-sessioncookie"];
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"HTTP/1.0 200 OK\r\n"
"%s"
"Connection: close\r\n"
"Cache-Control: no-store\r\n"
"Pragma: no-cache\r\n"
"Content-Type: application/x-rtsp-tunnelled\r\n\r\n",
dateHeader().data());
2017-04-01 16:35:56 +08:00
//注册GET
lock_guard<recursive_mutex> lock(g_mtxGetter);
2018-10-24 15:43:52 +08:00
g_mapGetter[_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
//InfoL << _strSessionCookie;
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
return true;
}
bool RtspSession::handleReq_Post() {
lock_guard<recursive_mutex> lock(g_mtxGetter);
2018-10-24 15:43:52 +08:00
string sessioncookie = _parser["x-sessioncookie"];
2017-04-01 16:35:56 +08:00
//Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) {
2018-09-14 18:04:41 +08:00
//WarnL << sessioncookie;
2017-04-01 16:35:56 +08:00
return false;
}
2018-10-24 15:43:52 +08:00
_bBase64need = true;
2017-04-01 16:35:56 +08:00
//Poster 找到Getter的SOCK
auto strongSession = it->second.lock();
g_mapGetter.erase(sessioncookie);
if (!strongSession) {
send_SessionNotFound();
2018-09-14 18:04:41 +08:00
//WarnL;
2017-04-01 16:35:56 +08:00
return false;
}
initSender(strongSession);
return true;
}
bool RtspSession::handleReq_SET_PARAMETER() {
//TraceL<<endl;
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
return true;
}
inline void RtspSession::send_NotAcceptable() {
2018-10-26 22:32:50 +08:00
int n = sprintf(_pcBuf,
"RTSP/1.0 406 Not Acceptable\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
2018-10-24 15:43:52 +08:00
SocketHelper::send(_pcBuf, n);
2017-04-01 16:35:56 +08:00
}
void RtspSession::doDelay(int delaySec, const std::function<void()> &fun) {
if(_delayTask){
_delayTask();
}
_delayTask = fun;
_iTaskTimeLine = time(NULL) + delaySec;
}
void RtspSession::cancelDelyaTask(){
_delayTask = nullptr;
}
void RtspSession::findStream(const function<void(bool)> &cb) {
bool success = findStream();
if (success) {
cb(true);
return;
}
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto task_id = this;
auto media_info = _mediaInfo;
auto onRegist = [task_id, weakSelf, media_info, cb](BroadcastMediaChangedArgs) {
if (bRegist &&
schema == media_info._schema &&
vhost == media_info._vhost &&
app == media_info._app &&
stream == media_info._streamid) {
//播发器请求的rtmp流终于注册上了
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
//切换到自己的线程再回复
//如果触发 kBroadcastMediaChanged 事件的线程与本RtspSession绑定的线程相同,
//那么strongSelf->async操作可能是同步操作,
//通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行,
//以便防止遍历事件监听对象map时做删除操作
strongSelf->async([task_id, weakSelf, media_info, cb]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
DebugL << "收到rtsp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/"
<< media_info._app << "/" << media_info._streamid;
cb(strongSelf->findStream());
//取消延时任务,防止多次回复
strongSelf->cancelDelyaTask();
//取消事件监听
//在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃
NoticeCenter::Instance().delListener(task_id, Broadcast::kBroadcastMediaChanged);
}, false);
}
};
NoticeCenter::Instance().addListener(task_id, Broadcast::kBroadcastMediaChanged, onRegist);
//5秒后执行失败回调
doDelay(5, [cb]() {
cb(false);
});
}
2017-04-01 16:35:56 +08:00
inline bool RtspSession::findStream() {
RtspMediaSource::Ptr pMediaSrc =
2018-10-24 15:43:52 +08:00
dynamic_pointer_cast<RtspMediaSource>( MediaSource::find(RTSP_SCHEMA,_mediaInfo._vhost, _mediaInfo._app,_mediaInfo._streamid) );
2017-04-01 16:35:56 +08:00
if (!pMediaSrc) {
2018-10-24 15:43:52 +08:00
WarnL << "No such stream:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
2017-04-01 16:35:56 +08:00
return false;
}
2018-10-24 15:43:52 +08:00
_strSdp = pMediaSrc->getSdp();
_pWeakRing = pMediaSrc->getRing();
2017-04-01 16:35:56 +08:00
2018-10-26 09:56:29 +08:00
_sdpAttr.load(_strSdp);
_aTrackInfo = _sdpAttr.getAvailableTrack();
2018-10-26 10:12:37 +08:00
if (_aTrackInfo.empty()) {
2017-04-01 16:35:56 +08:00
return false;
}
2018-10-24 15:43:52 +08:00
_strSession = makeRandStr(12);
_pMediaSrc = pMediaSrc;
2017-04-01 16:35:56 +08:00
2018-10-26 10:12:37 +08:00
for(auto &track : _aTrackInfo){
2018-10-26 10:59:13 +08:00
track->_ssrc = pMediaSrc->getSsrc(track->_type);
track->_seq = pMediaSrc->getSeqence(track->_type);
2018-10-26 14:12:16 +08:00
track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
2017-04-01 16:35:56 +08:00
}
return true;
}
2018-01-30 09:35:54 +08:00
inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
2017-04-01 16:35:56 +08:00
//InfoL<<(int)pkt.Interleaved;
2018-10-24 15:43:52 +08:00
switch (_rtpType) {
2017-04-01 16:35:56 +08:00
case PlayerBase::RTP_TCP: {
2018-01-30 09:35:54 +08:00
BufferRtp::Ptr buffer(new BufferRtp(pkt));
send(buffer);
2017-04-01 16:35:56 +08:00
#ifdef RTSP_SEND_RTCP
int iTrackIndex = getTrackIndexByTrackId(pkt.interleaved / 2);
2018-10-24 15:43:52 +08:00
RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
2017-04-01 16:35:56 +08:00
counter.pktCnt += 1;
counter.octCount += (pkt.length - 12);
2018-10-24 15:43:52 +08:00
auto &_ticker = _aRtcpTicker[iTrackIndex];
if (_ticker.elapsedTime() > 5 * 1000) {
2017-04-01 16:35:56 +08:00
//send rtcp every 5 second
2018-10-24 15:43:52 +08:00
_ticker.resetTime();
2017-04-01 16:35:56 +08:00
counter.timeStamp = pkt.timeStamp;
sendRTCP();
}
#endif
}
break;
case PlayerBase::RTP_UDP: {
2018-07-05 18:48:08 +08:00
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
2018-10-24 15:43:52 +08:00
auto pSock = _apUdpSock[iTrackIndex].lock();
2017-04-01 16:35:56 +08:00
if (!pSock) {
shutdown();
return;
}
2018-10-24 15:43:52 +08:00
auto peerAddr = _apPeerUdpAddr[iTrackIndex];
2017-04-01 16:35:56 +08:00
if (!peerAddr) {
return;
}
2018-01-30 09:35:54 +08:00
BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
2018-10-24 15:43:52 +08:00
_ui64TotalBytes += buffer->size();
pSock->send(buffer,kSockFlags, peerAddr.get());
2017-04-01 16:35:56 +08:00
}
break;
default:
break;
}
}
2018-02-23 15:36:51 +08:00
inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
2017-11-08 16:56:02 +08:00
if(iTrackIdx % 2 == 0){
//这是rtp探测包
2018-10-24 15:43:52 +08:00
if(!_bGotAllPeerUdp){
2017-11-08 16:56:02 +08:00
//还没有获取完整的rtp探测包
2018-02-23 15:36:51 +08:00
if(SockUtil::in_same_lan(get_local_ip().data(),get_peer_ip().data())){
2017-11-08 16:56:02 +08:00
//在内网中客户端上报的端口号是真实的所以我们忽略udp打洞包
2018-10-24 15:43:52 +08:00
_bGotAllPeerUdp = true;
2017-11-08 16:56:02 +08:00
return;
}
//设置真实的客户端nat映射端口号
2018-10-24 15:43:52 +08:00
_apPeerUdpAddr[iTrackIdx / 2].reset(new struct sockaddr(addr));
_abGotPeerUdp[iTrackIdx / 2] = true;
_bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包
2018-10-26 10:12:37 +08:00
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
2018-10-24 15:43:52 +08:00
if (!_abGotPeerUdp[i]) {
2017-11-08 16:56:02 +08:00
//还有track没获取到rtp探测包
2018-10-24 15:43:52 +08:00
_bGotAllPeerUdp = false;
2017-11-08 16:56:02 +08:00
break;
}
}
2017-04-01 16:35:56 +08:00
}
2017-11-08 16:56:02 +08:00
}else{
//这是rtcp心跳包说明播放器还存活
2018-10-24 15:43:52 +08:00
_ticker.resetTime();
2017-11-08 16:57:05 +08:00
//TraceL << "rtcp:" << (iTrackIdx-1)/2 ;
2017-04-01 16:35:56 +08:00
}
}
2017-11-08 16:56:02 +08:00
inline void RtspSession::startListenPeerUdpData() {
2018-10-24 15:43:52 +08:00
_bListenPeerUdpData = true;
2017-04-01 16:35:56 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2018-02-23 15:36:51 +08:00
UDPServer::Instance().listenPeer(get_peer_ip().data(), this,
[weakSelf](int iTrackIdx,const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool {
2017-04-01 16:35:56 +08:00
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
struct sockaddr addr=*pPeerAddr;
strongSelf->async_first([weakSelf,pBuf,addr,iTrackIdx]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr);
});
return true;
});
}
inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session) {
2018-10-24 15:43:52 +08:00
_pSender = session->_sock;
2017-04-01 16:35:56 +08:00
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
2018-10-24 15:43:52 +08:00
session->_onDestory = [weakSelf]() {
2017-04-01 16:35:56 +08:00
auto strongSelf=weakSelf.lock();
2018-09-14 18:04:41 +08:00
if(!strongSelf) {
return;
}
//DebugL;
2018-10-24 15:43:52 +08:00
strongSelf->_pSender->setOnErr([weakSelf](const SockException &err) {
2017-04-01 16:35:56 +08:00
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->safeShutdown();
});
};
2018-09-14 18:04:41 +08:00
session->shutdown_l(false);
2017-04-01 16:35:56 +08:00
}
#ifdef RTSP_SEND_RTCP
inline void RtspSession::sendRTCP() {
//DebugL;
uint8_t aui8Rtcp[60] = {0};
uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
2018-10-24 15:43:52 +08:00
for (uint8_t i = 0; i < _uiTrackCnt; i++) {
auto &track = _aTrackInfo[i];
auto &counter = _aRtcpCnt[i];
2017-04-01 16:35:56 +08:00
aui8Rtcp[0] = '$';
aui8Rtcp[1] = track.trackId * 2 + 1;
aui8Rtcp[2] = 56 / 256;
aui8Rtcp[3] = 56 % 256;
pui8Rtcp_SR[0] = 0x80;
pui8Rtcp_SR[1] = 0xC8;
pui8Rtcp_SR[2] = 0x00;
pui8Rtcp_SR[3] = 0x06;
uint32_t ssrc=htonl(track.ssrc);
memcpy(&pui8Rtcp_SR[4], &ssrc, 4);
uint64_t msw;
uint64_t lsw;
struct timeval tv;
gettimeofday(&tv, NULL);
msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);
msw = htonl(msw);
memcpy(&pui8Rtcp_SR[8], &msw, 4);
lsw = htonl(lsw);
memcpy(&pui8Rtcp_SR[12], &lsw, 4);
uint32_t rtpStamp = htonl(counter.timeStamp);
memcpy(&pui8Rtcp_SR[16], &rtpStamp, 4);
uint32_t pktCnt = htonl(counter.pktCnt);
memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);
uint32_t octCount = htonl(counter.octCount);
memcpy(&pui8Rtcp_SR[24], &octCount, 4);
pui8Rtcp_SDES[0] = 0x81;
pui8Rtcp_SDES[1] = 0xCA;
pui8Rtcp_SDES[2] = 0x00;
pui8Rtcp_SDES[3] = 0x06;
memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);
pui8Rtcp_SDES[8] = 0x01;
pui8Rtcp_SDES[9] = 0x0f;
memcpy(&pui8Rtcp_SDES[10], "_ZL_RtspServer_", 15);
pui8Rtcp_SDES[25] = 0x00;
send((char *) aui8Rtcp, 60);
}
}
#endif
}
2018-10-24 17:17:55 +08:00
/* namespace mediakit */
2017-04-01 16:35:56 +08:00