diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index c387ac91..ab3bd899 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -11,6 +11,7 @@ #include "RtmpSession.h" #include "Common/config.h" #include "Util/onceToken.h" +#include "Network/Server.h" using namespace std; using namespace toolkit; @@ -142,48 +143,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { _media_info._schema = RTMP_SCHEMA; auto now_stream_index = _now_stream_index; - auto on_res = [this, token, now_stream_index](const string &err, const ProtocolOption &option) { - _now_stream_index = now_stream_index; - if (!err.empty()) { - sendStatus({ "level", "error", - "code", "NetStream.Publish.BadAuth", - "description", err, - "clientid", "0" }); - shutdown(SockException(Err_shutdown, StrPrinter << "Unauthorized:" << err)); - return; - } - - assert(!_push_src); - auto src = MediaSource::find(RTMP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid); - auto push_failed = (bool)src; - - while (src) { - //尝试断连后继续推流 - auto rtmp_src = dynamic_pointer_cast(src); - if (!rtmp_src) { - //源不是rtmp推流产生的 - break; - } - auto ownership = rtmp_src->getOwnership(); - if (!ownership) { - //获取推流源所有权失败 - break; - } - _push_src = std::move(rtmp_src); - _push_src_ownership = std::move(ownership); - push_failed = false; - break; - } - - if (push_failed) { - sendStatus({"level", "error", - "code", "NetStream.Publish.BadName", - "description", "Already publishing.", - "clientid", "0" }); - shutdown(SockException(Err_shutdown, StrPrinter << "Already publishing:" << err)); - return; - } - + auto on_res_push = [this](ProtocolOption option){ if (!_push_src) { _push_src = std::make_shared(_media_info._vhost, _media_info._app, _media_info._streamid); //获取所有权 @@ -200,6 +160,71 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { setSocketFlags(); }; + auto on_res = [this, on_res_push,token, now_stream_index](const string &err, const ProtocolOption &option) { + _now_stream_index = now_stream_index; + if (!err.empty()) { + sendStatus({ "level", "error", + "code", "NetStream.Publish.BadAuth", + "description", err, + "clientid", "0" }); + shutdown(SockException(Err_shutdown, StrPrinter << "Unauthorized:" << err)); + return; + } + + assert(!_push_src); + auto src = MediaSource::find(RTMP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid); + auto push_failed = (bool)src; + RtmpMediaSourceImp::Ptr rtmp_src; + while (src) { + //尝试断连后继续推流 + rtmp_src = dynamic_pointer_cast(src); + if (!rtmp_src) { + //源不是rtmp推流产生的 + break; + } + auto ownership = rtmp_src->getOwnership(); + if (!ownership) { + //获取推流源所有权失败 + break; + } + _push_src = std::move(rtmp_src); + _push_src_ownership = std::move(ownership); + push_failed = false; + break; + } + + if (push_failed && rtmp_src) { + auto sock = rtmp_src->getOriginSock(); + if(sock){ + auto session = SessionMap::Instance().get(sock->getIdentifier()); + if(session){ + session->getPoller()->async_first([this,rtmp_src,on_res_push,option,session](){ + session->shutdown(SockException(Err_eof, "other pusher kick session")); + _push_src_ownership = rtmp_src->getOwnership(); + if(_push_src_ownership){ + _push_src = std::move(rtmp_src); + }else{ + WarnL<<"not reach this"; + } + this->getPoller()->async_first([option,on_res_push](){ + on_res_push(option); + }); + }); + return; + }else{ + TraceL; + } + } + sendStatus({"level", "error", + "code", "NetStream.Publish.BadName", + "description", "Already publishing.", + "clientid", "0" }); + shutdown(SockException(Err_shutdown, StrPrinter << "Already publishing:" << err)); + return; + } + + on_res_push(option); + }; if(_media_info._app.empty() || _media_info._streamid.empty()){ //不允许莫名其妙的推流url diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 797c62e6..b1e32305 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -287,9 +287,13 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { if(session && rtsp_session){ rtsp_src_imp = rtsp_session->getPushSrc(); session->getPoller()->async_first([this,rtsp_src,option,rtsp_src_imp,onResPushSrc,session](){ - session->shutdown(SockException(Err_shutdown, "other pusher kick session")); + session->shutdown(SockException(Err_eof, "other pusher kick session")); _push_src_ownership = rtsp_src_imp->getOwnership(); - _push_src = std::move(rtsp_src_imp); + if(_push_src_ownership){ + _push_src = std::move(rtsp_src_imp); + }else{ + WarnL<<"not reach this"; + } this->getPoller()->async_first([option,onResPushSrc](){ onResPushSrc(option);