diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index 498e9386..b20e2b4d 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1048,6 +1048,108 @@ } }, "response": [] + }, + { + "name": "开始发送rtp(startSendRtp)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/startSendRtp?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs&ssrc=1&dst_url=127.0.0.1&dst_port=10000&is_udp=0", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "startSendRtp" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "vhost", + "value": "{{defaultVhost}}", + "description": "虚拟主机,例如__defaultVhost__" + }, + { + "key": "app", + "value": "live", + "description": "应用名,例如 live" + }, + { + "key": "stream", + "value": "obs", + "description": "流id,例如 obs" + }, + { + "key": "ssrc", + "value": "1", + "description": "rtp的ssrc" + }, + { + "key": "dst_url", + "value": "127.0.0.1", + "description": "目标ip或域名" + }, + { + "key": "dst_port", + "value": "10000", + "description": "目标端口" + }, + { + "key": "is_udp", + "value": "0", + "description": "是否为udp模式,否则为tcp模式" + } + ] + } + }, + "response": [] + }, + { + "name": "停止 发送rtp(stopSendRtp)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/stopSendRtp?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "stopSendRtp" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "vhost", + "value": "{{defaultVhost}}", + "description": "虚拟主机,例如__defaultVhost__" + }, + { + "key": "app", + "value": "live", + "description": "应用名,例如 live" + }, + { + "key": "stream", + "value": "obs", + "description": "流id,例如 obs" + } + ] + } + }, + "response": [] } ], "event": [ @@ -1074,17 +1176,17 @@ ], "variable": [ { - "id": "0e272976-965b-4f25-8b9e-5916c59234d7", + "id": "ce426571-eb1e-4067-8901-01978c982fed", "key": "ZLMediaKit_URL", "value": "zlmediakit.com:8880" }, { - "id": "321374c3-3357-4405-915e-9cb524d95fbc", + "id": "2d3dfd4a-a39c-47d8-a3e9-37d80352ea5f", "key": "ZLMediaKit_secret", "value": "035c73f7-bb6b-4889-a715-d9eb2d1925cc" }, { - "id": "468ce1f6-ec79-44d2-819e-5cb9f42cd396", + "id": "0aacc473-3a2e-4ef9-b415-e86ce71e0c42", "key": "defaultVhost", "value": "__defaultVhost__" } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index d61547fd..3e2f3f90 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -810,6 +810,45 @@ void installWebApi() { } }); + static auto getMediaSource = [](const string &vhost, const string &app, const string &stream_id){ + auto src = MediaSource::find(RTMP_SCHEMA, vhost, app, stream_id); + if(src){ + return src; + } + return MediaSource::find(RTSP_SCHEMA, vhost, app, stream_id); + }; + + api_regist2("/index/api/startSendRtp",[](API_ARGS2){ + CHECK_SECRET(); + CHECK_ARGS("vhost", "app", "stream", "ssrc", "dst_url", "dst_port", "is_udp"); + + auto src = getMediaSource(allArgs["vhost"], allArgs["app"], allArgs["stream"]); + if (!src) { + throw ApiRetException("该媒体流不存在", API::OtherFailed); + } + + src->startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], [val, headerOut, invoker](const SockException &ex){ + if (ex) { + const_cast(val)["code"] = API::OtherFailed; + const_cast(val)["msg"] = ex.what(); + } + invoker("200 OK", headerOut, val.toStyledString()); + }); + }); + + api_regist1("/index/api/stopSendRtp",[](API_ARGS1){ + CHECK_SECRET(); + CHECK_ARGS("vhost", "app", "stream"); + + auto src = getMediaSource(allArgs["vhost"], allArgs["app"], allArgs["stream"]); + if (!src) { + throw ApiRetException("该媒体流不存在", API::OtherFailed); + } + + val["result"] = src->stopSendRtp(); + }); + + #endif//ENABLE_RTPPROXY // 开始录制hls或MP4 diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index eae1999a..f0eb493d 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -120,6 +120,23 @@ bool MediaSource::isRecording(Recorder::type type){ return listener->isRecording(*this, type); } +void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ + auto listener = _listener.lock(); + if (!listener) { + cb(SockException(Err_other, "尚未设置事件监听器")); + return; + } + return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, cb); +} + +bool MediaSource::stopSendRtp() { + auto listener = _listener.lock(); + if (!listener) { + return false; + } + return listener->stopSendRtp(*this); +} + void MediaSource::for_each_media(const function &cb) { decltype(s_media_source_map) copy; { @@ -553,6 +570,23 @@ vector MediaSourceEventInterceptor::getTracks(MediaSource &sender, b return listener->getTracks(sender, trackReady); } +void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ + auto listener = _listener.lock(); + if (listener) { + listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb); + } else { + MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb); + } +} + +bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender){ + auto listener = _listener.lock(); + if (listener) { + return listener->stopSendRtp(sender); + } + return false; +} + /////////////////////////////////////FlushPolicy////////////////////////////////////// static bool isFlushAble_default(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size) { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 2d7d2ddd..6da20cb5 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -61,6 +61,10 @@ public: virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }; // 获取所有track相关信息 virtual vector getTracks(MediaSource &sender, bool trackReady = true) const { return vector(); }; + // 开始发送ps-rtp + virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) { cb(SockException(Err_other, "not implemented"));}; + // 停止发送ps-rtp + virtual bool stopSendRtp(MediaSource &sender) {return false; } private: Timer::Ptr _async_close_timer; @@ -80,6 +84,8 @@ public: bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool isRecording(MediaSource &sender, Recorder::type type) override; vector getTracks(MediaSource &sender, bool trackReady = true) const override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) override; + bool stopSendRtp(MediaSource &sender) override; protected: std::weak_ptr _listener; @@ -160,6 +166,10 @@ public: bool setupRecord(Recorder::type type, bool start, const string &custom_path); // 获取录制状态 bool isRecording(Recorder::type type); + // 开始发送ps-rtp + void startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb); + // 停止发送ps-rtp + bool stopSendRtp(); ////////////////static方法,查找或生成MediaSource//////////////// diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 9138b61a..94bd49dd 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -255,6 +255,37 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type return _muxer->isRecording(sender,type); } +void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ +#if defined(ENABLE_RTPPROXY) + auto ps_rtp_sender = std::make_shared(ssrc); + weak_ptr weak_self = shared_from_this(); + ps_rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, ps_rtp_sender, cb](const SockException &ex) { + cb(ex); + auto strong_self = weak_self.lock(); + if (!strong_self || ex) { + return; + } + for (auto &track : strong_self->_muxer->getTracks(false)) { + ps_rtp_sender->addTrack(track); + } + ps_rtp_sender->addTrackCompleted(); + strong_self->_ps_rtp_sender = ps_rtp_sender; + }); +#else + cb(SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏")); +#endif//ENABLE_RTPPROXY +} + +bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender){ +#if defined(ENABLE_RTPPROXY) + if (_ps_rtp_sender) { + _ps_rtp_sender = nullptr; + return true; + } +#endif//ENABLE_RTPPROXY + return false; +} + void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) { _muxer->addTrack(track); } @@ -327,21 +358,26 @@ private: Frame::Ptr _frame; }; -void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame) { +void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { GET_CONFIG(bool, modify_stamp, General::kModifyStamp); - if (!modify_stamp) { - //未开启时间戳覆盖 - _muxer->inputFrame(frame); - } else { + auto frame = frame_in; + if (modify_stamp) { //开启了时间戳覆盖 - FrameModifyStamp::Ptr new_frame = std::make_shared(frame, _stamp[frame->getTrackType()]); - //输入时间戳覆盖后的帧 - _muxer->inputFrame(new_frame); + frame = std::make_shared(frame, _stamp[frame->getTrackType()]); } + _muxer->inputFrame(frame); + +#if defined(ENABLE_RTPPROXY) + auto ps_rtp_sender = _ps_rtp_sender; + if (ps_rtp_sender) { + ps_rtp_sender->inputFrame(frame); + } +#endif //ENABLE_RTPPROXY + } bool MultiMediaSourceMuxer::isEnabled(){ - return _muxer->isEnabled(); + return _muxer->isEnabled() || _ps_rtp_sender; } diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 94166c7e..358f9b1b 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -15,6 +15,8 @@ #include "Record/Recorder.h" #include "Record/HlsMediaSource.h" #include "Record/HlsRecorder.h" +#include "Rtp/PSRtpSender.h" + namespace mediakit{ class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this{ @@ -126,6 +128,22 @@ public: */ bool isRecording(MediaSource &sender, Recorder::type type) override; + /** + * 开始发送ps-rtp流 + * @param dst_url 目标ip或域名 + * @param dst_port 目标端口 + * @param ssrc rtp的ssrc + * @param is_udp 是否为udp + * @param cb 启动成功或失败回调 + */ + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) override; + + /** + * 停止ps-rtp发送 + * @return 是否成功 + */ + bool stopSendRtp(MediaSource &sender) override; + /////////////////////////////////MediaSinkInterface override///////////////////////////////// /** @@ -133,7 +151,7 @@ public: * 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系 * @param track 添加音频或视频轨道 */ - void addTrack(const Track::Ptr & track) override; + void addTrack(const Track::Ptr &track) override; /** * 添加track完毕 @@ -162,6 +180,9 @@ private: Stamp _stamp[2]; MultiMuxerPrivate::Ptr _muxer; std::weak_ptr _track_listener; +#if defined(ENABLE_RTPPROXY) + PSRtpSender::Ptr _ps_rtp_sender; +#endif //ENABLE_RTPPROXY }; }//namespace mediakit diff --git a/src/Rtp/PSEncoder.cpp b/src/Rtp/PSEncoder.cpp new file mode 100644 index 00000000..4663dded --- /dev/null +++ b/src/Rtp/PSEncoder.cpp @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#if defined(ENABLE_RTPPROXY) +#include "PSEncoder.h" +#include "Extension/H264.h" +namespace mediakit{ + +PSEncoder::PSEncoder() { + _buffer = std::make_shared(); + init(); +} + +PSEncoder::~PSEncoder() { + +} + +void PSEncoder::init() { + static struct ps_muxer_func_t func = { + /*alloc*/ + [](void *param, size_t bytes) { + PSEncoder *thiz = (PSEncoder *) param; + thiz->_buffer->setCapacity(bytes + 1); + return (void *) thiz->_buffer->data(); + }, + /*free*/ + [](void *param, void *packet) { + //什么也不做 + }, + /*wtite*/ + [](void *param, int stream, void *packet, size_t bytes) { + PSEncoder *thiz = (PSEncoder *) param; + thiz->onPS(thiz->_timestamp, packet, bytes); + } + }; + + _muxer.reset(ps_muxer_create(&func, this), [](struct ps_muxer_t *ptr) { + ps_muxer_destroy(ptr); + }); +} + +void PSEncoder::addTrack(const Track::Ptr &track) { + switch (track->getCodecId()) { + case CodecH264: { + _codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_VIDEO_H264, nullptr, 0); + break; + } + + case CodecH265: { + _codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_VIDEO_H265, nullptr, 0); + break; + } + + case CodecAAC: { + _codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_AUDIO_AAC, nullptr, 0); + break; + } + + case CodecG711A: { + _codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_AUDIO_G711A, nullptr, 0); + break; + } + + case CodecG711U: { + _codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_AUDIO_G711U, nullptr, 0); + break; + } + + case CodecOpus: { + _codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_AUDIO_OPUS, nullptr, 0); + break; + } + + default: WarnL << "mpeg-ps 不支持该编码格式,已忽略:" << track->getCodecName(); break; + } + //尝试音视频同步 + stampSync(); +} + +void PSEncoder::stampSync(){ + if(_codec_to_trackid.size() < 2){ + return; + } + + Stamp *audio = nullptr, *video = nullptr; + for(auto &pr : _codec_to_trackid){ + switch (getTrackType((CodecId) pr.first)){ + case TrackAudio : audio = &pr.second.stamp; break; + case TrackVideo : video = &pr.second.stamp; break; + default : break; + } + } + + if(audio && video){ + //音频时间戳同步于视频,因为音频时间戳被修改后不影响播放 + audio->syncTo(*video); + } +} + +void PSEncoder::resetTracks() { + init(); +} + +void PSEncoder::inputFrame(const Frame::Ptr &frame) { + auto it = _codec_to_trackid.find(frame->getCodecId()); + if (it == _codec_to_trackid.end()) { + return; + } + auto &track_info = it->second; + int64_t dts_out, pts_out; + switch (frame->getCodecId()) { + case CodecH264: { + int type = H264_TYPE(*((uint8_t *) frame->data() + frame->prefixSize())); + if (type == H264Frame::NAL_SEI) { + break; + } + } + case CodecH265: { + //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, + if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { + Frame::Ptr back = _frameCached.back(); + Buffer::Ptr merged_frame = back; + if (_frameCached.size() != 1) { + string merged; + _frameCached.for_each([&](const Frame::Ptr &frame) { + if (frame->prefixSize()) { + merged.append(frame->data(), frame->size()); + } else { + merged.append("\x00\x00\x00\x01", 4); + merged.append(frame->data(), frame->size()); + } + }); + merged_frame = std::make_shared(std::move(merged)); + } + track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out); + _timestamp = dts_out; + ps_muxer_input(_muxer.get(), track_info.track_id, back->keyFrame() ? 0x0001 : 0, pts_out * 90LL, + dts_out * 90LL, merged_frame->data(), merged_frame->size()); + _frameCached.clear(); + } + _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); + } + break; + + case CodecAAC: { + if (frame->prefixSize() == 0) { + WarnL << "必须提供adts头才能mpeg-ps打包"; + break; + } + } + + default: { + track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); + _timestamp = dts_out; + ps_muxer_input(_muxer.get(), track_info.track_id, frame->keyFrame() ? 0x0001 : 0, pts_out * 90LL, + dts_out * 90LL, frame->data(), frame->size()); + } + break; + } +} + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) diff --git a/src/Rtp/PSEncoder.h b/src/Rtp/PSEncoder.h new file mode 100644 index 00000000..3f258ea1 --- /dev/null +++ b/src/Rtp/PSEncoder.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_PSENCODER_H +#define ZLMEDIAKIT_PSENCODER_H +#if defined(ENABLE_RTPPROXY) +#include "mpeg-ps.h" +#include "Common/MediaSink.h" +#include "Common/Stamp.h" +namespace mediakit{ + +//该类实现mpeg-ps容器格式的打包 +class PSEncoder : public MediaSinkInterface { +public: + PSEncoder(); + ~PSEncoder() override; + + /** + * 添加音视频轨道 + */ + void addTrack(const Track::Ptr &track) override; + + /** + * 重置音视频轨道 + */ + void resetTracks() override; + + /** + * 输入帧数据 + */ + void inputFrame(const Frame::Ptr &frame) override; + +protected: + /** + * 输出mpeg-ps的回调函数 + * @param stamp 时间戳,毫秒 + * @param packet 数据指针 + * @param bytes 数据长度 + */ + virtual void onPS(uint32_t stamp, void *packet, size_t bytes) = 0; + +private: + void init(); + //音视频时间戳同步用 + void stampSync(); + +private: + struct track_info { + int track_id = -1; + Stamp stamp; + }; + +private: + uint32_t _timestamp = 0; + BufferRaw::Ptr _buffer; + List _frameCached; + std::shared_ptr _muxer; + unordered_map _codec_to_trackid; +}; + +}//namespace mediakit +#endif //ENABLE_RTPPROXY +#endif //ZLMEDIAKIT_PSENCODER_H diff --git a/src/Rtp/PSRtpSender.cpp b/src/Rtp/PSRtpSender.cpp new file mode 100644 index 00000000..64e3a0cb --- /dev/null +++ b/src/Rtp/PSRtpSender.cpp @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "PSRtpSender.h" +#include "Rtsp/RtspSession.h" +#include "Thread/WorkThreadPool.h" + +namespace mediakit{ + +PSRtpSender::PSRtpSender(uint32_t ssrc, uint8_t payload_type) { + GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); + _rtp_encoder = std::make_shared(CodecInvalid, ssrc, video_mtu, 90000, payload_type, 0); + _rtp_encoder->setRtpRing(std::make_shared()); + _rtp_encoder->getRtpRing()->setDelegate(std::make_shared([this](const RtpPacket::Ptr &rtp, bool is_key){ + onRtp(rtp, is_key); + })); + _poller = EventPollerPool::Instance().getPoller(); + InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); +} + +PSRtpSender::~PSRtpSender() { + InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); +} + +void PSRtpSender::onPS(uint32_t stamp, void *packet, size_t bytes) { + _rtp_encoder->inputFrame(std::make_shared((char *) packet, bytes, stamp)); +} + +void PSRtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ + _is_udp = is_udp; + _socket = std::make_shared(); + _dst_url = dst_url; + _dst_port = dst_port; + weak_ptr weak_self = shared_from_this(); + if (is_udp) { + _socket->bindUdpSock(0); + WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self]() { + //切换线程目的是为了dns解析放在后台线程执行 + struct sockaddr addr; + if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) { + cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url)); + return; + } + cb(SockException()); + auto strong_self = weak_self.lock(); + if (strong_self) { + //dns解析成功 + strong_self->_socket->setSendPeerAddr(&addr); + strong_self->onConnect(); + } + }); + } else { + _socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) { + cb(err); + auto strong_self = weak_self.lock(); + if (strong_self && !err) { + //tcp连接成功 + strong_self->onConnect(); + } + }); + } +} + +void PSRtpSender::onConnect(){ + _is_connect = true; + //加大发送缓存,防止udp丢包之类的问题 + SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024); + if(!_is_udp){ + //关闭tcp no_delay并开启MSG_MORE, 提高发送性能 + SockUtil::setNoDelay(_socket->rawFD(), false); + _socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); + } + //连接建立成功事件 + weak_ptr weak_self = shared_from_this(); + _socket->setOnErr([weak_self](const SockException &err) { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->onErr(err); + } + }); + InfoL << "开始发送 ps rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; +} + +void PSRtpSender::onRtp(const RtpPacket::Ptr &rtp, bool) { + if(!_is_connect){ + return; + } + + //开启合并写提高发送性能 + PacketCache::inputPacket(true, rtp, false); +} + +void PSRtpSender::onFlush(shared_ptr> &rtp_list, bool key_pos) { + int i = 0; + int size = rtp_list->size(); + rtp_list->for_each([&](const RtpPacket::Ptr &packet) { + if (_is_udp) { + //udp模式,rtp over tcp前4个字节可以忽略 + _socket->send(std::make_shared(packet, 4), nullptr, 0, ++i == size); + } else { + //tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 + _socket->send(std::make_shared(packet, 2), nullptr, 0, ++i == size); + } + }); +} + +void PSRtpSender::onErr(const SockException &ex, bool is_connect) { + _is_connect = false; + + //监听socket断开事件,方便重连 + if (is_connect) { + WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what(); + } else { + WarnL << "停止发送 ps rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what(); + } + + weak_ptr weak_self = shared_from_this(); + _connect_timer = std::make_shared(10.0, [weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, [weak_self](const SockException &ex){ + auto strong_self = weak_self.lock(); + if (strong_self && ex) { + //连接失败且本对象未销毁,那么重试连接 + strong_self->onErr(ex, true); + } + }); + return false; + }, _poller); +} + +}//namespace mediakit + diff --git a/src/Rtp/PSRtpSender.h b/src/Rtp/PSRtpSender.h new file mode 100644 index 00000000..cb31230b --- /dev/null +++ b/src/Rtp/PSRtpSender.h @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_PSRTPSENDER_H +#define ZLMEDIAKIT_PSRTPSENDER_H +#if defined(ENABLE_RTPPROXY) +#include "PSEncoder.h" +#include "Extension/CommonRtp.h" + +namespace mediakit{ + +class RingDelegateHelper : public RingDelegate { +public: + typedef function onRtp; + + ~RingDelegateHelper() override{} + RingDelegateHelper(onRtp on_rtp){ + _on_rtp = std::move(on_rtp); + } + void onWrite(const RtpPacket::Ptr &in, bool is_key) override{ + _on_rtp(in, is_key); + } + +private: + onRtp _on_rtp; +}; + +//该类在PSEncoder的基础上,实现了mpeg-ps的rtp打包以及发送 +class PSRtpSender : public PSEncoder, public std::enable_shared_from_this, public PacketCache{ +public: + typedef std::shared_ptr Ptr; + + /** + * 构造函数 + * @param ssrc rtp的ssrc + * @param payload_type 国标中ps-rtp的pt一般为96 + */ + PSRtpSender(uint32_t ssrc, uint8_t payload_type = 96); + ~PSRtpSender() override; + + /** + * 开始发送ps-rtp包 + * @param dst_url 目标ip或域名 + * @param dst_port 目标端口 + * @param is_udp 是否采用udp方式发送rtp + * @param cb 连接目标端口是否成功的回调 + */ + void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb); + +protected: + //mpeg-ps回调 + void onPS(uint32_t stamp, void *packet, size_t bytes) override; + + /** + * 批量flush rtp包时触发该函数 + * @param rtp_list rtp包列表 + * @param key_pos 是否包含关键帧 + */ + void onFlush(std::shared_ptr > &rtp_list, bool key_pos) override; + + +private: + //rtp打包后回调 + void onRtp(const RtpPacket::Ptr &in, bool is_key); + //udp/tcp连接成功回调 + void onConnect(); + //异常断开socket事件 + void onErr(const SockException &ex, bool is_connect = false); + +private: + bool _is_udp; + bool _is_connect = false; + string _dst_url; + uint16_t _dst_port; + Socket::Ptr _socket; + EventPoller::Ptr _poller; + Timer::Ptr _connect_timer; + std::shared_ptr _rtp_encoder; +}; + +}//namespace mediakit +#endif// defined(ENABLE_RTPPROXY) +#endif //ZLMEDIAKIT_PSRTPSENDER_H