add MediaTuple

This commit is contained in:
Johnny 2023-05-25 16:23:24 +08:00 committed by 夏楚
parent f4ee607feb
commit 0232caf068
41 changed files with 242 additions and 273 deletions

View File

@ -61,19 +61,19 @@ API_EXPORT const char* API_CALL mk_parser_get_content(const mk_parser ctx, size_
///////////////////////////////////////////MediaInfo/////////////////////////////////////////////
//MediaInfo对象的C映射
typedef struct mk_media_info_t *mk_media_info;
//MediaInfo::_param_strs
//MediaInfo::param_strs
API_EXPORT const char* API_CALL mk_media_info_get_params(const mk_media_info ctx);
//MediaInfo::_schema
//MediaInfo::schema
API_EXPORT const char* API_CALL mk_media_info_get_schema(const mk_media_info ctx);
//MediaInfo::_vhost
//MediaInfo::vhost
API_EXPORT const char* API_CALL mk_media_info_get_vhost(const mk_media_info ctx);
//MediaInfo::_app
//MediaInfo::app
API_EXPORT const char* API_CALL mk_media_info_get_app(const mk_media_info ctx);
//MediaInfo::_streamid
//MediaInfo::stream
API_EXPORT const char* API_CALL mk_media_info_get_stream(const mk_media_info ctx);
//MediaInfo::_host
//MediaInfo::host
API_EXPORT const char* API_CALL mk_media_info_get_host(const mk_media_info ctx);
//MediaInfo::_port
//MediaInfo::port
API_EXPORT uint16_t API_CALL mk_media_info_get_port(const mk_media_info ctx);

View File

@ -126,43 +126,43 @@ API_EXPORT const char* API_CALL mk_parser_get_content(const mk_parser ctx, size_
API_EXPORT const char* API_CALL mk_media_info_get_params(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_param_strs.c_str();
return info->param_strs.c_str();
}
API_EXPORT const char* API_CALL mk_media_info_get_schema(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_schema.c_str();
return info->schema.c_str();
}
API_EXPORT const char* API_CALL mk_media_info_get_vhost(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_vhost.c_str();
return info->vhost.c_str();
}
API_EXPORT const char* API_CALL mk_media_info_get_host(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_host.c_str();
return info->host.c_str();
}
API_EXPORT uint16_t API_CALL mk_media_info_get_port(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_port;
return info->port;
}
API_EXPORT const char* API_CALL mk_media_info_get_app(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_app.c_str();
return info->app.c_str();
}
API_EXPORT const char* API_CALL mk_media_info_get_stream(const mk_media_info ctx){
assert(ctx);
MediaInfo *info = (MediaInfo *)ctx;
return info->_streamid.c_str();
return info->stream.c_str();
}
///////////////////////////////////////////MediaSource/////////////////////////////////////////////

View File

@ -22,7 +22,8 @@ public:
MediaHelper(const char *vhost, const char *app, const char *stream, float duration, const ProtocolOption &option) {
_poller = EventPollerPool::Instance().getPoller();
// 在poller线程中创建DevChannel(MultiMediaSourceMuxer)对象,确保严格的线程安全限制
_poller->sync([&]() { _channel = std::make_shared<DevChannel>(vhost, app, stream, duration, option); });
auto tuple = MediaTuple{vhost, app, stream};
_poller->sync([&]() { _channel = std::make_shared<DevChannel>(tuple, duration, option); });
}
~MediaHelper() = default;

View File

@ -102,9 +102,9 @@ void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url,cons
_process.run(cmd, log_file);
InfoL << cmd;
if (is_local_ip(_media_info._host)) {
if (is_local_ip(_media_info.host)) {
//推流给自己的,通过判断流是否注册上来判断是否正常
if(_media_info._schema != RTSP_SCHEMA && _media_info._schema != RTMP_SCHEMA){
if (_media_info.schema != RTSP_SCHEMA && _media_info.schema != RTMP_SCHEMA) {
cb(SockException(Err_other,"本服务只支持rtmp/rtsp推流"));
return;
}
@ -154,10 +154,10 @@ void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url,cons
}
void FFmpegSource::findAsync(int maxWaitMS, const function<void(const MediaSource::Ptr &src)> &cb) {
auto src = MediaSource::find(_media_info._schema,
_media_info._vhost,
_media_info._app,
_media_info._streamid);
auto src = MediaSource::find(_media_info.schema,
_media_info.vhost,
_media_info.app,
_media_info.stream);
if(src || !maxWaitMS){
cb(src);
return;
@ -183,10 +183,10 @@ void FFmpegSource::findAsync(int maxWaitMS, const function<void(const MediaSourc
}
if (!bRegist ||
sender.getSchema() != strongSelf->_media_info._schema ||
sender.getVhost() != strongSelf->_media_info._vhost ||
sender.getApp() != strongSelf->_media_info._app ||
sender.getId() != strongSelf->_media_info._streamid) {
sender.getSchema() != strongSelf->_media_info.schema ||
sender.getVhost() != strongSelf->_media_info.vhost ||
sender.getApp() != strongSelf->_media_info.app ||
sender.getId() != strongSelf->_media_info.stream) {
//不是自己感兴趣的事件,忽略之
return;
}
@ -223,7 +223,7 @@ void FFmpegSource::startTimer(int timeout_ms) {
return false;
}
bool needRestart = ffmpeg_restart_sec > 0 && strongSelf->_replay_ticker.elapsedTime() > ffmpeg_restart_sec * 1000;
if (is_local_ip(strongSelf->_media_info._host)) {
if (is_local_ip(strongSelf->_media_info.host)) {
//推流给自己的我们通过检查是否已经注册来判断FFmpeg是否工作正常
strongSelf->findAsync(0, [&](const MediaSource::Ptr &src) {
//同步查找流

View File

@ -1467,7 +1467,8 @@ void installWebApi() {
api_regist("/index/api/deleteRecordDirectory", [](API_ARGS_MAP) {
CHECK_SECRET();
CHECK_ARGS("vhost", "app", "stream");
auto record_path = Recorder::getRecordPath(Recorder::type_mp4, allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["customized_path"]);
auto tuple = MediaTuple{allArgs["vhost"], allArgs["app"], allArgs["stream"]};
auto record_path = Recorder::getRecordPath(Recorder::type_mp4, tuple, allArgs["customized_path"]);
auto period = allArgs["period"];
record_path = record_path + period + "/";
int result = File::delete_file(record_path.data());
@ -1484,7 +1485,8 @@ void installWebApi() {
api_regist("/index/api/getMp4RecordFile", [](API_ARGS_MAP){
CHECK_SECRET();
CHECK_ARGS("vhost", "app", "stream");
auto record_path = Recorder::getRecordPath(Recorder::type_mp4, allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["customized_path"]);
auto tuple = MediaTuple{allArgs["vhost"], allArgs["app"], allArgs["stream"]};
auto record_path = Recorder::getRecordPath(Recorder::type_mp4, tuple, allArgs["customized_path"]);
auto period = allArgs["period"];
//判断是获取mp4文件列表还是获取文件夹列表

View File

@ -215,11 +215,11 @@ void do_http_hook(const string &url, const ArgsType &body, const function<void(c
static ArgsType make_json(const MediaInfo &args) {
ArgsType body;
body["schema"] = args._schema;
body[VHOST_KEY] = args._vhost;
body["app"] = args._app;
body["stream"] = args._streamid;
body["params"] = args._param_strs;
body["schema"] = args.schema;
body[VHOST_KEY] = args.vhost;
body["app"] = args.app;
body["stream"] = args.stream;
body["params"] = args.param_strs;
return body;
}
@ -263,12 +263,12 @@ static const string kEdgeServerParam = "edge=1";
static string getPullUrl(const string &origin_fmt, const MediaInfo &info) {
char url[1024] = { 0 };
if ((ssize_t)origin_fmt.size() > snprintf(url, sizeof(url), origin_fmt.data(), info._app.data(), info._streamid.data())) {
if ((ssize_t)origin_fmt.size() > snprintf(url, sizeof(url), origin_fmt.data(), info.app.data(), info.stream.data())) {
WarnL << "get origin url failed, origin_fmt:" << origin_fmt;
return "";
}
// 告知源站这是来自边沿站的拉流请求,如果未找到流请立即返回拉流失败
return string(url) + '?' + kEdgeServerParam + '&' + VHOST_KEY + '=' + info._vhost + '&' + info._param_strs;
return string(url) + '?' + kEdgeServerParam + '&' + VHOST_KEY + '=' + info.vhost + '&' + info.param_strs;
}
static void pullStreamFromOrigin(const vector<string> &urls, size_t index, size_t failed_cnt, const MediaInfo &args, const function<void()> &closePlayer) {
@ -280,10 +280,10 @@ static void pullStreamFromOrigin(const vector<string> &urls, size_t index, size_
InfoL << "pull stream from origin, failed_cnt: " << failed_cnt << ", timeout_sec: " << timeout_sec << ", url: " << url;
ProtocolOption option;
option.enable_hls = option.enable_hls || (args._schema == HLS_SCHEMA);
option.enable_hls = option.enable_hls || (args.schema == HLS_SCHEMA);
option.enable_mp4 = false;
addStreamProxy(args._vhost, args._app, args._streamid, url, retry_count, option, Rtsp::RTP_TCP, timeout_sec, [=](const SockException &ex, const string &key) mutable {
addStreamProxy(args.vhost, args.app, args.stream, url, retry_count, option, Rtsp::RTP_TCP, timeout_sec, [=](const SockException &ex, const string &key) mutable {
if (!ex) {
return;
}
@ -321,7 +321,7 @@ void installWebHook() {
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) {
GET_CONFIG(string, hook_publish, Hook::kOnPublish);
if (!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1") {
if (!hook_enable || args.param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1") {
invoker("", ProtocolOption());
return;
}
@ -346,7 +346,7 @@ void installWebHook() {
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastMediaPlayed, [](BroadcastMediaPlayedArgs) {
GET_CONFIG(string, hook_play, Hook::kOnPlay);
if (!hook_enable || args._param_strs == hook_adminparams || hook_play.empty() || sender.get_peer_ip() == "127.0.0.1") {
if (!hook_enable || args.param_strs == hook_adminparams || hook_play.empty() || sender.get_peer_ip() == "127.0.0.1") {
invoker("");
return;
}
@ -360,7 +360,7 @@ void installWebHook() {
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastFlowReport, [](BroadcastFlowReportArgs) {
GET_CONFIG(string, hook_flowreport, Hook::kOnFlowReport);
if (!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1") {
if (!hook_enable || args.param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1") {
return;
}
auto body = make_json(args);
@ -379,7 +379,7 @@ void installWebHook() {
// 监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastOnGetRtspRealm, [](BroadcastOnGetRtspRealmArgs) {
GET_CONFIG(string, hook_rtsp_realm, Hook::kOnRtspRealm);
if (!hook_enable || args._param_strs == hook_adminparams || hook_rtsp_realm.empty() || sender.get_peer_ip() == "127.0.0.1") {
if (!hook_enable || args.param_strs == hook_adminparams || hook_rtsp_realm.empty() || sender.get_peer_ip() == "127.0.0.1") {
// 无需认证
invoker("");
return;
@ -467,7 +467,7 @@ void installWebHook() {
return;
}
if (start_with(args._param_strs, kEdgeServerParam)) {
if (start_with(args.param_strs, kEdgeServerParam)) {
// 源站收到来自边沿站的溯源请求,流不存在时立即返回拉流失败
closePlayer();
return;

View File

@ -47,10 +47,8 @@ public:
using Ptr = std::shared_ptr<DevChannel>;
//fDuration<=0为直播否则为点播
DevChannel(
const std::string &vhost, const std::string &app, const std::string &stream_id, float duration = 0,
const ProtocolOption &option = ProtocolOption())
: MultiMediaSourceMuxer(vhost, app, stream_id, duration, option) {}
DevChannel(const MediaTuple& tuple, float duration = 0, const ProtocolOption &option = ProtocolOption())
: MultiMediaSourceMuxer(tuple, duration, option) {}
~DevChannel() override = default;
/**

View File

@ -105,7 +105,7 @@ ProtocolOption::ProtocolOption() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct MediaSourceNull : public MediaSource {
MediaSourceNull() : MediaSource("schema", "vhost", "app", "stream") {};
MediaSourceNull() : MediaSource("schema", MediaTuple{"vhost", "app", "stream"}) {};
int readerCount() override { return 0; }
};
@ -114,16 +114,12 @@ MediaSource &MediaSource::NullMediaSource() {
return *s_null;
}
MediaSource::MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id){
MediaSource::MediaSource(const string &schema, const MediaTuple& tuple): _tuple(tuple) {
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
if (!enableVhost) {
_vhost = DEFAULT_VHOST;
} else {
_vhost = vhost.empty() ? DEFAULT_VHOST : vhost;
if (!enableVhost || _tuple.vhost.empty()) {
_tuple.vhost = DEFAULT_VHOST;
}
_schema = schema;
_app = app;
_stream_id = stream_id;
_create_stamp = time(NULL);
}
@ -136,16 +132,16 @@ const string& MediaSource::getSchema() const {
}
const string& MediaSource::getVhost() const {
return _vhost;
return _tuple.vhost;
}
const string& MediaSource::getApp() const {
//获取该源的id
return _app;
return _tuple.app;
}
const string& MediaSource::getId() const {
return _stream_id;
return _tuple.stream;
}
std::shared_ptr<void> MediaSource::getOwnership() {
@ -424,7 +420,7 @@ static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, con
static void findAsync_l(const MediaInfo &info, const std::shared_ptr<Session> &session, bool retry,
const function<void(const MediaSource::Ptr &src)> &cb){
auto src = find_l(info._schema, info._vhost, info._app, info._streamid, true);
auto src = find_l(info.schema, info.vhost, info.app, info.stream, true);
if (src || !retry) {
cb(src);
return;
@ -459,10 +455,10 @@ static void findAsync_l(const MediaInfo &info, const std::shared_ptr<Session> &s
weak_ptr<Session> weak_session = session;
auto on_register = [weak_session, info, cb_once, cancel_all, poller](BroadcastMediaChangedArgs) {
if (!bRegist ||
sender.getSchema() != info._schema ||
sender.getVhost() != info._vhost ||
sender.getApp() != info._app ||
sender.getId() != info._streamid) {
sender.getSchema() != info.schema ||
sender.getVhost() != info.vhost ||
sender.getApp() != info.app ||
sender.getId() != info.stream) {
//不是自己感兴趣的事件,忽略之
return;
}
@ -527,7 +523,7 @@ void MediaSource::regist() {
{
//减小互斥锁临界区
lock_guard<recursive_mutex> lock(s_media_source_mtx);
auto &ref = s_media_source_map[_schema][_vhost][_app][_stream_id];
auto &ref = s_media_source_map[_schema][_tuple.vhost][_tuple.app][_tuple.stream];
auto src = ref.lock();
if (src) {
if (src.get() == this) {
@ -570,7 +566,7 @@ bool MediaSource::unregist() {
{
//减小互斥锁临界区
lock_guard<recursive_mutex> lock(s_media_source_mtx);
erase_media_source(ret, this, s_media_source_map, _schema, _vhost, _app, _stream_id);
erase_media_source(ret, this, s_media_source_map, _schema, _tuple.vhost, _tuple.app, _tuple.stream);
}
if (ret) {
@ -582,31 +578,31 @@ bool MediaSource::unregist() {
/////////////////////////////////////MediaInfo//////////////////////////////////////
void MediaInfo::parse(const std::string &url_in){
_full_url = url_in;
full_url = url_in;
auto url = url_in;
auto pos = url.find("?");
if (pos != string::npos) {
_param_strs = url.substr(pos + 1);
param_strs = url.substr(pos + 1);
url.erase(pos);
}
auto schema_pos = url.find("://");
if (schema_pos != string::npos) {
_schema = url.substr(0, schema_pos);
schema = url.substr(0, schema_pos);
} else {
schema_pos = -3;
}
auto split_vec = split(url.substr(schema_pos + 3), "/");
if (split_vec.size() > 0) {
splitUrl(split_vec[0], _host, _port);
_vhost = _host;
if (_vhost == "localhost" || isIP(_vhost.data())) {
splitUrl(split_vec[0], host, port);
vhost = host;
if (vhost == "localhost" || isIP(vhost.data())) {
//如果访问的是localhost或ip那么则为默认虚拟主机
_vhost = DEFAULT_VHOST;
vhost = DEFAULT_VHOST;
}
}
if (split_vec.size() > 1) {
_app = split_vec[1];
app = split_vec[1];
}
if (split_vec.size() > 2) {
string stream_id;
@ -616,18 +612,18 @@ void MediaInfo::parse(const std::string &url_in){
if (stream_id.back() == '/') {
stream_id.pop_back();
}
_streamid = stream_id;
stream = stream_id;
}
auto params = Parser::parseArgs(_param_strs);
auto params = Parser::parseArgs(param_strs);
if (params.find(VHOST_KEY) != params.end()) {
_vhost = params[VHOST_KEY];
vhost = params[VHOST_KEY];
}
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
if (!enableVhost || _vhost.empty()) {
if (!enableVhost || vhost.empty()) {
//如果关闭虚拟主机或者虚拟主机为空,则设置虚拟主机为默认
_vhost = DEFAULT_VHOST;
vhost = DEFAULT_VHOST;
}
}

View File

@ -252,24 +252,20 @@ private:
/**
* url获取媒体相关信息
*/
class MediaInfo {
class MediaInfo: public MediaTuple {
public:
~MediaInfo() = default;
MediaInfo() = default;
MediaInfo(const std::string &url) { parse(url); }
void parse(const std::string &url);
std::string shortUrl() const { return _vhost + "/" + _app + "/" + _streamid; }
std::string getUrl() const { return _schema + "://" + shortUrl(); }
std::string getUrl() const { return schema + "://" + shortUrl(); }
public:
uint16_t _port = 0;
std::string _full_url;
std::string _schema;
std::string _host;
std::string _vhost;
std::string _app;
std::string _streamid;
std::string _param_strs;
uint16_t port = 0;
std::string full_url;
std::string schema;
std::string host;
std::string param_strs;
};
/**
@ -280,7 +276,7 @@ public:
static MediaSource& NullMediaSource();
using Ptr = std::shared_ptr<MediaSource>;
MediaSource(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &stream_id);
MediaSource(const std::string &schema, const MediaTuple& tuple);
virtual ~MediaSource();
////////////////获取MediaSource相关信息////////////////
@ -294,7 +290,11 @@ public:
// 流id
const std::string& getId() const;
std::string shortUrl() const { return _vhost + "/" + _app + "/" + _stream_id; }
const MediaTuple& getMediaTuple() const {
return _tuple;
}
std::string shortUrl() const { return _tuple.shortUrl(); }
std::string getUrl() const { return _schema + "://" + shortUrl(); }
@ -369,7 +369,7 @@ public:
// 同步查找流
static Ptr find(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &id, bool from_mp4 = false);
static Ptr find(const MediaInfo &info, bool from_mp4 = false) {
return find(info._schema, info._vhost, info._app, info._streamid, from_mp4);
return find(info.schema, info.vhost, info.app, info.stream, from_mp4);
}
// 忽略schema同步查找流可能返回rtmp/rtsp/hls类型
@ -394,15 +394,13 @@ private:
protected:
toolkit::BytesSpeed _speed[TrackMax];
MediaTuple _tuple;
private:
std::atomic_flag _owned { false };
time_t _create_stamp;
toolkit::Ticker _ticker;
std::string _schema;
std::string _vhost;
std::string _app;
std::string _stream_id;
std::weak_ptr<MediaSourceEvent> _listener;
// 对象个数统计
toolkit::ObjectStatistic<MediaSource> _statistic;

View File

@ -25,7 +25,7 @@ namespace {
class MediaSourceForMuxer : public MediaSource {
public:
MediaSourceForMuxer(const MultiMediaSourceMuxer::Ptr &muxer)
: MediaSource("muxer", muxer->getVhost(), muxer->getApp(), muxer->getStreamId()) {
: MediaSource("muxer", muxer->getMediaTuple()) {
MediaSource::setListener(muxer);
}
int readerCount() override { return 0; }
@ -33,7 +33,7 @@ public:
} // namespace
static std::shared_ptr<MediaSinkInterface> makeRecorder(MediaSource &sender, const vector<Track::Ptr> &tracks, Recorder::type type, const ProtocolOption &option){
auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), option);
auto recorder = Recorder::createRecorder(type, sender.getMediaTuple(), option);
for (auto &track : tracks) {
recorder->addTrack(track);
}
@ -71,15 +71,15 @@ static string getTrackInfoStr(const TrackSource *track_src){
}
const std::string &MultiMediaSourceMuxer::getVhost() const {
return _vhost;
return _tuple.vhost;
}
const std::string &MultiMediaSourceMuxer::getApp() const {
return _app;
return _tuple.app;
}
const std::string &MultiMediaSourceMuxer::getStreamId() const {
return _stream_id;
return _tuple.stream;
}
std::string MultiMediaSourceMuxer::shortUrl() const {
@ -87,35 +87,32 @@ std::string MultiMediaSourceMuxer::shortUrl() const {
if (!ret.empty()) {
return ret;
}
return _vhost + "/" + _app + "/" + _stream_id;
return _tuple.shortUrl();
}
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, const ProtocolOption &option) {
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const MediaTuple& tuple, float dur_sec, const ProtocolOption &option): _tuple(tuple) {
_poller = EventPollerPool::Instance().getPoller();
_create_in_poller = _poller->isCurrentThread();
_vhost = vhost;
_app = app;
_stream_id = stream;
_option = option;
if (option.enable_rtmp) {
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, app, stream, option, std::make_shared<TitleMeta>(dur_sec));
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(_tuple, option, std::make_shared<TitleMeta>(dur_sec));
}
if (option.enable_rtsp) {
_rtsp = std::make_shared<RtspMediaSourceMuxer>(vhost, app, stream, option, std::make_shared<TitleSdp>(dur_sec));
_rtsp = std::make_shared<RtspMediaSourceMuxer>(_tuple, option, std::make_shared<TitleSdp>(dur_sec));
}
if (option.enable_hls) {
_hls = dynamic_pointer_cast<HlsRecorder>(Recorder::createRecorder(Recorder::type_hls, vhost, app, stream, option));
_hls = dynamic_pointer_cast<HlsRecorder>(Recorder::createRecorder(Recorder::type_hls, _tuple, option));
}
if (option.enable_mp4) {
_mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream, option);
_mp4 = Recorder::createRecorder(Recorder::type_mp4, _tuple, option);
}
if (option.enable_ts) {
_ts = std::make_shared<TSMediaSourceMuxer>(vhost, app, stream, option);
_ts = std::make_shared<TSMediaSourceMuxer>(_tuple, option);
}
#if defined(ENABLE_MP4)
if (option.enable_fmp4) {
_fmp4 = std::make_shared<FMP4MediaSourceMuxer>(vhost, app, stream, option);
_fmp4 = std::make_shared<FMP4MediaSourceMuxer>(_tuple, option);
}
#endif

View File

@ -37,7 +37,7 @@ public:
virtual void onAllTrackReady() = 0;
};
MultiMediaSourceMuxer(const std::string &vhost, const std::string &app, const std::string &stream, float dur_sec = 0.0,const ProtocolOption &option = ProtocolOption());
MultiMediaSourceMuxer(const MediaTuple& tuple, float dur_sec = 0.0,const ProtocolOption &option = ProtocolOption());
~MultiMediaSourceMuxer() override = default;
/**
@ -129,6 +129,9 @@ public:
const std::string& getVhost() const;
const std::string& getApp() const;
const std::string& getStreamId() const;
const MediaTuple& getMediaTuple() const {
return _tuple;
}
std::string shortUrl() const;
protected:
@ -159,9 +162,7 @@ private:
bool _is_enable = false;
bool _create_in_poller = false;
bool _video_key_pos = false;
std::string _vhost;
std::string _app;
std::string _stream_id;
MediaTuple _tuple;
ProtocolOption _option;
toolkit::Ticker _last_check;
Stamp _stamp[2];

View File

@ -39,10 +39,8 @@ public:
using RingDataType = std::shared_ptr<toolkit::List<FMP4Packet::Ptr> >;
using RingType = toolkit::RingBuffer<RingDataType>;
FMP4MediaSource(const std::string &vhost,
const std::string &app,
const std::string &stream_id,
int ring_size = FMP4_GOP_SIZE) : MediaSource(FMP4_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {}
FMP4MediaSource(const MediaTuple& tuple,
int ring_size = FMP4_GOP_SIZE) : MediaSource(FMP4_SCHEMA, tuple), _ring_size(ring_size) {}
~FMP4MediaSource() override { flush(); }

View File

@ -23,12 +23,9 @@ class FMP4MediaSourceMuxer final : public MP4MuxerMemory, public MediaSourceEven
public:
using Ptr = std::shared_ptr<FMP4MediaSourceMuxer>;
FMP4MediaSourceMuxer(const std::string &vhost,
const std::string &app,
const std::string &stream_id,
const ProtocolOption &option) {
FMP4MediaSourceMuxer(const MediaTuple& tuple, const ProtocolOption &option) {
_option = option;
_media_src = std::make_shared<FMP4MediaSource>(vhost, app, stream_id);
_media_src = std::make_shared<FMP4MediaSource>(tuple);
}
~FMP4MediaSourceMuxer() override { MP4MuxerMemory::flush(); };

View File

@ -278,7 +278,7 @@ static void canAccessPath(Session &sender, const Parser &parser, const MediaInfo
HttpCookieManager::Instance().delCookie(cookie);
}
bool is_hls = media_info._schema == HLS_SCHEMA;
bool is_hls = media_info.schema == HLS_SCHEMA;
SockInfoImp::Ptr info = std::make_shared<SockInfoImp>();
info->_identifier = sender.getIdentifier();
@ -363,8 +363,8 @@ static void accessFile(Session &sender, const Parser &parser, const MediaInfo &m
}
if (is_hls) {
// hls那么移除掉后缀获取真实的stream_id并且修改协议为HLS
const_cast<string &>(media_info._schema) = HLS_SCHEMA;
replace(const_cast<string &>(media_info._streamid), kHlsSuffix, "");
const_cast<string &>(media_info.schema) = HLS_SCHEMA;
replace(const_cast<string &>(media_info.stream), kHlsSuffix, "");
}
weak_ptr<Session> weakSession = static_pointer_cast<Session>(sender.shared_from_this());
@ -465,11 +465,11 @@ static string getFilePath(const Parser &parser,const MediaInfo &media_info, Sess
});
string url, path;
auto it = virtualPathMap.find(media_info._app);
auto it = virtualPathMap.find(media_info.app);
if (it != virtualPathMap.end()) {
//访问的是virtualPath
path = it->second;
url = parser.Url().substr(1 + media_info._app.size());
url = parser.Url().substr(1 + media_info.app.size());
} else {
//访问的是rootPath
path = rootPath;
@ -481,7 +481,7 @@ static string getFilePath(const Parser &parser,const MediaInfo &media_info, Sess
ch = '/';
}
}
auto ret = File::absolutePath(enableVhost ? media_info._vhost + url : url, path);
auto ret = File::absolutePath(enableVhost ? media_info.vhost + url : url, path);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpBeforeAccess, parser, ret, static_cast<SockInfo &>(sender));
return ret;
}

View File

@ -202,7 +202,7 @@ bool HttpSession::checkLiveStream(const string &schema, const string &url_suffi
//解析带上协议+参数完整的url
_mediaInfo.parse(schema + "://" + _parser["Host"] + url);
if (_mediaInfo._app.empty() || _mediaInfo._streamid.empty()) {
if (_mediaInfo.app.empty() || _mediaInfo.stream.empty()) {
//url不合法
return false;
}

View File

@ -29,9 +29,9 @@ PlayerProxy::PlayerProxy(
const EventPoller::Ptr &poller, int reconnect_delay_min, int reconnect_delay_max, int reconnect_delay_step)
: MediaPlayer(poller)
, _option(option) {
_vhost = vhost;
_app = app;
_stream_id = stream_id;
_tuple.vhost = vhost;
_tuple.app = app;
_tuple.stream = stream_id;
_retry_count = retry_count;
setOnClose(nullptr);
@ -187,11 +187,11 @@ void PlayerProxy::setDirectProxy() {
// rtsp拉流
GET_CONFIG(bool, directProxy, Rtsp::kDirectProxy);
if (directProxy) {
mediaSource = std::make_shared<RtspMediaSource>(_vhost, _app, _stream_id);
mediaSource = std::make_shared<RtspMediaSource>(_tuple);
}
} else if (dynamic_pointer_cast<RtmpPlayer>(_delegate)) {
// rtmp拉流,rtmp强制直接代理
mediaSource = std::make_shared<RtmpMediaSource>(_vhost, _app, _stream_id);
mediaSource = std::make_shared<RtmpMediaSource>(_tuple);
}
if (mediaSource) {
setMediaSource(mediaSource);
@ -277,18 +277,18 @@ void PlayerProxy::onPlaySuccess() {
// rtsp拉流代理
if (reset_when_replay || !_muxer) {
_option.enable_rtsp = false;
_muxer = std::make_shared<MultiMediaSourceMuxer>(_vhost, _app, _stream_id, getDuration(), _option);
_muxer = std::make_shared<MultiMediaSourceMuxer>(_tuple, getDuration(), _option);
}
} else if (dynamic_pointer_cast<RtmpMediaSource>(_media_src)) {
// rtmp拉流代理
if (reset_when_replay || !_muxer) {
_option.enable_rtmp = false;
_muxer = std::make_shared<MultiMediaSourceMuxer>(_vhost, _app, _stream_id, getDuration(), _option);
_muxer = std::make_shared<MultiMediaSourceMuxer>(_tuple, getDuration(), _option);
}
} else {
// 其他拉流代理
if (reset_when_replay || !_muxer) {
_muxer = std::make_shared<MultiMediaSourceMuxer>(_vhost, _app, _stream_id, getDuration(), _option);
_muxer = std::make_shared<MultiMediaSourceMuxer>(_tuple, getDuration(), _option);
}
}
_muxer->setMediaListener(shared_from_this());

View File

@ -134,9 +134,7 @@ private:
int _reconnect_delay_min;
int _reconnect_delay_max;
int _reconnect_delay_step;
std::string _vhost;
std::string _app;
std::string _stream_id;
MediaTuple _tuple;
std::string _pull_url;
toolkit::Timer::Ptr _timer;
std::function<void()> _on_disconnect;

View File

@ -163,10 +163,10 @@ std::shared_ptr<FILE> HlsMakerImp::makeFile(const string &file, bool setbuf) {
}
void HlsMakerImp::setMediaSource(const string &vhost, const string &app, const string &stream_id) {
_media_src = std::make_shared<HlsMediaSource>(vhost, app, stream_id);
_info.app = app;
_info.stream = stream_id;
_info.vhost = vhost;
_media_src = std::make_shared<HlsMediaSource>(_info);
}
HlsMediaSource::Ptr HlsMakerImp::getMediaSource() const {

View File

@ -25,8 +25,7 @@ public:
using RingType = toolkit::RingBuffer<std::string>;
using Ptr = std::shared_ptr<HlsMediaSource>;
HlsMediaSource(const std::string &vhost, const std::string &app, const std::string &stream_id)
: MediaSource(HLS_SCHEMA, vhost, app, stream_id) {}
HlsMediaSource(const MediaTuple& tuple): MediaSource(HLS_SCHEMA, tuple) {}
~HlsMediaSource() override = default;
/**

View File

@ -35,8 +35,8 @@ public:
~HlsRecorder() { MpegMuxer::flush(); };
void setMediaSource(const std::string &vhost, const std::string &app, const std::string &stream_id) {
_hls->setMediaSource(vhost, app, stream_id);
void setMediaSource(const MediaTuple& tuple) {
_hls->setMediaSource(tuple.vhost, tuple.app, tuple.stream);
}
void setListener(const std::weak_ptr<MediaSourceEvent> &listener) {

View File

@ -20,17 +20,18 @@ using namespace toolkit;
namespace mediakit {
MP4Reader::MP4Reader(const string &vhost, const string &app, const string &stream_id, const string &file_path) {
MP4Reader::MP4Reader(const std::string &vhost, const std::string &app, const std::string &stream_id, const string &file_path) {
//读写文件建议放在后台线程
auto tuple = MediaTuple{vhost, app, stream_id};
_poller = WorkThreadPool::Instance().getPoller();
_file_path = file_path;
if (_file_path.empty()) {
GET_CONFIG(string, recordPath, Protocol::kMP4SavePath);
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
if (enableVhost) {
_file_path = vhost + "/" + app + "/" + stream_id;
_file_path = tuple.shortUrl();
} else {
_file_path = app + "/" + stream_id;
_file_path = tuple.app + "/" + tuple.stream;
}
_file_path = File::absolutePath(_file_path, recordPath);
}
@ -38,14 +39,14 @@ MP4Reader::MP4Reader(const string &vhost, const string &app, const string &strea
_demuxer = std::make_shared<MP4Demuxer>();
_demuxer->openMP4(_file_path);
if (stream_id.empty()) {
if (tuple.stream.empty()) {
return;
}
ProtocolOption option;
//读取mp4文件并流化时不重复生成mp4/hls文件
option.enable_mp4 = false;
option.enable_hls = false;
_muxer = std::make_shared<MultiMediaSourceMuxer>(vhost, app, stream_id, _demuxer->getDurationMS() / 1000.0f, option);
_muxer = std::make_shared<MultiMediaSourceMuxer>(tuple, _demuxer->getDurationMS() / 1000.0f, option);
auto tracks = _demuxer->getTracks(false);
if (tracks.empty()) {
throw std::runtime_error(StrPrinter << "该mp4文件没有有效的track:" << _file_path);

View File

@ -21,16 +21,16 @@ using namespace toolkit;
namespace mediakit {
string Recorder::getRecordPath(Recorder::type type, const string &vhost, const string &app, const string &stream_id, const string &customized_path) {
string Recorder::getRecordPath(Recorder::type type, const MediaTuple& tuple, const string &customized_path) {
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
switch (type) {
case Recorder::type_hls: {
GET_CONFIG(string, hlsPath, Protocol::kHlsSavePath);
string m3u8FilePath;
if (enableVhost) {
m3u8FilePath = vhost + "/" + app + "/" + stream_id + "/hls.m3u8";
m3u8FilePath = tuple.shortUrl() + "/hls.m3u8";
} else {
m3u8FilePath = app + "/" + stream_id + "/hls.m3u8";
m3u8FilePath = tuple.app + "/" + tuple.stream + "/hls.m3u8";
}
//Here we use the customized file path.
if (!customized_path.empty()) {
@ -43,9 +43,9 @@ string Recorder::getRecordPath(Recorder::type type, const string &vhost, const s
GET_CONFIG(string, recordAppName, Record::kAppName);
string mp4FilePath;
if (enableVhost) {
mp4FilePath = vhost + "/" + recordAppName + "/" + app + "/" + stream_id + "/";
mp4FilePath = tuple.vhost + "/" + recordAppName + "/" + tuple.app + "/" + tuple.stream + "/";
} else {
mp4FilePath = recordAppName + "/" + app + "/" + stream_id + "/";
mp4FilePath = recordAppName + "/" + tuple.app + "/" + tuple.stream + "/";
}
//Here we use the customized file path.
if (!customized_path.empty()) {
@ -58,14 +58,14 @@ string Recorder::getRecordPath(Recorder::type type, const string &vhost, const s
}
}
std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const string &vhost, const string &app, const string &stream_id, const ProtocolOption &option){
std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const MediaTuple& tuple, const ProtocolOption &option){
switch (type) {
case Recorder::type_hls: {
#if defined(ENABLE_HLS)
auto path = Recorder::getRecordPath(type, vhost, app, stream_id, option.hls_save_path);
auto path = Recorder::getRecordPath(type, tuple, option.hls_save_path);
GET_CONFIG(bool, enable_vhost, General::kEnableVhost);
auto ret = std::make_shared<HlsRecorder>(path, enable_vhost ? string(VHOST_KEY) + "=" + vhost : "", option);
ret->setMediaSource(vhost, app, stream_id);
auto ret = std::make_shared<HlsRecorder>(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
ret->setMediaSource(tuple);
return ret;
#else
throw std::invalid_argument("hls相关功能未打开请开启ENABLE_HLS宏后编译再测试");
@ -75,8 +75,8 @@ std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const st
case Recorder::type_mp4: {
#if defined(ENABLE_MP4)
auto path = Recorder::getRecordPath(type, vhost, app, stream_id, option.mp4_save_path);
return std::make_shared<MP4Recorder>(path, vhost, app, stream_id, option.mp4_max_second);
auto path = Recorder::getRecordPath(type, tuple, option.mp4_save_path);
return std::make_shared<MP4Recorder>(path, tuple.vhost, tuple.app, tuple.stream, option.mp4_max_second);
#else
throw std::invalid_argument("mp4相关功能未打开请开启ENABLE_MP4宏后编译再测试");
#endif

View File

@ -18,7 +18,16 @@ namespace mediakit {
class MediaSinkInterface;
class ProtocolOption;
class RecordInfo {
struct MediaTuple {
std::string vhost;
std::string app;
std::string stream;
std::string shortUrl() const {
return vhost + '/' + app + '/' + stream;
}
};
class RecordInfo: public MediaTuple {
public:
time_t start_time; // GMT 标准时间,单位秒
float time_len; // 录像长度,单位秒
@ -27,9 +36,6 @@ public:
std::string file_name; // 文件名称
std::string folder; // 文件夹路径
std::string url; // 播放路径
std::string app; // 应用名称
std::string stream; // 流 ID
std::string vhost; // 虚拟主机
};
class Recorder{
@ -50,7 +56,7 @@ public:
* @param customized_path
* @return
*/
static std::string getRecordPath(type type, const std::string &vhost, const std::string &app, const std::string &stream_id,const std::string &customized_path = "");
static std::string getRecordPath(type type, const MediaTuple& tuple, const std::string &customized_path = "");
/**
*
@ -62,7 +68,7 @@ public:
* @param max_second mp4录制最大切片时间0
* @return nullptr
*/
static std::shared_ptr<MediaSinkInterface> createRecorder(type type, const std::string &vhost, const std::string &app, const std::string &stream_id, const ProtocolOption &option);
static std::shared_ptr<MediaSinkInterface> createRecorder(type type, const MediaTuple& tuple, const ProtocolOption &option);
private:
Recorder() = delete;

View File

@ -46,12 +46,7 @@ public:
* @param stream_id id
* @param ring_size 0
*/
RtmpMediaSource(const std::string &vhost,
const std::string &app,
const std::string &stream_id,
int ring_size = RTMP_GOP_SIZE) :
MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {
}
RtmpMediaSource(const MediaTuple& tuple, int ring_size = RTMP_GOP_SIZE): MediaSource(RTMP_SCHEMA, tuple), _ring_size(ring_size) {}
~RtmpMediaSource() override { flush(); }

View File

@ -63,7 +63,7 @@ void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/)
}
RtmpMediaSourceImp::RtmpMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize) : RtmpMediaSource(vhost, app, id, ringSize)
RtmpMediaSourceImp::RtmpMediaSourceImp(const MediaTuple& tuple, int ringSize) : RtmpMediaSource(tuple, ringSize)
{
_demuxer = std::make_shared<RtmpDemuxer>();
_demuxer->setTrackListener(this);
@ -99,7 +99,7 @@ void RtmpMediaSourceImp::setProtocolOption(const ProtocolOption &option)
_option = option;
//不重复生成rtmp协议
_option.enable_rtmp = false;
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), _option);
_muxer = std::make_shared<MultiMediaSourceMuxer>(_tuple, _demuxer->getDuration(), _option);
_muxer->setMediaListener(getListener());
_muxer->setTrackListener(std::static_pointer_cast<RtmpMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件)

View File

@ -35,7 +35,7 @@ public:
* @param id id
* @param ringSize
*/
RtmpMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize = RTMP_GOP_SIZE);
RtmpMediaSourceImp(const MediaTuple& tuple, int ringSize = RTMP_GOP_SIZE);
~RtmpMediaSourceImp() override = default;

View File

@ -21,13 +21,11 @@ class RtmpMediaSourceMuxer final : public RtmpMuxer, public MediaSourceEventInte
public:
using Ptr = std::shared_ptr<RtmpMediaSourceMuxer>;
RtmpMediaSourceMuxer(const std::string &vhost,
const std::string &strApp,
const std::string &strId,
RtmpMediaSourceMuxer(const MediaTuple& tuple,
const ProtocolOption &option,
const TitleMeta::Ptr &title = nullptr) : RtmpMuxer(title) {
_option = option;
_media_src = std::make_shared<RtmpMediaSource>(vhost, strApp, strId);
_media_src = std::make_shared<RtmpMediaSource>(tuple);
getRtmpRing()->setDelegate(_media_src);
}

View File

@ -84,7 +84,7 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) {
auto tc_url = params["tcUrl"].as_string();
if (tc_url.empty()) {
// defaultVhost:默认vhost
tc_url = string(RTMP_SCHEMA) + "://" + DEFAULT_VHOST + "/" + _media_info._app;
tc_url = string(RTMP_SCHEMA) + "://" + DEFAULT_VHOST + "/" + _media_info.app;
} else {
auto pos = tc_url.rfind('?');
if (pos != string::npos) {
@ -94,9 +94,9 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) {
}
// 初步解析只用于获取vhost信息
_media_info.parse(tc_url);
_media_info._schema = RTMP_SCHEMA;
_media_info.schema = RTMP_SCHEMA;
// 赋值rtmp app
_media_info._app = params["app"].as_string();
_media_info.app = params["app"].as_string();
bool ok = true; //(app == APP_NAME);
AMFValue version(AMF_OBJECT);
@ -109,7 +109,7 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) {
status.set("objectEncoding", params["objectEncoding"]);
sendReply(ok ? "_result" : "_error", version, status);
if (!ok) {
throw std::runtime_error("Unsupported application: " + _media_info._app);
throw std::runtime_error("Unsupported application: " + _media_info.app);
}
AMFEncoder invoke;
@ -132,9 +132,9 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
}));
dec.load<AMFValue>();/* NULL */
// 赋值为rtmp stream id 信息
_media_info._streamid = getStreamId(dec.load<std::string>());
_media_info.stream = getStreamId(dec.load<std::string>());
// 再解析url切割url为app/stream_id (不一定符合rtmp url切割规范)
_media_info.parse(_media_info._schema + "://" + _media_info._vhost + '/' + _media_info._app + '/' + _media_info._streamid);
_media_info.parse(_media_info.getUrl());
auto now_stream_index = _now_stream_index;
auto on_res = [this, token, now_stream_index](const string &err, const ProtocolOption &option) {
@ -149,7 +149,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
}
assert(!_push_src);
auto src = MediaSource::find(RTMP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid);
auto src = MediaSource::find(RTMP_SCHEMA, _media_info.vhost, _media_info.app, _media_info.stream);
auto push_failed = (bool)src;
while (src) {
@ -180,7 +180,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
}
if (!_push_src) {
_push_src = std::make_shared<RtmpMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
_push_src = std::make_shared<RtmpMediaSourceImp>(_media_info);
//获取所有权
_push_src_ownership = _push_src->getOwnership();
_push_src->setProtocolOption(option);
@ -196,7 +196,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
setSocketFlags();
};
if(_media_info._app.empty() || _media_info._streamid.empty()){
if(_media_info.app.empty() || _media_info.stream.empty()){
//不允许莫名其妙的推流url
on_res("rtmp推流url非法", ProtocolOption());
return;
@ -256,7 +256,7 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr
sendStatus({ "level", (ok ? "status" : "error"),
"code", (ok ? "NetStream.Play.Reset" : (auth_success ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth")),
"description", (ok ? "Resetting and playing." : (auth_success ? "No such stream." : err.data())),
"details", _media_info._streamid,
"details", _media_info.stream,
"clientid", "0" });
if (!ok) {
@ -270,7 +270,7 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr
sendStatus({ "level", "status",
"code", "NetStream.Play.Start",
"description", "Started playing." ,
"details", _media_info._streamid,
"details", _media_info.stream,
"clientid", "0"});
// |RtmpSampleAccess(true, true)
@ -289,7 +289,7 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr
sendStatus({ "level", "status",
"code", "NetStream.Play.PublishNotify",
"description", "Now published." ,
"details", _media_info._streamid,
"details", _media_info.stream,
"clientid", "0"});
auto &metadata = src->getMetaData();
@ -433,9 +433,9 @@ string RtmpSession::getStreamId(const string &str){
void RtmpSession::onCmd_play(AMFDecoder &dec) {
dec.load<AMFValue>(); /* NULL */
// 赋值为rtmp stream id 信息
_media_info._streamid = getStreamId(dec.load<std::string>());
_media_info.stream = getStreamId(dec.load<std::string>());
// 再解析url切割url为app/stream_id (不一定符合rtmp url切割规范)
_media_info.parse(_media_info._schema + "://" + _media_info._vhost + '/' + _media_info._app + '/' + _media_info._streamid);
_media_info.parse(_media_info.getUrl());
doPlay(dec);
}
@ -589,7 +589,7 @@ MediaOriginType RtmpSession::getOriginType(MediaSource &sender) const{
}
string RtmpSession::getOriginUrl(MediaSource &sender) const {
return _media_info._full_url;
return _media_info.full_url;
}
std::shared_ptr<SockInfo> RtmpSession::getOriginSock(MediaSource &sender) const {

View File

@ -134,7 +134,7 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
// 设置dump目录
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
if (!dump_dir.empty()) {
auto save_path = File::absolutePath(_media_info._streamid + ".mpeg", dump_dir);
auto save_path = File::absolutePath(_media_info.stream + ".mpeg", dump_dir);
_save_file_ps.reset(File::create_file(save_path.data(), "wb"), [](FILE *fp) {
if (fp) {
fclose(fp);
@ -171,11 +171,11 @@ void GB28181Process::onRtpDecode(const Frame::Ptr &frame) {
// 创建解码器
if (checkTS((uint8_t *)frame->data(), frame->size())) {
// 猜测是ts负载
InfoL << _media_info._streamid << " judged to be TS";
InfoL << _media_info.stream << " judged to be TS";
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _interface);
} else {
// 猜测是ps负载
InfoL << _media_info._streamid << " judged to be PS";
InfoL << _media_info.stream << " judged to be PS";
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, _interface);
}
}

View File

@ -26,14 +26,14 @@ static constexpr size_t kMaxCachedFrame = 200;
namespace mediakit {
RtpProcess::RtpProcess(const string &stream_id) {
_media_info._schema = kRtpAppName;
_media_info._vhost = DEFAULT_VHOST;
_media_info._app = kRtpAppName;
_media_info._streamid = stream_id;
_media_info.schema = kRtpAppName;
_media_info.vhost = DEFAULT_VHOST;
_media_info.app = kRtpAppName;
_media_info.stream = stream_id;
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
{
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr;
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info.stream + ".rtp", dump_dir).data(), "wb") : nullptr;
if (fp) {
_save_file_rtp.reset(fp, [](FILE *fp) {
fclose(fp);
@ -42,7 +42,7 @@ RtpProcess::RtpProcess(const string &stream_id) {
}
{
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr;
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info.stream + ".video", dump_dir).data(), "wb") : nullptr;
if (fp) {
_save_file_video.reset(fp, [](FILE *fp) {
fclose(fp);
@ -234,7 +234,7 @@ uint16_t RtpProcess::get_local_port() {
}
string RtpProcess::getIdentifier() const {
return _media_info._streamid;
return _media_info.stream;
}
void RtpProcess::emitOnPublish() {
@ -251,9 +251,7 @@ void RtpProcess::emitOnPublish() {
return;
}
if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info._vhost,
strong_self->_media_info._app,
strong_self->_media_info._streamid,0.0f,
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f,
option);
if (strong_self->_only_audio) {
strong_self->_muxer->setOnlyAudio();
@ -291,7 +289,7 @@ toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) {
if (_sock) {
return _sock->getPoller();
}
throw std::runtime_error("RtpProcess::getOwnerPoller failed:" + _media_info._streamid);
throw std::runtime_error("RtpProcess::getOwnerPoller failed:" + _media_info.stream);
}
float RtpProcess::getLossRate(MediaSource &sender, TrackType type) {

View File

@ -42,11 +42,7 @@ public:
* @param stream_id id
* @param ring_size 0
*/
RtspMediaSource(const std::string &vhost,
const std::string &app,
const std::string &stream_id,
int ring_size = RTP_GOP_SIZE) :
MediaSource(RTSP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {}
RtspMediaSource(const MediaTuple& tuple, int ring_size = RTP_GOP_SIZE): MediaSource(RTSP_SCHEMA, tuple), _ring_size(ring_size) {}
~RtspMediaSource() override { flush(); }
@ -76,7 +72,7 @@ public:
return _sdp;
}
virtual RtspMediaSource::Ptr Clone(const std::string& stream) {
virtual RtspMediaSource::Ptr clone(const std::string& stream) {
return nullptr;
}

View File

@ -74,8 +74,7 @@ void RtspMediaSource::onWrite(RtpPacket::Ptr rtp, bool keyPos) {
PacketCache<RtpPacket>::inputPacket(stamp, is_video, std::move(rtp), keyPos);
}
RtspMediaSourceImp::RtspMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize)
: RtspMediaSource(vhost, app, id, ringSize)
RtspMediaSourceImp::RtspMediaSourceImp(const MediaTuple& tuple, int ringSize): RtspMediaSource(tuple, ringSize)
{
_demuxer = std::make_shared<RtspDemuxer>();
_demuxer->setTrackListener(this);
@ -114,7 +113,7 @@ void RtspMediaSourceImp::setProtocolOption(const ProtocolOption &option)
//导致rtc无法播放所以在rtsp推流rtc播放时建议关闭直接代理模式
_option = option;
_option.enable_rtsp = !direct_proxy;
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), _option);
_muxer = std::make_shared<MultiMediaSourceMuxer>(_tuple, _demuxer->getDuration(), _option);
_muxer->setMediaListener(getListener());
_muxer->setTrackListener(std::static_pointer_cast<RtspMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件)
@ -126,8 +125,10 @@ void RtspMediaSourceImp::setProtocolOption(const ProtocolOption &option)
}
}
RtspMediaSource::Ptr RtspMediaSourceImp::Clone(const std::string &stream) {
auto src_imp = std::make_shared<RtspMediaSourceImp>(getVhost(), getApp(), stream);
RtspMediaSource::Ptr RtspMediaSourceImp::clone(const std::string &stream) {
auto tuple = _tuple;
tuple.stream = stream;
auto src_imp = std::make_shared<RtspMediaSourceImp>(tuple);
src_imp->setSdp(getSdp());
src_imp->setProtocolOption(getProtocolOption());
return src_imp;

View File

@ -28,7 +28,7 @@ public:
* @param id id
* @param ringSize
*/
RtspMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize = RTP_GOP_SIZE);
RtspMediaSourceImp(const MediaTuple& tuple, int ringSize = RTP_GOP_SIZE);
~RtspMediaSourceImp() override = default;
@ -107,7 +107,7 @@ public:
}
}
RtspMediaSource::Ptr Clone(const std::string& stream) override;
RtspMediaSource::Ptr clone(const std::string& stream) override;
private:
bool _all_track_ready = false;
ProtocolOption _option;

View File

@ -21,13 +21,11 @@ class RtspMediaSourceMuxer final : public RtspMuxer, public MediaSourceEventInte
public:
using Ptr = std::shared_ptr<RtspMediaSourceMuxer>;
RtspMediaSourceMuxer(const std::string &vhost,
const std::string &strApp,
const std::string &strId,
RtspMediaSourceMuxer(const MediaTuple& tuple,
const ProtocolOption &option,
const TitleSdp::Ptr &title = nullptr) : RtspMuxer(title) {
_option = option;
_media_src = std::make_shared<RtspMediaSource>(vhost,strApp,strId);
_media_src = std::make_shared<RtspMediaSource>(tuple);
getRtpRing()->setDelegate(_media_src);
}

View File

@ -133,7 +133,7 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
if (_content_base.empty() && method != "GET") {
_content_base = parser.Url();
_media_info.parse(parser.FullUrl());
_media_info._schema = RTSP_SCHEMA;
_media_info.schema = RTSP_SCHEMA;
}
using rtsp_request_handler = void (RtspSession::*)(const Parser &parser);
@ -208,7 +208,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
_media_info.parse(full_url);
}
if (_media_info._app.empty() || _media_info._streamid.empty()) {
if (_media_info.app.empty() || _media_info.stream.empty()) {
//推流rtsp url必须最少两级(rtsp://host/app/stream_id)不允许莫名其妙的推流url
static constexpr auto err = "rtsp推流url非法,最少确保两级rtsp url";
sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, err);
@ -223,7 +223,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
}
assert(!_push_src);
auto src = MediaSource::find(RTSP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid);
auto src = MediaSource::find(RTSP_SCHEMA, _media_info.vhost, _media_info.app, _media_info.stream);
auto push_failed = (bool)src;
while (src) {
@ -266,7 +266,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
}
if (!_push_src) {
_push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
_push_src = std::make_shared<RtspMediaSourceImp>(_media_info);
//获取所有权
_push_src_ownership = _push_src->getOwnership();
_push_src->setProtocolOption(option);
@ -717,7 +717,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
break;
case Rtsp::RTP_MULTICAST: {
if(!_multicaster){
_multicaster = RtpMultiCaster::get(*this, get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid, _multicast_ip, _multicast_video_port, _multicast_audio_port);
_multicaster = RtpMultiCaster::get(*this, get_local_ip(), _media_info.vhost, _media_info.app, _media_info.stream, _multicast_ip, _multicast_video_port, _multicast_audio_port);
if (!_multicaster) {
send_NotAcceptable();
throw SockException(Err_shutdown, "can not get a available udp multicast socket");
@ -1145,7 +1145,7 @@ MediaOriginType RtspSession::getOriginType(MediaSource &sender) const{
}
string RtspSession::getOriginUrl(MediaSource &sender) const {
return _media_info._full_url;
return _media_info.full_url;
}
std::shared_ptr<SockInfo> RtspSession::getOriginSock(MediaSource &sender) const {

View File

@ -39,10 +39,7 @@ public:
using RingDataType = std::shared_ptr<toolkit::List<TSPacket::Ptr> >;
using RingType = toolkit::RingBuffer<RingDataType>;
TSMediaSource(const std::string &vhost,
const std::string &app,
const std::string &stream_id,
int ring_size = TS_GOP_SIZE) : MediaSource(TS_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {}
TSMediaSource(const MediaTuple& tuple, int ring_size = TS_GOP_SIZE): MediaSource(TS_SCHEMA, tuple), _ring_size(ring_size) {}
~TSMediaSource() override { flush(); }

View File

@ -21,12 +21,9 @@ class TSMediaSourceMuxer final : public MpegMuxer, public MediaSourceEventInterc
public:
using Ptr = std::shared_ptr<TSMediaSourceMuxer>;
TSMediaSourceMuxer(const std::string &vhost,
const std::string &app,
const std::string &stream_id,
const ProtocolOption &option) : MpegMuxer(false) {
TSMediaSourceMuxer(const MediaTuple& tuple, const ProtocolOption &option) : MpegMuxer(false) {
_option = option;
_media_src = std::make_shared<TSMediaSource>(vhost, app, stream_id);
_media_src = std::make_shared<TSMediaSource>(tuple);
}
~TSMediaSourceMuxer() override { MpegMuxer::flush(); };

View File

@ -35,7 +35,7 @@ void SrtTransportImp::onHandShakeFinished(std::string &streamid, struct sockaddr
return;
}
auto params = Parser::parseArgs(_media_info._param_strs);
auto params = Parser::parseArgs(_media_info.param_strs);
if (params["m"] == "publish") {
_is_pusher = true;
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this);
@ -52,7 +52,7 @@ bool SrtTransportImp::parseStreamid(std::string &streamid) {
if (!toolkit::start_with(streamid, "#!::")) {
return false;
}
_media_info._schema = SRT_SCHEMA;
_media_info.schema = SRT_SCHEMA;
std::string real_streamid = streamid.substr(4);
std::string vhost, app, stream_name;
@ -70,10 +70,10 @@ bool SrtTransportImp::parseStreamid(std::string &streamid) {
app = tmps[0];
stream_name = tmps[1];
} else {
if (_media_info._param_strs.empty()) {
_media_info._param_strs = it.first + "=" + it.second;
if (_media_info.param_strs.empty()) {
_media_info.param_strs = it.first + "=" + it.second;
} else {
_media_info._param_strs += "&" + it.first + "=" + it.second;
_media_info.param_strs += "&" + it.first + "=" + it.second;
}
}
}
@ -82,15 +82,15 @@ bool SrtTransportImp::parseStreamid(std::string &streamid) {
}
if (vhost != "") {
_media_info._vhost = vhost;
_media_info.vhost = vhost;
} else {
_media_info._vhost = DEFAULT_VHOST;
_media_info.vhost = DEFAULT_VHOST;
}
_media_info._app = app;
_media_info._streamid = stream_name;
_media_info.app = app;
_media_info.stream = stream_name;
TraceL << " mediainfo=" << _media_info.shortUrl() << " params=" << _media_info._param_strs;
TraceL << " mediainfo=" << _media_info.shortUrl() << " params=" << _media_info.param_strs;
return true;
}
@ -136,7 +136,7 @@ mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &
// 获取媒体源url或者文件路径
std::string SrtTransportImp::getOriginUrl(mediakit::MediaSource &sender) const {
return _media_info._full_url;
return _media_info.full_url;
}
// 获取媒体源客户端相关信息
@ -157,9 +157,7 @@ void SrtTransportImp::emitOnPublish() {
return;
}
if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info._vhost,
strong_self->_media_info._app,
strong_self->_media_info._streamid,0.0f,
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info,0.0f,
option);
strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc();
@ -207,7 +205,7 @@ void SrtTransportImp::emitOnPlay() {
void SrtTransportImp::doPlay() {
// 异步查找直播流
MediaInfo info = _media_info;
info._schema = TS_SCHEMA;
info.schema = TS_SCHEMA;
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
MediaSource::findAsync(info, getSession(), [weak_self](const MediaSource::Ptr &src) {
auto strong_self = weak_self.lock();
@ -281,7 +279,7 @@ uint16_t SrtTransportImp::get_local_port() {
}
std::string SrtTransportImp::getIdentifier() const {
return _media_info._streamid;
return _media_info.stream;
}
bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) {
@ -293,7 +291,7 @@ bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) {
auto diff = _type_to_stamp[TrackType::TrackVideo].getRelativeStamp() - _type_to_stamp[TrackType::TrackAudio].getRelativeStamp();
if(std::abs(diff) > 5000){
// 超过5s应该同步 TODO
WarnL << _media_info._full_url<<" video or audio not sync : "<<diff;
WarnL << _media_info.full_url <<" video or audio not sync : "<<diff;
}
}
//TraceL<<"after type "<<frame_tmp->getCodecName()<<" dts "<<frame_tmp->dts()<<" pts "<<frame_tmp->pts();

View File

@ -90,8 +90,8 @@ void initEventListener() {
static onceToken s_token([]() {
//监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnGetRtspRealm, [](BroadcastOnGetRtspRealmArgs) {
DebugL << "RTSP是否需要鉴权事件" << args.getUrl() << " " << args._param_strs;
if (string("1") == args._streamid) {
DebugL << "RTSP是否需要鉴权事件" << args.getUrl() << " " << args.param_strs;
if (string("1") == args.stream) {
// live/1需要认证
//该流需要认证并且设置realm
invoker(REALM);
@ -104,7 +104,7 @@ void initEventListener() {
//监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnRtspAuth, [](BroadcastOnRtspAuthArgs) {
DebugL << "RTSP播放鉴权:" << args.getUrl() << " " << args._param_strs;
DebugL << "RTSP播放鉴权:" << args.getUrl() << " " << args.param_strs;
DebugL << "RTSP用户" << user_name << (must_no_encrypt ? " Base64" : " MD5") << " 方式登录";
string user = user_name;
//假设我们异步读取数据库
@ -134,14 +134,14 @@ void initEventListener() {
//监听rtsp/rtmp推流事件返回结果告知是否有推流权限
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) {
DebugL << "推流鉴权:" << args.getUrl() << " " << args._param_strs;
DebugL << "推流鉴权:" << args.getUrl() << " " << args.param_strs;
invoker("", ProtocolOption());//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
//监听rtsp/rtsps/rtmp/http-flv播放事件返回结果告知是否有播放权限(rtsp通过kBroadcastOnRtspAuth或此事件都可以实现鉴权)
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPlayed, [](BroadcastMediaPlayedArgs) {
DebugL << "播放鉴权:" << args.getUrl() << " " << args._param_strs;
DebugL << "播放鉴权:" << args.getUrl() << " " << args.param_strs;
invoker("");//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
@ -182,14 +182,13 @@ void initEventListener() {
*
* ZLMediaKit会把其立即转发给播放器(55)
*/
DebugL << "未找到流事件:" << args.getUrl() << " " << args._param_strs;
DebugL << "未找到流事件:" << args.getUrl() << " " << args.param_strs;
});
//监听播放或推流结束时消耗流量事件
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastFlowReport, [](BroadcastFlowReportArgs) {
DebugL << "播放器(推流器)断开连接事件:" << args.getUrl() << " " << args._param_strs
<< "\r\n使用流量:" << totalBytes << " bytes,连接时长:" << totalDuration << "";
DebugL << "播放器(推流器)断开连接事件:" << args.getUrl() << " " << args.param_strs << "\r\n使用流量:" << totalBytes << " bytes,连接时长:" << totalDuration << "";
});

View File

@ -71,7 +71,7 @@ MediaOriginType WebRtcPusher::getOriginType(MediaSource &sender) const {
}
string WebRtcPusher::getOriginUrl(MediaSource &sender) const {
return _media_info._full_url;
return _media_info.full_url;
}
std::shared_ptr<SockInfo> WebRtcPusher::getOriginSock(MediaSource &sender) const {
@ -99,7 +99,7 @@ void WebRtcPusher::onRecvRtp(MediaTrack &track, const string &rid, RtpPacket::Pt
auto &src = _push_src_sim[rid];
if (!src) {
auto stream_id = rid.empty() ? _push_src->getId() : _push_src->getId() + "_" + rid;
auto src_imp = _push_src->Clone(stream_id);
auto src_imp = _push_src->clone(stream_id);
_push_src_sim_ownership[rid] = src_imp->getOwnership();
src_imp->setListener(static_pointer_cast<WebRtcPusher>(shared_from_this()));
src = src_imp;

View File

@ -1179,7 +1179,7 @@ void push_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana
RtspMediaSourceImp::Ptr push_src;
std::shared_ptr<void> push_src_ownership;
auto src = MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid);
auto src = MediaSource::find(RTSP_SCHEMA, info.vhost, info.app, info.stream);
auto push_failed = (bool)src;
while (src) {
@ -1206,7 +1206,7 @@ void push_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana
}
if (!push_src) {
push_src = std::make_shared<RtspMediaSourceImp>(info._vhost, info._app, info._streamid);
push_src = std::make_shared<RtspMediaSourceImp>(info);
push_src_ownership = push_src->getOwnership();
push_src->setProtocolOption(option);
}
@ -1235,7 +1235,7 @@ void play_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana
}
// webrtc播放的是rtsp的源
info._schema = RTSP_SCHEMA;
info.schema = RTSP_SCHEMA;
MediaSource::findAsync(info, session_ptr, [=](const MediaSource::Ptr &src_in) mutable {
auto src = dynamic_pointer_cast<RtspMediaSource>(src_in);
if (!src) {
@ -1243,7 +1243,7 @@ void play_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana
return;
}
// 还原成rtc目的是为了hook时识别哪种播放协议
info._schema = RTC_SCHEMA;
info.schema = RTC_SCHEMA;
auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info, preferred_tcp);
cb(*rtc);
});