diff --git a/api/source/mk_common.cpp b/api/source/mk_common.cpp index ce36924e..1b84707e 100644 --- a/api/source/mk_common.cpp +++ b/api/source/mk_common.cpp @@ -30,10 +30,8 @@ static TcpServer::Ptr http_server[2]; static TcpServer::Ptr shell_server; #ifdef ENABLE_RTPPROXY -#include "Rtp/UdpRecver.h" -#include "Rtp/RtpSession.h" -static std::shared_ptr udpRtpServer; -static TcpServer::Ptr tcpRtpServer; +#include "Rtp/RtpServer.h" +static std::shared_ptr rtpServer; #endif //////////////////////////environment init/////////////////////////// @@ -57,8 +55,7 @@ API_EXPORT void API_CALL mk_stop_all_server(){ CLEAR_ARR(rtmp_server); CLEAR_ARR(http_server); #ifdef ENABLE_RTPPROXY - udpRtpServer = nullptr; - tcpRtpServer = nullptr; + rtpServer = nullptr; #endif stopAllTcpServer(); } @@ -184,18 +181,12 @@ API_EXPORT uint16_t API_CALL mk_rtmp_server_start(uint16_t port, int ssl) { API_EXPORT uint16_t API_CALL mk_rtp_server_start(uint16_t port){ #ifdef ENABLE_RTPPROXY try { - //创建rtp tcp服务器 - tcpRtpServer = std::make_shared(); - tcpRtpServer->start(port); - - //创建rtp udp服务器 - auto ret = tcpRtpServer->getPort(); - udpRtpServer = std::make_shared(); - udpRtpServer->initSock(port); - return ret; + //创建rtp 服务器 + rtpServer = std::make_shared(); + rtpServer->start(port); + return rtpServer->getPort(); } catch (std::exception &ex) { - tcpRtpServer.reset(); - udpRtpServer.reset(); + rtpServer.reset(); WarnL << ex.what(); return 0; } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 20a653aa..cc98d596 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -34,6 +34,9 @@ #include "Thread/WorkThreadPool.h" #include "Rtp/RtpSelector.h" #include "FFmpegSource.h" +#if defined(ENABLE_RTPPROXY) +#include "Rtp/RtpServer.h" +#endif using namespace Json; using namespace toolkit; using namespace mediakit; @@ -244,15 +247,24 @@ bool checkArgs(Args &&args,First &&first,KeyTypes && ...keys){ } \ } +//拉流代理器列表 static unordered_map s_proxyMap; static recursive_mutex s_proxyMapMtx; + +//FFmpeg拉流代理器列表 +static unordered_map s_ffmpegMap; +static recursive_mutex s_ffmpegMapMtx; + +#if defined(ENABLE_RTPPROXY) +//rtp服务器列表 +static unordered_map s_rtpServerMap; +static recursive_mutex s_rtpServerMapMtx; +#endif + static inline string getProxyKey(const string &vhost,const string &app,const string &stream){ return vhost + "/" + app + "/" + stream; } -static unordered_map s_ffmpegMap; -static recursive_mutex s_ffmpegMapMtx; - /** * 安装api接口 * 所有api都支持GET和POST两种方式 @@ -745,6 +757,29 @@ void installWebApi() { val["peer_ip"] = process->get_peer_ip(); val["peer_port"] = process->get_peer_port(); }); + + api_regist1("/index/api/openRtpServer",[](API_ARGS1){ + CHECK_SECRET(); + CHECK_ARGS("port", "enable_tcp"); + + RtpServer::Ptr server = std::make_shared(); + server->start(allArgs["port"], allArgs["enable_tcp"].as()); + val["port"] = server->getPort(); + + //保存对象 + lock_guard lck(s_rtpServerMapMtx); + s_rtpServerMap.emplace(server->getPort(), server); + }); + + api_regist1("/index/api/closeRtpServer",[](API_ARGS1){ + CHECK_SECRET(); + CHECK_ARGS("port"); + + lock_guard lck(s_rtpServerMapMtx); + val["hit"] = (int)s_rtpServerMap.erase(allArgs["port"].as()); + }); + + #endif//ENABLE_RTPPROXY // 开始录制hls或MP4 @@ -1045,4 +1080,10 @@ void unInstallWebApi(){ lock_guard lck(s_ffmpegMapMtx); s_ffmpegMap.clear(); } + { +#if defined(ENABLE_RTPPROXY) + lock_guard lck(s_rtpServerMapMtx); + s_rtpServerMap.clear(); +#endif + } } \ No newline at end of file diff --git a/server/main.cpp b/server/main.cpp index cd1ee8a0..705092c4 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -20,13 +20,11 @@ #include "Network/TcpServer.h" #include "Poller/EventPoller.h" #include "Common/config.h" -#include "Rtsp/UDPServer.h" #include "Rtsp/RtspSession.h" -#include "Rtp/RtpSession.h" #include "Rtmp/RtmpSession.h" #include "Shell/ShellSession.h" #include "Http/WebSocketSession.h" -#include "Rtp/UdpRecver.h" +#include "Rtp/RtpServer.h" #include "WebApi.h" #include "WebHook.h" @@ -283,8 +281,7 @@ int start_main(int argc,char *argv[]) { #if defined(ENABLE_RTPPROXY) //GB28181 rtp推流端口,支持UDP/TCP - UdpRecver recver; - TcpServer::Ptr tcpRtpServer(new TcpServer()); + RtpServer::Ptr rtpServer = std::make_shared(); #endif//defined(ENABLE_RTPPROXY) try { @@ -307,12 +304,8 @@ int start_main(int argc,char *argv[]) { if(shellPort) { shellSrv->start(shellPort); } #if defined(ENABLE_RTPPROXY) - if(rtpPort){ - //创建rtp udp服务器 - recver.initSock(rtpPort); - //创建rtp tcp服务器 - tcpRtpServer->start(rtpPort); - } + //创建rtp服务器 + if(rtpPort){ rtpServer->start(rtpPort); } #endif//defined(ENABLE_RTPPROXY) }catch (std::exception &ex){ diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp new file mode 100644 index 00000000..5e90c1f7 --- /dev/null +++ b/src/Rtp/RtpServer.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#if defined(ENABLE_RTPPROXY) +#include "RtpServer.h" +#include "RtpSelector.h" +namespace mediakit{ + +RtpServer::RtpServer() { +} + +RtpServer::~RtpServer() { + if(_udp_server){ + _udp_server->setOnRead(nullptr); + } +} + +void RtpServer::start(uint16_t local_port, bool enable_tcp, const char *local_ip) { + _udp_server.reset(new Socket(nullptr, false)); + auto &ref = RtpSelector::Instance(); + auto sock = _udp_server; + _udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + ref.inputRtp(sock, buf->data(), buf->size(), addr); + }); + + //创建udp服务器 + if (!_udp_server->bindUdpSock(local_port, local_ip)) { + _udp_server = nullptr; + string err = (StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); + throw std::runtime_error(err); + } + //设置udp socket读缓存 + SockUtil::setRecvBuf(_udp_server->rawFD(), 4 * 1024 * 1024); + + if (enable_tcp) { + try { + //创建tcp服务器 + _tcp_server = std::make_shared(_udp_server->getPoller()); + _tcp_server->start(_udp_server->get_local_port(), local_ip); + } catch (...) { + _tcp_server = nullptr; + _udp_server = nullptr; + throw; + } + } +} + +EventPoller::Ptr RtpServer::getPoller() { + return _udp_server->getPoller(); +} + +uint16_t RtpServer::getPort() { + return _udp_server ? _udp_server->get_local_port() : 0; +} + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h new file mode 100644 index 00000000..c81f9513 --- /dev/null +++ b/src/Rtp/RtpServer.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_RTPSERVER_H +#define ZLMEDIAKIT_RTPSERVER_H + +#if defined(ENABLE_RTPPROXY) +#include +#include "Network/Socket.h" +#include "Network/TcpServer.h" +#include "RtpSession.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit{ + +/** + * RTP服务器,支持UDP/TCP + */ +class RtpServer { +public: + typedef std::shared_ptr Ptr; + typedef function onRecv; + + RtpServer(); + ~RtpServer(); + + /** + * 开启服务器,可能抛异常 + * @param local_port 本地端口,0时为随机端口 + * @param enable_tcp 是否启用tcp服务器 + * @param local_ip 绑定的本地网卡ip + */ + void start(uint16_t local_port, bool enable_tcp = true, const char *local_ip = "0.0.0.0"); + + /** + * 获取绑定的本地端口 + */ + uint16_t getPort(); + + /** + * 获取绑定的线程 + */ + EventPoller::Ptr getPoller(); + +protected: + Socket::Ptr _udp_server; + TcpServer::Ptr _tcp_server; +}; + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) +#endif //ZLMEDIAKIT_RTPSERVER_H diff --git a/src/Rtp/UdpRecver.cpp b/src/Rtp/UdpRecver.cpp deleted file mode 100644 index 2b4a6f3b..00000000 --- a/src/Rtp/UdpRecver.cpp +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). - * - * Use of this source code is governed by MIT license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#if defined(ENABLE_RTPPROXY) -#include "UdpRecver.h" -#include "RtpSelector.h" -namespace mediakit{ - -UdpRecver::UdpRecver() { -} - -UdpRecver::~UdpRecver() { - if(_sock){ - _sock->setOnRead(nullptr); - } -} - -bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) { - _sock.reset(new Socket(nullptr, false)); - onceToken token(nullptr,[&](){ - SockUtil::setRecvBuf(_sock->rawFD(),4 * 1024 * 1024); - }); - - auto &ref = RtpSelector::Instance(); - auto sock = _sock; - _sock->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int ){ - ref.inputRtp(sock, buf->data(),buf->size(),addr); - }); - return _sock->bindUdpSock(local_port,local_ip); -} - -EventPoller::Ptr UdpRecver::getPoller() { - return _sock->getPoller(); -} - -}//namespace mediakit -#endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/UdpRecver.h b/src/Rtp/UdpRecver.h deleted file mode 100644 index 98361064..00000000 --- a/src/Rtp/UdpRecver.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). - * - * Use of this source code is governed by MIT license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef ZLMEDIAKIT_UDPRECVER_H -#define ZLMEDIAKIT_UDPRECVER_H - -#if defined(ENABLE_RTPPROXY) -#include -#include "Network/Socket.h" -using namespace std; -using namespace toolkit; - -namespace mediakit{ - -/** - * 组播接收器 - */ -class UdpRecver { -public: - typedef std::shared_ptr Ptr; - typedef function onRecv; - - UdpRecver(); - virtual ~UdpRecver(); - bool initSock(uint16_t local_port,const char *local_ip = "0.0.0.0"); - EventPoller::Ptr getPoller(); -protected: - Socket::Ptr _sock; -}; - -}//namespace mediakit -#endif//defined(ENABLE_RTPPROXY) -#endif //ZLMEDIAKIT_UDPRECVER_H