diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index 01709f2d..dd15424f 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -211,7 +211,14 @@ API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32 API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data){ assert(ctx && dst_url && ssrc); MediaSource *src = (MediaSource *)ctx; - src->startSendRtp(dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](uint16_t local_port, const SockException &ex){ + + MediaSourceEvent::SendRtpArgs args; + args.dst_url = dst_url; + args.dst_port = dst_port; + args.ssrc = ssrc; + args.is_udp = is_udp; + + src->startSendRtp(args, [cb, user_data](uint16_t local_port, const SockException &ex){ if (cb) { cb(user_data, local_port, ex.getErrCode(), ex.what()); } diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 58f83706..02465890 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -239,8 +239,15 @@ API_EXPORT int API_CALL mk_media_input_audio(mk_media ctx, const void* data, int API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data){ assert(ctx && dst_url && ssrc); MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx; + + MediaSourceEvent::SendRtpArgs args; + args.dst_url = dst_url; + args.dst_port = dst_port; + args.ssrc = ssrc; + args.is_udp = is_udp; + //sender参数无用 - (*obj)->getChannel()->startSendRtp(*MediaSource::NullMediaSource, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](uint16_t local_port, const SockException &ex){ + (*obj)->getChannel()->startSendRtp(*MediaSource::NullMediaSource, args, [cb, user_data](uint16_t local_port, const SockException &ex){ if (cb) { cb(user_data, local_port, ex.getErrCode(), ex.what()); } diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index ee80aa49..035870b6 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1481,6 +1481,24 @@ "value": "0", "description": "是否推送本地MP4录像,该参数非必选参数", "disabled": true + }, + { + "key": "use_ps", + "value": "1", + "description": "rtp打包采用ps还是es模式,默认采用ps模式,该参数非必选参数", + "disabled": true + }, + { + "key": "pt", + "value": "96", + "description": "rtp payload type,默认96,该参数非必选参数", + "disabled": true + }, + { + "key": "only_audio", + "value": "1", + "description": "rtp es方式打包时,是否只打包音频,该参数非必选参数", + "disabled": true } ] } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 4bf7a9fb..16478ca1 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1104,19 +1104,26 @@ void installWebApi() { if (!src) { throw ApiRetException("该媒体流不存在", API::OtherFailed); } - uint8_t pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); - bool use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as(); - bool only_audio = allArgs["only_audio"].empty() ? true : allArgs["only_audio"].as(); - TraceL << "pt "<startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], allArgs["src_port"], [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable{ + + MediaSourceEvent::SendRtpArgs args; + args.dst_url = allArgs["dst_url"]; + args.dst_port = allArgs["dst_port"]; + args.ssrc = allArgs["ssrc"]; + args.is_udp = allArgs["is_udp"]; + args.src_port = allArgs["src_port"]; + args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); + args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as(); + args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as(); + TraceL << "pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; + + src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { if (ex) { val["code"] = API::OtherFailed; val["msg"] = ex.what(); } val["local_port"] = local_port; invoker(200, headerOut, val.toStyledString()); - },pt,use_ps,only_audio); + }); }); api_regist("/index/api/stopSendRtp",[](API_ARGS_MAP){ diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 8c50f3b0..8b29bc91 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -237,13 +237,13 @@ bool MediaSource::isRecording(Recorder::type type){ return listener->isRecording(*this, type); } -void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb,uint8_t pt, bool use_ps,bool only_audio){ +void MediaSource::startSendRtp(const MediaSourceEvent::SendRtpArgs &args, const std::function cb) { auto listener = _listener.lock(); if (!listener) { cb(0, SockException(Err_other, "尚未设置事件监听器")); return; } - return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, src_port, cb, pt, use_ps, only_audio); + return listener->startSendRtp(*this, args, cb); } bool MediaSource::stopSendRtp(const string &ssrc) { @@ -720,12 +720,12 @@ vector MediaSourceEventInterceptor::getMediaTracks(MediaSource &send return listener->getMediaTracks(sender, trackReady); } -void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb, uint8_t pt, bool use_ps,bool only_audio ){ +void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const MediaSourceEvent::SendRtpArgs &args, const std::function cb) { auto listener = _listener.lock(); if (listener) { - listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, src_port, cb, pt, use_ps, only_audio); + listener->startSendRtp(sender, args, cb); } else { - MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, src_port, cb, pt, use_ps, only_audio); + MediaSourceEvent::startSendRtp(sender, args, cb); } } diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 94d1c11a..f97f751d 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -50,7 +50,7 @@ enum class MediaOriginType : uint8_t { std::string getOriginTypeString(MediaOriginType type); class MediaSource; -class MediaSourceEvent{ +class MediaSourceEvent { public: friend class MediaSource; MediaSourceEvent(){}; @@ -85,8 +85,29 @@ public: virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }; // 获取所有track相关信息 virtual std::vector getMediaTracks(MediaSource &sender, bool trackReady = true) const { return std::vector(); }; + + class SendRtpArgs { + public: + // 是否采用udp方式发送rtp + bool is_udp = true; + // rtp采用ps还是es方式 + bool use_ps = true; + //发送es流时指定是否只发送纯音频流 + bool only_audio = true; + // rtp payload type + uint8_t pt = 96; + // 指定rtp ssrc + std::string ssrc; + // 指定本地发送端口 + uint16_t src_port = 0; + // 发送目标端口 + uint16_t dst_port; + // 发送目标主机地址,可以是ip或域名 + std::string dst_url; + }; + // 开始发送ps-rtp - virtual void startSendRtp(MediaSource &sender, const std::string &dst_url, uint16_t dst_port, const std::string &ssrc, bool is_udp, uint16_t src_port, const std::function &cb, uint8_t pt=96, bool use_ps = true,bool only_audio = true) { cb(0, toolkit::SockException(toolkit::Err_other, "not implemented"));}; + virtual void startSendRtp(MediaSource &sender, const SendRtpArgs &args, const std::function cb) { cb(0, toolkit::SockException(toolkit::Err_other, "not implemented"));}; // 停止发送ps-rtp virtual bool stopSendRtp(MediaSource &sender, const std::string &ssrc) {return false; } @@ -117,7 +138,7 @@ public: bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const std::string &custom_path, size_t max_second) override; bool isRecording(MediaSource &sender, Recorder::type type) override; std::vector getMediaTracks(MediaSource &sender, bool trackReady = true) const override; - void startSendRtp(MediaSource &sender, const std::string &dst_url, uint16_t dst_port, const std::string &ssrc, bool is_udp, uint16_t src_port, const std::function &cb, uint8_t pt=96, bool use_ps = true,bool only_audio = true) override; + void startSendRtp(MediaSource &sender, const SendRtpArgs &args, const std::function cb) override; bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override; private: @@ -269,7 +290,7 @@ public: // 获取录制状态 bool isRecording(Recorder::type type); // 开始发送ps-rtp - void startSendRtp(const std::string &dst_url, uint16_t dst_port, const std::string &ssrc, bool is_udp, uint16_t src_port, const std::function &cb , uint8_t pt = 96, bool use_ps = true,bool only_audio = true); + void startSendRtp(const MediaSourceEvent::SendRtpArgs &args, const std::function cb); // 停止发送ps-rtp bool stopSendRtp(const std::string &ssrc); diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 028a765d..7606fb6a 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -213,11 +213,11 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type } } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb ,uint8_t pt, bool use_ps,bool only_audio){ +void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const MediaSourceEvent::SendRtpArgs &args, const std::function cb) { #if defined(ENABLE_RTPPROXY) - RtpSender::Ptr rtp_sender = std::make_shared(atoi(ssrc.data()),pt,use_ps,only_audio); + auto rtp_sender = std::make_shared(); weak_ptr weak_self = shared_from_this(); - rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](uint16_t local_port, const SockException &ex) { + rtp_sender->startSend(args, [args, weak_self, rtp_sender, cb](uint16_t local_port, const SockException &ex) { cb(local_port, ex); auto strong_self = weak_self.lock(); if (!strong_self || ex) { @@ -228,7 +228,7 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const string &dst_url, u } rtp_sender->addTrackCompleted(); lock_guard lck(strong_self->_rtp_sender_mtx); - strong_self->_rtp_sender[ssrc] = rtp_sender; + strong_self->_rtp_sender[args.ssrc] = rtp_sender; }); #else cb(0, SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏")); diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 48588953..46dc563f 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -134,7 +134,7 @@ public: * @param is_udp 是否为udp * @param cb 启动成功或失败回调 */ - void startSendRtp(MediaSource &sender, const std::string &dst_url, uint16_t dst_port, const std::string &ssrc, bool is_udp, uint16_t src_port, const std::function &cb ,uint8_t pt = 96, bool use_ps = true,bool only_audio = true) override; + void startSendRtp(MediaSource &sender, const MediaSourceEvent::SendRtpArgs &args, const std::function cb) override; /** * 停止ps-rtp发送 diff --git a/src/Rtp/RtpCache.h b/src/Rtp/RtpCache.h index fa9da2ae..b393bca4 100644 --- a/src/Rtp/RtpCache.h +++ b/src/Rtp/RtpCache.h @@ -51,7 +51,7 @@ protected: class RtpCacheRaw : public RtpCache, public RawEncoderImp{ public: - RtpCacheRaw(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96,bool sendAudio = true) : RtpCache(std::move(cb)), RawEncoderImp(ssrc, payload_type,sendAudio) {}; + RtpCacheRaw(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96, bool sendAudio = true) : RtpCache(std::move(cb)), RawEncoderImp(ssrc, payload_type,sendAudio) {}; ~RtpCacheRaw() override = default; protected: diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 5d93275f..71f52384 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -19,37 +19,28 @@ using namespace toolkit; namespace mediakit{ -RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type,bool use_ps, bool only_audio) { +void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const function &cb){ + _args = args; _poller = EventPollerPool::Instance().getPoller(); - if (use_ps) { - _interface = std::make_shared( - [this](std::shared_ptr> list) { onFlushRtpList(std::move(list)); }, ssrc, payload_type); - }else{ - _interface = std::make_shared( - [this](std::shared_ptr> list) { onFlushRtpList(std::move(list)); }, ssrc, payload_type,only_audio); + auto lam = [this](std::shared_ptr> list) { onFlushRtpList(std::move(list)); }; + if (args.use_ps) { + _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt); + } else { + _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); } -} - -RtpSender::~RtpSender() {} - -void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb){ - _is_udp = is_udp; _socket = Socket::createSocket(_poller, false); - _dst_url = dst_url; - _dst_port = dst_port; - _src_port = src_port; weak_ptr weak_self = shared_from_this(); - if (is_udp) { - _socket->bindUdpSock(src_port); + if (args.is_udp) { + _socket->bindUdpSock(args.src_port); auto poller = _poller; auto local_port = _socket->get_local_port(); - WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller, local_port]() { + WorkThreadPool::Instance().getPoller()->async([cb, args, weak_self, poller, local_port]() { struct sockaddr addr; //切换线程目的是为了dns解析放在后台线程执行 - if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) { - poller->async([dst_url, cb, local_port]() { + if (!SockUtil::getDomainIP(args.dst_url.data(), args.dst_port, addr)) { + poller->async([args, cb, local_port]() { //切回自己的线程 - cb(local_port, SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url)); + cb(local_port, SockException(Err_dns, StrPrinter << "dns解析域名失败:" << args.dst_url)); }); return; } @@ -66,7 +57,7 @@ void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, }); }); } else { - _socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) { + _socket->connect(args.dst_url, args.dst_port, [cb, weak_self](const SockException &err) { auto strong_self = weak_self.lock(); if (strong_self) { if (!err) { @@ -77,7 +68,7 @@ void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, } else { cb(0, err); } - }, 5.0F, "0.0.0.0", src_port); + }, 5.0F, "0.0.0.0", args.src_port); } } @@ -85,7 +76,7 @@ void RtpSender::onConnect(){ _is_connect = true; //加大发送缓存,防止udp丢包之类的问题 SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024); - if (!_is_udp) { + if (!_args.is_udp) { //关闭tcp no_delay并开启MSG_MORE, 提高发送性能 SockUtil::setNoDelay(_socket->rawFD(), false); _socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); @@ -99,8 +90,8 @@ void RtpSender::onConnect(){ } }); //获取本地端口,断开重连后确保端口不变 - _src_port = _socket->get_local_port(); - InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; + _args.src_port = _socket->get_local_port(); + InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _args.is_udp; } bool RtpSender::addTrack(const Track::Ptr &track){ @@ -128,7 +119,7 @@ void RtpSender::onFlushRtpList(shared_ptr > rtp_list) { return; } - auto is_udp = _is_udp; + auto is_udp = _args.is_udp; auto socket = _socket; _poller->async([rtp_list, is_udp, socket]() { size_t i = 0; @@ -150,9 +141,9 @@ void RtpSender::onErr(const SockException &ex, bool is_connect) { //监听socket断开事件,方便重连 if (is_connect) { - WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what(); + WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what(); } else { - WarnL << "停止发送 rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what(); + WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what(); } weak_ptr weak_self = shared_from_this(); @@ -161,7 +152,7 @@ void RtpSender::onErr(const SockException &ex, bool is_connect) { if (!strong_self) { return false; } - strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, strong_self->_src_port, [weak_self](uint16_t local_port, const SockException &ex){ + strong_self->startSend(strong_self->_args, [weak_self](uint16_t local_port, const SockException &ex){ auto strong_self = weak_self.lock(); if (strong_self && ex) { //连接失败且本对象未销毁,那么重试连接 diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 3eee7d69..aba98dd2 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -21,25 +21,15 @@ class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this public: typedef std::shared_ptr Ptr; - ~RtpSender() override; - - /** - * 构造函数,创建GB28181 RTP发送客户端 - * @param ssrc rtp的ssrc - * @param payload_type 国标中ps-rtp的pt一般为96 - * @param use_ps 是否打包为PS然后发送 - * @param only_audio use_ps 为false 时有效,指定发送音频还是视频 - */ - RtpSender(uint32_t ssrc, uint8_t payload_type = 96,bool use_ps = true,bool only_audio = true); + RtpSender() = default; + ~RtpSender() override = default; /** * 开始发送ps-rtp包 - * @param dst_url 目标ip或域名 - * @param dst_port 目标端口 - * @param is_udp 是否采用udp方式发送rtp + * @param args 发送参数 * @param cb 连接目标端口是否成功的回调 */ - void startSend(const std::string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const std::function &cb); + void startSend(const MediaSourceEvent::SendRtpArgs &args, const std::function &cb); /** * 输入帧数据 @@ -72,11 +62,8 @@ private: void onErr(const toolkit::SockException &ex, bool is_connect = false); private: - bool _is_udp; bool _is_connect = false; - std::string _dst_url; - uint16_t _dst_port; - uint16_t _src_port; + MediaSourceEvent::SendRtpArgs _args; toolkit::Socket::Ptr _socket; toolkit::EventPoller::Ptr _poller; toolkit::Timer::Ptr _connect_timer;