操作MediaSource对象时确保线程安全

This commit is contained in:
xiongziliang 2022-06-11 14:17:43 +08:00
parent 97116e1208
commit 09af12a183

View File

@ -699,22 +699,23 @@ void installWebApi() {
});
//测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs
api_regist("/index/api/getMediaInfo",[](API_ARGS_MAP){
api_regist("/index/api/getMediaInfo",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("schema","vhost","app","stream");
auto src = MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"]);
if(!src){
val["online"] = false;
return;
throw ApiRetException("can not find the stream", API::NotFound);
}
val = makeMediaSourceJson(*src);
val["online"] = true;
val["code"] = API::Success;
src->getOwnerPoller()->async([=]() mutable {
auto val = makeMediaSourceJson(*src);
val["code"] = API::Success;
invoker(200, headerOut, val.toStyledString());
});
});
//主动关断流,包括关断拉流、推流
//测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1
api_regist("/index/api/close_stream",[](API_ARGS_MAP){
api_regist("/index/api/close_stream",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("schema","vhost","app","stream");
//踢掉推流器
@ -722,16 +723,17 @@ void installWebApi() {
allArgs["vhost"],
allArgs["app"],
allArgs["stream"]);
if (src) {
bool flag = src->close(allArgs["force"].as<bool>());
if (!src) {
throw ApiRetException("can not find the stream", API::NotFound);
}
bool force = allArgs["force"].as<bool>();
src->getOwnerPoller()->async([=]() mutable {
bool flag = src->close(force);
val["result"] = flag ? 0 : -1;
val["msg"] = flag ? "success" : "close failed";
val["code"] = flag ? API::Success : API::OtherFailed;
} else {
val["result"] = -2;
val["msg"] = "can not find the stream";
val["code"] = API::OtherFailed;
}
});
});
//批量主动关断流,包括关断拉流、推流
@ -748,8 +750,8 @@ void installWebApi() {
}, allArgs["schema"], allArgs["vhost"], allArgs["app"], allArgs["stream"]);
bool force = allArgs["force"].as<bool>();
for(auto &media : media_list){
if(media->close(force)){
for (auto &media : media_list) {
if (media->close(force)) {
++count_closed;
}
}
@ -1105,7 +1107,7 @@ void installWebApi() {
auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["from_mp4"].as<int>());
if (!src) {
throw ApiRetException("该媒体流不存在", API::OtherFailed);
throw ApiRetException("can not find the source stream", API::NotFound);
}
MediaSourceEvent::SendRtpArgs args;
@ -1118,15 +1120,17 @@ void installWebApi() {
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>();
args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as<bool>();
TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio;
TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio;
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
}
val["local_port"] = local_port;
invoker(200, headerOut, val.toStyledString());
src->getOwnerPoller()->async([=]() mutable {
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
}
val["local_port"] = local_port;
invoker(200, headerOut, val.toStyledString());
});
});
});
@ -1136,7 +1140,7 @@ void installWebApi() {
auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["from_mp4"].as<int>());
if (!src) {
throw ApiRetException("该媒体流不存在", API::OtherFailed);
throw ApiRetException("can not find the source stream", API::NotFound);
}
MediaSourceEvent::SendRtpArgs args;
@ -1148,29 +1152,38 @@ void installWebApi() {
args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>();
args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as<bool>();
TraceL << "startSendRtpPassive, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio;
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
}
val["local_port"] = local_port;
invoker(200, headerOut, val.toStyledString());
src->getOwnerPoller()->async([=]() mutable {
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
}
val["local_port"] = local_port;
invoker(200, headerOut, val.toStyledString());
});
});
});
api_regist("/index/api/stopSendRtp",[](API_ARGS_MAP){
api_regist("/index/api/stopSendRtp",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("vhost", "app", "stream");
auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"]);
if (!src) {
throw ApiRetException("该媒体流不存在", API::OtherFailed);
throw ApiRetException("can not find the stream", API::NotFound);
}
//ssrc如果为空关闭全部
if (!src->stopSendRtp(allArgs["ssrc"])) {
throw ApiRetException("尚未开始推流,停止失败", API::OtherFailed);
}
src->getOwnerPoller()->async([=]() mutable {
// ssrc如果为空关闭全部
if (!src->stopSendRtp(allArgs["ssrc"])) {
val["code"] = API::OtherFailed;
val["msg"] = "stopSendRtp failed";
invoker(200, headerOut, val.toStyledString());
return;
}
invoker(200, headerOut, val.toStyledString());
});
});
api_regist("/index/api/pauseRtpCheck", [](API_ARGS_MAP) {
@ -1199,80 +1212,102 @@ void installWebApi() {
#endif//ENABLE_RTPPROXY
// 开始录制hls或MP4
api_regist("/index/api/startRecord",[](API_ARGS_MAP){
api_regist("/index/api/startRecord",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("type","vhost","app","stream");
auto result = Recorder::startRecord((Recorder::type) allArgs["type"].as<int>(),
allArgs["vhost"],
allArgs["app"],
allArgs["stream"],
allArgs["customized_path"],
allArgs["max_second"].as<size_t>());
val["result"] = result;
val["code"] = result ? API::Success : API::OtherFailed;
val["msg"] = result ? "success" : "start record failed";
auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"] );
if (!src) {
throw ApiRetException("can not find the stream", API::NotFound);
}
src->getOwnerPoller()->async([=]() mutable {
auto result = src->setupRecord((Recorder::type)allArgs["type"].as<int>(), true, allArgs["customized_path"], allArgs["max_second"].as<size_t>());
val["result"] = result;
val["code"] = result ? API::Success : API::OtherFailed;
val["msg"] = result ? "success" : "start record failed";
invoker(200, headerOut, val.toStyledString());
});
});
//设置录像流播放速度
api_regist("/index/api/setRecordSpeed", [](API_ARGS_MAP) {
api_regist("/index/api/setRecordSpeed", [](API_ARGS_MAP_ASYNC) {
CHECK_SECRET();
CHECK_ARGS("schema", "vhost", "app", "stream", "speed");
auto src = MediaSource::find(allArgs["schema"],
allArgs["vhost"],
allArgs["app"],
allArgs["stream"]);
if (src) {
bool flag = src->speed(allArgs["speed"].as<float>());
if (!src) {
throw ApiRetException("can not find the stream", API::NotFound);
}
auto speed = allArgs["speed"].as<float>();
src->getOwnerPoller()->async([=]() mutable {
bool flag = src->speed(speed);
val["result"] = flag ? 0 : -1;
val["msg"] = flag ? "success" : "set failed";
val["code"] = flag ? API::Success : API::OtherFailed;
} else {
val["result"] = -2;
val["msg"] = "can not find the stream";
val["code"] = API::OtherFailed;
}
invoker(200, headerOut, val.toStyledString());
});
});
api_regist("/index/api/seekRecordStamp", [](API_ARGS_MAP) {
api_regist("/index/api/seekRecordStamp", [](API_ARGS_MAP_ASYNC) {
CHECK_SECRET();
CHECK_ARGS("schema", "vhost", "app", "stream", "stamp");
auto src = MediaSource::find(allArgs["schema"],
allArgs["vhost"],
allArgs["app"],
allArgs["stream"]);
if (src) {
bool flag = src->seekTo(allArgs["stamp"].as<size_t>());
if (!src) {
throw ApiRetException("can not find the stream", API::NotFound);
}
auto stamp = allArgs["stamp"].as<size_t>();
src->getOwnerPoller()->async([=]() mutable {
bool flag = src->seekTo(stamp);
val["result"] = flag ? 0 : -1;
val["msg"] = flag ? "success" : "seek failed";
val["code"] = flag ? API::Success : API::OtherFailed;
} else {
val["result"] = -2;
val["msg"] = "can not find the stream";
val["code"] = API::OtherFailed;
}
invoker(200, headerOut, val.toStyledString());
});
});
// 停止录制hls或MP4
api_regist("/index/api/stopRecord",[](API_ARGS_MAP){
api_regist("/index/api/stopRecord",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("type","vhost","app","stream");
auto result = Recorder::stopRecord((Recorder::type) allArgs["type"].as<int>(),
allArgs["vhost"],
allArgs["app"],
allArgs["stream"]);
val["result"] = result;
val["code"] = result ? API::Success : API::OtherFailed;
val["msg"] = result ? "success" : "stop record failed";
auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"] );
if (!src) {
throw ApiRetException("can not find the stream", API::NotFound);
}
auto type = (Recorder::type)allArgs["type"].as<int>();
src->getOwnerPoller()->async([=]() mutable {
auto result = src->setupRecord(type, false, "", 0);
val["result"] = result;
val["code"] = result ? API::Success : API::OtherFailed;
val["msg"] = result ? "success" : "stop record failed";
invoker(200, headerOut, val.toStyledString());
});
});
// 获取hls或MP4录制状态
api_regist("/index/api/isRecording",[](API_ARGS_MAP){
api_regist("/index/api/isRecording",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("type","vhost","app","stream");
val["status"] = Recorder::isRecording((Recorder::type) allArgs["type"].as<int>(),
allArgs["vhost"],
allArgs["app"],
allArgs["stream"]);
auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"]);
if (!src) {
throw ApiRetException("can not find the stream", API::NotFound);
}
auto type = (Recorder::type)allArgs["type"].as<int>();
src->getOwnerPoller()->async([=]() mutable {
val["status"] = src->isRecording(type);
invoker(200, headerOut, val.toStyledString());
});
});
//获取录像文件夹列表或mp4文件列表