diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 181eebf6..cdcc68ac 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1186,104 +1186,55 @@ void installWebApi() { }); #ifdef ENABLE_WEBRTC + class WebRtcArgsImp : public WebRtcArgs { + public: + WebRtcArgsImp(const HttpAllArgs &args) : _args(args) {} + ~WebRtcArgsImp() override = default; + + variant operator[](const string &key) const override { + if (key == "url") { + return getUrl(); + } + return _args[key]; + } + + private: + string getUrl() const{ + auto &allArgs = _args; + CHECK_ARGS("app", "stream"); + + return StrPrinter << RTC_SCHEMA << "://" << _args["Host"] << "/" << _args["app"] << "/" + << _args["stream"] << "?" << _args.getParser().Params(); + } + + private: + HttpAllArgs _args; + }; + api_regist("/index/api/webrtc",[](API_ARGS_STRING_ASYNC){ - auto offer_sdp = allArgs.getArgs(); + CHECK_ARGS("type"); auto type = allArgs["type"]; + auto offer = allArgs.getArgs(); + CHECK(!offer.empty(), "http body(webrtc offer sdp) is empty"); - //设置返回类型 - headerOut["Content-Type"] = HttpFileManager::getContentType(".json"); - //设置跨域 - headerOut["Access-Control-Allow-Origin"] = "*"; + WebRtcPluginManager::Instance().getAnswerSdp(*(static_cast(&sender)), type, offer, WebRtcArgsImp(allArgs), + [invoker, val, offer, headerOut](const WebRtcInterface &exchanger) mutable { + //设置返回类型 + headerOut["Content-Type"] = HttpFileManager::getContentType(".json"); + //设置跨域 + headerOut["Access-Control-Allow-Origin"] = "*"; - if (type.empty() || !strcasecmp(type.data(), "play")) { - CHECK_ARGS("app", "stream"); - MediaInfo info(StrPrinter << "rtc://" << allArgs["Host"] << "/" << allArgs["app"] << "/" << allArgs["stream"] << "?" << allArgs.getParser().Params()); - - auto session = static_cast(&sender); - auto session_ptr = session->shared_from_this(); - Broadcast::AuthInvoker authInvoker = [invoker, offer_sdp, val, info, headerOut, session_ptr](const string &err) mutable { - if (!err.empty()) { - val["code"] = API::AuthFailed; - val["msg"] = err; - invoker(200, headerOut, val.toStyledString()); - return; - } - - //webrtc播放的是rtsp的源 - info._schema = RTSP_SCHEMA; - MediaSource::findAsync(info, session_ptr, [=](const MediaSource::Ptr &src_in) mutable { - auto src = dynamic_pointer_cast(src_in); - if (!src) { - val["code"] = API::NotFound; - val["msg"] = "stream not found"; - invoker(200, headerOut, val.toStyledString()); - return; - } - //还原成rtc,目的是为了hook时识别哪种播放协议 - info._schema = "rtc"; - auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info); - val["sdp"] = rtc->getAnswerSdp(offer_sdp); - val["type"] = "answer"; - invoker(200, headerOut, val.toStyledString()); - }); - }; - - //广播通用播放url鉴权事件 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, info, authInvoker, sender); - if (!flag) { - //该事件无人监听,默认不鉴权 - authInvoker(""); + try { + val["sdp"] = const_cast(exchanger).getAnswerSdp(offer); + val["id"] = exchanger.getIdentifier(); + val["type"] = "answer"; + invoker(200, headerOut, val.toStyledString()); + } catch (std::exception &ex) { + val["code"] = API::Exception; + val["msg"] = ex.what(); + invoker(200, headerOut, val.toStyledString()); } - return; - } - - if (!strcasecmp(type.data(), "push")) { - CHECK_ARGS("app", "stream"); - MediaInfo info(StrPrinter << "rtc://" << allArgs["Host"] << "/" << allArgs["app"] << "/" << allArgs["stream"] << "?" << allArgs.getParser().Params()); - - Broadcast::PublishAuthInvoker authInvoker = [invoker, offer_sdp, val, info, headerOut](const string &err, bool enableHls, bool enableMP4) mutable { - try { - auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid)); - if (src) { - throw std::runtime_error("已经在推流"); - } - if (!err.empty()) { - throw runtime_error(StrPrinter << "推流鉴权失败:" << err); - } - auto push_src = std::make_shared(info._vhost, info._app, info._streamid); - push_src->setProtocolTranslation(enableHls, enableMP4); - auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, info); - push_src->setListener(rtc); - val["sdp"] = rtc->getAnswerSdp(offer_sdp); - val["type"] = "answer"; - invoker(200, headerOut, val.toStyledString()); - } catch (std::exception &ex) { - val["code"] = API::Exception; - val["msg"] = ex.what(); - invoker(200, headerOut, val.toStyledString()); - } - }; - - //rtsp推流需要鉴权 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, info, authInvoker, sender); - if (!flag) { - //该事件无人监听,默认不鉴权 - GET_CONFIG(bool, toHls, General::kPublishToHls); - GET_CONFIG(bool, toMP4, General::kPublishToMP4); - authInvoker("", toHls, toMP4); - } - return; - } - - if (!strcasecmp(type.data(), "echo")) { - auto rtc = WebRtcEchoTest::create(EventPollerPool::Instance().getPoller()); - val["sdp"] = rtc->getAnswerSdp(offer_sdp); - val["type"] = "answer"; - invoker(200, headerOut, val.toStyledString()); - return; - } - - throw ApiRetException("不支持该类型", API::InvalidArgs); + }); }); #endif diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 2951c30c..c7a8106f 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -885,7 +885,7 @@ WebRtcTransportManager &WebRtcTransportManager::Instance() { return s_instance; } -void WebRtcTransportManager::addItem(string key, const WebRtcTransportImp::Ptr &ptr) { +void WebRtcTransportManager::addItem(const string &key, const WebRtcTransportImp::Ptr &ptr) { lock_guard lck(_mtx); _map[key] = ptr; } @@ -902,7 +902,106 @@ WebRtcTransportImp::Ptr WebRtcTransportManager::getItem(const string &key) { return it->second.lock(); } -void WebRtcTransportManager::removeItem(string key) { +void WebRtcTransportManager::removeItem(const string &key) { lock_guard lck(_mtx); _map.erase(key); } + +////////////////////////////////////////////////////////////////////////////////////////////// + +WebRtcPluginManager &WebRtcPluginManager::Instance() { + static WebRtcPluginManager s_instance; + return s_instance; +} + +void WebRtcPluginManager::registerPlugin(const string &type, Plugin cb) { + lock_guard lck(_mtx_creator); + _map_creator[type] = std::move(cb); +} + +void WebRtcPluginManager::getAnswerSdp(Session &sender, const string &type, const string &offer, const WebRtcArgs &args, + const onCreateRtc &cb) { + lock_guard lck(_mtx_creator); + auto it = _map_creator.find(type); + if (it == _map_creator.end()) { + cb(WebRtcException(SockException(Err_other, "the type can not supported"))); + return; + } + it->second(sender, offer, args, cb); +} + +#include "WebRtcPlayer.h" +#include "WebRtcPusher.h" +#include "WebRtcEchoTest.h" + +void echo_plugin(Session &sender, const string &offer, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) { + cb(*WebRtcEchoTest::create(EventPollerPool::Instance().getPoller())); +} + +void push_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) { + MediaInfo info(args["url"]); + Broadcast::PublishAuthInvoker invoker = [cb, offer_sdp, info](const string &err, bool enable_hls, bool enable_mp4) mutable { + if (!err.empty()) { + cb(WebRtcException(SockException(Err_other, err))); + return; + } + auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid)); + if (src) { + cb(WebRtcException(SockException(Err_other, "already publishing"))); + return; + } + + auto push_src = std::make_shared(info._vhost, info._app, info._streamid); + push_src->setProtocolTranslation(enable_hls, enable_mp4); + auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, info); + push_src->setListener(rtc); + cb(*rtc); + }; + + //rtsp推流需要鉴权 + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, info, invoker, sender); + if (!flag) { + //该事件无人监听,默认不鉴权 + GET_CONFIG(bool, to_hls, General::kPublishToHls); + GET_CONFIG(bool, to_mp4, General::kPublishToMP4); + invoker("", to_hls, to_mp4); + } +} + +void play_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) { + MediaInfo info(args["url"]); + auto session_ptr = sender.shared_from_this(); + Broadcast::AuthInvoker invoker = [cb, offer_sdp, info, session_ptr](const string &err) mutable { + if (!err.empty()) { + cb(WebRtcException(SockException(Err_other, err))); + return; + } + + //webrtc播放的是rtsp的源 + info._schema = RTSP_SCHEMA; + MediaSource::findAsync(info, session_ptr, [=](const MediaSource::Ptr &src_in) mutable { + auto src = dynamic_pointer_cast(src_in); + if (!src) { + cb(WebRtcException(SockException(Err_other, "stream not found"))); + return; + } + //还原成rtc,目的是为了hook时识别哪种播放协议 + info._schema = RTC_SCHEMA; + auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info); + cb(*rtc); + }); + }; + + //广播通用播放url鉴权事件 + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, info, invoker, sender); + if (!flag) { + //该事件无人监听,默认不鉴权 + invoker(""); + } +} + +static onceToken s_rtc_auto_register([](){ + WebRtcPluginManager::Instance().registerPlugin("echo", echo_plugin); + WebRtcPluginManager::Instance().registerPlugin("push", push_plugin); + WebRtcPluginManager::Instance().registerPlugin("play", play_plugin); +}); diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index f5153e1f..b432dce5 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -35,7 +35,31 @@ extern const string kPort; extern const string kTimeOutSec; }//namespace RTC -class WebRtcTransport : public RTC::DtlsTransport::Listener, public RTC::IceServer::Listener, public std::enable_shared_from_this { +class WebRtcInterface { +public: + WebRtcInterface() = default; + virtual ~WebRtcInterface() = default; + virtual string getAnswerSdp(const string &offer) = 0; + virtual const string &getIdentifier() const = 0; +}; + +class WebRtcException : public WebRtcInterface { +public: + WebRtcException(const SockException &ex) : _ex(ex) {}; + ~WebRtcException() override = default; + string getAnswerSdp(const string &offer) override { + throw _ex; + } + const string &getIdentifier() const override { + static string s_null; + return s_null; + } + +private: + SockException _ex; +}; + +class WebRtcTransport : public WebRtcInterface, public RTC::DtlsTransport::Listener, public RTC::IceServer::Listener, public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; WebRtcTransport(const EventPoller::Ptr &poller); @@ -56,7 +80,12 @@ public: * @param offer offer sdp * @return answer sdp */ - std::string getAnswerSdp(const string &offer); + string getAnswerSdp(const string &offer) override; + + /** + * 获取对象唯一id + */ + const string& getIdentifier() const override; /** * socket收到udp数据 @@ -77,7 +106,6 @@ public: void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); const EventPoller::Ptr& getPoller() const; - const string& getIdentifier() const; protected: //// dtls相关的回调 //// @@ -228,13 +256,42 @@ private: class WebRtcTransportManager { public: + friend class WebRtcTransportImp; static WebRtcTransportManager &Instance(); - void addItem(string key, const WebRtcTransportImp::Ptr &ptr); - void removeItem(string key); WebRtcTransportImp::Ptr getItem(const string &key); private: WebRtcTransportManager() = default; + void addItem(const string &key, const WebRtcTransportImp::Ptr &ptr); + void removeItem(const string &key); + +private: mutable mutex _mtx; unordered_map > _map; +}; + +class WebRtcArgs { +public: + WebRtcArgs() = default; + virtual ~WebRtcArgs() = default; + + virtual variant operator[](const string &key) const = 0; +}; + +class WebRtcPluginManager { +public: + using onCreateRtc = function; + using Plugin = function; + + static WebRtcPluginManager &Instance(); + + void registerPlugin(const string &type, Plugin cb); + void getAnswerSdp(Session &sender, const string &type, const string &offer, const WebRtcArgs &args, const onCreateRtc &cb); + +private: + WebRtcPluginManager() = default; + +private: + mutable mutex _mtx_creator; + unordered_map _map_creator; }; \ No newline at end of file