diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 93a0929b..a3462b25 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -72,7 +72,7 @@ protected: } //请在回调中调用mk_media_release函数释放资源,否则MediaSource::close()操作不会生效 _on_close(_on_close_data); - WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + WarnL << "close media:" << sender.getUrl() << " " << force; return true; } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index d72dea53..0bf0d224 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -312,6 +312,14 @@ static inline string getPusherKey(const string &schema, const string &vhost, con return schema + "/" + vhost + "/" + app + "/" + stream + "/" + MD5(dst_url).hexdigest(); } +static void fillSockInfo(Value& val, SockInfo* info) { + val["peer_ip"] = info->get_peer_ip(); + val["peer_port"] = info->get_peer_port(); + val["local_port"] = info->get_local_port(); + val["local_ip"] = info->get_local_ip(); + val["identifier"] = info->getIdentifier(); +} + Value makeMediaSourceJson(MediaSource &media){ Value item; item["schema"] = media.getSchema(); @@ -330,11 +338,7 @@ Value makeMediaSourceJson(MediaSource &media){ item["isRecordingHLS"] = media.isRecording(Recorder::type_hls); auto originSock = media.getOriginSock(); if (originSock) { - item["originSock"]["local_ip"] = originSock->get_local_ip(); - item["originSock"]["local_port"] = originSock->get_local_port(); - item["originSock"]["peer_ip"] = originSock->get_peer_ip(); - item["originSock"]["peer_port"] = originSock->get_peer_port(); - item["originSock"]["identifier"] = originSock->getIdentifier(); + fillSockInfo(item["originSock"], originSock.get()); } else { item["originSock"] = Json::nullValue; } @@ -782,9 +786,7 @@ void installWebApi() { [](std::shared_ptr &&info) -> std::shared_ptr { auto obj = std::make_shared(); auto session = static_pointer_cast(info); - (*obj)["peer_ip"] = session->get_peer_ip(); - (*obj)["peer_port"] = session->get_peer_port(); - (*obj)["id"] = session->getIdentifier(); + fillSockInfo(*obj, session.get()); (*obj)["typeid"] = toolkit::demangle(typeid(*session).name()); return obj; }); @@ -868,10 +870,7 @@ void installWebApi() { if(!peer_ip.empty() && peer_ip != session->get_peer_ip()){ return; } - jsession["peer_ip"] = session->get_peer_ip(); - jsession["peer_port"] = session->get_peer_port(); - jsession["local_ip"] = session->get_local_ip(); - jsession["local_port"] = session->get_local_port(); + fillSockInfo(jsession, session.get()); jsession["id"] = id; jsession["typeid"] = toolkit::demangle(typeid(*session).name()); val["data"].append(jsession); @@ -1137,10 +1136,7 @@ void installWebApi() { return; } val["exist"] = true; - val["peer_ip"] = process->get_peer_ip(); - val["peer_port"] = process->get_peer_port(); - val["local_port"] = process->get_local_port(); - val["local_ip"] = process->get_local_ip(); + fillSockInfo(val, process.get()); }); api_regist("/index/api/openRtpServer",[](API_ARGS_MAP){ diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 810aead7..ea2ee033 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -13,6 +13,7 @@ #include "Util/util.h" #include "Network/sockutil.h" #include "Network/TcpSession.h" +#include "Util/NoticeCenter.h" using namespace std; using namespace toolkit; @@ -24,7 +25,11 @@ namespace toolkit { namespace mediakit { static recursive_mutex s_media_source_mtx; -static MediaSource::SchemaVhostAppStreamMap s_media_source_map; +using StreamMap = unordered_map >; +using AppStreamMap = unordered_map; +using VhostAppStreamMap = unordered_map; +using SchemaVhostAppStreamMap = unordered_map; +static SchemaVhostAppStreamMap s_media_source_map; string getOriginTypeString(MediaOriginType type){ #define SWITCH_CASE(type) case MediaOriginType::type : return #type @@ -43,10 +48,6 @@ string getOriginTypeString(MediaOriginType type){ } } -static string getOriginUrl_l(const MediaSource *thiz) { - return thiz->getSchema() + "://" + thiz->getVhost() + "/" + thiz->getApp() + "/" + thiz->getId(); -} - ////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct MediaSourceNull : public MediaSource { MediaSourceNull() : MediaSource("schema", "vhost", "app", "stream") {}; @@ -109,16 +110,12 @@ std::shared_ptr MediaSource::getOwnership() { } int MediaSource::getBytesSpeed(TrackType type){ - if(type == TrackInvalid){ + if(type == TrackInvalid || type == TrackMax){ return _speed[TrackVideo].getSpeed() + _speed[TrackAudio].getSpeed(); } return _speed[type].getSpeed(); } -uint64_t MediaSource::getCreateStamp() const { - return _create_stamp; -} - uint64_t MediaSource::getAliveSecond() const { //使用Ticker对象获取存活时间的目的是防止修改系统时间导致回退 return _ticker.createdTime() / 1000; @@ -140,6 +137,7 @@ std::weak_ptr MediaSource::getListener(bool next) const{ if (!next) { return _listener; } + auto listener = dynamic_pointer_cast(_listener.lock()); if (!listener) { //不是MediaSourceEventInterceptor对象或者对象已经销毁 @@ -170,13 +168,13 @@ MediaOriginType MediaSource::getOriginType() const { string MediaSource::getOriginUrl() const { auto listener = _listener.lock(); if (!listener) { - return getOriginUrl_l(this); + return getUrl(); } auto ret = listener->getOriginUrl(const_cast(*this)); if (!ret.empty()) { return ret; } - return getOriginUrl_l(this); + return getUrl(); } std::shared_ptr MediaSource::getOriginSock() const { @@ -253,7 +251,7 @@ void MediaSource::onReaderChanged(int size) { bool MediaSource::setupRecord(Recorder::type type, bool start, const string &custom_path, size_t max_second){ auto listener = _listener.lock(); if (!listener) { - WarnL << "未设置MediaSource的事件监听者,setupRecord失败:" << getSchema() << "/" << getVhost() << "/" << getApp() << "/" << getId(); + WarnL << "未设置MediaSource的事件监听者,setupRecord失败:" << getUrl(); return false; } return listener->setupRecord(*this, type, start, custom_path, max_second); @@ -337,7 +335,7 @@ void MediaSource::for_each_media(const function &cb, static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, const string &app, const string &id, bool from_mp4) { string vhost = vhost_in; - GET_CONFIG(bool,enableVhost,General::kEnableVhost); + GET_CONFIG(bool, enableVhost, General::kEnableVhost); if(vhost.empty() || !enableVhost){ vhost = DEFAULT_VHOST; } @@ -351,7 +349,7 @@ static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, con MediaSource::for_each_media([&](const MediaSource::Ptr &src) { ret = std::move(const_cast(src)); }, schema, vhost, app, id); if(!ret && from_mp4 && schema != HLS_SCHEMA){ - //未查找媒体源,则读取mp4创建一个 + //未找到媒体源,则读取mp4创建一个 //播放hls不触发mp4点播(因为HLS也可以用于录像,不是纯粹的直播) ret = MediaSource::createFromMP4(schema, vhost, app, id); } @@ -379,7 +377,7 @@ static void findAsync_l(const MediaInfo &info, const std::shared_ptr &s }; auto on_timeout = poller->doDelayTask(maxWaitMS, [cb_once, listener_tag]() { - //最多等待一定时间,如果这个时间内,流未注册上,那么返回未找到流 + // 最多等待一定时间,如在这个时间内,流还未注册上,则返回空 NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged); cb_once(nullptr); return 0; @@ -402,17 +400,15 @@ static void findAsync_l(const MediaInfo &info, const std::shared_ptr &s //不是自己感兴趣的事件,忽略之 return; } + poller->async([weak_session, cancel_all, info, cb_once]() { cancel_all(); - auto strong_session = weak_session.lock(); - if (!strong_session) { - //自己已经销毁 - return; + if (auto strong_session = weak_session.lock()) { + //播发器请求的流终于注册上了,切换到自己的线程再回复 + DebugL << "收到媒体注册事件,回复播放器:" << info.getUrl(); + //再找一遍媒体源,一般能找到 + findAsync_l(info, strong_session, false, cb_once); } - //播发器请求的流终于注册上了,切换到自己的线程再回复 - DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid; - //再找一遍媒体源,一般能找到 - findAsync_l(info, strong_session, false, cb_once); }, false); }; @@ -458,7 +454,7 @@ void MediaSource::emitEvent(bool regist){ } //触发广播 NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, regist, *this); - InfoL << (regist ? "媒体注册:" : "媒体注销:") << _schema << " " << _vhost << " " << _app << " " << _stream_id; + InfoL << (regist ? "媒体注册:" : "媒体注销:") << getUrl(); } void MediaSource::regist() { @@ -472,7 +468,7 @@ void MediaSource::regist() { return; } //增加判断, 防止当前流已注册时再次注册 - throw std::invalid_argument("media source already existed:" + _schema + "/" + _vhost + "/" + _app + "/" + _stream_id); + throw std::invalid_argument("media source already existed:" + getUrl()); } ref = shared_from_this(); } @@ -519,9 +515,9 @@ bool MediaSource::unregist() { /////////////////////////////////////MediaInfo////////////////////////////////////// -void MediaInfo::parse(const string &url_in){ +void MediaInfo::parse(const std::string &url_in){ _full_url = url_in; - string url = url_in; + auto url = url_in; auto pos = url.find("?"); if (pos != string::npos) { _param_strs = url.substr(pos + 1); @@ -621,11 +617,7 @@ void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){ NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strong_sender); } else { //这个是mp4点播,我们自动关闭 - WarnL << "MP4点播无人观看,自动关闭:" - << strong_sender->getSchema() << "/" - << strong_sender->getVhost() << "/" - << strong_sender->getApp() << "/" - << strong_sender->getId(); + WarnL << "MP4点播无人观看,自动关闭:" << strong_sender->getUrl(); strong_sender->close(false); } return false; @@ -633,7 +625,7 @@ void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){ } string MediaSourceEvent::getOriginUrl(MediaSource &sender) const { - return getOriginUrl_l(&sender); + return sender.getUrl(); } MediaOriginType MediaSourceEventInterceptor::getOriginType(MediaSource &sender) const { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 7c7a96b1..2b67773b 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -19,13 +19,8 @@ #include #include "Common/config.h" #include "Common/Parser.h" -#include "Util/logger.h" -#include "Util/TimeTicker.h" -#include "Util/NoticeCenter.h" #include "Util/List.h" #include "Network/Socket.h" -#include "Rtsp/Rtsp.h" -#include "Rtmp/Rtmp.h" #include "Extension/Track.h" #include "Record/Recorder.h" @@ -85,7 +80,7 @@ public: // 通知观看人数变化 virtual void onReaderChanged(MediaSource &sender, int size); //流注册或注销事件 - virtual void onRegist(MediaSource &sender, bool regist) {}; + virtual void onRegist(MediaSource &sender, bool regist) {} // 获取丢包率 virtual float getLossRate(MediaSource &sender, TrackType type) { return -1; } // 获取所在线程, 此函数一般强制重载 @@ -95,7 +90,7 @@ public: // 开启或关闭录制 virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const std::string &custom_path, size_t max_second) { return false; }; // 获取录制状态 - virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }; + virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; } // 获取所有track相关信息 virtual std::vector getMediaTracks(MediaSource &sender, bool trackReady = true) const { return std::vector(); }; @@ -180,7 +175,12 @@ public: MediaInfo() {} MediaInfo(const std::string &url) { parse(url); } void parse(const std::string &url); - + std::string shortUrl() const { + return _vhost + "/" + _app + "/" + _streamid; + } + std::string getUrl() const { + return _schema + "://" + shortUrl(); + } public: std::string _full_url; std::string _schema; @@ -199,10 +199,6 @@ class MediaSource: public TrackSource, public std::enable_shared_from_this; - using StreamMap = std::unordered_map >; - using AppStreamMap = std::unordered_map; - using VhostAppStreamMap = std::unordered_map; - using SchemaVhostAppStreamMap = std::unordered_map; MediaSource(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &stream_id) ; virtual ~MediaSource(); @@ -218,6 +214,13 @@ public: // 流id const std::string& getId() const; + std::string shortUrl() const { + return _vhost + "/" + _app + "/" + _stream_id; + } + std::string getUrl() const { + return _schema + "://" + shortUrl(); + } + //获取对象所有权 std::shared_ptr getOwnership(); @@ -232,7 +235,7 @@ public: // 获取数据速率,单位bytes/s int getBytesSpeed(TrackType type = TrackInvalid); // 获取流创建GMT unix时间戳,单位秒 - uint64_t getCreateStamp() const; + uint64_t getCreateStamp() const {return _create_stamp;} // 获取流上线时间,单位秒 uint64_t getAliveSecond() const; @@ -288,8 +291,11 @@ public: // 同步查找流 static Ptr find(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &id, bool from_mp4 = false); + static Ptr find(const MediaInfo &info, bool from_mp4 = false) { + return find(info._schema, info._vhost, info._app, info._streamid, from_mp4); + } - // 忽略类型,同步查找流,可能返回rtmp/rtsp/hls类型 + // 忽略schema,同步查找流,可能返回rtmp/rtsp/hls类型 static Ptr find(const std::string &vhost, const std::string &app, const std::string &stream_id, bool from_mp4 = false); // 异步查找流 @@ -335,6 +341,7 @@ public: bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, size_t cache_size); private: + // 音视频的最后时间戳 uint64_t _last_stamp[2] = {0, 0}; }; diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index af2eeaab..a0d3ad28 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -103,9 +103,7 @@ void HttpSession::onError(const SockException& err) { //flv/ts播放器 uint64_t duration = _ticker.createdTime() / 1000; WarnP(this) << "FLV/TS/FMP4播放器(" - << _mediaInfo._vhost << "/" - << _mediaInfo._app << "/" - << _mediaInfo._streamid + << _mediaInfo.shortUrl() << ")断开:" << err.what() << ",耗时(s):" << duration; diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index ec0eb210..d3f51d54 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -160,7 +160,7 @@ bool PlayerProxy::close(MediaSource &sender, bool force) { strongSelf->teardown(); }); _on_close(SockException(Err_shutdown, "closed by user")); - WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + WarnL << sender.getUrl() << " " << force; return true; } diff --git a/src/Record/HlsMediaSource.cpp b/src/Record/HlsMediaSource.cpp index 63c0085b..adcc909a 100644 --- a/src/Record/HlsMediaSource.cpp +++ b/src/Record/HlsMediaSource.cpp @@ -40,8 +40,7 @@ HlsCookieData::~HlsCookieData() { if (*_added) { uint64_t duration = (_ticker.createdTime() - _ticker.elapsedTime()) / 1000; WarnL << _sock_info->getIdentifier() << "(" << _sock_info->get_peer_ip() << ":" << _sock_info->get_peer_port() - << ") " << "HLS播放器(" << _info._vhost << "/" << _info._app << "/" << _info._streamid - << ")断开,耗时(s):" << duration; + << ") " << "HLS播放器(" << _info.shortUrl() << ")断开,耗时(s):" << duration; GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); uint64_t bytes = _bytes.load(); diff --git a/src/Record/MP4Reader.cpp b/src/Record/MP4Reader.cpp index f58299fa..10377092 100644 --- a/src/Record/MP4Reader.cpp +++ b/src/Record/MP4Reader.cpp @@ -235,7 +235,7 @@ bool MP4Reader::close(MediaSource &sender, bool force) { return false; } _timer.reset(); - WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + WarnL << sender.getUrl() << " " << force; return true; } diff --git a/src/Record/MP4Recorder.cpp b/src/Record/MP4Recorder.cpp index 35dbeaa0..d527bb27 100644 --- a/src/Record/MP4Recorder.cpp +++ b/src/Record/MP4Recorder.cpp @@ -63,6 +63,7 @@ void MP4Recorder::createFile() { WarnL << ex.what(); } } + void MP4Recorder::asyncClose() { auto muxer = _muxer; auto full_path_tmp = _full_path_tmp; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index aa9d5e9c..a5323853 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -31,9 +31,7 @@ void RtmpSession::onError(const SockException& err) { bool is_player = !_push_src_ownership; uint64_t duration = _ticker.createdTime() / 1000; WarnP(this) << (is_player ? "RTMP播放器(" : "RTMP推流器(") - << _media_info._vhost << "/" - << _media_info._app << "/" - << _media_info._streamid + << _media_info.shortUrl() << ")断开:" << err.what() << ",耗时(s):" << duration; @@ -256,10 +254,7 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr "clientid", "0" }); if (!ok) { - string err_msg = StrPrinter << (auth_success ? "no such stream:" : err.data()) << " " - << _media_info._vhost << " " - << _media_info._app << " " - << _media_info._streamid; + string err_msg = StrPrinter << (auth_success ? "no such stream:" : err.data()) << " " << _media_info.shortUrl(); shutdown(SockException(Err_shutdown, err_msg)); return; } @@ -579,7 +574,7 @@ bool RtmpSession::close(MediaSource &sender,bool force) { if(!_push_src || (!force && _push_src->totalReaderCount())){ return false; } - string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + string err = StrPrinter << "close media:" << sender.getUrl() << " " << force; safeShutdown(SockException(Err_shutdown,err)); return true; } @@ -619,6 +614,6 @@ void RtmpSession::dumpMetadata(const AMFValue &metadata) { metadata.object_for_each([&](const string &key, const AMFValue &val) { printer << "\r\n" << key << "\t:" << val.to_string(); }); - InfoL << _media_info._vhost << " " << _media_info._app << " " << _media_info._streamid << (string) printer; + InfoL << _media_info.shortUrl() << (string) printer; } } /* namespace mediakit */ diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 0e1822f8..df4e0900 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -52,9 +52,7 @@ RtpProcess::RtpProcess(const string &stream_id) { RtpProcess::~RtpProcess() { uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000; WarnP(this) << "RTP推流器(" - << _media_info._vhost << "/" - << _media_info._app << "/" - << _media_info._streamid + << _media_info.shortUrl() << ")断开,耗时(s):" << duration; //流量统计事件广播 @@ -272,7 +270,7 @@ MediaOriginType RtpProcess::getOriginType(MediaSource &sender) const{ } string RtpProcess::getOriginUrl(MediaSource &sender) const { - return _media_info._schema + "://" + _media_info._vhost + "/" + _media_info._app + "/" + _media_info._streamid; + return _media_info.getUrl(); } std::shared_ptr RtpProcess::getOriginSock(MediaSource &sender) const { diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 507ec2fc..e60e7e10 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -144,7 +144,7 @@ bool RtpProcessHelper::close(MediaSource &sender, bool force) { return false; } parent->delProcess(_stream_id, _process.get()); - WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + WarnL << "close media:" << sender.getUrl() << " " << force; return true; } diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 4b3213e6..8c9de0c3 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -129,7 +129,7 @@ bool RtpSession::close(MediaSource &sender, bool force) { if(!_process || (!force && static_pointer_cast(_process)->totalReaderCount(sender))){ return false; } - string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + string err = StrPrinter << "close media:" << sender.getUrl() << " " << force; safeShutdown(SockException(Err_shutdown,err)); return true; } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 91a51dc7..196d222f 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -63,9 +63,7 @@ void RtspSession::onError(const SockException &err) { bool is_player = !_push_src_ownership; uint64_t duration = _alive_ticker.createdTime() / 1000; WarnP(this) << (is_player ? "RTSP播放器(" : "RTSP推流器(") - << _media_info._vhost << "/" - << _media_info._app << "/" - << _media_info._streamid + << _media_info.shortUrl() << ")断开:" << err.what() << ",耗时(s):" << duration; @@ -249,9 +247,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { if (push_failed) { sendRtspResponse("406 Not Acceptable", { "Content-Type", "text/plain" }, "Already publishing."); - string err = StrPrinter << "ANNOUNCE:" - << "Already publishing:" << _media_info._vhost << " " << _media_info._app << " " - << _media_info._streamid << endl; + string err = StrPrinter << "ANNOUNCE: Already publishing:" << _media_info.shortUrl() << endl; throw SockException(Err_shutdown, err); } @@ -417,7 +413,7 @@ void RtspSession::onAuthSuccess() { auto rtsp_src = dynamic_pointer_cast(src); if (!rtsp_src) { //未找到相应的MediaSource - string err = StrPrinter << "no such stream:" << strong_self->_media_info._vhost << " " << strong_self->_media_info._app << " " << strong_self->_media_info._streamid; + string err = StrPrinter << "no such stream:" << strong_self->_media_info.shortUrl(); strong_self->send_StreamNotFound(); strong_self->shutdown(SockException(Err_shutdown,err)); return; @@ -1134,7 +1130,7 @@ bool RtspSession::close(MediaSource &sender, bool force) { if(!_push_src || (!force && _push_src->totalReaderCount())){ return false; } - string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + string err = StrPrinter << "close media:" << sender.getUrl() << " " << force; safeShutdown(SockException(Err_shutdown,err)); return true; } diff --git a/src/Shell/ShellCMD.h b/src/Shell/ShellCMD.h index 95c99a20..2e283b00 100644 --- a/src/Shell/ShellCMD.h +++ b/src/Shell/ShellCMD.h @@ -23,12 +23,7 @@ public: MediaSource::for_each_media([&](const MediaSource::Ptr &media) { if (ini.find("list") != ini.end()) { //列出源 - (*stream) << "\t" - << media->getSchema() << "/" - << media->getVhost() << "/" - << media->getApp() << "/" - << media->getId() - << "\r\n"; + (*stream) << "\t" << media->getUrl() << "\r\n"; return; } @@ -42,20 +37,10 @@ public: if (!media->close(true)) { break; } - (*stream) << "\t踢出成功:" - << media->getSchema() << "/" - << media->getVhost() << "/" - << media->getApp() << "/" - << media->getId() - << "\r\n"; + (*stream) << "\t踢出成功:" << media->getUrl() << "\r\n"; return; } while (0); - (*stream) << "\t踢出失败:" - << media->getSchema() << "/" - << media->getVhost() << "/" - << media->getApp() << "/" - << media->getId() - << "\r\n"; + (*stream) << "\t踢出失败:" << media->getUrl() << "\r\n"; } }, false); diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 036aa0b7..9147f573 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -10,8 +10,7 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) SrtTransportImp::~SrtTransportImp() { InfoP(this); uint64_t duration = _alive_ticker.createdTime() / 1000; - WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info._vhost << "/" << _media_info._app << "/" - << _media_info._streamid << ")断开,耗时(s):" << duration; + WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info.shortUrl() << ")断开,耗时(s):" << duration; // 流量统计事件广播 GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); @@ -89,8 +88,7 @@ bool SrtTransportImp::parseStreamid(std::string &streamid) { _media_info._app = app; _media_info._streamid = stream_name; - TraceL << " vhost=" << _media_info._vhost << " app=" << _media_info._app << " streamid=" << _media_info._streamid - << " params=" << _media_info._param_strs; + TraceL << " mediainfo=" << _media_info.shortUrl() << " params=" << _media_info._param_strs; return true; } @@ -115,8 +113,7 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force) { if (!force && totalReaderCount(sender)) { return false; } - std::string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" - << sender.getApp() << "/" << sender.getId() << " " << force; + std::string err = StrPrinter << "close media:" << sender.getUrl() << " " << force; weak_ptr weak_self = static_pointer_cast(shared_from_this()); getPoller()->async([weak_self, err]() { auto strong_self = weak_self.lock(); diff --git a/tests/test_server.cpp b/tests/test_server.cpp index da0d7bd1..d488e61a 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -90,26 +90,22 @@ static mutex s_mtxFlvRecorder; void initEventListener() { static onceToken s_token([]() { //监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问 - NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnGetRtspRealm, - [](BroadcastOnGetRtspRealmArgs) { - DebugL << "RTSP是否需要鉴权事件:" << args._schema << " " << args._vhost << " " - << args._app << " " << args._streamid << " " - << args._param_strs; - if (string("1") == args._streamid) { - // live/1需要认证 - //该流需要认证,并且设置realm - invoker(REALM); - } else { - //有时我们要查询redis或数据库来判断该流是否需要认证,通过invoker的方式可以做到完全异步 - //该流我们不需要认证 - invoker(""); - } - }); + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnGetRtspRealm, [](BroadcastOnGetRtspRealmArgs) { + DebugL << "RTSP是否需要鉴权事件:" << args.getUrl() << " " << args._param_strs; + if (string("1") == args._streamid) { + // live/1需要认证 + //该流需要认证,并且设置realm + invoker(REALM); + } else { + //有时我们要查询redis或数据库来判断该流是否需要认证,通过invoker的方式可以做到完全异步 + //该流我们不需要认证 + invoker(""); + } + }); //监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码 NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnRtspAuth, [](BroadcastOnRtspAuthArgs) { - DebugL << "RTSP播放鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid - << " " << args._param_strs; + DebugL << "RTSP播放鉴权:" << args.getUrl() << " " << args._param_strs; DebugL << "RTSP用户:" << user_name << (must_no_encrypt ? " Base64" : " MD5") << " 方式登录"; string user = user_name; //假设我们异步读取数据库 @@ -139,16 +135,14 @@ void initEventListener() { //监听rtsp/rtmp推流事件,返回结果告知是否有推流权限 NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) { - DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " - << args._param_strs; + DebugL << "推流鉴权:" << args.getUrl() << " " << args._param_strs; invoker("", ProtocolOption());//鉴权成功 //invoker("this is auth failed message");//鉴权失败 }); //监听rtsp/rtsps/rtmp/http-flv播放事件,返回结果告知是否有播放权限(rtsp通过kBroadcastOnRtspAuth或此事件都可以实现鉴权) NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPlayed, [](BroadcastMediaPlayedArgs) { - DebugL << "播放鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " - << args._param_strs; + DebugL << "播放鉴权:" << args.getUrl() << " " << args._param_strs; invoker("");//鉴权成功 //invoker("this is auth failed message");//鉴权失败 }); @@ -164,42 +158,38 @@ void initEventListener() { NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, [](BroadcastMediaChangedArgs) { if (sender.getSchema() == RTMP_SCHEMA && sender.getApp() == "live") { lock_guard lck(s_mtxFlvRecorder); + auto key = sender.shortUrl(); if (bRegist) { - DebugL << "开始录制RTMP:" << sender.getSchema() << " " << sender.getVhost() << " " << sender.getApp() << " " << sender.getId(); + DebugL << "开始录制RTMP:" << sender.getUrl(); GET_CONFIG(string, http_root, Http::kRootPath); - auto path = - http_root + "/" + sender.getVhost() + "/" + sender.getApp() + "/" + sender.getId() + "_" + to_string(time(NULL)) + ".flv"; + auto path = http_root + "/" + key + "_" + to_string(time(NULL)) + ".flv"; FlvRecorder::Ptr recorder(new FlvRecorder); try { recorder->startRecord(EventPollerPool::Instance().getPoller(), dynamic_pointer_cast(sender.shared_from_this()), path); - s_mapFlvRecorder[sender.getVhost() + "/" + sender.getApp() + "/" + sender.getId()] = recorder; + s_mapFlvRecorder[key] = recorder; } catch (std::exception &ex) { WarnL << ex.what(); } } else { - s_mapFlvRecorder.erase(sender.getVhost() + "/" + sender.getApp() + "/" + sender.getId()); + s_mapFlvRecorder.erase(key); } } }); //监听播放失败(未找到特定的流)事件 - NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream, - [](BroadcastNotFoundStreamArgs) { - /** - * 你可以在这个事件触发时再去拉流,这样就可以实现按需拉流 - * 拉流成功后,ZLMediaKit会把其立即转发给播放器(最大等待时间约为5秒,如果5秒都未拉流成功,播放器会播放失败) - */ - DebugL << "未找到流事件:" << args._schema << " " << args._vhost << " " - << args._app << " " << args._streamid << " " - << args._param_strs; - }); + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream, [](BroadcastNotFoundStreamArgs) { + /** + * 你可以在这个事件触发时再去拉流,这样就可以实现按需拉流 + * 拉流成功后,ZLMediaKit会把其立即转发给播放器(最大等待时间约为5秒,如果5秒都未拉流成功,播放器会播放失败) + */ + DebugL << "未找到流事件:" << args.getUrl() << " " << args._param_strs; + }); //监听播放或推流结束时消耗流量事件 NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastFlowReport, [](BroadcastFlowReportArgs) { - DebugL << "播放器(推流器)断开连接事件:" << args._schema << " " << args._vhost << " " << args._app << " " - << args._streamid << " " << args._param_strs + DebugL << "播放器(推流器)断开连接事件:" << args.getUrl() << " " << args._param_strs << "\r\n使用流量:" << totalBytes << " bytes,连接时长:" << totalDuration << "秒"; }); diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp index 520ca79d..74f216d9 100644 --- a/webrtc/WebRtcPlayer.cpp +++ b/webrtc/WebRtcPlayer.cpp @@ -72,9 +72,7 @@ void WebRtcPlayer::onDestory() { GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_reader && getSession()) { WarnL << "RTC播放器(" - << _media_info._vhost << "/" - << _media_info._app << "/" - << _media_info._streamid + << _media_info.shortUrl() << ")结束播放,耗时(s):" << duration; if (bytes_usage >= iFlowThreshold * 1024) { NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 9d006493..30182039 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -43,8 +43,7 @@ bool WebRtcPusher::close(MediaSource &sender, bool force) { if (!force && totalReaderCount(sender)) { return false; } - string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" - << sender.getApp() << "/" << sender.getId() << " " << force; + string err = StrPrinter << "close media:" << sender.getUrl() << " " << force; weak_ptr weak_self = static_pointer_cast(shared_from_this()); getPoller()->async([weak_self, err]() { auto strong_self = weak_self.lock(); @@ -123,9 +122,7 @@ void WebRtcPusher::onDestory() { if (getSession()) { WarnL << "RTC推流器(" - << _media_info._vhost << "/" - << _media_info._app << "/" - << _media_info._streamid + << _media_info.shortUrl() << ")结束推流,耗时(s):" << duration; if (bytes_usage >= iFlowThreshold * 1024) { NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration,