Merge pull request #5 from xiongziliang/master

update from origin
This commit is contained in:
lyg1949 2020-07-14 17:29:26 +08:00 committed by GitHub
commit 2d89322b8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1491 additions and 393 deletions

@ -1 +1 @@
Subproject commit ed47015f92cc79dfe3344b3666aafb54f1bbc2f4
Subproject commit 60aab3b9f6230312c882b9d3d360ed27c94ebd9f

@ -1 +1 @@
Subproject commit 7f7906b05d84c5efeceecb8d6f540a71c8153431
Subproject commit 43facc343afc2b5b70bbbc3c177f20dfa936f2bf

View File

@ -0,0 +1,24 @@
{
"id": "95afe791-f716-426e-99c4-a797e112ab2c",
"name": "127.0.0.1",
"values": [
{
"key": "ZLMediaKit_URL",
"value": "127.0.0.1",
"enabled": true
},
{
"key": "ZLMediaKit_secret",
"value": "035c73f7-bb6b-4889-a715-d9eb2d1925cc",
"enabled": true
},
{
"key": "defaultVhost",
"value": "__defaultVhost__",
"enabled": true
}
],
"_postman_variable_scope": "environment",
"_postman_exported_at": "2020-07-11T15:16:04.479Z",
"_postman_exported_using": "Postman/7.27.1"
}

File diff suppressed because it is too large Load Diff

1
postman/readme.md Normal file
View File

@ -0,0 +1 @@
把这两个json文件导入postman就可以愉快的测试ZLMediaKit的restful接口了

View File

@ -257,7 +257,7 @@ static recursive_mutex s_ffmpegMapMtx;
#if defined(ENABLE_RTPPROXY)
//rtp服务器列表
static unordered_map<uint16_t, RtpServer::Ptr> s_rtpServerMap;
static unordered_map<string, RtpServer::Ptr> s_rtpServerMap;
static recursive_mutex s_rtpServerMapMtx;
#endif
@ -753,36 +753,42 @@ void installWebApi() {
val["exist"] = true;
val["peer_ip"] = process->get_peer_ip();
val["peer_port"] = process->get_peer_port();
val["local_port"] = process->get_local_port();
val["local_ip"] = process->get_local_ip();
});
api_regist1("/index/api/openRtpServer",[](API_ARGS1){
CHECK_SECRET();
CHECK_ARGS("port", "enable_tcp", "stream_id");
RtpServer::Ptr server = std::make_shared<RtpServer>();
server->start(allArgs["port"], allArgs["stream_id"], allArgs["enable_tcp"].as<bool>());
auto stream_id = allArgs["stream_id"];
auto port = server->getPort();
server->setOnDetach([port]() {
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
if(s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) {
//为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id
throw InvalidArgsException("该stream_id已存在");
}
RtpServer::Ptr server = std::make_shared<RtpServer>();
server->start(allArgs["port"], stream_id, allArgs["enable_tcp"].as<bool>());
server->setOnDetach([stream_id]() {
//设置rtp超时移除事件
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
s_rtpServerMap.erase(port);
s_rtpServerMap.erase(stream_id);
});
//保存对象
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
s_rtpServerMap.emplace(port, server);
s_rtpServerMap.emplace(stream_id, server);
//回复json
val["port"] = port;
val["port"] = server->getPort();
});
api_regist1("/index/api/closeRtpServer",[](API_ARGS1){
CHECK_SECRET();
CHECK_ARGS("port");
CHECK_ARGS("stream_id");
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
val["hit"] = (int)s_rtpServerMap.erase(allArgs["port"].as<uint16_t>());
val["hit"] = (int) s_rtpServerMap.erase(allArgs["stream_id"]);
});
api_regist1("/index/api/listRtpServer",[](API_ARGS1){
@ -790,7 +796,10 @@ void installWebApi() {
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
for (auto &pr : s_rtpServerMap) {
val["data"].append(pr.first);
Value obj;
obj["stream_id"] = pr.first;
obj["port"] = pr.second->getPort();
val["data"].append(obj);
}
});

View File

@ -118,7 +118,7 @@ RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) {
case CodecH265 : return std::make_shared<H265RtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecAAC : return std::make_shared<AACRtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecG711A :
case CodecG711U : return std::make_shared<G711RtpEncoder>(ssrc, mtu, sample_rate, pt, interleaved);
case CodecG711U : return std::make_shared<G711RtpEncoder>(codec_id, ssrc, mtu, sample_rate, pt, interleaved);
default : WarnL << "暂不支持该CodecId:" << codec_id; return nullptr;
}
}
@ -129,7 +129,7 @@ RtpCodec::Ptr Factory::getRtpDecoderByTrack(const Track::Ptr &track) {
case CodecH265 : return std::make_shared<H265RtpDecoder>();
case CodecAAC : return std::make_shared<AACRtpDecoder>(track->clone());
case CodecG711A :
case CodecG711U : return std::make_shared<G711RtpDecoder>(track->clone());
case CodecG711U : return std::make_shared<G711RtpDecoder>(track->getCodecId());
default : WarnL << "暂不支持该CodecId:" << track->getCodecName(); return nullptr;
}
}

View File

