diff --git a/AUTHORS b/AUTHORS index 41b3771f..78e72dd2 100644 --- a/AUTHORS +++ b/AUTHORS @@ -45,4 +45,14 @@ Xinghua Zhao <(holychaossword@hotmail.com> 明月惊鹊 cgm <2958580318@qq.com> hejilin <1724010622@qq.com> -alexliyu7352 \ No newline at end of file +alexliyu7352 +cgm <2958580318@qq.com> +[haorui wang](https://github.com/HaoruiWang) +joshuafc +[JayChen0519](https://github.com/JayChen0519) +zx +wangcker +WuPeng +[starry](https://github.com/starry) +[mtdxc](https://github.com/mtdxc) +[胡刚风](https://github.com/hugangfeng333) \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 52b73270..8ed4356d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,12 @@ if(CCACHE_FOUND) set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache) endif(CCACHE_FOUND) +#add_compile_options(-D__STDC_FORMAT_MACROS) +if(CMAKE_COMPILER_IS_GNUCXX) + add_compile_options(-D__STDC_FORMAT_MACROS) + message(STATUS "-D__STDC_FORMAT_MACROS") +endif(CMAKE_COMPILER_IS_GNUCXX) + #set(CMAKE_BUILD_TYPE "Release") if ("${CMAKE_BUILD_TYPE}" STREQUAL "") set(CMAKE_BUILD_TYPE "Debug") diff --git a/README.md b/README.md index b7dc819f..0563b2fc 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,7 @@ bash build_docker_images.sh - 播放器 - [基于wasm支持H265的播放器](https://github.com/numberwolf/h265web.js) - [基于MSE的websocket-fmp4播放器](https://github.com/v354412101/wsPlayer) + - [全国产webrtc sdk(metaRTC)](https://github.com/metartc/metaRTC) ## 授权协议 @@ -263,6 +264,16 @@ bash build_docker_images.sh [cgm](mailto:2958580318@qq.com) [hejilin](mailto:1724010622@qq.com) [alexliyu7352](mailto:liyu7352@gmail.com) +[cgm](mailto:2958580318@qq.com) +[haorui wang](https://github.com/HaoruiWang) +[joshuafc](mailto:joshuafc@foxmail.com) +[JayChen0519](https://github.com/JayChen0519) +[zx](mailto:zuoxue@qq.com) +[wangcker](mailto:wangcker@163.com) +[WuPeng](mailto:wp@zafu.edu.cn) +[starry](https://github.com/starry) +[mtdxc](https://github.com/mtdxc) +[胡刚风](https://github.com/hugangfeng333) ## 使用案例 diff --git a/conf/config.ini b/conf/config.ini index 7094600b..c2d4613f 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -266,7 +266,7 @@ port_range=30000-35000 [rtc] #rtc播放推流、播放超时时间 timeoutSec=15 -#本机对rtc客户端的可见ip,作为服务器时一般为公网ip,置空时,会自动获取网卡ip +#本机对rtc客户端的可见ip,作为服务器时一般为公网ip,可有多个,用','分开,当置空时,会自动获取网卡ip externIP= #rtc udp服务器监听端口号,所有rtc客户端将通过该端口传输stun/dtls/srtp/srtcp数据, #该端口是多线程的,同时支持客户端网络切换导致的连接迁移 diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index aee6ba18..12f8fe4f 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1050,7 +1050,7 @@ "method": "GET", "header": [], "url": { - "raw": "{{ZLMediaKit_URL}}/index/api/startRecord?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs&stamp=1000", + "raw": "{{ZLMediaKit_URL}}/index/api/seekRecordStamp?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs&stamp", "host": [ "{{ZLMediaKit_URL}}" ], @@ -1517,6 +1517,82 @@ }, "response": [] }, + { + "name": "开始tcp passive被动发送rtp(startSendRtpPassive)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/startSendRtpPassive?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=test&ssrc=1", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "startSendRtpPassive" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "vhost", + "value": "{{defaultVhost}}", + "description": "虚拟主机,例如__defaultVhost__" + }, + { + "key": "app", + "value": "live", + "description": "应用名,例如 live" + }, + { + "key": "stream", + "value": "test", + "description": "流id,例如 obs" + }, + { + "key": "ssrc", + "value": "1", + "description": "rtp推流的ssrc,ssrc不同时,可以推流到多个上级服务器" + }, + { + "key": "src_port", + "value": "0", + "description": "指定tcp/udp客户端使用的本地端口,0时为随机端口,该参数非必选参数,不传时为随机端口。", + "disabled": true + }, + { + "key": "from_mp4", + "value": "0", + "description": "是否推送本地MP4录像,该参数非必选参数", + "disabled": true + }, + { + "key": "use_ps", + "value": "1", + "description": "rtp打包采用ps还是es模式,默认采用ps模式,该参数非必选参数", + "disabled": true + }, + { + "key": "pt", + "value": "96", + "description": "rtp payload type,默认96,该参数非必选参数", + "disabled": true + }, + { + "key": "only_audio", + "value": "1", + "description": "rtp es方式打包时,是否只打包音频,该参数非必选参数", + "disabled": true + } + ] + } + }, + "response": [] + }, { "name": "停止 发送rtp(stopSendRtp)", "request": { diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 2224f068..9992cfa6 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1107,6 +1107,7 @@ void installWebApi() { } MediaSourceEvent::SendRtpArgs args; + args.passive = false; args.dst_url = allArgs["dst_url"]; args.dst_port = allArgs["dst_port"]; args.ssrc = allArgs["ssrc"]; @@ -1115,7 +1116,7 @@ void installWebApi() { args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as(); args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as(); - TraceL << "pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; + TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { if (ex) { @@ -1127,6 +1128,34 @@ void installWebApi() { }); }); + api_regist("/index/api/startSendRtpPassive",[](API_ARGS_MAP_ASYNC){ + CHECK_SECRET(); + CHECK_ARGS("vhost", "app", "stream", "ssrc"); + + auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["from_mp4"].as()); + if (!src) { + throw ApiRetException("该媒体流不存在", API::OtherFailed); + } + + MediaSourceEvent::SendRtpArgs args; + args.passive = true; + args.ssrc = allArgs["ssrc"]; + args.is_udp = false; + args.src_port = allArgs["src_port"]; + args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); + args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as(); + args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as(); + TraceL << "startSendRtpPassive, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; + src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { + if (ex) { + val["code"] = API::OtherFailed; + val["msg"] = ex.what(); + } + val["local_port"] = local_port; + invoker(200, headerOut, val.toStyledString()); + }); + }); + api_regist("/index/api/stopSendRtp",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("vhost", "app", "stream"); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index f274491d..dd6d3572 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -94,6 +94,8 @@ public: bool use_ps = true; //发送es流时指定是否只发送纯音频流 bool only_audio = true; + //tcp被动方式 + bool passive = false; // rtp payload type uint8_t pt = 96; // 指定rtp ssrc diff --git a/src/Common/Parser.cpp b/src/Common/Parser.cpp index 3b76adc5..6f9dc810 100644 --- a/src/Common/Parser.cpp +++ b/src/Common/Parser.cpp @@ -8,7 +8,7 @@ * may be found in the AUTHORS file in the root of the source tree. */ -#include +#include #include "Parser.h" #include "macros.h" #include "Network/sockutil.h" diff --git a/src/Http/HlsPlayer.cpp b/src/Http/HlsPlayer.cpp index e401ad2b..e7af45e4 100644 --- a/src/Http/HlsPlayer.cpp +++ b/src/Http/HlsPlayer.cpp @@ -275,6 +275,14 @@ void HlsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) }, poller); } +void HlsDemuxer::pushTask(std::function task) { + int64_t stamp = 0; + if (!_frame_cache.empty()) { + stamp = _frame_cache.back().first; + } + _frame_cache.emplace_back(std::make_pair(stamp, std::move(task))); +} + bool HlsDemuxer::inputFrame(const Frame::Ptr &frame) { //为了避免track准备时间过长, 因此在没准备好之前, 直接消费掉所有的帧 if (!_delegate.isAllTrackReady()) { @@ -287,12 +295,15 @@ bool HlsDemuxer::inputFrame(const Frame::Ptr &frame) { setPlayPosition(frame->dts()); } //根据时间戳缓存frame - _frame_cache.emplace(frame->dts(), Frame::getCacheAbleFrame(frame)); + auto cached_frame = Frame::getCacheAbleFrame(frame); + _frame_cache.emplace_back(std::make_pair(frame->dts(), [cached_frame, this]() { + _delegate.inputFrame(cached_frame); + })); if (getBufferMS() > 30 * 1000) { //缓存超过30秒,强制消费至15秒(减少延时或内存占用) while (getBufferMS() > 15 * 1000) { - _delegate.inputFrame(_frame_cache.begin()->second); + _frame_cache.begin()->second(); _frame_cache.erase(_frame_cache.begin()); } //接着播放缓存中最早的帧 @@ -332,7 +343,7 @@ void HlsDemuxer::onTick() { } //消费掉已经到期的帧 - _delegate.inputFrame(it->second); + it->second(); it = _frame_cache.erase(it); } } @@ -368,8 +379,15 @@ void HlsPlayerImp::onPlayResult(const SockException &ex) { void HlsPlayerImp::onShutdown(const SockException &ex) { if (_demuxer) { + std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); + static_pointer_cast(_demuxer)->pushTask([weak_self, ex]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->PlayerImp::onShutdown(ex); + } + }); + } else { PlayerImp::onShutdown(ex); - _demuxer = nullptr; } } diff --git a/src/Http/HlsPlayer.h b/src/Http/HlsPlayer.h index 75610dcb..7db6316b 100644 --- a/src/Http/HlsPlayer.h +++ b/src/Http/HlsPlayer.h @@ -34,6 +34,7 @@ public: void addTrackCompleted() override { _delegate.addTrackCompleted(); } void resetTracks() override { ((MediaSink &)_delegate).resetTracks(); } std::vector getTracks(bool ready = true) const override { return _delegate.getTracks(ready); } + void pushTask(std::function task); private: void onTick(); @@ -46,7 +47,7 @@ private: toolkit::Ticker _ticker; toolkit::Timer::Ptr _timer; MediaSinkDelegate _delegate; - std::multimap _frame_cache; + std::deque > > _frame_cache; }; class HlsPlayer : public HttpClientImp , public PlayerBase , public HlsParser{ diff --git a/src/Http/HttpClientImp.cpp b/src/Http/HttpClientImp.cpp index 6dcf8437..7b36b46c 100644 --- a/src/Http/HttpClientImp.cpp +++ b/src/Http/HttpClientImp.cpp @@ -16,6 +16,8 @@ namespace mediakit { void HttpClientImp::onConnect(const SockException &ex) { if (!isHttps()) { + //https 302跳转 http时,需要关闭ssl + setDoNotUseSSL(); HttpClient::onConnect(ex); } else { TcpClientWithSSL::onConnect(ex); diff --git a/src/Http/TsplayerImp.cpp b/src/Http/TsplayerImp.cpp index c41abb97..ae988637 100644 --- a/src/Http/TsplayerImp.cpp +++ b/src/Http/TsplayerImp.cpp @@ -45,8 +45,17 @@ void TsPlayerImp::onPlayResult(const SockException &ex) { } void TsPlayerImp::onShutdown(const SockException &ex) { - PlayerImp::onShutdown(ex); - _demuxer = nullptr; + if (_demuxer) { + std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); + static_pointer_cast(_demuxer)->pushTask([weak_self, ex]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->PlayerImp::onShutdown(ex); + } + }); + } else { + PlayerImp::onShutdown(ex); + } } vector TsPlayerImp::getTracks(bool ready) const { diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index fda55c5a..55f04907 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -12,6 +12,7 @@ #include "RtpSender.h" #include "Rtsp/RtspSession.h" #include "Thread/WorkThreadPool.h" +#include "Util/uv_errno.h" #include "RtpCache.h" using namespace std; @@ -19,49 +20,105 @@ using namespace toolkit; namespace mediakit{ +RtpSender::RtpSender() { + _poller = EventPollerPool::Instance().getPoller(); + _socket = Socket::createSocket(_poller, false); +} + void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const function &cb){ _args = args; - _poller = EventPollerPool::Instance().getPoller(); - auto lam = [this](std::shared_ptr> list) { onFlushRtpList(std::move(list)); }; - if (args.use_ps) { - _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt); - } else { - _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); - } - _socket = Socket::createSocket(_poller, false); - weak_ptr weak_self = shared_from_this(); - if (args.is_udp) { - if (args.src_port) { - //指定端口 - _socket->bindUdpSock(args.src_port); + if (!_interface) { + //重连时不重新创建对象 + auto lam = [this](std::shared_ptr> list) { onFlushRtpList(std::move(list)); }; + if (args.use_ps) { + _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt); } else { - auto pr = std::make_pair(std::move(_socket), Socket::createSocket(_poller, false)); - //从端口池获取随机端口 - makeSockPair(pr, "::", true); - _socket = std::move(pr.first); + _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); } + } + + weak_ptr weak_self = shared_from_this(); + if (args.passive) { + // tcp被动发流模式 + _args.is_udp = false; + try { + auto tcp_listener = Socket::createSocket(_poller, false); + if (args.src_port) { + //指定端口 + if (!tcp_listener->listen(args.src_port)) { + throw std::invalid_argument(StrPrinter << "open tcp passive server failed on port:" << args.src_port + << ", err:" << get_uv_errmsg(true)); + } + } else { + auto pr = std::make_pair(tcp_listener, Socket::createSocket(_poller, false)); + //从端口池获取随机端口 + makeSockPair(pr, "::", false, false); + } + // tcp服务器默认开启5秒 + auto delay_task = _poller->doDelayTask(5 * 1000, [tcp_listener, cb]() mutable { + cb(0, SockException(Err_timeout, "wait tcp connection timeout")); + tcp_listener = nullptr; + return 0; + }); + tcp_listener->setOnAccept([weak_self, cb, delay_task](Socket::Ptr &sock, std::shared_ptr &complete) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + //立即关闭tcp服务器 + delay_task->cancel(); + strong_self->_socket = sock; + strong_self->onConnect(); + cb(sock->get_local_port(), SockException()); + InfoL << "accept connection from:" << sock->get_peer_ip() << ":" << sock->get_peer_port(); + }); + InfoL << "start tcp passive server on:" << tcp_listener->get_local_port(); + } catch (std::exception &ex) { + cb(0, SockException(Err_other, ex.what())); + return; + } + return; + } + if (args.is_udp) { auto poller = _poller; - auto local_port = _socket->get_local_port(); - WorkThreadPool::Instance().getPoller()->async([cb, args, weak_self, poller, local_port]() { + WorkThreadPool::Instance().getPoller()->async([cb, args, weak_self, poller]() { struct sockaddr_storage addr; //切换线程目的是为了dns解析放在后台线程执行 if (!SockUtil::getDomainIP(args.dst_url.data(), args.dst_port, addr, AF_INET, SOCK_DGRAM, IPPROTO_UDP)) { - poller->async([args, cb, local_port]() { + poller->async([args, cb]() { //切回自己的线程 - cb(local_port, SockException(Err_dns, StrPrinter << "dns解析域名失败:" << args.dst_url)); + cb(0, SockException(Err_dns, StrPrinter << "dns解析域名失败:" << args.dst_url)); }); return; } //dns解析成功 - poller->async([addr, weak_self, cb, local_port]() { + poller->async([args, addr, weak_self, cb]() { //切回自己的线程 - cb(local_port, SockException()); auto strong_self = weak_self.lock(); - if (strong_self) { - strong_self->_socket->bindPeerAddr((struct sockaddr *)&addr); - strong_self->onConnect(); + if (!strong_self) { + return; } + string ifr_ip = addr.ss_family == AF_INET ? "0.0.0.0" : "::"; + try { + if (args.src_port) { + //指定端口 + if (!strong_self->_socket->bindUdpSock(args.src_port, ifr_ip)) { + throw std::invalid_argument(StrPrinter << "bindUdpSock failed on port:" << args.src_port + << ", err:" << get_uv_errmsg(true)); + } + } else { + auto pr = std::make_pair(strong_self->_socket, Socket::createSocket(strong_self->_poller, false)); + //从端口池获取随机端口 + makeSockPair(pr, ifr_ip, true); + } + } catch (std::exception &ex) { + cb(0, SockException(Err_other, ex.what())); + return; + } + strong_self->_socket->bindPeerAddr((struct sockaddr *)&addr); + strong_self->onConnect(); + cb(strong_self->_socket->get_local_port(), SockException()); }); }); } else { @@ -76,7 +133,7 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct } else { cb(0, err); } - }, 5.0F, "::", args.src_port); + }, 5.0F, "::", args.src_port); } } @@ -147,11 +204,15 @@ void RtpSender::onFlushRtpList(shared_ptr > rtp_list) { void RtpSender::onErr(const SockException &ex, bool is_connect) { _is_connect = false; - //监听socket断开事件,方便重连 - if (is_connect) { - WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what(); + if (_args.passive) { + WarnL << "tcp passive connection lost: " << ex.what(); } else { - WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what(); + //监听socket断开事件,方便重连 + if (is_connect) { + WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what(); + } else { + WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what(); + } } weak_ptr weak_self = shared_from_this(); diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index aba98dd2..3d1485e2 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -21,7 +21,7 @@ class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this public: typedef std::shared_ptr Ptr; - RtpSender() = default; + RtpSender(); ~RtpSender() override = default; /** diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 4893c302..c2f54d5c 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -99,9 +99,6 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_ //随机端口,rtp端口采用偶数 auto pair = std::make_pair(rtp_socket, rtcp_socket); makeSockPair(pair, local_ip, re_use_port); - //取偶数端口 - rtp_socket = pair.first; - rtcp_socket = pair.second; } else if (!rtp_socket->bindUdpSock(local_port, local_ip, re_use_port)) { //用户指定端口 throw std::runtime_error(StrPrinter << "创建rtp端口 " << local_ip << ":" << local_port << " 失败:" << get_uv_errmsg(true)); diff --git a/src/Rtsp/Rtsp.cpp b/src/Rtsp/Rtsp.cpp index 0aeeb63a..e18dfa97 100644 --- a/src/Rtsp/Rtsp.cpp +++ b/src/Rtsp/Rtsp.cpp @@ -312,7 +312,8 @@ string SdpParser::toString() const { return title + video + audio; } -class PortManager : public std::enable_shared_from_this { +template +class PortManager : public std::enable_shared_from_this > { public: PortManager() { static auto func = [](const string &str, int index) { @@ -331,28 +332,44 @@ public: return *instance; } - void bindUdpSock(std::pair &pair, const string &local_ip, bool re_use_port) { + void makeSockPair(std::pair &pair, const string &local_ip, bool re_use_port, bool is_udp) { auto &sock0 = pair.first; auto &sock1 = pair.second; auto sock_pair = getPortPair(); if (!sock_pair) { - throw runtime_error("none reserved udp port in pool"); + throw runtime_error("none reserved port in pool"); } + if (is_udp) { + if (!sock0->bindUdpSock(2 * *sock_pair, local_ip.data(), re_use_port)) { + //分配端口失败 + throw runtime_error("open udp socket[0] failed"); + } - if (!sock0->bindUdpSock(2 * *sock_pair, local_ip.data(), re_use_port)) { - //分配端口失败 - throw runtime_error("open udp socket[0] failed"); + if (!sock1->bindUdpSock(2 * *sock_pair + 1, local_ip.data(), re_use_port)) { + //分配端口失败 + throw runtime_error("open udp socket[1] failed"); + } + + auto on_cycle = [sock_pair](Socket::Ptr &, std::shared_ptr &) {}; + // udp socket没onAccept事件,设置该回调,目的是为了在销毁socket时,回收对象 + sock0->setOnAccept(on_cycle); + sock1->setOnAccept(on_cycle); + } else { + if (!sock0->listen(2 * *sock_pair, local_ip.data())) { + //分配端口失败 + throw runtime_error("listen tcp socket[0] failed"); + } + + if (!sock1->listen(2 * *sock_pair + 1, local_ip.data())) { + //分配端口失败 + throw runtime_error("listen tcp socket[1] failed"); + } + + auto on_cycle = [sock_pair](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {}; + // udp socket没onAccept事件,设置该回调,目的是为了在销毁socket时,回收对象 + sock0->setOnRead(on_cycle); + sock1->setOnRead(on_cycle); } - - if (!sock1->bindUdpSock(2 * *sock_pair + 1, local_ip.data(), re_use_port)) { - //分配端口失败 - throw runtime_error("open udp socket[1] failed"); - } - - auto on_cycle = [sock_pair](Socket::Ptr &, std::shared_ptr &) {}; - // udp socket没onAccept事件,设置该回调,目的是为了在销毁socket时,回收对象 - sock0->setOnAccept(on_cycle); - sock1->setOnAccept(on_cycle); } private: @@ -372,7 +389,7 @@ private: _port_pair_pool.pop_front(); InfoL << "got port from pool:" << 2 * pos << "-" << 2 * pos + 1; - weak_ptr weak_self = shared_from_this(); + weak_ptr weak_self = this->shared_from_this(); std::shared_ptr ret(new uint16_t(pos), [weak_self, pos](uint16_t *ptr) { delete ptr; auto strong_self = weak_self.lock(); @@ -392,20 +409,22 @@ private: deque _port_pair_pool; }; -void makeSockPair(std::pair &pair, const string &local_ip, bool re_use_port) { - //全局互斥锁保护,防止端口重复分配 - static recursive_mutex s_mtx; - lock_guard lck(s_mtx); +void makeSockPair(std::pair &pair, const string &local_ip, bool re_use_port, bool is_udp) { int try_count = 0; while (true) { try { - PortManager::Instance().bindUdpSock(pair, local_ip, re_use_port); + //udp和tcp端口池使用相同算法和范围分配,但是互不相干 + if (is_udp) { + PortManager<0>::Instance().makeSockPair(pair, local_ip, re_use_port, is_udp); + } else { + PortManager<1>::Instance().makeSockPair(pair, local_ip, re_use_port, is_udp); + } break; } catch (exception &ex) { if (++try_count == 3) { throw; } - WarnL << "open udp socket failed:" << ex.what() << ", retry: " << try_count; + WarnL << "open socket failed:" << ex.what() << ", retry: " << try_count; } } } diff --git a/src/Rtsp/Rtsp.h b/src/Rtsp/Rtsp.h index 31e0c79b..4e784dcd 100644 --- a/src/Rtsp/Rtsp.h +++ b/src/Rtsp/Rtsp.h @@ -357,7 +357,7 @@ private: //创建rtp over tcp4个字节的头 toolkit::Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved); //创建rtp-rtcp端口对 -void makeSockPair(std::pair &pair, const std::string &local_ip, bool re_use_port = false); +void makeSockPair(std::pair &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true); //十六进制方式打印ssrc std::string printSSRC(uint32_t ui32Ssrc); diff --git a/webrtc/RtpExt.cpp b/webrtc/RtpExt.cpp index 51e40b01..b9568584 100644 --- a/webrtc/RtpExt.cpp +++ b/webrtc/RtpExt.cpp @@ -189,7 +189,7 @@ map RtpExt::getExtValue(const RtpHeader *header) appendExt(ret, ptr, end); return ret; } - if ((reserved & 0xFFF0) >> 4 == kTwoByteHeader) { + if ((reserved & 0xFFF0) == kTwoByteHeader) { appendExt(ret, ptr, end); return ret; } diff --git a/webrtc/Sdp.cpp b/webrtc/Sdp.cpp index 70e83730..4ed16d26 100644 --- a/webrtc/Sdp.cpp +++ b/webrtc/Sdp.cpp @@ -1591,8 +1591,13 @@ RETRY: if (type == TrackApplication) { RtcMedia answer_media = offer_media; answer_media.role = mathDtlsRole(offer_media.role); +#ifdef ENABLE_SCTP answer_media.direction = matchDirection(offer_media.direction, configure.direction); answer_media.candidate = configure.candidate; + +#else + answer_media.direction = RtpDirection::inactive; +#endif ret->media.emplace_back(answer_media); return; } diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index b6649597..b7deceb3 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -471,10 +471,15 @@ void WebRtcTransportImp::onStartWebRTC() { void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) { //修改answer sdp的ip、端口信息 - GET_CONFIG(string, extern_ip, RTC::kExternIP); + GET_CONFIG_FUNC(std::vector, extern_ips, RTC::kExternIP, [](string str){ + std::vector ret; + if (str.length()) + ret = split(str, ","); + return ret; + }); for (auto &m : sdp.media) { m.addr.reset(); - m.addr.address = extern_ip.empty() ? SockUtil::get_local_ip() : extern_ip; + m.addr.address = extern_ips.empty() ? SockUtil::get_local_ip() : extern_ips[0]; m.rtcp_addr.reset(); m.rtcp_addr.address = m.addr.address; @@ -522,28 +527,48 @@ void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) { } } -void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const { - WebRtcTransport::onRtcConfigure(configure); - //添加接收端口candidate信息 - configure.addCandidate(*getIceCandidate()); -} - -SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{ +SdpAttrCandidate::Ptr makeIceCandidate(std::string ip, uint16_t port, + uint32_t priority = 100, std::string proto = "udp") { auto candidate = std::make_shared(); - candidate->foundation = "udpcandidate"; //rtp端口 candidate->component = 1; - candidate->transport = "udp"; + candidate->transport = proto; + candidate->foundation = proto + "candidate"; //优先级,单candidate时随便 - candidate->priority = 100; - GET_CONFIG(string, extern_ip, RTC::kExternIP); - candidate->address = extern_ip.empty() ? SockUtil::get_local_ip() : extern_ip; - GET_CONFIG(uint16_t, local_port, RTC::kPort); - candidate->port = local_port; + candidate->priority = priority; + candidate->address = ip; + candidate->port = port; candidate->type = "host"; return candidate; } +void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const { + WebRtcTransport::onRtcConfigure(configure); + + GET_CONFIG(uint16_t, local_port, RTC::kPort); + //添加接收端口candidate信息 + GET_CONFIG_FUNC(std::vector, extern_ips, RTC::kExternIP, [](string str){ + std::vector ret; + if (str.length()) + ret = split(str, ","); + return ret; + }); + if (extern_ips.empty()) { + std::string localIp = SockUtil::get_local_ip(); + configure.addCandidate(*makeIceCandidate(localIp, local_port, 120, "udp")); + } + else { + const uint32_t delta = 10; + uint32_t priority = 100 + delta * extern_ips.size(); + for (auto ip : extern_ips) { + configure.addCandidate(*makeIceCandidate(ip, local_port, priority, "udp")); + priority -= delta; + } + } +} + + + /////////////////////////////////////////////////////////////////// class RtpChannel : public RtpTrackImp, public std::enable_shared_from_this { diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index f880c9d9..a5bd4147 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -264,7 +264,6 @@ protected: void updateTicker(); private: - SdpAttrCandidate::Ptr getIceCandidate() const; void onSortedRtp(MediaTrack &track, const std::string &rid, mediakit::RtpPacket::Ptr rtp); void onSendNack(MediaTrack &track, const mediakit::FCI_NACK &nack, uint32_t ssrc); void onSendTwcc(uint32_t ssrc, const std::string &twcc_fci);