diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index fa1bf993..797c62e6 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -17,6 +17,7 @@ #include "Util/base64.h" #include "RtpMultiCaster.h" #include "Rtcp/RtcpContext.h" +#include "Network/Server.h" using namespace std; using namespace toolkit; @@ -218,41 +219,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { throw SockException(Err_shutdown, StrPrinter << err << ":" << full_url); } - auto onRes = [this, parser, full_url](const string &err, const ProtocolOption &option) { - if (!err.empty()) { - sendRtspResponse("401 Unauthorized", { "Content-Type", "text/plain" }, err); - shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err)); - return; - } - - assert(!_push_src); - auto src = MediaSource::find(RTSP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid); - auto push_failed = (bool)src; - - while (src) { - //尝试断连后继续推流 - auto rtsp_src = dynamic_pointer_cast(src); - if (!rtsp_src) { - //源不是rtsp推流产生的 - break; - } - auto ownership = rtsp_src->getOwnership(); - if (!ownership) { - //获取推流源所有权失败 - break; - } - _push_src = std::move(rtsp_src); - _push_src_ownership = std::move(ownership); - push_failed = false; - break; - } - - if (push_failed) { - sendRtspResponse("406 Not Acceptable", { "Content-Type", "text/plain" }, "Already publishing."); - string err = StrPrinter << "ANNOUNCE: Already publishing:" << _media_info.shortUrl() << endl; - throw SockException(Err_shutdown, err); - } - + auto onResPushSrc = [this, parser, full_url](ProtocolOption option) { SdpParser sdpParser(parser.Content()); _sessionid = makeRandStr(12); _sdp_track = sdpParser.getAvailableTrack(); @@ -270,7 +237,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { if (!_push_src) { _push_src = std::make_shared(_media_info._vhost, _media_info._app, _media_info._streamid); - //获取所有权 + // 获取所有权 _push_src_ownership = _push_src->getOwnership(); _push_src->setProtocolOption(option); _push_src->setSdp(parser.Content()); @@ -281,6 +248,66 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { sendRtspResponse("200 OK"); }; + auto onRes = [this, parser, full_url,onResPushSrc](const string &err, const ProtocolOption &option) { + if (!err.empty()) { + sendRtspResponse("401 Unauthorized", { "Content-Type", "text/plain" }, err); + shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err)); + return; + } + + assert(!_push_src); + auto src = MediaSource::find(RTSP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid); + auto push_failed = (bool)src; + RtspMediaSource::Ptr rtsp_src; + RtspMediaSourceImp::Ptr rtsp_src_imp; + while (src) { + //尝试断连后继续推流 + rtsp_src_imp = dynamic_pointer_cast(src); + rtsp_src = dynamic_pointer_cast(src); + if (!rtsp_src_imp) { + //源不是rtsp推流产生的 + break; + } + auto ownership = rtsp_src_imp->getOwnership(); + if (!ownership) { + //获取推流源所有权失败 + break; + } + _push_src = std::move(rtsp_src_imp); + _push_src_ownership = std::move(ownership); + push_failed = false; + break; + } + + if (push_failed && rtsp_src) { + auto sock = rtsp_src->getOriginSock(); + if(sock){ + auto session = SessionMap::Instance().get(sock->getIdentifier()); + auto rtsp_session = dynamic_pointer_cast(session); + if(session && rtsp_session){ + rtsp_src_imp = rtsp_session->getPushSrc(); + session->getPoller()->async_first([this,rtsp_src,option,rtsp_src_imp,onResPushSrc,session](){ + session->shutdown(SockException(Err_shutdown, "other pusher kick session")); + _push_src_ownership = rtsp_src_imp->getOwnership(); + _push_src = std::move(rtsp_src_imp); + + this->getPoller()->async_first([option,onResPushSrc](){ + onResPushSrc(option); + }); + }); + return; + } + } + + sendRtspResponse("406 Not Acceptable", { "Content-Type", "text/plain" }, "not reach this"); + string err = StrPrinter << "ANNOUNCE: not reach this:" << _media_info.shortUrl() << endl; + throw SockException(Err_shutdown, err); + } + + onResPushSrc(option); + + }; + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); Broadcast::PublishAuthInvoker invoker = [weak_self, onRes](const string &err, const ProtocolOption &option) { auto strong_self = weak_self.lock(); @@ -1249,5 +1276,9 @@ void RtspSession::setSocketFlags(){ } } +RtspMediaSourceImp::Ptr RtspSession::getPushSrc(){ + return _push_src; +} + } /* namespace mediakit */ diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 4b476dd0..ffeb845d 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -40,6 +40,8 @@ public: void onError(const toolkit::SockException &err) override; void onManager() override; + RtspMediaSourceImp::Ptr getPushSrc(); + protected: /////RtspSplitter override///// //收到完整的rtsp包回调,包括sdp等content数据