webrtc新增自定义插件模式

This commit is contained in:
ziyue 2021-10-19 15:23:12 +08:00
parent 8aa2d0ce07
commit 5ee9b69568
3 changed files with 207 additions and 100 deletions

View File

@ -1186,75 +1186,47 @@ void installWebApi() {
}); });
#ifdef ENABLE_WEBRTC #ifdef ENABLE_WEBRTC
api_regist("/index/api/webrtc",[](API_ARGS_STRING_ASYNC){ class WebRtcArgsImp : public WebRtcArgs {
auto offer_sdp = allArgs.getArgs(); public:
auto type = allArgs["type"]; WebRtcArgsImp(const HttpAllArgs<string> &args) : _args(args) {}
~WebRtcArgsImp() override = default;
variant operator[](const string &key) const override {
if (key == "url") {
return getUrl();
}
return _args[key];
}
private:
string getUrl() const{
auto &allArgs = _args;
CHECK_ARGS("app", "stream");
return StrPrinter << RTC_SCHEMA << "://" << _args["Host"] << "/" << _args["app"] << "/"
<< _args["stream"] << "?" << _args.getParser().Params();
}
private:
HttpAllArgs<string> _args;
};
api_regist("/index/api/webrtc",[](API_ARGS_STRING_ASYNC){
CHECK_ARGS("type");
auto type = allArgs["type"];
auto offer = allArgs.getArgs();
CHECK(!offer.empty(), "http body(webrtc offer sdp) is empty");
WebRtcPluginManager::Instance().getAnswerSdp(*(static_cast<Session *>(&sender)), type, offer, WebRtcArgsImp(allArgs),
[invoker, val, offer, headerOut](const WebRtcInterface &exchanger) mutable {
//设置返回类型 //设置返回类型
headerOut["Content-Type"] = HttpFileManager::getContentType(".json"); headerOut["Content-Type"] = HttpFileManager::getContentType(".json");
//设置跨域 //设置跨域
headerOut["Access-Control-Allow-Origin"] = "*"; headerOut["Access-Control-Allow-Origin"] = "*";
if (type.empty() || !strcasecmp(type.data(), "play")) {
CHECK_ARGS("app", "stream");
MediaInfo info(StrPrinter << "rtc://" << allArgs["Host"] << "/" << allArgs["app"] << "/" << allArgs["stream"] << "?" << allArgs.getParser().Params());
auto session = static_cast<TcpSession*>(&sender);
auto session_ptr = session->shared_from_this();
Broadcast::AuthInvoker authInvoker = [invoker, offer_sdp, val, info, headerOut, session_ptr](const string &err) mutable {
if (!err.empty()) {
val["code"] = API::AuthFailed;
val["msg"] = err;
invoker(200, headerOut, val.toStyledString());
return;
}
//webrtc播放的是rtsp的源
info._schema = RTSP_SCHEMA;
MediaSource::findAsync(info, session_ptr, [=](const MediaSource::Ptr &src_in) mutable {
auto src = dynamic_pointer_cast<RtspMediaSource>(src_in);
if (!src) {
val["code"] = API::NotFound;
val["msg"] = "stream not found";
invoker(200, headerOut, val.toStyledString());
return;
}
//还原成rtc目的是为了hook时识别哪种播放协议
info._schema = "rtc";
auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info);
val["sdp"] = rtc->getAnswerSdp(offer_sdp);
val["type"] = "answer";
invoker(200, headerOut, val.toStyledString());
});
};
//广播通用播放url鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, info, authInvoker, sender);
if (!flag) {
//该事件无人监听,默认不鉴权
authInvoker("");
}
return;
}
if (!strcasecmp(type.data(), "push")) {
CHECK_ARGS("app", "stream");
MediaInfo info(StrPrinter << "rtc://" << allArgs["Host"] << "/" << allArgs["app"] << "/" << allArgs["stream"] << "?" << allArgs.getParser().Params());
Broadcast::PublishAuthInvoker authInvoker = [invoker, offer_sdp, val, info, headerOut](const string &err, bool enableHls, bool enableMP4) mutable {
try { try {
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid)); val["sdp"] = const_cast<WebRtcInterface &>(exchanger).getAnswerSdp(offer);
if (src) { val["id"] = exchanger.getIdentifier();
throw std::runtime_error("已经在推流");
}
if (!err.empty()) {
throw runtime_error(StrPrinter << "推流鉴权失败:" << err);
}
auto push_src = std::make_shared<RtspMediaSourceImp>(info._vhost, info._app, info._streamid);
push_src->setProtocolTranslation(enableHls, enableMP4);
auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, info);
push_src->setListener(rtc);
val["sdp"] = rtc->getAnswerSdp(offer_sdp);
val["type"] = "answer"; val["type"] = "answer";
invoker(200, headerOut, val.toStyledString()); invoker(200, headerOut, val.toStyledString());
} catch (std::exception &ex) { } catch (std::exception &ex) {
@ -1262,28 +1234,7 @@ void installWebApi() {
val["msg"] = ex.what(); val["msg"] = ex.what();
invoker(200, headerOut, val.toStyledString()); invoker(200, headerOut, val.toStyledString());
} }
}; });
//rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, info, authInvoker, sender);
if (!flag) {
//该事件无人监听,默认不鉴权
GET_CONFIG(bool, toHls, General::kPublishToHls);
GET_CONFIG(bool, toMP4, General::kPublishToMP4);
authInvoker("", toHls, toMP4);
}
return;
}
if (!strcasecmp(type.data(), "echo")) {
auto rtc = WebRtcEchoTest::create(EventPollerPool::Instance().getPoller());
val["sdp"] = rtc->getAnswerSdp(offer_sdp);
val["type"] = "answer";
invoker(200, headerOut, val.toStyledString());
return;
}
throw ApiRetException("不支持该类型", API::InvalidArgs);
}); });
#endif #endif