@ -12,8 +12,8 @@
namespace mediakit{
G711RtpDecoder::G711RtpDecoder(const Track::Ptr &track){
_codecid = track->getCodecId();
G711RtpDecoder::G711RtpDecoder(CodecId codecid){
_codecid = codecid;
_frame = obtainFrame();
}
@ -59,16 +59,10 @@ void G711RtpDecoder::onGetG711(const G711Frame::Ptr &frame) {
/////////////////////////////////////////////////////////////////////////////////////
G711RtpEncoder::G711RtpEncoder(uint32_t ui32Ssrc,
uint32_t ui32MtuSize,
uint32_t ui32SampleRate,
uint8_t ui8PayloadType,
uint8_t ui8Interleaved) :
RtpInfo(ui32Ssrc,
ui32MtuSize,
ui32SampleRate,
ui8PayloadType,
ui8Interleaved) {
G711RtpEncoder::G711RtpEncoder(CodecId codecid, uint32_t ui32Ssrc, uint32_t ui32MtuSize,
uint32_t ui32SampleRate, uint8_t ui8PayloadType, uint8_t ui8Interleaved) :
G711RtpDecoder(codecid),
RtpInfo(ui32Ssrc, ui32MtuSize, ui32SampleRate, ui8PayloadType, ui8Interleaved) {
}
void G711RtpEncoder::inputFrame(const Frame::Ptr &frame) {

View File

@ -21,7 +21,7 @@ class G711RtpDecoder : public RtpCodec , public ResourcePoolHelper<G711Frame> {
public:
typedef std::shared_ptr<G711RtpDecoder> Ptr;
G711RtpDecoder(const Track::Ptr &track);
G711RtpDecoder(CodecId codecid);
~G711RtpDecoder() {}
/**
@ -35,9 +35,6 @@ public:
return _codecid;
}
protected:
G711RtpDecoder() {}
private:
void onGetG711(const G711Frame::Ptr &frame);
G711Frame::Ptr obtainFrame();
@ -61,7 +58,8 @@ public:
* @param ui8PayloadType pt类型
* @param ui8Interleaved rtsp interleaved
*/
G711RtpEncoder(uint32_t ui32Ssrc,
G711RtpEncoder(CodecId codecid,
uint32_t ui32Ssrc,
uint32_t ui32MtuSize,
uint32_t ui32SampleRate,
uint8_t ui8PayloadType = 0,

View File

@ -80,7 +80,9 @@ void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) {
if (it->second->getProcess().get() != ptr) {
return;
}
auto process = it->second->getProcess();
_map_rtp_process.erase(it);
process->onDetach();
}
void RtpSelector::onManager() {

View File

@ -56,12 +56,12 @@ static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
RtspSession::RtspSession(const Socket::Ptr &sock) : TcpSession(sock) {
DebugP(this);
GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
pSock->setSendTimeOutSecond(keep_alive_sec);
sock->setSendTimeOutSecond(keep_alive_sec);
//起始接收buffer缓存设置为4K节省内存
pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
sock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
}
RtspSession::~RtspSession() {
@ -69,16 +69,16 @@ RtspSession::~RtspSession() {
}
void RtspSession::onError(const SockException &err) {
bool isPlayer = !_pushSrc;
uint64_t duration = _ticker.createdTime()/1000;
bool isPlayer = !_push_src;
uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(")
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")断开:" << err.what()
<< ",耗时(s):" << duration;
if (_rtpType == Rtsp::RTP_MULTICAST) {
if (_rtp_type == Rtsp::RTP_MULTICAST) {
//取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
}
@ -91,8 +91,8 @@ void RtspSession::onError(const SockException& err) {
//流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast<SockInfo &>(*this));
if(_bytes_usage > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, isPlayer, static_cast<SockInfo &>(*this));
}
}
@ -101,30 +101,28 @@ void RtspSession::onManager() {
GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond);
GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
if (_ticker.createdTime() > handshake_sec * 1000) {
if (_strSession.size() == 0) {
if (_alive_ticker.createdTime() > handshake_sec * 1000) {
if (_sessionid.size() == 0) {
shutdown(SockException(Err_timeout,"illegal connection"));
return;
}
}
if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) {
if ((_rtp_type == Rtsp::RTP_UDP || _push_src ) && _alive_ticker.elapsedTime() > keep_alive_sec * 1000) {
//如果是推流端或者rtp over udp类型的播放端那么就做超时检测
shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
return;
}
}
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
_ticker.resetTime();
_ui64TotalBytes += pBuf->size();
if (_onRecv) {
void RtspSession::onRecv(const Buffer::Ptr &buf) {
_alive_ticker.resetTime();
_bytes_usage += buf->size();
if (_on_recv) {
//http poster的请求数据转发给http getter处理
_onRecv(pBuf);
_on_recv(buf);
} else {
// TraceP(this) << pBuf->size() << "\r\n" << pBuf->data();
input(pBuf->data(),pBuf->size());
input(buf->data(), buf->size());
}
}
@ -132,15 +130,15 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
static inline bool end_of(const string &str, const string &substr){
auto pos = str.rfind(substr);
return pos != string::npos && pos == str.size() - substr.size();
};
}
void RtspSession::onWholeRtspPacket(Parser &parser) {
string strCmd = parser.Method(); //提取出请求命令字
_iCseq = atoi(parser["CSeq"].data());
if(_strContentBase.empty() && strCmd != "GET"){
_strContentBase = parser.Url();
_mediaInfo.parse(parser.FullUrl());
_mediaInfo._schema = RTSP_SCHEMA;
string method = parser.Method(); //提取出请求命令字
_cseq = atoi(parser["CSeq"].data());
if(_content_base.empty() && method != "GET"){
_content_base = parser.Url();
_media_info.parse(parser.FullUrl());
_media_info._schema = RTSP_SCHEMA;
}
typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
@ -160,10 +158,10 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
}, []() {});
auto it = s_cmd_functions.find(strCmd);
auto it = s_cmd_functions.find(method);
if (it == s_cmd_functions.end()) {
sendRtspResponse("403 Forbidden");
shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << method));
return;
}
@ -181,24 +179,22 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
}
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
if(!_pushSrc){
if(!_push_src){
return;
}
int trackIdx = -1;
uint8_t interleaved = data[1];
if(interleaved %2 == 0){
trackIdx = getTrackIndexByInterleaved(interleaved);
handleOneRtp(trackIdx, _aTrackInfo[trackIdx]->_type, _aTrackInfo[trackIdx]->_samplerate, (unsigned char *) data + 4, len - 4);
auto track_idx = getTrackIndexByInterleaved(interleaved);
handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (unsigned char *) data + 4, len - 4);
}else{
trackIdx = getTrackIndexByInterleaved(interleaved - 1);
onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4);
auto track_idx = getTrackIndexByInterleaved(interleaved - 1);
onRtcpPacket(track_idx, _sdp_track[track_idx], (unsigned char *) data + 4, len - 4);
}
}
void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
void RtspSession::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len){}
}
int64_t RtspSession::getContentLength(Parser &parser) {
if(parser.Method() == "POST"){
//http post请求的content数据部分是base64编码后的rtsp请求信令包
@ -207,7 +203,6 @@ int64_t RtspSession::getContentLength(Parser &parser) {
return RtspSplitter::getContentLength(parser);
}
void RtspSession::handleReq_Options(const Parser &parser) {
//支持这些命令
sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
@ -215,16 +210,16 @@ void RtspSession::handleReq_Options(const Parser &parser) {
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA,
_mediaInfo._vhost,
_mediaInfo._app,
_mediaInfo._streamid));
_media_info._vhost,
_media_info._app,
_media_info._streamid));
if(src){
sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
string err = StrPrinter << "ANNOUNCE:"
<< "Already publishing:"
<< _mediaInfo._vhost << " "
<< _mediaInfo._app << " "
<< _mediaInfo._streamid << endl;
<< _media_info._vhost << " "
<< _media_info._app << " "
<< _media_info._streamid << endl;
throw SockException(Err_shutdown,err);
}
@ -232,30 +227,30 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
if(end_of(full_url,".sdp")){
//去除.sdp后缀防止EasyDarwin推流器强制添加.sdp后缀
full_url = full_url.substr(0,full_url.length() - 4);
_mediaInfo.parse(full_url);
_media_info.parse(full_url);
}
if(_mediaInfo._app.empty() || _mediaInfo._streamid.empty()){
if(_media_info._app.empty() || _media_info._streamid.empty()){
//推流rtsp url必须最少两级(rtsp://host/app/stream_id)不允许莫名其妙的推流url
sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, "rtsp推流url非法,最少确保两级rtsp url");
throw SockException(Err_shutdown,StrPrinter << "rtsp推流url非法:" << full_url);
}
SdpParser sdpParser(parser.Content());
_strSession = makeRandStr(12);
_aTrackInfo = sdpParser.getAvailableTrack();
_sessionid = makeRandStr(12);
_sdp_track = sdpParser.getAvailableTrack();
_pushSrc = std::make_shared<RtspMediaSourceImp>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_pushSrc->setSdp(sdpParser.toString());
_push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
_push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_push_src->setSdp(sdpParser.toString());
sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"});
sendRtspResponse("200 OK",{"Content-Base", _content_base + "/"});
}
void RtspSession::handleReq_RECORD(const Parser &parser){
if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
if (_sdp_track.empty() || parser["Session"] != _sessionid) {
send_SessionNotFound();
throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any availabe track when record" : "session not found when record");
}
auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
bool authSuccess = err.empty();
@ -266,21 +261,21 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
}
//设置转协议
_pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);
_push_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4);
_StrPrinter rtp_info;
for(auto &track : _aTrackInfo){
for(auto &track : _sdp_track){
if (track->_inited == false) {
//还有track没有setup
shutdown(SockException(Err_shutdown,"track not setuped"));
return;
}
rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ",";
rtp_info << "url=" << _content_base << "/" << track->_control_surffix << ",";
}
rtp_info.pop_back();
sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
if(_rtpType == Rtsp::RTP_TCP){
if(_rtp_type == Rtsp::RTP_TCP){
//如果是rtsp推流服务器并且是TCP推流那么加大TCP接收缓存这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags();
@ -303,7 +298,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
};
//rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
@ -341,7 +336,7 @@ void RtspSession::emitOnPlay(){
};
//广播通用播放url鉴权事件
auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _mediaInfo, invoker, static_cast<SockInfo &>(*this));
auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this));
if (!flag) {
//该事件无人监听,默认不鉴权
onRes("");
@ -381,7 +376,7 @@ void RtspSession::handleReq_Describe(const Parser &parser) {
if(_rtsp_realm.empty()){
//广播是否需要rtsp专属认证事件
if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _mediaInfo, invoker, static_cast<SockInfo &>(*this))) {
if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _media_info, invoker, static_cast<SockInfo &>(*this))) {
//无人监听此事件,说明无需认证
invoker("");
}
@ -389,10 +384,11 @@ void RtspSession::handleReq_Describe(const Parser &parser) {
invoker(_rtsp_realm);
}
}
void RtspSession::onAuthSuccess() {
TraceP(this);
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){
MediaSource::findAsync(_media_info, weakSelf.lock(), [weakSelf](const MediaSource::Ptr &src){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
@ -400,43 +396,44 @@ void RtspSession::onAuthSuccess() {
auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
if (!rtsp_src) {
//未找到相应的MediaSource
string err = StrPrinter << "no such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
string err = StrPrinter << "no such stream:" << strongSelf->_media_info._vhost << " " << strongSelf->_media_info._app << " " << strongSelf->_media_info._streamid;
strongSelf->send_StreamNotFound();
strongSelf->shutdown(SockException(Err_shutdown,err));
return;
}
//找到了相应的rtsp流
strongSelf->_aTrackInfo = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
if (strongSelf->_aTrackInfo.empty()) {
strongSelf->_sdp_track = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
if (strongSelf->_sdp_track.empty()) {
//该流无效
DebugL << "无trackInfo该流无效";
strongSelf->send_StreamNotFound();
strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp"));
return;
}
strongSelf->_strSession = makeRandStr(12);
strongSelf->_pMediaSrc = rtsp_src;
for(auto &track : strongSelf->_aTrackInfo){
strongSelf->_sessionid = makeRandStr(12);
strongSelf->_play_src = rtsp_src;
for(auto &track : strongSelf->_sdp_track){
track->_ssrc = rtsp_src->getSsrc(track->_type);
track->_seq = rtsp_src->getSeqence(track->_type);
track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
}
strongSelf->sendRtspResponse("200 OK",
{"Content-Base",strongSelf->_strContentBase + "/",
{"Content-Base", strongSelf->_content_base + "/",
"x-Accept-Retransmit","our-retransmit",
"x-Accept-Dynamic-Rate","1"
},rtsp_src->getSdp());
});
}
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
if (!authBasic) {
//我们需要客户端优先以md5方式认证
_strNonce = makeRandStr(32);
_auth_nonce = makeRandStr(32);
sendRtspResponse("401 Unauthorized",
{"WWW-Authenticate",
StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" });
StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _auth_nonce << "\"" });
}else {
//当然我们也支持base64认证,但是我们不建议这样做
sendRtspResponse("401 Unauthorized",
@ -448,10 +445,10 @@ void RtspSession::onAuthFailed(const string &realm,const string &why,bool close)
}
}
void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
void RtspSession::onAuthBasic(const string &realm,const string &auth_base64){
//base64认证
char user_pwd_buf[512];
av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
av_base64_decode((uint8_t *) user_pwd_buf, auth_base64.data(), auth_base64.size());
auto user_pwd_vec = split(user_pwd_buf, ":");
if (user_pwd_vec.size() < 2) {
//认证信息格式不合法回复401 Unauthorized
@ -486,7 +483,7 @@ void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
};
//此时必须提供明文密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,static_cast<SockInfo &>(*this))){
if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, user, true, invoker, static_cast<SockInfo &>(*this))) {
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnP(this) << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
@ -495,9 +492,9 @@ void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
}
}
void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
DebugP(this) << strMd5;
auto mapTmp = Parser::parseArgs(strMd5,",","=");
void RtspSession::onAuthDigest(const string &realm,const string &auth_md5){
DebugP(this) << auth_md5;
auto mapTmp = Parser::parseArgs(auth_md5, ",", "=");
decltype(mapTmp) map;
for(auto &pr : mapTmp){
map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
@ -509,8 +506,8 @@ void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
}
//check nonce
auto nonce = map["nonce"];
if(_strNonce != nonce){
onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce);
if(_auth_nonce != nonce){
onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _auth_nonce);
return ;
}
//check username and uri
@ -570,7 +567,7 @@ void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
};
//此时可以提供明文或md5加密的密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,static_cast<SockInfo &>(*this))){
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, username, false, invoker, static_cast<SockInfo &>(*this))){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnP(this) << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
@ -602,9 +599,11 @@ void RtspSession::onAuthUser(const string &realm,const string &authorization){
onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
}
}
inline void RtspSession::send_StreamNotFound() {
sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
}
inline void RtspSession::send_UnsupportedTransport() {
sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
}
@ -615,35 +614,35 @@ inline void RtspSession::send_SessionNotFound() {
void RtspSession::handleReq_Setup(const Parser &parser) {
//处理setup命令该函数可能进入多次
auto controlSuffix = split(parser.FullUrl(),"/").back();// parser.FullUrl().substr(_strContentBase.size());
auto controlSuffix = split(parser.FullUrl(),"/").back();
if(controlSuffix.front() == '/'){
controlSuffix = controlSuffix.substr(1);
}
int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
SdpTrack::Ptr &trackRef = _sdp_track[trackIdx];
if (trackRef->_inited) {
//已经初始化过该Track
throw SockException(Err_shutdown, "can not setup one track twice");
}
trackRef->_inited = true; //现在初始化
if(_rtpType == Rtsp::RTP_Invalid){
if(_rtp_type == Rtsp::RTP_Invalid){
auto &strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){
_rtpType = Rtsp::RTP_TCP;
_rtp_type = Rtsp::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){
_rtpType = Rtsp::RTP_MULTICAST;
_rtp_type = Rtsp::RTP_MULTICAST;
}else{
_rtpType = Rtsp::RTP_UDP;
_rtp_type = Rtsp::RTP_UDP;
}
}
//允许接收rtp、rtcp包
RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP);
RtspSplitter::enableRecvRtp(_rtp_type == Rtsp::RTP_TCP);
switch (_rtpType) {
switch (_rtp_type) {
case Rtsp::RTP_TCP: {
if(_pushSrc){
if(_push_src){
//rtsp推流时interleaved由推流者决定
auto key_values = Parser::parseArgs(parser["Transport"],";","=");
int interleaved_rtp = -1 , interleaved_rtcp = -1;
@ -658,13 +657,15 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
}
sendRtspResponse("200 OK",
{"Transport", StrPrinter << "RTP/AVP/TCP;unicast;"
<< "interleaved=" << (int)trackRef->_interleaved << "-" << (int)trackRef->_interleaved + 1 << ";"
<< "interleaved=" << (int) trackRef->_interleaved << "-"
<< (int) trackRef->_interleaved + 1 << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc),
"x-Transport-Options", "late-tolerance=1.400000",
"x-Dynamic-Rate", "1"
});
}
break;
case Rtsp::RTP_UDP: {
std::pair<Socket::Ptr, Socket::Ptr> pr;
try{
@ -675,8 +676,8 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
throw SockException(Err_shutdown, ex.what());
}
_apRtpSock[trackIdx] = pr.first;
_apRtcpSock[trackIdx] = pr.second;
_rtp_socks[trackIdx] = pr.first;
_rtcp_socks[trackIdx] = pr.second;
//设置客户端内网端口信息
string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
@ -705,14 +706,15 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
sendRtspResponse("200 OK",
{"Transport", StrPrinter << "RTP/AVP/UDP;unicast;"
<< "client_port=" << strClientPort << ";"
<< "server_port=" << pr.first->get_local_port() << "-" << pr.second->get_local_port() << ";"
<< "server_port=" << pr.first->get_local_port() << "-"
<< pr.second->get_local_port() << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc)
});
}
break;
case Rtsp::RTP_MULTICAST: {
if(!_multicaster){
_multicaster = RtpMultiCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
_multicaster = RtpMultiCaster::get(getPoller(), get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid);
if (!_multicaster) {
send_NotAcceptable();
throw SockException(Err_shutdown, "can not get a available udp multicast socket");
@ -754,19 +756,19 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
}
void RtspSession::handleReq_Play(const Parser &parser) {
if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
if (_sdp_track.empty() || parser["Session"] != _sessionid) {
send_SessionNotFound();
throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any available track when play" : "session not found when play");
throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any available track when play" : "session not found when play");
}
auto pMediaSrc = _pMediaSrc.lock();
if(!pMediaSrc){
auto play_src = _play_src.lock();
if(!play_src){
send_StreamNotFound();
shutdown(SockException(Err_shutdown,"rtsp stream released"));
return;
}
bool useBuf = true;
_enableSendRtp = false;
bool useGOP = true;
_enable_send_rtp = false;
float iStartTime = 0;
auto strRange = parser["Range"];
if (strRange.size()) {
@ -777,53 +779,53 @@ void RtspSession::handleReq_Play(const Parser &parser) {
}
iStartTime = 1000 * atof(strStart.data());
InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
useBuf = !pMediaSrc->seekTo(iStartTime);
} else if (pMediaSrc->totalReaderCount() == 0) {
useGOP = !play_src->seekTo(iStartTime);
} else if (play_src->totalReaderCount() == 0) {
//第一个消费者
pMediaSrc->seekTo(0);
play_src->seekTo(0);
}
_StrPrinter rtp_info;
for (auto &track : _aTrackInfo) {
for (auto &track : _sdp_track) {
if (track->_inited == false) {
//还有track没有setup
shutdown(SockException(Err_shutdown, "track not setuped"));
return;
}
track->_ssrc = pMediaSrc->getSsrc(track->_type);
track->_seq = pMediaSrc->getSeqence(track->_type);
track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
track->_ssrc = play_src->getSsrc(track->_type);
track->_seq = play_src->getSeqence(track->_type);
track->_time_stamp = play_src->getTimeStamp(track->_type);
rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";"
rtp_info << "url=" << _content_base << "/" << track->_control_surffix << ";"
<< "seq=" << track->_seq << ";"
<< "rtptime=" << (int) (track->_time_stamp * (track->_samplerate / 1000)) << ",";
}
rtp_info.pop_back();
sendRtspResponse("200 OK",
{"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useBuf? pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000),
{"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useGOP ? play_src->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000),
"RTP-Info",rtp_info
});
_enableSendRtp = true;
_enable_send_rtp = true;
setSocketFlags();
if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
if (!_play_reader && _rtp_type != Rtsp::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pRtpReader = pMediaSrc->getRing()->attach(getPoller(), useBuf);
_pRtpReader->setDetachCB([weakSelf]() {
_play_reader = play_src->getRing()->attach(getPoller(), useGOP);
_play_reader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
});
_pRtpReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
_play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
if (strongSelf->_enableSendRtp) {
if (strongSelf->_enable_send_rtp) {
strongSelf->sendRtpPacket(pack);
}
});
@ -831,13 +833,13 @@ void RtspSession::handleReq_Play(const Parser &parser) {
}
void RtspSession::handleReq_Pause(const Parser &parser) {
if (parser["Session"] != _strSession) {
if (parser["Session"] != _sessionid) {
send_SessionNotFound();
throw SockException(Err_shutdown,"session not found when pause");
}
sendRtspResponse("200 OK");
_enableSendRtp = false;
_enable_send_rtp = false;
}
void RtspSession::handleReq_Teardown(const Parser &parser) {
@ -873,7 +875,7 @@ void RtspSession::handleReq_Post(const Parser &parser) {
g_mapGetter.erase(sessioncookie);
//http poster收到请求后转发给http getter处理
_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
_on_recv = [this,httpGetterWeak](const Buffer::Ptr &buf){
auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){
shutdown(SockException(Err_shutdown,"http getter released"));
@ -881,18 +883,18 @@ void RtspSession::handleReq_Post(const Parser &parser) {
}
//切换到http getter的线程
httpGetterStrong->async([pBuf,httpGetterWeak](){
httpGetterStrong->async([buf,httpGetterWeak](){
auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){
return;
}
httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size()))));
httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(buf->data(), buf->size()))));
});
};
if(!parser.Content().empty()){
//http poster后面的粘包
_onRecv(std::make_shared<BufferString>(parser.Content()));
_on_recv(std::make_shared<BufferString>(parser.Content()));
}
sendRtspResponse("200 OK",
@ -911,82 +913,82 @@ inline void RtspSession::send_NotAcceptable() {
sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
}
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
_pushSrc->onWrite(rtppt, false);
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
_push_src->onWrite(rtp, false);
}
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
//这是rtcp心跳包说明播放器还存活
_ticker.resetTime();
if(intervaled % 2 == 0){
if(_pushSrc){
inline void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr) {
//这是rtcp心跳包说明播放器还存活
_alive_ticker.resetTime();
if (interleaved % 2 == 0) {
if (_push_src) {
//这是rtsp推流上来的rtp包
auto &ref = _aTrackInfo[intervaled / 2];
handleOneRtp(intervaled / 2, ref->_type, ref->_samplerate, (unsigned char *) pBuf->data(), pBuf->size());
}else if(!_udpSockConnected.count(intervaled)){
auto &ref = _sdp_track[interleaved / 2];
handleOneRtp(interleaved / 2, ref->_type, ref->_samplerate, (unsigned char *) buf->data(), buf->size());
} else if (!_udp_connected_flags.count(interleaved)) {
//这是rtsp播放器的rtp打洞包
_udpSockConnected.emplace(intervaled);
_apRtpSock[intervaled / 2]->setSendPeerAddr(&addr);
_udp_connected_flags.emplace(interleaved);
_rtp_socks[interleaved / 2]->setSendPeerAddr(&addr);
}
} else {
//rtcp包
if(!_udpSockConnected.count(intervaled)){
_udpSockConnected.emplace(intervaled);
_apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr);
if (!_udp_connected_flags.count(interleaved)) {
_udp_connected_flags.emplace(interleaved);
_rtcp_socks[(interleaved - 1) / 2]->setSendPeerAddr(&addr);
}
onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),pBuf->size());
onRtcpPacket((interleaved - 1) / 2, _sdp_track[(interleaved - 1) / 2], (unsigned char *) buf->data(),
buf->size());
}
}
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
inline void RtspSession::startListenPeerUdpData(int track_idx) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto srcIP = inet_addr(get_peer_ip().data());
auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &buf, struct sockaddr *peer_addr, int interleaved){
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return false;
}
if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
<< SockUtil::inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
if (((struct sockaddr_in *) peer_addr)->sin_addr.s_addr != srcIP) {
WarnP(strongSelf.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
<< SockUtil::inet_ntoa(((struct sockaddr_in *) peer_addr)->sin_addr);
return true;
}
struct sockaddr addr=*pPeerAddr;
strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
struct sockaddr addr = *peer_addr;
strongSelf->async([weakSelf, buf, addr, interleaved]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr);
strongSelf->onRcvPeerUdpData(interleaved, buf, addr);
});
return true;
};
switch (_rtpType){
switch (_rtp_type){
case Rtsp::RTP_MULTICAST:{
//组播使用的共享rtcp端口
UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
return onUdpData(pBuf,pPeerAddr,intervaled);
UDPServer::Instance().listenPeer(get_peer_ip().data(), this,
[onUdpData]( int interleaved, const Buffer::Ptr &buf, struct sockaddr *peer_addr) {
return onUdpData(buf, peer_addr, interleaved);
});
}
break;
case Rtsp::RTP_UDP:{
auto setEvent = [&](Socket::Ptr &sock,int intervaled){
auto setEvent = [&](Socket::Ptr &sock,int interleaved){
if(!sock){
WarnP(this) << "udp端口为空:" << intervaled;
WarnP(this) << "udp端口为空:" << interleaved;
return;
}
sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
onUdpData(pBuf,pPeerAddr,intervaled);
sock->setOnRead([onUdpData,interleaved](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
onUdpData(pBuf, pPeerAddr, interleaved);
});
};
setEvent(_apRtpSock[trackIdx], 2*trackIdx );
setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 );
setEvent(_rtp_socks[track_idx], 2 * track_idx );
setEvent(_rtcp_socks[track_idx], 2 * track_idx + 1 );
}
break;
@ -1003,14 +1005,11 @@ static string dateStr(){
return buf;
}
bool RtspSession::sendRtspResponse(const string &res_code,
const StrCaseMap &header_const,
const string &sdp,
const char *protocol){
bool RtspSession::sendRtspResponse(const string &res_code, const StrCaseMap &header_const, const string &sdp, const char *protocol){
auto header = header_const;
header.emplace("CSeq",StrPrinter << _iCseq);
if(!_strSession.empty()){
header.emplace("Session",_strSession);
header.emplace("CSeq",StrPrinter << _cseq);
if(!_sessionid.empty()){
header.emplace("Session", _sessionid);
}
header.emplace("Server",SERVER_NAME);
@ -1040,14 +1039,11 @@ int RtspSession::send(const Buffer::Ptr &pkt){
// if(!_enableSendRtp){
// DebugP(this) << pkt->data();
// }
_ui64TotalBytes += pkt->size();
_bytes_usage += pkt->size();
return TcpSession::send(pkt);
}
bool RtspSession::sendRtspResponse(const string &res_code,
const std::initializer_list<string> &header,
const string &sdp,
const char *protocol) {
bool RtspSession::sendRtspResponse(const string &res_code, const std::initializer_list<string> &header, const string &sdp, const char *protocol) {
string key;
StrCaseMap header_map;
int i = 0;
@ -1062,35 +1058,36 @@ bool RtspSession::sendRtspResponse(const string &res_code,
}
inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (type == _aTrackInfo[i]->_type) {
for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (type == _sdp_track[i]->_type) {
return i;
}
}
if(_aTrackInfo.size() == 1){
if(_sdp_track.size() == 1){
return 0;
}
throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) type);
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (controlSuffix == _sdp_track[i]->_control_surffix) {
return i;
}
}
if(_aTrackInfo.size() == 1){
if(_sdp_track.size() == 1){
return 0;
}
throw SockException(Err_shutdown, StrPrinter << "no such track with suffix:" << controlSuffix);
}
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (_aTrackInfo[i]->_interleaved == interleaved) {
for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (_sdp_track[i]->_interleaved == interleaved) {
return i;
}
}
if(_aTrackInfo.size() == 1){
if(_sdp_track.size() == 1){
return 0;
}
throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved);
@ -1098,7 +1095,7 @@ inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
bool RtspSession::close(MediaSource &sender, bool force) {
//此回调在其他线程触发
if(!_pushSrc || (!force && _pushSrc->totalReaderCount())){
if(!_push_src || (!force && _push_src->totalReaderCount())){
return false;
}
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
@ -1107,17 +1104,34 @@ bool RtspSession::close(MediaSource &sender,bool force) {
}
int RtspSession::totalReaderCount(MediaSource &sender) {
return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount();
return _push_src ? _push_src->totalReaderCount() : sender.readerCount();
}
inline void RtspSession::onSendRtpPacket(const RtpPacket::Ptr &pkt){
#if RTSP_SERVER_SEND_RTCP
int track_index = getTrackIndexByTrackType(pkt->type);
RtcpCounter &counter = _rtcp_counter[track_index];
counter.pktCnt += 1;
counter.octCount += (pkt->size() - pkt->offset);
auto &ticker = _rtcp_send_tickers[track_index];
if (ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
ticker.resetTime();
//直接保存网络字节序
memcpy(&counter.timeStamp, pkt->data() + 8, 4);
sendSenderReport(_rtp_type == Rtsp::RTP_TCP, track_index);
}
#endif
}
void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
//InfoP(this) <<(int)pkt.Interleaved;
switch (_rtpType) {
switch (_rtp_type) {
case Rtsp::RTP_TCP: {
int i = 0;
int size = pkt->size();
setSendFlushFlag(false);
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
onSendRtpPacket(rtp);
if (++i == size) {
setSendFlushFlag(true);
}
@ -1129,15 +1143,15 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
int i = 0;
int size = pkt->size();
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
int iTrackIndex = getTrackIndexByTrackType(rtp->type);
auto &pSock = _apRtpSock[iTrackIndex];
onSendRtpPacket(rtp);
int track_index = getTrackIndexByTrackType(rtp->type);
auto &pSock = _rtp_socks[track_index];
if (!pSock) {
shutdown(SockException(Err_shutdown, "udp sock not opened yet"));
return;
}
BufferRtp::Ptr buffer(new BufferRtp(rtp, 4));
_ui64TotalBytes += buffer->size();
_bytes_usage += buffer->size();
pSock->send(buffer, nullptr, 0, ++i == size);
});
}
@ -1145,42 +1159,27 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
default:
break;
}
#if RTSP_SERVER_SEND_RTCP
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
counter.pktCnt += 1;
counter.octCount += (pkt->length - pkt->offset);
auto &ticker = _aRtcpTicker[iTrackIndex];
if (ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
ticker.resetTime();
//直接保存网络字节序
memcpy(&counter.timeStamp, pkt->payload + 8 , 4);
sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex);
}
#endif
}
void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
void RtspSession::sendSenderReport(bool over_tcp, int track_index) {
static const char s_cname[] = "ZLMediaKitRtsp";
uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
auto &track = _aTrackInfo[iTrackIndex];
auto &counter = _aRtcpCnt[iTrackIndex];
uint8_t rtcp_buf[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
uint8_t *rtcp_sr = rtcp_buf + 4, *rtcp_sdes = rtcp_sr + 28;
auto &track = _sdp_track[track_index];
auto &counter = _rtcp_counter[track_index];
aui8Rtcp[0] = '$';
aui8Rtcp[1] = track->_interleaved + 1;
aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8;
aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;
rtcp_buf[0] = '$';
rtcp_buf[1] = track->_interleaved + 1;
rtcp_buf[2] = (sizeof(rtcp_buf) - 4) >> 8;
rtcp_buf[3] = (sizeof(rtcp_buf) - 4) & 0xFF;
pui8Rtcp_SR[0] = 0x80;
pui8Rtcp_SR[1] = 0xC8;
pui8Rtcp_SR[2] = 0x00;
pui8Rtcp_SR[3] = 0x06;
rtcp_sr[0] = 0x80;
rtcp_sr[1] = 0xC8;
rtcp_sr[2] = 0x00;
rtcp_sr[3] = 0x06;
uint32_t ssrc = htonl(track->_ssrc);
memcpy(&pui8Rtcp_SR[4], &ssrc, 4);
memcpy(&rtcp_sr[4], &ssrc, 4);
uint64_t msw;
uint64_t lsw;
@ -1190,35 +1189,35 @@ void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);
msw = htonl(msw);
memcpy(&pui8Rtcp_SR[8], &msw, 4);
memcpy(&rtcp_sr[8], &msw, 4);
lsw = htonl(lsw);
memcpy(&pui8Rtcp_SR[12], &lsw, 4);
memcpy(&rtcp_sr[12], &lsw, 4);
//直接使用网络字节序
memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4);
memcpy(&rtcp_sr[16], &counter.timeStamp, 4);
uint32_t pktCnt = htonl(counter.pktCnt);
memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);
memcpy(&rtcp_sr[20], &pktCnt, 4);
uint32_t octCount = htonl(counter.octCount);
memcpy(&pui8Rtcp_SR[24], &octCount, 4);
memcpy(&rtcp_sr[24], &octCount, 4);
pui8Rtcp_SDES[0] = 0x81;
pui8Rtcp_SDES[1] = 0xCA;
pui8Rtcp_SDES[2] = 0x00;
pui8Rtcp_SDES[3] = 0x06;
rtcp_sdes[0] = 0x81;
rtcp_sdes[1] = 0xCA;
rtcp_sdes[2] = 0x00;
rtcp_sdes[3] = 0x06;
memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);
memcpy(&rtcp_sdes[4], &ssrc, 4);
pui8Rtcp_SDES[8] = 0x01;
pui8Rtcp_SDES[9] = 0x0f;
memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;
rtcp_sdes[8] = 0x01;
rtcp_sdes[9] = 0x0f;
memcpy(&rtcp_sdes[10], s_cname, sizeof(s_cname));
rtcp_sdes[10 + sizeof(s_cname)] = 0x00;
if(overTcp){
send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
if (over_tcp) {
send(obtainBuffer((char *) rtcp_buf, sizeof(rtcp_buf)));
} else {
_apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
_rtcp_socks[track_index]->send((char *) rtcp_buf + 4, sizeof(rtcp_buf) - 4);
}
}
@ -1234,4 +1233,3 @@ void RtspSession::setSocketFlags(){
}
/* namespace mediakit */

View File

@ -59,51 +59,31 @@ public:
//在请求明文密码时如果提供md5密码者则会导致认证失败
typedef std::function<void(bool encrypted,const string &pwd_or_md5)> onAuth;
RtspSession(const Socket::Ptr &pSock);
RtspSession(const Socket::Ptr &sock);
virtual ~RtspSession();
////TcpSession override////
void onRecv(const Buffer::Ptr &pBuf) override;
void onRecv(const Buffer::Ptr &buf) override;
void onError(const SockException &err) override;
void onManager() override;
protected:
//RtspSplitter override
/**
* rtsp包回调sdp等content数据
* @param parser rtsp包
*/
/////RtspSplitter override/////
//收到完整的rtsp包回调包括sdp等content数据
void onWholeRtspPacket(Parser &parser) override;
/**
* rtp包回调
* @param data
* @param len
*/
//收到rtp包回调
void onRtpPacket(const char *data, uint64_t len) override;
/**
* rtsp头中获取Content长度
* @param parser
* @return
*/
//从rtsp头中获取Content长度
int64_t getContentLength(Parser &parser) override;
//RtpReceiver override
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
//MediaSourceEvent override
////RtpReceiver override////
void onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) override;
////MediaSourceEvent override////
bool close(MediaSource &sender, bool force) override ;
int totalReaderCount(MediaSource &sender) override;
//TcpSession override
/////TcpSession override////
int send(const Buffer::Ptr &pkt) override;
//收到RTCP包回调
virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len);
/**
* RTCP包回调
* @param iTrackidx
* @param track
* @param pucData
* @param uiLen
*/
virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen);
private:
//处理options方法,获取服务器能力
void handleReq_Options(const Parser &parser);
@ -127,26 +107,22 @@ private:
void handleReq_Post(const Parser &parser);
//处理SET_PARAMETER、GET_PARAMETER方法一般用于心跳
void handleReq_SET_PARAMETER(const Parser &parser);
//rtsp资源未找到
void inline send_StreamNotFound();
void send_StreamNotFound();
//不支持的传输模式
void inline send_UnsupportedTransport();
void send_UnsupportedTransport();
//会话id错误
void inline send_SessionNotFound();
void send_SessionNotFound();
//一般rtsp服务器打开端口失败时触发
void inline send_NotAcceptable();
void send_NotAcceptable();
//获取track下标
inline int getTrackIndexByTrackType(TrackType type);
inline int getTrackIndexByControlSuffix(const string &controlSuffix);
inline int getTrackIndexByInterleaved(int interleaved);
int getTrackIndexByTrackType(TrackType type);
int getTrackIndexByControlSuffix(const string &control_suffix);
int getTrackIndexByInterleaved(int interleaved);
//一般用于接收udp打洞包也用于rtsp推流
inline void onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr &addr);
void onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr);
//配合onRcvPeerUdpData使用
inline void startListenPeerUdpData(int iTrackIdx);
void startListenPeerUdpData(int track_idx);
////rtsp专有认证相关////
//认证成功
void onAuthSuccess();
@ -155,73 +131,76 @@ private:
//开始走rtsp专有认证流程
void onAuthUser(const string &realm, const string &authorization);
//校验base64方式的认证加密
void onAuthBasic(const string &realm,const string &strBase64);
void onAuthBasic(const string &realm, const string &auth_base64);
//校验md5方式的认证加密
void onAuthDigest(const string &realm,const string &strMd5);
void onAuthDigest(const string &realm, const string &auth_md5);
//触发url鉴权事件
void emitOnPlay();
//发送rtp给客户端
void sendRtpPacket(const RtspMediaSource::RingDataType &pkt);
//触发rtcp发送
void onSendRtpPacket(const RtpPacket::Ptr &rtp);
//回复客户端
bool sendRtspResponse(const string &res_code, const std::initializer_list<string> &header, const string &sdp = "", const char *protocol = "RTSP/1.0");
bool sendRtspResponse(const string &res_code, const StrCaseMap &header = StrCaseMap(), const string &sdp = "", const char *protocol = "RTSP/1.0");
//服务器发送rtcp
void sendSenderReport(bool overTcp,int iTrackIndex);
void sendSenderReport(bool over_tcp, int track_idx);
//设置socket标志
void setSocketFlags();
private:
//用于判断客户端是否超时
Ticker _ticker;
//收到的seq回复时一致
int _iCseq = 0;
//ContentBase
string _strContentBase;
//Session号
string _strSession;
//记录是否需要rtsp专属鉴权防止重复触发事件
string _rtsp_realm;
//是否已经触发on_play事件
bool _emit_on_play = false;
//url解析后保存的相关信息
MediaInfo _mediaInfo;
//rtsp播放器绑定的直播源
std::weak_ptr<RtspMediaSource> _pMediaSrc;
//直播源读取器
RtspMediaSource::RingType::RingReader::Ptr _pRtpReader;
//是否开始发送rtp
bool _enable_send_rtp;
//推流或拉流客户端采用的rtp传输方式
Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid;
Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid;
//收到的seq回复时一致
int _cseq = 0;
//消耗的总流量
uint64_t _bytes_usage = 0;
//ContentBase
string _content_base;
//Session号
string _sessionid;
//记录是否需要rtsp专属鉴权防止重复触发事件
string _rtsp_realm;
//登录认证
string _auth_nonce;
//用于判断客户端是否超时
Ticker _alive_ticker;
//url解析后保存的相关信息
MediaInfo _media_info;
//rtsp推流相关绑定的源
RtspMediaSourceImp::Ptr _push_src;
//rtsp播放器绑定的直播源
std::weak_ptr<RtspMediaSource> _play_src;
//直播源读取器
RtspMediaSource::RingType::RingReader::Ptr _play_reader;
//sdp里面有效的track,包含音频或视频
vector<SdpTrack::Ptr> _aTrackInfo;
vector<SdpTrack::Ptr> _sdp_track;
//rtcp统计,trackid idx 为数组下标
RtcpCounter _rtcp_counter[2];
//rtcp发送时间,trackid idx 为数组下标
Ticker _rtcp_send_tickers[2];
////////RTP over udp////////
//RTP端口,trackid idx 为数组下标
Socket::Ptr _apRtpSock[2];
Socket::Ptr _rtp_socks[2];
//RTCP端口,trackid idx 为数组下标
Socket::Ptr _apRtcpSock[2];
Socket::Ptr _rtcp_socks[2];
//标记是否收到播放的udp打洞包,收到播放的udp打洞包后才能知道其外网udp端口号
unordered_set<int> _udpSockConnected;
unordered_set<int> _udp_connected_flags;
////////RTP over udp_multicast////////
//共享的rtp组播对象
RtpMultiCaster::Ptr _multicaster;
//登录认证
string _strNonce;
//消耗的总流量
uint64_t _ui64TotalBytes = 0;
//RTSP over HTTP
////////RTSP over HTTP ////////
//quicktime 请求rtsp会产生两次tcp连接
//一次发送 get 一次发送post需要通过x-sessioncookie关联起来
string _http_x_sessioncookie;
function<void(const Buffer::Ptr &pBuf)> _onRecv;
//是否开始发送rtp
bool _enableSendRtp;
//rtsp推流相关
RtspMediaSourceImp::Ptr _pushSrc;
//rtcp统计,trackid idx 为数组下标
RtcpCounter _aRtcpCnt[2];
//rtcp发送时间,trackid idx 为数组下标
Ticker _aRtcpTicker[2];
function<void(const Buffer::Ptr &)> _on_recv;
};
/**