diff --git a/README.md b/README.md index b8166e6e..38c1c4c3 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,8 @@ - 支持虚拟主机,可以隔离不同域名 - 支持按需拉流,无人观看自动关断拉流 - 支持先拉流后推流,提高及时推流画面打开率 + - 支持先播放后推流 + - 支持推流异常断开重连续推播放器不断开 - 提供c api sdk - 支持FFmpeg拉流代理任意格式的流 - 支持http api生成并返回实时截图 diff --git a/conf/config.ini b/conf/config.ini index 2d608cd2..c064e21b 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -89,6 +89,10 @@ wait_track_ready_ms=10000 wait_add_track_ms=3000 #如果track未就绪,我们先缓存帧数据,但是有最大个数限制,防止内存溢出 unready_frame_cache=100 +#推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 +#置0关闭此特性(推流断开会导致立即断开播放器) +#此参数不应大于播放器超时时间 +continue_push_ms=15000 [hls] #hls写文件的buf大小,调整参数可以提高文件io性能 diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 414c7c76..9de0db48 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -85,6 +85,20 @@ const string& MediaSource::getId() const { return _stream_id; } +std::shared_ptr MediaSource::getOwnership() { + if (_owned.test_and_set()) { + //已经被所有 + return nullptr; + } + weak_ptr weak_self = shared_from_this(); + return std::shared_ptr(this, [weak_self](void *ptr) { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->_owned.clear(); + } + }); +} + int MediaSource::getBytesSpeed(TrackType type){ if(type == TrackInvalid){ return _speed[TrackVideo].getSpeed() + _speed[TrackAudio].getSpeed(); @@ -419,8 +433,12 @@ void MediaSource::regist() { //减小互斥锁临界区 lock_guard lock(s_media_source_mtx); auto &ref = s_media_source_map[_schema][_vhost][_app][_stream_id]; - // 增加判断, 防止当前流已注册时再次注册 - if (ref.lock() && ref.lock().get() != this) { + auto src = ref.lock(); + if (src) { + if (src.get() == this) { + return; + } + //增加判断, 防止当前流已注册时再次注册 throw std::invalid_argument("media source already existed:" + _schema + "/" + _vhost + "/" + _app + "/" + _stream_id); } ref = shared_from_this(); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 827bf6e5..14f0fc64 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -219,6 +220,9 @@ public: // 流id const string& getId() const; + //获取对象所有权 + std::shared_ptr getOwnership(); + // 获取所有Track vector getTracks(bool ready = true) const override; @@ -301,6 +305,7 @@ protected: BytesSpeed _speed[TrackMax]; private: + atomic_flag _owned { false }; time_t _create_stamp; Ticker _ticker; string _schema; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 0750d16f..3256402d 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -78,6 +78,7 @@ const string kEnableAudio = GENERAL_FIELD"enable_audio"; const string kWaitTrackReadyMS = GENERAL_FIELD"wait_track_ready_ms"; const string kWaitAddTrackMS = GENERAL_FIELD"wait_add_track_ms"; const string kUnreadyFrameCache = GENERAL_FIELD"unready_frame_cache"; +const string kContinuePushMS = GENERAL_FIELD"continue_push_ms"; onceToken token([](){ mINI::Instance()[kFlowThreshold] = 1024; @@ -100,6 +101,7 @@ onceToken token([](){ mINI::Instance()[kWaitTrackReadyMS] = 10000; mINI::Instance()[kWaitAddTrackMS] = 3000; mINI::Instance()[kUnreadyFrameCache] = 100; + mINI::Instance()[kContinuePushMS] = 15 * 1000; },nullptr); }//namespace General diff --git a/src/Common/config.h b/src/Common/config.h index ab35a566..d2cb9ef5 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -193,6 +193,9 @@ extern const string kWaitTrackReadyMS; extern const string kWaitAddTrackMS; //如果track未就绪,我们先缓存帧数据,但是有最大个数限制(100帧时大约4秒),防止内存溢出 extern const string kUnreadyFrameCache; +//推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 +//置0关闭此特性(推流断开会导致立即断开播放器) +extern const string kContinuePushMS; }//namespace General diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 73a115cb..9cb3d978 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -24,9 +24,9 @@ RtmpSession::~RtmpSession() { } void RtmpSession::onError(const SockException& err) { - bool isPlayer = !_publisher_src; - uint64_t duration = _ticker.createdTime()/1000; - WarnP(this) << (isPlayer ? "RTMP播放器(" : "RTMP推流器(") + bool is_player = !_push_src; + uint64_t duration = _ticker.createdTime() / 1000; + WarnP(this) << (is_player ? "RTMP播放器(" : "RTMP推流器(") << _media_info._vhost << "/" << _media_info._app << "/" << _media_info._streamid @@ -34,26 +34,35 @@ void RtmpSession::onError(const SockException& err) { << ",耗时(s):" << duration; //流量统计事件广播 - GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); - if(_total_bytes >= iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, isPlayer, static_cast(*this)); + if (_total_bytes >= iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, is_player, static_cast(*this)); + } + + GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS); + if (_push_src && continue_push_ms) { + //取消所有权 + _push_src_ownership = nullptr; + //延时10秒注销流 + auto push_src = std::move(_push_src); + getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; }); } } void RtmpSession::onManager() { - GET_CONFIG(uint32_t,handshake_sec,Rtmp::kHandshakeSecond); - GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond); + GET_CONFIG(uint32_t, handshake_sec, Rtmp::kHandshakeSecond); + GET_CONFIG(uint32_t, keep_alive_sec, Rtmp::kKeepAliveSecond); if (_ticker.createdTime() > handshake_sec * 1000) { - if (!_ring_reader && !_publisher_src) { - shutdown(SockException(Err_timeout,"illegal connection")); + if (!_ring_reader && !_push_src) { + shutdown(SockException(Err_timeout, "illegal connection")); } } - if (_publisher_src) { - //publisher + if (_push_src) { + // push if (_ticker.elapsedTime() > keep_alive_sec * 1000) { - shutdown(SockException(Err_timeout,"recv data from rtmp pusher timeout")); + shutdown(SockException(Err_timeout, "recv data from rtmp pusher timeout")); } } } @@ -121,31 +130,61 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { _media_info.parse(_tc_url + "/" + getStreamId(dec.load())); _media_info._schema = RTMP_SCHEMA; - auto on_res = [this,pToken](const string &err, bool enableHls, bool enableMP4){ - auto src = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA, - _media_info._vhost, - _media_info._app, - _media_info._streamid)); - bool auth_success = err.empty(); - bool ok = (!src && !_publisher_src && auth_success); - AMFValue status(AMF_OBJECT); - status.set("level", ok ? "status" : "error"); - status.set("code", ok ? "NetStream.Publish.Start" : (auth_success ? "NetStream.Publish.BadName" : "NetStream.Publish.BadAuth")); - status.set("description", ok ? "Started publishing stream." : (auth_success ? "Already publishing." : err.data())); - status.set("clientid", "0"); - sendReply("onStatus", nullptr, status); - if (!ok) { - string errMsg = StrPrinter << (auth_success ? "already publishing:" : err.data()) << " " - << _media_info._vhost << " " - << _media_info._app << " " - << _media_info._streamid; - shutdown(SockException(Err_shutdown,errMsg)); + auto on_res = [this, pToken](const string &err, bool enableHls, bool enableMP4) { + if (!err.empty()) { + sendStatus({ "level", "error", + "code", "NetStream.Publish.BadAuth", + "description", err, + "clientid", "0" }); + shutdown(SockException(Err_shutdown, StrPrinter << "Unauthorized:" << err)); return; } - _publisher_src.reset(new RtmpMediaSourceImp(_media_info._vhost, _media_info._app, _media_info._streamid)); - _publisher_src->setListener(dynamic_pointer_cast(shared_from_this())); - //设置转协议 - _publisher_src->setProtocolTranslation(enableHls, enableMP4); + + assert(!_push_src); + auto src = MediaSource::find(RTMP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid); + auto push_failed = (bool)src; + + while (src) { + //尝试断连后继续推流 + auto rtmp_src = dynamic_pointer_cast(src); + if (!rtmp_src) { + //源不是rtmp推流产生的 + break; + } + auto ownership = rtmp_src->getOwnership(); + if (!ownership) { + //获取推流源所有权失败 + break; + } + _push_src = std::move(rtmp_src); + _push_src_ownership = std::move(ownership); + push_failed = false; + break; + } + + if (push_failed) { + sendStatus({"level", "error", + "code", "NetStream.Publish.BadName", + "description", "Already publishing.", + "clientid", "0" }); + shutdown(SockException(Err_shutdown, StrPrinter << "Already publishing:" << err)); + return; + } + + if (!_push_src) { + _push_src = std::make_shared(_media_info._vhost, _media_info._app, _media_info._streamid); + //获取所有权 + _push_src_ownership = _push_src->getOwnership(); + _push_src->setProtocolTranslation(enableHls, enableMP4); + } + + _push_src->setListener(dynamic_pointer_cast(shared_from_this())); + + sendStatus({"level", "status", + "code", "NetStream.Publish.Start", + "description", "Started publishing stream.", + "clientid", "0" }); + setSocketFlags(); }; @@ -178,15 +217,27 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { } void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) { - AMFValue status(AMF_OBJECT); - status.set("level", "status"); - status.set("code", "NetStream.Unpublish.Success"); - status.set("description", "Stop publishing."); - sendReply("onStatus", nullptr, status); + sendStatus({ "level", "status", + "code", "NetStream.Unpublish.Success", + "description", "Stop publishing." }); throw std::runtime_error(StrPrinter << "Stop publishing" << endl); } -void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src){ +void RtmpSession::sendStatus(const std::initializer_list &key_value) { + AMFValue status(AMF_OBJECT); + int i = 0; + string key; + for (auto &val : key_value) { + if (++i % 2 == 0) { + status.set(key, val); + } else { + key = val; + } + } + sendReply("onStatus", nullptr, status); +} + +void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr &src) { bool auth_success = err.empty(); bool ok = (src.operator bool() && auth_success); if (ok) { @@ -194,13 +245,12 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr sendUserControl(CONTROL_STREAM_BEGIN, STREAM_MEDIA); } // onStatus(NetStream.Play.Reset) - AMFValue status(AMF_OBJECT); - status.set("level", ok ? "status" : "error"); - status.set("code", ok ? "NetStream.Play.Reset" : (auth_success ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth")); - status.set("description", ok ? "Resetting and playing." : (auth_success ? "No such stream." : err.data())); - status.set("details", _media_info._streamid); - status.set("clientid", "0"); - sendReply("onStatus", nullptr, status); + sendStatus({ "level", (ok ? "status" : "error"), + "code", (ok ? "NetStream.Play.Reset" : (auth_success ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth")), + "description", (ok ? "Resetting and playing." : (auth_success ? "No such stream." : err.data())), + "details", _media_info._streamid, + "clientid", "0" }); + if (!ok) { string err_msg = StrPrinter << (auth_success ? "no such stream:" : err.data()) << " " << _media_info._vhost << " " @@ -211,13 +261,12 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr } // onStatus(NetStream.Play.Start) - status.clear(); - status.set("level", "status"); - status.set("code", "NetStream.Play.Start"); - status.set("description", "Started playing."); - status.set("details", _media_info._streamid); - status.set("clientid", "0"); - sendReply("onStatus", nullptr, status); + + sendStatus({ "level", "status", + "code", "NetStream.Play.Start", + "description", "Started playing." , + "details", _media_info._streamid, + "clientid", "0"}); // |RtmpSampleAccess(true, true) AMFEncoder invoke; @@ -232,13 +281,11 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr sendResponse(MSG_DATA, invoke.data()); //onStatus(NetStream.Play.PublishNotify) - status.clear(); - status.set("level", "status"); - status.set("code", "NetStream.Play.PublishNotify"); - status.set("description", "Now published."); - status.set("details", _media_info._streamid); - status.set("clientid", "0"); - sendReply("onStatus", nullptr, status); + sendStatus({ "level", "status", + "code", "NetStream.Play.PublishNotify", + "description", "Now published." , + "details", _media_info._streamid, + "clientid", "0"}); auto &metadata = src->getMetaData(); if(metadata){ @@ -280,7 +327,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); }); src->pause(false); - _player_src = src; + _play_src = src; //提高服务器发送性能 setSocketFlags(); } @@ -386,14 +433,14 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) { dec.load();/* NULL */ bool paused = dec.load(); TraceP(this) << paused; - AMFValue status(AMF_OBJECT); - status.set("level", "status"); - status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify"); - status.set("description", paused ? "Paused stream." : "Unpaused stream."); - sendReply("onStatus", nullptr, status); + + sendStatus({ "level", "status", + "code", (paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify"), + "description", (paused ? "Paused stream." : "Unpaused stream.")}); + //streamBegin sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA); - auto strongSrc = _player_src.lock(); + auto strongSrc = _play_src.lock(); if (strongSrc) { strongSrc->pause(paused); } @@ -405,17 +452,16 @@ void RtmpSession::onCmd_playCtrl(AMFDecoder &dec) { int ctrlType = ctrlObj["ctrlType"].as_integer(); float speed = ctrlObj["speed"].as_number(); - AMFValue status(AMF_OBJECT); - status.set("level", "status"); - status.set("code", "NetStream.Speed.Notify"); - status.set("description", "Speeding"); - sendReply("onStatus", nullptr, status); + sendStatus({ "level", "status", + "code", "NetStream.Speed.Notify", + "description", "Speeding"}); + //streamBegin sendUserControl(CONTROL_STREAM_EOF, STREAM_MEDIA); - auto stongSrc = _player_src.lock(); - if (stongSrc) { - stongSrc->speed(speed); + auto strong_src = _play_src.lock(); + if (strong_src) { + strong_src->speed(speed); } } @@ -424,7 +470,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) { if (type != "onMetaData") { throw std::runtime_error("can only set metadata"); } - _publisher_metadata = dec.load(); + _push_metadata = dec.load(); } void RtmpSession::onProcessCmd(AMFDecoder &dec) { @@ -471,7 +517,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) { setMetaData(dec); } else if (type == "onMetaData") { //兼容某些不规范的推流器 - _publisher_metadata = dec.load(); + _push_metadata = dec.load(); } else { TraceP(this) << "unknown notify:" << type; } @@ -480,8 +526,8 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) { case MSG_AUDIO: case MSG_VIDEO: { - if (!_publisher_src) { - WarnL << "Not a rtmp publisher!"; + if (!_push_src) { + WarnL << "Not a rtmp push!"; return; } GET_CONFIG(bool, rtmp_modify_stamp, Rtmp::kModifyStamp); @@ -493,9 +539,9 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) { if (!_set_meta_data) { _set_meta_data = true; - _publisher_src->setMetaData(_publisher_metadata ? _publisher_metadata : TitleMeta().getMetadata()); + _push_src->setMetaData(_push_metadata ? _push_metadata : TitleMeta().getMetadata()); } - _publisher_src->onWrite(std::move(packet)); + _push_src->onWrite(std::move(packet)); break; } @@ -507,15 +553,13 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) { void RtmpSession::onCmd_seek(AMFDecoder &dec) { dec.load();/* NULL */ - AMFValue status(AMF_OBJECT); - status.set("level", "status"); - status.set("code", "NetStream.Seek.Notify"); - status.set("description", "Seeking."); - sendReply("onStatus", nullptr, status); + sendStatus({ "level", "status", + "code", "NetStream.Seek.Notify", + "description", "Seeking."}); auto milliSeconds = (uint32_t)(dec.load().as_number()); InfoP(this) << "rtmp seekTo(ms):" << milliSeconds; - auto strong_src = _player_src.lock(); + auto strong_src = _play_src.lock(); if (strong_src) { strong_src->seekTo(milliSeconds); } @@ -527,7 +571,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { bool RtmpSession::close(MediaSource &sender,bool force) { //此回调在其他线程触发 - if(!_publisher_src || (!force && _publisher_src->totalReaderCount())){ + if(!_push_src || (!force && _push_src->totalReaderCount())){ return false; } string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; @@ -536,7 +580,7 @@ bool RtmpSession::close(MediaSource &sender,bool force) { } int RtmpSession::totalReaderCount(MediaSource &sender) { - return _publisher_src ? _publisher_src->totalReaderCount() : sender.readerCount(); + return _push_src ? _push_src->totalReaderCount() : sender.readerCount(); } MediaOriginType RtmpSession::getOriginType(MediaSource &sender) const{ diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 62626546..7e289695 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -85,6 +85,7 @@ private: void setSocketFlags(); string getStreamId(const string &str); void dumpMetadata(const AMFValue &metadata); + void sendStatus(const std::initializer_list &key_value); private: bool _set_meta_data = false; @@ -97,9 +98,10 @@ private: //数据接收超时计时器 Ticker _ticker; MediaInfo _media_info; - std::weak_ptr _player_src; - AMFValue _publisher_metadata; - std::shared_ptr _publisher_src; + std::weak_ptr _play_src; + AMFValue _push_metadata; + RtmpMediaSourceImp::Ptr _push_src; + std::shared_ptr _push_src_ownership; RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; }; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index af1cd266..45e641b3 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -60,9 +60,9 @@ RtspSession::~RtspSession() { } void RtspSession::onError(const SockException &err) { - bool isPlayer = !_push_src; + bool is_player = !_push_src; uint64_t duration = _alive_ticker.createdTime() / 1000; - WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(") + WarnP(this) << (is_player ? "RTSP播放器(" : "RTSP推流器(") << _media_info._vhost << "/" << _media_info._app << "/" << _media_info._streamid @@ -81,16 +81,24 @@ void RtspSession::onError(const SockException &err) { } //流量统计事件广播 - GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); - if(_bytes_usage >= iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, isPlayer, static_cast(*this)); + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); + if (_bytes_usage >= iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, is_player, static_cast(*this)); } + GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS); + if (_push_src && continue_push_ms) { + //取消所有权 + _push_src_ownership = nullptr; + //延时10秒注销流 + auto push_src = std::move(_push_src); + getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; }); + } } void RtspSession::onManager() { - GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond); - GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond); + GET_CONFIG(uint32_t, handshake_sec, Rtsp::kHandshakeSecond); + GET_CONFIG(uint32_t, keep_alive_sec, Rtsp::kKeepAliveSecond); if (_alive_ticker.createdTime() > handshake_sec * 1000) { if (_sessionid.size() == 0) { @@ -198,20 +206,6 @@ void RtspSession::handleReq_Options(const Parser &parser) { } void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { - auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, - _media_info._vhost, - _media_info._app, - _media_info._streamid)); - if (src) { - sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing."); - string err = StrPrinter << "ANNOUNCE:" - << "Already publishing:" - << _media_info._vhost << " " - << _media_info._app << " " - << _media_info._streamid << endl; - throw SockException(Err_shutdown, err); - } - auto full_url = parser.FullUrl(); _content_base = full_url; if (end_with(full_url, ".sdp")) { @@ -227,21 +221,50 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { throw SockException(Err_shutdown, StrPrinter << err << ":" << full_url); } - auto onRes = [this, parser, full_url](const string &err, bool enableHls, bool enableMP4){ - bool authSuccess = err.empty(); - if (!authSuccess) { - sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); + auto onRes = [this, parser, full_url](const string &err, bool enableHls, bool enableMP4) { + if (!err.empty()) { + sendRtspResponse("401 Unauthorized", { "Content-Type", "text/plain" }, err); shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err)); return; } + assert(!_push_src); + auto src = MediaSource::find(RTSP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid); + auto push_failed = (bool)src; + + while (src) { + //尝试断连后继续推流 + auto rtsp_src = dynamic_pointer_cast(src); + if (!rtsp_src) { + //源不是rtmp推流产生的 + break; + } + auto ownership = rtsp_src->getOwnership(); + if (!ownership) { + //获取推流源所有权失败 + break; + } + _push_src = std::move(rtsp_src); + _push_src_ownership = std::move(ownership); + push_failed = false; + break; + } + + if (push_failed) { + sendRtspResponse("406 Not Acceptable", { "Content-Type", "text/plain" }, "Already publishing."); + string err = StrPrinter << "ANNOUNCE:" + << "Already publishing:" << _media_info._vhost << " " << _media_info._app << " " + << _media_info._streamid << endl; + throw SockException(Err_shutdown, err); + } + SdpParser sdpParser(parser.Content()); _sessionid = makeRandStr(12); _sdp_track = sdpParser.getAvailableTrack(); if (_sdp_track.empty()) { - //sdp无效 + // sdp无效 static constexpr auto err = "sdp中无有效track"; - sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, err); + sendRtspResponse("403 Forbidden", { "Content-Type", "text/plain" }, err); shutdown(SockException(Err_shutdown, StrPrinter << err << ":" << full_url)); return; } @@ -249,11 +272,16 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { for (auto &track : _sdp_track) { _rtcp_context.emplace_back(std::make_shared()); } - auto push_src = std::make_shared(_media_info._vhost, _media_info._app, _media_info._streamid); - push_src->setListener(dynamic_pointer_cast(shared_from_this())); - push_src->setProtocolTranslation(enableHls, enableMP4); - push_src->setSdp(parser.Content()); - _push_src = std::move(push_src); + + if (!_push_src) { + _push_src = std::make_shared(_media_info._vhost, _media_info._app, _media_info._streamid); + //获取所有权 + _push_src_ownership = _push_src->getOwnership(); + _push_src->setProtocolTranslation(enableHls, enableMP4); + _push_src->setSdp(parser.Content()); + } + + _push_src->setListener(dynamic_pointer_cast(shared_from_this())); sendRtspResponse("200 OK"); }; diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 9911b0d4..87712453 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -185,7 +185,9 @@ private: //url解析后保存的相关信息 MediaInfo _media_info; //rtsp推流相关绑定的源 - RtspMediaSource::Ptr _push_src; + RtspMediaSourceImp::Ptr _push_src; + //推流器所有权 + std::shared_ptr _push_src_ownership; //rtsp播放器绑定的直播源 std::weak_ptr _play_src; //直播源读取器