View File

@ -885,7 +885,7 @@ WebRtcTransportManager &WebRtcTransportManager::Instance() {
return s_instance; return s_instance;
} }
void WebRtcTransportManager::addItem(string key, const WebRtcTransportImp::Ptr &ptr) { void WebRtcTransportManager::addItem(const string &key, const WebRtcTransportImp::Ptr &ptr) {
lock_guard<mutex> lck(_mtx); lock_guard<mutex> lck(_mtx);
_map[key] = ptr; _map[key] = ptr;
} }
@ -902,7 +902,106 @@ WebRtcTransportImp::Ptr WebRtcTransportManager::getItem(const string &key) {
return it->second.lock(); return it->second.lock();
} }
void WebRtcTransportManager::removeItem(string key) { void WebRtcTransportManager::removeItem(const string &key) {
lock_guard<mutex> lck(_mtx); lock_guard<mutex> lck(_mtx);
_map.erase(key); _map.erase(key);
} }
//////////////////////////////////////////////////////////////////////////////////////////////
WebRtcPluginManager &WebRtcPluginManager::Instance() {
static WebRtcPluginManager s_instance;
return s_instance;
}
void WebRtcPluginManager::registerPlugin(const string &type, Plugin cb) {
lock_guard<mutex> lck(_mtx_creator);
_map_creator[type] = std::move(cb);
}
void WebRtcPluginManager::getAnswerSdp(Session &sender, const string &type, const string &offer, const WebRtcArgs &args,
const onCreateRtc &cb) {
lock_guard<mutex> lck(_mtx_creator);
auto it = _map_creator.find(type);
if (it == _map_creator.end()) {
cb(WebRtcException(SockException(Err_other, "the type can not supported")));
return;
}
it->second(sender, offer, args, cb);
}
#include "WebRtcPlayer.h"
#include "WebRtcPusher.h"
#include "WebRtcEchoTest.h"
void echo_plugin(Session &sender, const string &offer, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
cb(*WebRtcEchoTest::create(EventPollerPool::Instance().getPoller()));
}
void push_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
MediaInfo info(args["url"]);
Broadcast::PublishAuthInvoker invoker = [cb, offer_sdp, info](const string &err, bool enable_hls, bool enable_mp4) mutable {
if (!err.empty()) {
cb(WebRtcException(SockException(Err_other, err)));
return;
}
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid));
if (src) {
cb(WebRtcException(SockException(Err_other, "already publishing")));
return;
}
auto push_src = std::make_shared<RtspMediaSourceImp>(info._vhost, info._app, info._streamid);
push_src->setProtocolTranslation(enable_hls, enable_mp4);
auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, info);
push_src->setListener(rtc);
cb(*rtc);
};
//rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, info, invoker, sender);
if (!flag) {
//该事件无人监听,默认不鉴权
GET_CONFIG(bool, to_hls, General::kPublishToHls);
GET_CONFIG(bool, to_mp4, General::kPublishToMP4);
invoker("", to_hls, to_mp4);
}
}
void play_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
MediaInfo info(args["url"]);
auto session_ptr = sender.shared_from_this();
Broadcast::AuthInvoker invoker = [cb, offer_sdp, info, session_ptr](const string &err) mutable {
if (!err.empty()) {
cb(WebRtcException(SockException(Err_other, err)));
return;
}
//webrtc播放的是rtsp的源
info._schema = RTSP_SCHEMA;
MediaSource::findAsync(info, session_ptr, [=](const MediaSource::Ptr &src_in) mutable {
auto src = dynamic_pointer_cast<RtspMediaSource>(src_in);
if (!src) {
cb(WebRtcException(SockException(Err_other, "stream not found")));
return;
}
//还原成rtc目的是为了hook时识别哪种播放协议
info._schema = RTC_SCHEMA;
auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info);
cb(*rtc);
});
};
//广播通用播放url鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, info, invoker, sender);
if (!flag) {
//该事件无人监听,默认不鉴权
invoker("");
}
}
static onceToken s_rtc_auto_register([](){
WebRtcPluginManager::Instance().registerPlugin("echo", echo_plugin);
WebRtcPluginManager::Instance().registerPlugin("push", push_plugin);
WebRtcPluginManager::Instance().registerPlugin("play", play_plugin);
});

