diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index f81cbe61..42b72814 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -159,9 +159,9 @@ API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32 */ typedef void(API_CALL *on_mk_media_source_send_rtp_result)(void *user_data, uint16_t local_port, int err, const char *msg); -//MediaSource::startSendRtp,请参考mk_media_start_send_rtp,注意ctx参数类型不一样 -API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data); -API_EXPORT void API_CALL mk_media_source_start_send_rtp2(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data, on_user_data_free user_data_free); +// MediaSource::startSendRtp,请参考mk_media_start_send_rtp,注意ctx参数类型不一样 +API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int con_type, on_mk_media_source_send_rtp_result cb, void *user_data); +API_EXPORT void API_CALL mk_media_source_start_send_rtp2(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int con_type, on_mk_media_source_send_rtp_result cb, void *user_data, on_user_data_free user_data_free); //MediaSource::stopSendRtp,请参考mk_media_stop_send_rtp,注意ctx参数类型不一样 API_EXPORT int API_CALL mk_media_source_stop_send_rtp(const mk_media_source ctx); diff --git a/api/include/mk_media.h b/api/include/mk_media.h index e7277462..85327dbf 100755 --- a/api/include/mk_media.h +++ b/api/include/mk_media.h @@ -269,12 +269,12 @@ typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result; * @param dst_url 目标ip或域名 * @param dst_port 目标端口 * @param ssrc rtp的ssrc,10进制的字符串打印 - * @param is_udp 是否为udp + * @param con_type 0: tcp主动,1:udp主动,2:tcp被动,3:udp被动 * @param cb 启动成功或失败回调 * @param user_data 回调用户指针 */ -API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data); -API_EXPORT void API_CALL mk_media_start_send_rtp2(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data, on_user_data_free user_data_free); +API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int con_type, on_mk_media_send_rtp_result cb, void *user_data); +API_EXPORT void API_CALL mk_media_start_send_rtp2(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int con_type, on_mk_media_send_rtp_result cb, void *user_data, on_user_data_free user_data_free); /** * 停止某路或全部ps-rtp发送,此api线程安全 diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index 46bc408e..91ab640d 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -295,11 +295,11 @@ API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32 MediaSource *src = (MediaSource *)ctx; return src->seekTo(stamp); } -API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data) { - mk_media_source_start_send_rtp2(ctx, dst_url, dst_port, ssrc, is_udp, cb, user_data, nullptr); +API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int con_type, on_mk_media_source_send_rtp_result cb, void *user_data) { + mk_media_source_start_send_rtp2(ctx, dst_url, dst_port, ssrc, con_type, cb, user_data, nullptr); } -API_EXPORT void API_CALL mk_media_source_start_send_rtp2(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data, on_user_data_free user_data_free){ +API_EXPORT void API_CALL mk_media_source_start_send_rtp2(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int con_type, on_mk_media_source_send_rtp_result cb, void *user_data, on_user_data_free user_data_free){ assert(ctx && dst_url && ssrc); MediaSource *src = (MediaSource *)ctx; @@ -307,7 +307,7 @@ API_EXPORT void API_CALL mk_media_source_start_send_rtp2(const mk_media_source c args.dst_url = dst_url; args.dst_port = dst_port; args.ssrc = ssrc; - args.is_udp = is_udp; + args.con_type = (mediakit::MediaSourceEvent::SendRtpArgs::ConType)con_type; std::shared_ptr ptr(user_data, user_data_free ? user_data_free : [](void *) {}); src->startSendRtp(args, [cb, ptr](uint16_t local_port, const SockException &ex){ diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index fda298af..df59aa0f 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -281,11 +281,11 @@ API_EXPORT int API_CALL mk_media_input_audio(mk_media ctx, const void *data, int return (*obj)->getChannel()->inputAudio((const char*)data, len, dts); } -API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data) { - mk_media_start_send_rtp2(ctx, dst_url, dst_port, ssrc, is_udp, cb, user_data, nullptr); +API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int con_type, on_mk_media_send_rtp_result cb, void *user_data) { + mk_media_start_send_rtp2(ctx, dst_url, dst_port, ssrc, con_type, cb, user_data, nullptr); } -API_EXPORT void API_CALL mk_media_start_send_rtp2(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data, +API_EXPORT void API_CALL mk_media_start_send_rtp2(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int con_type, on_mk_media_send_rtp_result cb, void *user_data, on_user_data_free user_data_free) { assert(ctx && dst_url && ssrc); MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx; @@ -294,7 +294,7 @@ API_EXPORT void API_CALL mk_media_start_send_rtp2(mk_media ctx, const char *dst_ args.dst_url = dst_url; args.dst_port = dst_port; args.ssrc = ssrc; - args.is_udp = is_udp; + args.con_type = (mediakit::MediaSourceEvent::SendRtpArgs::ConType)con_type; // sender参数无用 auto ref = *obj; diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index 66464a82..8b80aede 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1881,7 +1881,7 @@ "response": [] }, { - "name": "开始发送rtp(startSendRtp)", + "name": "开始active模式发送rtp(startSendRtp)", "request": { "method": "GET", "header": [], @@ -1940,7 +1940,7 @@ { "key": "is_udp", "value": "0", - "description": "是否为udp模式,否则为tcp模式" + "description": "1:udp active模式, 0:tcp active模式" }, { "key": "src_port", @@ -1955,9 +1955,9 @@ "disabled": true }, { - "key": "use_ps", + "key": "type", "value": "1", - "description": "rtp打包采用ps还是es模式,默认采用ps模式,该参数非必选参数", + "description": "rtp打包模式,0:es, 1: ps, 2: ts", "disabled": true }, { @@ -1990,7 +1990,7 @@ "response": [] }, { - "name": "开始tcp passive被动发送rtp(startSendRtpPassive)", + "name": "开始passive模式发送rtp(startSendRtpPassive)", "request": { "method": "GET", "header": [], @@ -2030,6 +2030,12 @@ "value": "1", "description": "rtp推流的ssrc,ssrc不同时,可以推流到多个上级服务器" }, + { + "key": "is_udp", + "value": "0", + "disabled": true, + "description": "1:udp passive模式, 0:tcp passive模式" + }, { "key": "src_port", "value": "0", @@ -2043,9 +2049,9 @@ "disabled": true }, { - "key": "use_ps", + "key": "type", "value": "1", - "description": "rtp打包采用ps还是es模式,默认采用ps模式,该参数非必选参数", + "description": "rtp打包模式,0:es, 1: ps, 2: ts", "disabled": true }, { diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 2b08817a..e086fe6c 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1377,10 +1377,7 @@ void installWebApi() { } }); - api_regist("/index/api/startSendRtp",[](API_ARGS_MAP_ASYNC){ - CHECK_SECRET(); - CHECK_ARGS("vhost", "app", "stream", "ssrc", "dst_url", "dst_port", "is_udp"); - + static auto start_send_rtp = [] (bool passive, API_ARGS_MAP_ASYNC) { auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["from_mp4"].as()); if (!src) { throw ApiRetException("can not find the source stream", API::NotFound); @@ -1391,30 +1388,50 @@ void installWebApi() { type = allArgs["use_ps"].as(); } MediaSourceEvent::SendRtpArgs args; - args.passive = false; + if (passive) { + args.con_type = allArgs["is_udp"].as() ? mediakit::MediaSourceEvent::SendRtpArgs::kUdpPassive : mediakit::MediaSourceEvent::SendRtpArgs::kTcpPassive; + } else { + args.con_type = allArgs["is_udp"].as() ? mediakit::MediaSourceEvent::SendRtpArgs::kUdpActive : mediakit::MediaSourceEvent::SendRtpArgs::kTcpActive; + } args.dst_url = allArgs["dst_url"]; args.dst_port = allArgs["dst_port"]; args.ssrc_multi_send = allArgs["ssrc_multi_send"].empty() ? false : allArgs["ssrc_multi_send"].as(); args.ssrc = allArgs["ssrc"]; - args.is_udp = allArgs["is_udp"]; args.src_port = allArgs["src_port"]; args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); - args.type = (MediaSourceEvent::SendRtpArgs::Type)type; + args.data_type = (MediaSourceEvent::SendRtpArgs::DataType)type; args.only_audio = allArgs["only_audio"].as(); args.udp_rtcp_timeout = allArgs["udp_rtcp_timeout"]; args.recv_stream_id = allArgs["recv_stream_id"]; - TraceL << "startSendRtp, pt " << int(args.pt) << " rtp type " << type << " audio " << args.only_audio; - + args.close_delay_ms = allArgs["close_delay_ms"]; src->getOwnerPoller()->async([=]() mutable { - src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { - if (ex) { - val["code"] = API::OtherFailed; - val["msg"] = ex.what(); - } - val["local_port"] = local_port; + try { + src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { + if (ex) { + val["code"] = API::OtherFailed; + val["msg"] = ex.what(); + } + val["local_port"] = local_port; + invoker(200, headerOut, val.toStyledString()); + }); + } catch (std::exception &ex) { + val["code"] = API::Exception; + val["msg"] = ex.what(); invoker(200, headerOut, val.toStyledString()); - }); + } }); + }; + + api_regist("/index/api/startSendRtp",[](API_ARGS_MAP_ASYNC){ + CHECK_SECRET(); + CHECK_ARGS("vhost", "app", "stream", "ssrc", "dst_url", "dst_port", "is_udp"); + start_send_rtp(false, API_ARGS_VALUE, invoker); + }); + + api_regist("/index/api/startSendRtpPassive",[](API_ARGS_MAP_ASYNC){ + CHECK_SECRET(); + CHECK_ARGS("vhost", "app", "stream", "ssrc"); + start_send_rtp(true, API_ARGS_VALUE, invoker); }); api_regist("/index/api/listRtpSender",[](API_ARGS_MAP_ASYNC){ @@ -1437,45 +1454,6 @@ void installWebApi() { }); }); - api_regist("/index/api/startSendRtpPassive",[](API_ARGS_MAP_ASYNC){ - CHECK_SECRET(); - CHECK_ARGS("vhost", "app", "stream", "ssrc"); - - auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["from_mp4"].as()); - if (!src) { - throw ApiRetException("can not find the source stream", API::NotFound); - } - auto type = allArgs["type"].empty() ? (int)MediaSourceEvent::SendRtpArgs::kRtpPS : allArgs["type"].as(); - if (!allArgs["use_ps"].empty()) { - // 兼容之前的use_ps参数 - type = allArgs["use_ps"].as(); - } - - MediaSourceEvent::SendRtpArgs args; - args.passive = true; - args.ssrc = allArgs["ssrc"]; - args.is_udp = false; - args.src_port = allArgs["src_port"]; - args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); - args.type = (MediaSourceEvent::SendRtpArgs::Type)type; - args.only_audio = allArgs["only_audio"].as(); - args.recv_stream_id = allArgs["recv_stream_id"]; - //tcp被动服务器等待链接超时时间 - args.tcp_passive_close_delay_ms = allArgs["close_delay_ms"]; - TraceL << "startSendRtpPassive, pt " << int(args.pt) << " rtp type " << type << " audio " << args.only_audio; - - src->getOwnerPoller()->async([=]() mutable { - src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { - if (ex) { - val["code"] = API::OtherFailed; - val["msg"] = ex.what(); - } - val["local_port"] = local_port; - invoker(200, headerOut, val.toStyledString()); - }); - }); - }); - api_regist("/index/api/stopSendRtp",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); CHECK_ARGS("vhost", "app", "stream"); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 0792b75b..06499698 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -96,18 +96,29 @@ public: class SendRtpArgs { public: - enum Type { kRtpRAW = 0, kRtpPS = 1, kRtpTS = 2 }; - // 是否采用udp方式发送rtp - bool is_udp = true; + enum DataType { + kRtpES = 0, // 发送ES流 + kRtpPS = 1, // 发送PS流 + kRtpTS = 2 // 发送TS流 + }; + + enum ConType { + kTcpActive = 0, // tcp主动模式,tcp客户端主动连接对方并发送rtp + kUdpActive = 1, // udp主动模式,主动发送数据给对方 + kTcpPassive = 2, // tcp被动模式,tcp服务器,等待对方连接并回复rtp + kUdpPassive = 3 // udp被动方式,等待对方发送nat打洞包,然后回复rtp至打洞包源地址 + }; + // rtp类型 - Type type = kRtpPS; - //发送es流时指定是否只发送纯音频流 + DataType data_type = kRtpPS; + // 连接类型 + ConType con_type = kUdpActive; + + // 发送es流时指定是否只发送纯音频流 bool only_audio = false; - //tcp被动方式 - bool passive = false; // rtp payload type uint8_t pt = 96; - //是否支持同ssrc多服务器发送 + // 是否支持同ssrc多服务器发送 bool ssrc_multi_send = false; // 指定rtp ssrc std::string ssrc; @@ -118,16 +129,16 @@ public: // 发送目标主机地址,可以是ip或域名 std::string dst_url; - //udp发送时,是否开启rr rtcp接收超时判断 + // udp发送时,是否开启rr rtcp接收超时判断 bool udp_rtcp_timeout = false; - //tcp被动发送服务器延时关闭事件,单位毫秒;设置为0时,则使用默认值5000ms - uint32_t tcp_passive_close_delay_ms = 0; - //udp 发送时,rr rtcp包接收超时时间,单位毫秒 + // passive被动、tcp主动模式超时时间 + uint32_t close_delay_ms = 0; + // udp 发送时,rr rtcp包接收超时时间,单位毫秒 uint32_t rtcp_timeout_ms = 30 * 1000; - //udp 发送时,发送sr rtcp包间隔,单位毫秒 + // udp 发送时,发送sr rtcp包间隔,单位毫秒 uint32_t rtcp_send_interval_ms = 5 * 1000; - //发送rtp同时接收,一般用于双向语言对讲, 如果不为空,说明开启接收 + // 发送rtp同时接收,一般用于双向语言对讲, 如果不为空,说明开启接收 std::string recv_stream_id; }; diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index cf4c34b3..3f8e7668 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -386,6 +386,17 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE weak_ptr weak_self = shared_from_this(); + rtp_sender->setOnClose([weak_self, ssrc](const toolkit::SockException &ex) { + if (auto strong_self = weak_self.lock()) { + // 可能归属线程发生变更 + strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() { + WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex; + strong_self->_rtp_sender.erase(ssrc); + NOTICE_EMIT(BroadcastSendRtpStoppedArgs, Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex); + }); + } + }); + rtp_sender->startSend(args, [ssrc,ssrc_multi_send, weak_self, rtp_sender, cb, tracks, ring, poller](uint16_t local_port, const SockException &ex) mutable { cb(local_port, ex); auto strong_self = weak_self.lock(); @@ -397,16 +408,6 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE rtp_sender->addTrack(track); } rtp_sender->addTrackCompleted(); - rtp_sender->setOnClose([weak_self, ssrc](const toolkit::SockException &ex) { - if (auto strong_self = weak_self.lock()) { - // 可能归属线程发生变更 - strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() { - WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex; - strong_self->_rtp_sender.erase(ssrc); - NOTICE_EMIT(BroadcastSendRtpStoppedArgs, Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex); - }); - } - }); auto reader = ring->attach(poller); reader->setReadCB([rtp_sender](const Frame::Ptr &frame) { diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index ed0d836e..3012dda7 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -40,86 +40,99 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct if (!_interface) { //重连时不重新创建对象 auto lam = [this](std::shared_ptr> list) { onFlushRtpList(std::move(list)); }; - switch (args.type) { + switch (args.data_type) { case MediaSourceEvent::SendRtpArgs::kRtpPS: _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, true); break; case MediaSourceEvent::SendRtpArgs::kRtpTS: _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, false); break; - case MediaSourceEvent::SendRtpArgs::kRtpRAW: _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); break; - default: CHECK(0, "invalid rtp type:" + to_string(args.type)); break; + case MediaSourceEvent::SendRtpArgs::kRtpES: _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); break; + default: CHECK(0, "invalid rtp type: " + to_string(args.data_type)); break; } } + auto delay_ms = _args.close_delay_ms ? _args.close_delay_ms : 5000; weak_ptr weak_self = shared_from_this(); - if (args.passive) { - // tcp被动发流模式 - _args.is_udp = false; - // 默认等待链接 - bool is_wait = true; - try { - auto tcp_listener = Socket::createSocket(_poller, false); - if (args.src_port) { - //指定端口 - if (!tcp_listener->listen(args.src_port)) { - throw std::invalid_argument(StrPrinter << "open tcp passive server failed on port:" << args.src_port - << ", err:" << get_uv_errmsg(true)); - } - is_wait = true; - } else { - auto pr = std::make_pair(tcp_listener, Socket::createSocket(_poller, false)); - //从端口池获取随机端口 - makeSockPair(pr, "::", false, false); - // 随机端口不等待,保证调用者可以知道端口 - is_wait = false; + if (args.con_type == MediaSourceEvent::SendRtpArgs::kTcpPassive) { + auto tcp_listener = Socket::createSocket(_poller, false); + if (args.src_port) { + // 指定端口 + if (!tcp_listener->listen(args.src_port)) { + throw std::invalid_argument(StrPrinter << "open tcp passive server failed on port: " << args.src_port << ", err: " << get_uv_errmsg(true)); } - // tcp服务器默认开启5秒 - auto delay = _args.tcp_passive_close_delay_ms ? _args.tcp_passive_close_delay_ms : 5000; - auto delay_task = _poller->doDelayTask(delay, [tcp_listener, cb, is_wait]() mutable { - if (is_wait) { - cb(0, SockException(Err_timeout, "wait tcp connection timeout")); - } - tcp_listener = nullptr; - return 0; - }); - tcp_listener->setOnAccept([weak_self, cb, delay_task,is_wait](Socket::Ptr &sock, std::shared_ptr &complete) { - auto strong_self = weak_self.lock(); - if (!strong_self) { - return; - } - //立即关闭tcp服务器 - delay_task->cancel(); - strong_self->_socket_rtp = sock; - strong_self->onConnect(); - if (is_wait) { - cb(sock->get_local_port(), SockException()); - } - InfoL << "accept connection from:" << sock->get_peer_ip() << ":" << sock->get_peer_port(); - }); - InfoL << "start tcp passive server on:" << tcp_listener->get_local_port(); - if (!is_wait) { - // 随机端口马上返回端口,保证调用者知道端口 - cb(tcp_listener->get_local_port(), SockException()); - } - } catch (std::exception &ex) { - cb(0, SockException(Err_other, ex.what())); - return; + } else { + auto pr = std::make_pair(tcp_listener, Socket::createSocket(_poller, false)); + // 从端口池获取随机端口 + makeSockPair(pr, "::", true, false); } - return; - } - if (args.is_udp) { + // 定时器持有tcp_listener,保证超时时间内保持监听 + auto delay_task = _poller->doDelayTask(delay_ms, [weak_self, tcp_listener]() mutable { + // 防止循环引用 + tcp_listener = nullptr; + if (auto strong_self = weak_self.lock()) { + strong_self->onClose(SockException(Err_timeout, "wait tcp connection timeout")); + } + return 0; + }); + tcp_listener->setOnAccept([weak_self, delay_task](Socket::Ptr &sock, std::shared_ptr &complete) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + delay_task->cancel(); + strong_self->_socket_rtp = sock; + strong_self->onConnect(); + InfoL << "accept tcp connection from: " << sock->get_peer_ip() << ":" << sock->get_peer_port(); + }); + InfoL << "start tcp passive server on: " << tcp_listener->get_local_port(); + cb(tcp_listener->get_local_port(), SockException()); + + } else if (args.con_type == MediaSourceEvent::SendRtpArgs::kUdpPassive) { + if (args.src_port) { + // 指定端口 + if (!_socket_rtp->bindUdpSock(args.src_port, "::", true)) { + throw std::invalid_argument(StrPrinter << "open udp passive server failed on port: " << args.src_port << ", err: " << get_uv_errmsg(true)); + } + } else { + auto pr = std::make_pair(_socket_rtp, Socket::createSocket(_poller, false)); + // 从端口池获取随机端口 + makeSockPair(pr, "::", true, true); + } + auto delay_task = _poller->doDelayTask(delay_ms, [weak_self]() mutable { + if (auto strong_self = weak_self.lock()) { + // 关闭端口 + strong_self->_socket_rtp->closeSock(); + strong_self->onClose(SockException(Err_timeout, "wait udp connection timeout")); + } + return 0; + }); + _socket_rtp->setOnRead([weak_self, delay_task](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + delay_task->cancel(); + strong_self->_socket_rtp->bindPeerAddr(addr, addr_len, true); + // 异步执行onConnect,防止在OnRead回调中调用setOnRead + strong_self->_poller->async([strong_self]() { strong_self->onConnect(); }, false); + InfoL << "accept udp connection from: " << strong_self->_socket_rtp->get_peer_ip() << ":" << strong_self->_socket_rtp->get_peer_port(); + }); + InfoL << "start udp passive server on: " << _socket_rtp->get_local_port(); + cb(_socket_rtp->get_local_port(), SockException()); + + } else if (args.con_type == MediaSourceEvent::SendRtpArgs::kUdpActive) { auto poller = _poller; WorkThreadPool::Instance().getPoller()->async([cb, args, weak_self, poller]() { struct sockaddr_storage addr; - //切换线程目的是为了dns解析放在后台线程执行 + // 切换线程目的是为了dns解析放在后台线程执行 if (!SockUtil::getDomainIP(args.dst_url.data(), args.dst_port, addr, AF_INET, SOCK_DGRAM, IPPROTO_UDP)) { poller->async([args, cb]() { - //切回自己的线程 - cb(0, SockException(Err_dns, StrPrinter << "dns解析域名失败:" << args.dst_url)); + // 切回自己的线程 + cb(0, SockException(Err_dns, StrPrinter << "dns resolution failed: " << args.dst_url)); }); return; } - //dns解析成功 + // dns解析成功 poller->async([args, addr, weak_self, cb]() { - //切回自己的线程 + // 切回自己的线程 auto strong_self = weak_self.lock(); if (!strong_self) { return; @@ -127,15 +140,14 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct string ifr_ip = addr.ss_family == AF_INET ? "0.0.0.0" : "::"; try { if (args.src_port) { - //指定端口 - if (!strong_self->_socket_rtp->bindUdpSock(args.src_port, ifr_ip)) { - throw std::invalid_argument(StrPrinter << "bindUdpSock failed on port:" << args.src_port - << ", err:" << get_uv_errmsg(true)); + // 指定端口 + if (!strong_self->_socket_rtp->bindUdpSock(args.src_port, ifr_ip, true)) { + throw std::invalid_argument(StrPrinter << "open udp active client failed on port: " << args.src_port << ", err: " << get_uv_errmsg(true)); } } else { auto pr = std::make_pair(strong_self->_socket_rtp, Socket::createSocket(strong_self->_poller, false)); - //从端口池获取随机端口 - makeSockPair(pr, ifr_ip, true); + // 从端口池获取随机端口 + makeSockPair(pr, ifr_ip, true, true); } } catch (std::exception &ex) { cb(0, SockException(Err_other, ex.what())); @@ -146,19 +158,24 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct cb(strong_self->_socket_rtp->get_local_port(), SockException()); }); }); - } else { - _socket_rtp->connect(args.dst_url, args.dst_port, [cb, weak_self](const SockException &err) { + InfoL << "start udp active send rtp to: " << args.dst_url << ":" << args.dst_port; + + } else if (args.con_type == MediaSourceEvent::SendRtpArgs::kTcpActive) { + _socket_rtp->connect(args.dst_url, args.dst_port,[cb, weak_self](const SockException &err) { auto strong_self = weak_self.lock(); if (strong_self) { if (!err) { - //tcp连接成功 + // tcp连接成功 strong_self->onConnect(); } cb(strong_self->_socket_rtp->get_local_port(), err); } else { cb(0, err); } - }, 5.0F, "::", args.src_port); + }, delay_ms / 1000.0, "::", args.src_port); + InfoL << "start tcp active send rtp to: " << args.dst_url << ":" << args.dst_port; + } else { + CHECK(0, "invalid con type"); } } @@ -168,8 +185,8 @@ void RtpSender::createRtcpSocket() { } _socket_rtcp = Socket::createSocket(_socket_rtp->getPoller(), false); //rtcp端口使用户rtp端口+1 - if(!_socket_rtcp->bindUdpSock(_socket_rtp->get_local_port() + 1, _socket_rtp->get_local_ip(), false)){ - WarnL << "bind rtcp udp socket failed:" << get_uv_errmsg(true); + if(!_socket_rtcp->bindUdpSock(_socket_rtp->get_local_port() + 1, _socket_rtp->get_local_ip(), true)){ + WarnL << "bind rtcp udp socket failed: " << get_uv_errmsg(true); _socket_rtcp = nullptr; return; } @@ -180,12 +197,18 @@ void RtpSender::createRtcpSocket() { _rtcp_context = std::make_shared(); weak_ptr weak_self = shared_from_this(); - _socket_rtcp->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *, int) { + bool bind_addr = false; + _socket_rtcp->setOnRead([weak_self, bind_addr](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable { //接收receive report rtcp auto strong_self = weak_self.lock(); if (!strong_self) { return; } + if (!bind_addr) { + // 收到对方rtcp打洞包后,再回复rtcp + bind_addr = true; + strong_self->_socket_rtcp->bindPeerAddr(addr, addr_len, true); + } auto rtcp_arr = RtcpHeader::loadFromBytes(buf->data(), buf->size()); for (auto &rtcp : rtcp_arr) { strong_self->onRecvRtcp(rtcp); @@ -199,19 +222,19 @@ void RtpSender::onRecvRtcp(RtcpHeader *rtcp) { _rtcp_recv_ticker.resetTime(); } -//连接建立成功事件 -void RtpSender::onConnect(){ +// 连接建立成功事件 +void RtpSender::onConnect() { _is_connect = true; - //加大发送缓存,防止udp丢包之类的问题 + // 加大发送缓存,防止udp丢包之类的问题 SockUtil::setSendBuf(_socket_rtp->rawFD(), 4 * 1024 * 1024); - if (!_args.is_udp) { - //关闭tcp no_delay并开启MSG_MORE, 提高发送性能 + if (_args.con_type == MediaSourceEvent::SendRtpArgs::kTcpActive || _args.con_type == MediaSourceEvent::SendRtpArgs::kTcpPassive) { + // 关闭tcp no_delay并开启MSG_MORE, 提高发送性能 SockUtil::setNoDelay(_socket_rtp->rawFD(), false); _socket_rtp->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } else if (_args.udp_rtcp_timeout) { createRtcpSocket(); } - //连接建立成功事件 + // 连接建立成功事件 weak_ptr weak_self = shared_from_this(); if (!_args.recv_stream_id.empty()) { mINI ini; @@ -226,11 +249,13 @@ void RtpSender::onConnect(){ } try { strong_self->_rtp_session->onRecv(buf); - } catch (std::exception &ex){ + } catch (std::exception &ex) { SockException err(toolkit::Err_shutdown, ex.what()); strong_self->_rtp_session->shutdown(err); } }); + } else { + _socket_rtp->setOnRead(nullptr); } _socket_rtp->setOnErr([weak_self](const SockException &err) { auto strong_self = weak_self.lock(); @@ -238,12 +263,10 @@ void RtpSender::onConnect(){ strong_self->onErr(err); } }); - //获取本地端口,断开重连后确保端口不变 - _args.src_port = _socket_rtp->get_local_port(); - InfoL << "开始发送 rtp:" << _socket_rtp->get_peer_ip() << ":" << _socket_rtp->get_peer_port() << ", 是否为udp方式:" << _args.is_udp; + InfoL << "startSend rtp success: " << _socket_rtp->get_peer_ip() << ":" << _socket_rtp->get_peer_port() << ", data_type: " << _args.data_type << ", con_type: " << _args.con_type; } -bool RtpSender::addTrack(const Track::Ptr &track){ +bool RtpSender::addTrack(const Track::Ptr &track) { if (_args.only_audio && track->getTrackType() == TrackVideo) { // 如果只发送音频则忽略视频 return false; @@ -251,11 +274,11 @@ bool RtpSender::addTrack(const Track::Ptr &track){ return _interface->addTrack(track); } -void RtpSender::addTrackCompleted(){ +void RtpSender::addTrackCompleted() { _interface->addTrackCompleted(); } -void RtpSender::resetTracks(){ +void RtpSender::resetTracks() { _interface->resetTracks(); } @@ -265,13 +288,12 @@ void RtpSender::flush() { } } -//此函数在其他线程执行 bool RtpSender::inputFrame(const Frame::Ptr &frame) { if (_args.only_audio && frame->getTrackType() == TrackVideo) { // 如果只发送音频则忽略视频 return false; } - //连接成功后才做实质操作(节省cpu资源) + // 连接成功后才做实质操作(节省cpu资源) return _is_connect ? _interface->inputFrame(frame) : false; } @@ -283,20 +305,20 @@ void RtpSender::onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check) { _rtcp_context->onRtp(rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, 90000 /*not used*/, rtp->size()); if (!check) { - //减少判断次数 + // 减少判断次数 return; } - //每5秒发送一次rtcp + // 每5秒发送一次rtcp if (_rtcp_send_ticker.elapsedTime() > _args.rtcp_send_interval_ms) { _rtcp_send_ticker.resetTime(); - //rtcp ssrc为rtp ssrc + 1 - auto sr = _rtcp_context->createRtcpSR(atoi(_args.ssrc.data()) + 1); - //send sender report rtcp - _socket_rtcp->send(sr); + // rtcp ssrc为rtp ssrc + 1 + auto sr = _rtcp_context->createRtcpSR(atoi(_args.ssrc.data()) + 1); + // send sender report rtcp + _socket_rtcp->send(sr); } if (_rtcp_recv_ticker.elapsedTime() > _args.rtcp_timeout_ms) { - //接收rr rtcp超时 + // 接收rr rtcp超时 WarnL << "recv rr rtcp timeout"; _rtcp_recv_ticker.resetTime(); onClose(SockException(Err_timeout, "recv rr rtcp timeout")); @@ -306,28 +328,36 @@ void RtpSender::onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check) { void RtpSender::onClose(const SockException &ex) { auto cb = _on_close; if (cb) { - //在下次循环时触发onClose,原因是防止遍历map时删除元素 + // 在下次循环时触发onClose,原因是防止遍历map时删除元素 _poller->async([cb, ex]() { cb(ex); }, false); } } -//此函数在其他线程执行 -void RtpSender::onFlushRtpList(shared_ptr > rtp_list) { - if(!_is_connect){ - //连接成功后才能发送数据 +// 此函数在其他线程执行 +void RtpSender::onFlushRtpList(shared_ptr> rtp_list) { + if (!_is_connect) { + // 连接成功后才能发送数据 return; } size_t i = 0; auto size = rtp_list->size(); rtp_list->for_each([&](Buffer::Ptr &packet) { - if (_args.is_udp) { - onSendRtpUdp(packet, i == 0); - // udp模式,rtp over tcp前4个字节可以忽略 - _socket_rtp->send(std::make_shared(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size); - } else { - // tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 - _socket_rtp->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); + switch (_args.con_type) { + case MediaSourceEvent::SendRtpArgs::kUdpActive: + case MediaSourceEvent::SendRtpArgs::kUdpPassive: { + onSendRtpUdp(packet, i == 0); + // udp模式,rtp over tcp前4个字节可以忽略 + _socket_rtp->send(std::make_shared(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size); + break; + } + case MediaSourceEvent::SendRtpArgs::kTcpActive: + case MediaSourceEvent::SendRtpArgs::kTcpPassive: { + // tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 + _socket_rtp->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); + break; + } + default: CHECK(0); } }); } @@ -338,9 +368,9 @@ void RtpSender::onErr(const SockException &ex) { onClose(ex); } -void RtpSender::setOnClose(std::function on_close){ +void RtpSender::setOnClose(std::function on_close) { _on_close = std::move(on_close); } -}//namespace mediakit -#endif// defined(ENABLE_RTPPROXY) +} // namespace mediakit +#endif // defined(ENABLE_RTPPROXY) diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 0902d12d..3ad09e8d 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -63,6 +63,9 @@ public: */ virtual void resetTracks() override; + /** + * 设置发送rtp停止回调 + */ void setOnClose(std::function on_close); private: