rtsp push replace pre pusher

This commit is contained in:
xiongguangjie 2023-03-24 23:13:25 +08:00
parent ca7efd5941
commit 67d5ca0246
2 changed files with 69 additions and 36 deletions

View File

@ -17,6 +17,7 @@
#include "Util/base64.h" #include "Util/base64.h"
#include "RtpMultiCaster.h" #include "RtpMultiCaster.h"
#include "Rtcp/RtcpContext.h" #include "Rtcp/RtcpContext.h"
#include "Network/Server.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
@ -218,41 +219,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
throw SockException(Err_shutdown, StrPrinter << err << ":" << full_url); throw SockException(Err_shutdown, StrPrinter << err << ":" << full_url);
} }
auto onRes = [this, parser, full_url](const string &err, const ProtocolOption &option) { auto onResPushSrc = [this, parser, full_url](ProtocolOption option) {
if (!err.empty()) {
sendRtspResponse("401 Unauthorized", { "Content-Type", "text/plain" }, err);
shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err));
return;
}
assert(!_push_src);
auto src = MediaSource::find(RTSP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid);
auto push_failed = (bool)src;
while (src) {
//尝试断连后继续推流
auto rtsp_src = dynamic_pointer_cast<RtspMediaSourceImp>(src);
if (!rtsp_src) {
//源不是rtsp推流产生的
break;
}
auto ownership = rtsp_src->getOwnership();
if (!ownership) {
//获取推流源所有权失败
break;
}
_push_src = std::move(rtsp_src);
_push_src_ownership = std::move(ownership);
push_failed = false;
break;
}
if (push_failed) {
sendRtspResponse("406 Not Acceptable", { "Content-Type", "text/plain" }, "Already publishing.");
string err = StrPrinter << "ANNOUNCE: Already publishing:" << _media_info.shortUrl() << endl;
throw SockException(Err_shutdown, err);
}
SdpParser sdpParser(parser.Content()); SdpParser sdpParser(parser.Content());
_sessionid = makeRandStr(12); _sessionid = makeRandStr(12);
_sdp_track = sdpParser.getAvailableTrack(); _sdp_track = sdpParser.getAvailableTrack();
@ -270,7 +237,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
if (!_push_src) { if (!_push_src) {
_push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid); _push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
//获取所有权 // 获取所有权
_push_src_ownership = _push_src->getOwnership(); _push_src_ownership = _push_src->getOwnership();
_push_src->setProtocolOption(option); _push_src->setProtocolOption(option);
_push_src->setSdp(parser.Content()); _push_src->setSdp(parser.Content());
@ -281,6 +248,66 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
sendRtspResponse("200 OK"); sendRtspResponse("200 OK");
}; };
auto onRes = [this, parser, full_url,onResPushSrc](const string &err, const ProtocolOption &option) {
if (!err.empty()) {
sendRtspResponse("401 Unauthorized", { "Content-Type", "text/plain" }, err);
shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err));
return;
}
assert(!_push_src);
auto src = MediaSource::find(RTSP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid);
auto push_failed = (bool)src;
RtspMediaSource::Ptr rtsp_src;
RtspMediaSourceImp::Ptr rtsp_src_imp;
while (src) {
//尝试断连后继续推流
rtsp_src_imp = dynamic_pointer_cast<RtspMediaSourceImp>(src);
rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
if (!rtsp_src_imp) {
//源不是rtsp推流产生的
break;
}
auto ownership = rtsp_src_imp->getOwnership();
if (!ownership) {
//获取推流源所有权失败
break;
}
_push_src = std::move(rtsp_src_imp);
_push_src_ownership = std::move(ownership);
push_failed = false;
break;
}
if (push_failed && rtsp_src) {
auto sock = rtsp_src->getOriginSock();
if(sock){
auto session = SessionMap::Instance().get(sock->getIdentifier());
auto rtsp_session = dynamic_pointer_cast<RtspSession>(session);
if(session && rtsp_session){
rtsp_src_imp = rtsp_session->getPushSrc();
session->getPoller()->async_first([this,rtsp_src,option,rtsp_src_imp,onResPushSrc,session](){
session->shutdown(SockException(Err_shutdown, "other pusher kick session"));
_push_src_ownership = rtsp_src_imp->getOwnership();
_push_src = std::move(rtsp_src_imp);
this->getPoller()->async_first([option,onResPushSrc](){
onResPushSrc(option);
});
});
return;
}
}
sendRtspResponse("406 Not Acceptable", { "Content-Type", "text/plain" }, "not reach this");
string err = StrPrinter << "ANNOUNCE: not reach this:" << _media_info.shortUrl() << endl;
throw SockException(Err_shutdown, err);
}
onResPushSrc(option);
};
weak_ptr<RtspSession> weak_self = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weak_self = dynamic_pointer_cast<RtspSession>(shared_from_this());
Broadcast::PublishAuthInvoker invoker = [weak_self, onRes](const string &err, const ProtocolOption &option) { Broadcast::PublishAuthInvoker invoker = [weak_self, onRes](const string &err, const ProtocolOption &option) {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
@ -1249,5 +1276,9 @@ void RtspSession::setSocketFlags(){
} }
} }
RtspMediaSourceImp::Ptr RtspSession::getPushSrc(){
return _push_src;
}
} }
/* namespace mediakit */ /* namespace mediakit */

View File

@ -40,6 +40,8 @@ public:
void onError(const toolkit::SockException &err) override; void onError(const toolkit::SockException &err) override;
void onManager() override; void onManager() override;
RtspMediaSourceImp::Ptr getPushSrc();
protected: protected:
/////RtspSplitter override///// /////RtspSplitter override/////
//收到完整的rtsp包回调包括sdp等content数据 //收到完整的rtsp包回调包括sdp等content数据