View File

@ -35,7 +35,31 @@ extern const string kPort;
extern const string kTimeOutSec; extern const string kTimeOutSec;
}//namespace RTC }//namespace RTC
class WebRtcTransport : public RTC::DtlsTransport::Listener, public RTC::IceServer::Listener, public std::enable_shared_from_this<WebRtcTransport> { class WebRtcInterface {
public:
WebRtcInterface() = default;
virtual ~WebRtcInterface() = default;
virtual string getAnswerSdp(const string &offer) = 0;
virtual const string &getIdentifier() const = 0;
};
class WebRtcException : public WebRtcInterface {
public:
WebRtcException(const SockException &ex) : _ex(ex) {};
~WebRtcException() override = default;
string getAnswerSdp(const string &offer) override {
throw _ex;
}
const string &getIdentifier() const override {
static string s_null;
return s_null;
}
private:
SockException _ex;
};
class WebRtcTransport : public WebRtcInterface, public RTC::DtlsTransport::Listener, public RTC::IceServer::Listener, public std::enable_shared_from_this<WebRtcTransport> {
public: public:
using Ptr = std::shared_ptr<WebRtcTransport>; using Ptr = std::shared_ptr<WebRtcTransport>;
WebRtcTransport(const EventPoller::Ptr &poller); WebRtcTransport(const EventPoller::Ptr &poller);
@ -56,7 +80,12 @@ public:
* @param offer offer sdp * @param offer offer sdp
* @return answer sdp * @return answer sdp
*/ */
std::string getAnswerSdp(const string &offer); string getAnswerSdp(const string &offer) override;
/**
* id
*/
const string& getIdentifier() const override;
/** /**
* socket收到udp数据 * socket收到udp数据
@ -77,7 +106,6 @@ public:
void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr);
const EventPoller::Ptr& getPoller() const; const EventPoller::Ptr& getPoller() const;
const string& getIdentifier() const;
protected: protected:
//// dtls相关的回调 //// //// dtls相关的回调 ////
@ -228,13 +256,42 @@ private:
class WebRtcTransportManager { class WebRtcTransportManager {
public: public:
friend class WebRtcTransportImp;
static WebRtcTransportManager &Instance(); static WebRtcTransportManager &Instance();
void addItem(string key, const WebRtcTransportImp::Ptr &ptr);
void removeItem(string key);
WebRtcTransportImp::Ptr getItem(const string &key); WebRtcTransportImp::Ptr getItem(const string &key);
private: private:
WebRtcTransportManager() = default; WebRtcTransportManager() = default;
void addItem(const string &key, const WebRtcTransportImp::Ptr &ptr);
void removeItem(const string &key);
private:
mutable mutex _mtx; mutable mutex _mtx;
unordered_map<string, weak_ptr<WebRtcTransportImp> > _map; unordered_map<string, weak_ptr<WebRtcTransportImp> > _map;
}; };
class WebRtcArgs {
public:
WebRtcArgs() = default;
virtual ~WebRtcArgs() = default;
virtual variant operator[](const string &key) const = 0;
};
class WebRtcPluginManager {
public:
using onCreateRtc = function<void(const WebRtcInterface &rtc)>;
using Plugin = function<void(Session &sender, const string &offer, const WebRtcArgs &args, const onCreateRtc &cb)>;
static WebRtcPluginManager &Instance();
void registerPlugin(const string &type, Plugin cb);
void getAnswerSdp(Session &sender, const string &type, const string &offer, const WebRtcArgs &args, const onCreateRtc &cb);
private:
WebRtcPluginManager() = default;
private:
mutable mutex _mtx_creator;
unordered_map<string, Plugin> _map_creator;
};