From ab9a605a66cda08b87b843e8bff7edeb880e5e40 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 24 Oct 2020 23:33:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Common/MediaSource.cpp | 4 +- src/Common/MediaSource.h | 6 +- src/Common/MultiMediaSourceMuxer.cpp | 24 ++-- src/Common/MultiMediaSourceMuxer.h | 6 +- src/Rtp/GB28181Process.cpp | 116 ++++++++++++++++ src/Rtp/GB28181Process.h | 56 ++++++++ src/Rtp/PSEncoder.cpp | 38 ++++++ src/Rtp/PSEncoder.h | 17 +++ src/Rtp/ProcessInterface.h | 37 +++++ src/Rtp/RtpCache.cpp | 33 +++++ src/Rtp/RtpCache.h | 49 +++++++ src/Rtp/RtpProcess.cpp | 152 +++++++-------------- src/Rtp/RtpProcess.h | 41 ++---- src/Rtp/RtpSelector.cpp | 12 +- src/Rtp/{PSRtpSender.cpp => RtpSender.cpp} | 69 +++++----- src/Rtp/{PSRtpSender.h => RtpSender.h} | 59 ++++---- src/Rtp/RtpServer.cpp | 2 +- src/Rtp/RtpSession.cpp | 2 +- src/Rtp/RtpSession.h | 7 +- 19 files changed, 500 insertions(+), 230 deletions(-) create mode 100644 src/Rtp/GB28181Process.cpp create mode 100644 src/Rtp/GB28181Process.h create mode 100644 src/Rtp/ProcessInterface.h create mode 100644 src/Rtp/RtpCache.cpp create mode 100644 src/Rtp/RtpCache.h rename src/Rtp/{PSRtpSender.cpp => RtpSender.cpp} (66%) rename src/Rtp/{PSRtpSender.h => RtpSender.h} (53%) diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index da4f750d..2d6b04ab 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -179,7 +179,7 @@ bool MediaSource::isRecording(Recorder::type type){ return listener->isRecording(*this, type); } -void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ +void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ auto listener = _listener.lock(); if (!listener) { cb(SockException(Err_other, "尚未设置事件监听器")); @@ -638,7 +638,7 @@ vector MediaSourceEventInterceptor::getTracks(MediaSource &sender, b return listener->getTracks(sender, trackReady); } -void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ +void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ auto listener = _listener.lock(); if (listener) { listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index f2043658..5df31f50 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -83,7 +83,7 @@ public: // 获取所有track相关信息 virtual vector getTracks(MediaSource &sender, bool trackReady = true) const { return vector(); }; // 开始发送ps-rtp - virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) { cb(SockException(Err_other, "not implemented"));}; + virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) { cb(SockException(Err_other, "not implemented"));}; // 停止发送ps-rtp virtual bool stopSendRtp(MediaSource &sender) {return false; } @@ -112,7 +112,7 @@ public: bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool isRecording(MediaSource &sender, Recorder::type type) override; vector getTracks(MediaSource &sender, bool trackReady = true) const override; - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) override; bool stopSendRtp(MediaSource &sender) override; private: @@ -256,7 +256,7 @@ public: // 获取录制状态 bool isRecording(Recorder::type type); // 开始发送ps-rtp - void startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb); + void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb); // 停止发送ps-rtp bool stopSendRtp(); diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index e7f31bdb..5774f45c 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -328,21 +328,21 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type return _muxer->isRecording(sender,type); } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ +void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ #if defined(ENABLE_RTPPROXY) - auto ps_rtp_sender = std::make_shared(ssrc); + RtpSender::Ptr rtp_sender = std::make_shared(atoi(ssrc.data())); weak_ptr weak_self = shared_from_this(); - ps_rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, ps_rtp_sender, cb](const SockException &ex) { + rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, rtp_sender, cb](const SockException &ex) { cb(ex); auto strong_self = weak_self.lock(); if (!strong_self || ex) { return; } for (auto &track : strong_self->_muxer->getTracks(false)) { - ps_rtp_sender->addTrack(track); + rtp_sender->addTrack(track); } - ps_rtp_sender->addTrackCompleted(); - strong_self->_ps_rtp_sender = ps_rtp_sender; + rtp_sender->addTrackCompleted(); + strong_self->_rtp_sender = rtp_sender; }); #else cb(SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏")); @@ -351,8 +351,8 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_ bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender){ #if defined(ENABLE_RTPPROXY) - if (_ps_rtp_sender) { - _ps_rtp_sender = nullptr; + if (_rtp_sender) { + _rtp_sender = nullptr; return true; } #endif//ENABLE_RTPPROXY @@ -441,9 +441,9 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { _muxer->inputFrame(frame); #if defined(ENABLE_RTPPROXY) - auto ps_rtp_sender = _ps_rtp_sender; - if (ps_rtp_sender) { - ps_rtp_sender->inputFrame(frame); + auto rtp_sender = _rtp_sender; + if (rtp_sender) { + rtp_sender->inputFrame(frame); } #endif //ENABLE_RTPPROXY @@ -451,7 +451,7 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { bool MultiMediaSourceMuxer::isEnabled(){ #if defined(ENABLE_RTPPROXY) - return (_muxer->isEnabled() || _ps_rtp_sender); + return (_muxer->isEnabled() || _rtp_sender); #else return _muxer->isEnabled(); #endif //ENABLE_RTPPROXY diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 475268d1..11ef0e81 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -12,7 +12,7 @@ #define ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H #include "Common/Stamp.h" -#include "Rtp/PSRtpSender.h" +#include "Rtp/RtpSender.h" #include "Record/Recorder.h" #include "Record/HlsRecorder.h" #include "Record/HlsMediaSource.h" @@ -142,7 +142,7 @@ public: * @param is_udp 是否为udp * @param cb 启动成功或失败回调 */ - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) override; /** * 停止ps-rtp发送 @@ -187,7 +187,7 @@ private: MultiMuxerPrivate::Ptr _muxer; std::weak_ptr _track_listener; #if defined(ENABLE_RTPPROXY) - PSRtpSender::Ptr _ps_rtp_sender; + RtpSender::Ptr _rtp_sender; #endif //ENABLE_RTPPROXY }; diff --git a/src/Rtp/GB28181Process.cpp b/src/Rtp/GB28181Process.cpp new file mode 100644 index 00000000..cc8255a8 --- /dev/null +++ b/src/Rtp/GB28181Process.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#if defined(ENABLE_RTPPROXY) +#include "GB28181Process.h" +#include "Util/File.h" +#include "Http/HttpTSPlayer.h" +#include "Extension/CommonRtp.h" + +namespace mediakit{ + +//判断是否为ts负载 +static inline bool checkTS(const uint8_t *packet, int bytes){ + return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE; +} + +GB28181Process::GB28181Process(const MediaInfo &media_info, MediaSinkInterface *interface) { + assert(interface); + _media_info = media_info; + _interface = interface; +} + +GB28181Process::~GB28181Process() {} + +bool GB28181Process::inputRtp(bool, const char *data, int data_len) { + return handleOneRtp(0, TrackVideo, 90000, (unsigned char *) data, data_len); +} + +void GB28181Process::onRtpSorted(const RtpPacket::Ptr &rtp, int) { + if (!_rtp_decoder) { + switch (rtp->PT) { + case 33: + case 96: { + //ts或ps负载 + _rtp_decoder = std::make_shared(CodecInvalid, 256 * 1024); + + //设置dump目录 + GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); + if (!dump_dir.empty()) { + auto save_path = File::absolutePath(_media_info._streamid + ".mp2", dump_dir); + _save_file_ps.reset(File::create_file(save_path.data(), "wb"), [](FILE *fp) { + if (fp) { + fclose(fp); + } + }); + } + break; + } + + default: + WarnL << "不支持的rtp负载类型:" << (int) rtp->PT; + return; + } + + //设置frame回调 + _rtp_decoder->addDelegate(std::make_shared([this](const Frame::Ptr &frame) { + onRtpDecode(frame); + })); + } + + //解码rtp + _rtp_decoder->inputRtp(rtp, false); +} + +const char *GB28181Process::onSearchPacketTail(const char *packet,int bytes){ + try { + auto ret = _decoder->input((uint8_t *) packet, bytes); + if (ret > 0) { + return packet + ret; + } + return nullptr; + } catch (std::exception &ex) { + InfoL << "解析ps或ts异常: bytes=" << bytes + << " ,exception=" << ex.what() + << " ,hex=" << hexdump((uint8_t *) packet, bytes); + if (remainDataSize() > 256 * 1024) { + //缓存太多数据无法处理则上抛异常 + throw; + } + return nullptr; + } +} + +void GB28181Process::onRtpDecode(const Frame::Ptr &frame) { + //这是TS或PS + if (_save_file_ps) { + fwrite(frame->data(), frame->size(), 1, _save_file_ps.get()); + } + + if (!_decoder) { + //创建解码器 + if (checkTS((uint8_t *) frame->data(), frame->size())) { + //猜测是ts负载 + InfoL << _media_info._streamid << " judged to be TS"; + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _interface); + } else { + //猜测是ps负载 + InfoL << _media_info._streamid << " judged to be PS"; + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, _interface); + } + } + + if (_decoder) { + HttpRequestSplitter::input(frame->data(), frame->size()); + } +} + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/GB28181Process.h b/src/Rtp/GB28181Process.h new file mode 100644 index 00000000..1dc2ee8b --- /dev/null +++ b/src/Rtp/GB28181Process.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_GB28181ROCESS_H +#define ZLMEDIAKIT_GB28181ROCESS_H + +#if defined(ENABLE_RTPPROXY) + +#include "Decoder.h" +#include "ProcessInterface.h" +#include "Rtsp/RtpCodec.h" +#include "Rtsp/RtpReceiver.h" +#include "Http/HttpRequestSplitter.h" + +namespace mediakit{ + +class GB28181Process : public HttpRequestSplitter, public RtpReceiver, public ProcessInterface{ +public: + typedef std::shared_ptr Ptr; + GB28181Process(const MediaInfo &media_info, MediaSinkInterface *interface); + ~GB28181Process() override; + + /** + * 输入rtp + * @param data rtp数据指针 + * @param data_len rtp数据长度 + * @return 是否解析成功 + */ + bool inputRtp(bool, const char *data, int data_len) override; + +protected: + void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; + const char *onSearchPacketTail(const char *data,int len) override; + int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; }; + +private: + void onRtpDecode(const Frame::Ptr &frame); + +private: + MediaInfo _media_info; + DecoderImp::Ptr _decoder; + MediaSinkInterface *_interface; + std::shared_ptr _save_file_ps; + std::shared_ptr _rtp_decoder; +}; + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) +#endif //ZLMEDIAKIT_GB28181ROCESS_H diff --git a/src/Rtp/PSEncoder.cpp b/src/Rtp/PSEncoder.cpp index b6688af0..f2b47693 100644 --- a/src/Rtp/PSEncoder.cpp +++ b/src/Rtp/PSEncoder.cpp @@ -167,5 +167,43 @@ void PSEncoder::inputFrame(const Frame::Ptr &frame) { } } +//////////////////////////////////////////////////////////////////////////////////////////////// + +class RingDelegateHelper : public RingDelegate { +public: + typedef function onRtp; + + ~RingDelegateHelper() override{} + RingDelegateHelper(onRtp on_rtp){ + _on_rtp = std::move(on_rtp); + } + void onWrite(RtpPacket::Ptr in, bool is_key) override{ + _on_rtp(std::move(in), is_key); + } + +private: + onRtp _on_rtp; +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type) { + GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); + _rtp_encoder = std::make_shared(CodecInvalid, ssrc, video_mtu, 90000, payload_type, 0); + _rtp_encoder->setRtpRing(std::make_shared()); + _rtp_encoder->getRtpRing()->setDelegate(std::make_shared([this](RtpPacket::Ptr rtp, bool is_key){ + onRTP(std::move(rtp)); + })); + InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); +} + +PSEncoderImp::~PSEncoderImp() { + InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); +} + +void PSEncoderImp::onPS(uint32_t stamp, void *packet, size_t bytes) { + _rtp_encoder->inputFrame(std::make_shared((char *) packet, bytes, stamp)); +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) diff --git a/src/Rtp/PSEncoder.h b/src/Rtp/PSEncoder.h index 5b63d3fc..4e643cc6 100644 --- a/src/Rtp/PSEncoder.h +++ b/src/Rtp/PSEncoder.h @@ -14,6 +14,7 @@ #include "mpeg-ps.h" #include "Common/MediaSink.h" #include "Common/Stamp.h" +#include "Extension/CommonRtp.h" namespace mediakit{ //该类实现mpeg-ps容器格式的打包 @@ -65,6 +66,22 @@ private: unordered_map _codec_to_trackid; }; +class PSEncoderImp : public PSEncoder{ +public: + PSEncoderImp(uint32_t ssrc, uint8_t payload_type = 96); + ~PSEncoderImp() override; + +protected: + //rtp打包后回调 + virtual void onRTP(Buffer::Ptr rtp) = 0; + +protected: + void onPS(uint32_t stamp, void *packet, size_t bytes) override; + +private: + std::shared_ptr _rtp_encoder; +}; + }//namespace mediakit #endif //ENABLE_RTPPROXY #endif //ZLMEDIAKIT_PSENCODER_H diff --git a/src/Rtp/ProcessInterface.h b/src/Rtp/ProcessInterface.h new file mode 100644 index 00000000..bf6c2d98 --- /dev/null +++ b/src/Rtp/ProcessInterface.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + + +#ifndef ZLMEDIAKIT_PROCESSINTERFACE_H +#define ZLMEDIAKIT_PROCESSINTERFACE_H + +#include +#include + +namespace mediakit { + +class ProcessInterface { +public: + using Ptr = std::shared_ptr; + ProcessInterface() = default; + virtual ~ProcessInterface() = default; + + /** + * 输入rtp + * @param is_udp 是否为udp模式 + * @param data rtp数据指针 + * @param data_len rtp数据长度 + * @return 是否解析成功 + */ + virtual bool inputRtp(bool is_udp, const char *data, int data_len) = 0; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_PROCESSINTERFACE_H diff --git a/src/Rtp/RtpCache.cpp b/src/Rtp/RtpCache.cpp new file mode 100644 index 00000000..0ff2961e --- /dev/null +++ b/src/Rtp/RtpCache.cpp @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "RtpCache.h" + +namespace mediakit{ + +RtpCache::RtpCache(onFlushed cb) { + _cb = std::move(cb); +} + +void RtpCache::onFlush(std::shared_ptr > rtp_list, bool) { + _cb(std::move(rtp_list)); +} + +void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer) { + inputPacket(stamp, true, std::move(buffer), false); +} + +void RtpCachePS::onRTP(Buffer::Ptr buffer) { + auto rtp = static_pointer_cast(buffer); + auto stamp = rtp->timeStamp; + input(stamp, std::move(buffer)); +} + +}//namespace mediakit diff --git a/src/Rtp/RtpCache.h b/src/Rtp/RtpCache.h new file mode 100644 index 00000000..95545f79 --- /dev/null +++ b/src/Rtp/RtpCache.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_RTPCACHE_H +#define ZLMEDIAKIT_RTPCACHE_H + +#include "PSEncoder.h" +#include "Extension/CommonRtp.h" + +namespace mediakit{ + +class RtpCache : public PacketCache { +public: + using onFlushed = function >)>; + RtpCache(onFlushed cb); + ~RtpCache() override = default; + +protected: + /** + * 输入rtp(目的是为了合并写) + * @param buffer rtp数据 + */ + void input(uint64_t stamp, Buffer::Ptr buffer); + +protected: + void onFlush(std::shared_ptr > rtp_list, bool) override; + +private: + onFlushed _cb; +}; + +class RtpCachePS : public RtpCache, public PSEncoderImp{ +public: + RtpCachePS(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96) : RtpCache(std::move(cb)), PSEncoderImp(ssrc, payload_type) {}; + ~RtpCachePS() override = default; + +protected: + void onRTP(Buffer::Ptr rtp) override; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_RTPCACHE_H diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 23b4c754..1713574c 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -9,14 +9,17 @@ */ #if defined(ENABLE_RTPPROXY) +#include "GB28181Process.h" #include "RtpProcess.h" +#include "RtpSplitter.h" #include "Util/File.h" #include "Http/HttpTSPlayer.h" + #define RTP_APP_NAME "rtp" -namespace mediakit{ +namespace mediakit { -static string printAddress(const struct sockaddr *addr){ +static string printAddress(const struct sockaddr *addr) { return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port); } @@ -26,20 +29,11 @@ RtpProcess::RtpProcess(const string &stream_id) { _media_info._app = RTP_APP_NAME; _media_info._streamid = stream_id; - GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); + GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); { FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr; - if(fp){ - _save_file_rtp.reset(fp,[](FILE *fp){ - fclose(fp); - }); - } - } - - { - FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".mp2", dump_dir).data(), "wb") : nullptr; - if(fp){ - _save_file_ps.reset(fp,[](FILE *fp){ + if (fp) { + _save_file_rtp.reset(fp, [](FILE *fp) { fclose(fp); }); } @@ -47,20 +41,16 @@ RtpProcess::RtpProcess(const string &stream_id) { { FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr; - if(fp){ - _save_file_video.reset(fp,[](FILE *fp){ + if (fp) { + _save_file_video.reset(fp, [](FILE *fp) { fclose(fp); }); } } - _rtp_decoder = std::make_shared(CodecInvalid, 256 * 1024); - _rtp_decoder->addDelegate(std::make_shared([this](const Frame::Ptr &frame){ - onRtpDecode((uint8_t *) frame->data(), frame->size(), frame->dts()); - })); } RtpProcess::~RtpProcess() { - uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000; + uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000; WarnP(this) << "RTP推流器(" << _media_info._vhost << "/" << _media_info._app << "/" @@ -79,95 +69,47 @@ RtpProcess::~RtpProcess() { } } -bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { - GET_CONFIG(bool,check_source,RtpProxy::kCheckSource); +bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, int len, const struct sockaddr *addr, uint32_t *dts_out) { + GET_CONFIG(bool, check_source, RtpProxy::kCheckSource); //检查源是否合法 - if(!_addr){ + if (!_addr) { _addr = new struct sockaddr; _sock = sock; - memcpy(_addr,addr, sizeof(struct sockaddr)); + memcpy(_addr, addr, sizeof(struct sockaddr)); DebugP(this) << "bind to address:" << printAddress(_addr); //推流鉴权 emitOnPublish(); } - if(!_muxer){ + if (!_muxer) { //无权限推流 return false; } - if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){ + if (check_source && memcmp(_addr, addr, sizeof(struct sockaddr)) != 0) { DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr); return false; } - _total_bytes += data_len; - bool ret = handleOneRtp(0, TrackVideo, 90000, (unsigned char *) data, data_len); - if(dts_out){ + _total_bytes += len; + if (_save_file_rtp) { + uint16_t size = len; + size = htons(size); + fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get()); + fwrite((uint8_t *) data, len, 1, _save_file_rtp.get()); + } + if (!_process) { + _process = std::make_shared(_media_info, this); + } + bool ret = _process ? _process->inputRtp(is_udp, data, len) : false; + if (dts_out) { *dts_out = _dts; } return ret; } -//判断是否为ts负载 -static inline bool checkTS(const uint8_t *packet, int bytes){ - return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE; -} - -void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { - if(rtp->sequence != (uint16_t)(_sequence + 1) && _sequence != 0){ - WarnP(this) << "rtp丢包:" << rtp->sequence << " != " << _sequence << "+1" << ",公网环境下请使用tcp方式推流"; - } - _sequence = rtp->sequence; - if(_save_file_rtp){ - uint16_t size = rtp->size() - 4; - size = htons(size); - fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get()); - fwrite((uint8_t *) rtp->data() + 4, rtp->size() - 4, 1, _save_file_rtp.get()); - } - _rtp_decoder->inputRtp(rtp); -} - -const char *RtpProcess::onSearchPacketTail(const char *packet,int bytes){ - try { - auto ret = _decoder->input((uint8_t *) packet, bytes); - if (ret > 0) { - return packet + ret; - } - return nullptr; - } catch (std::exception &ex) { - InfoL << "解析ps或ts异常: bytes=" << bytes - << " ,exception=" << ex.what() - << " ,hex=" << hexdump((uint8_t *) packet, bytes); - return nullptr; - } -} - -void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp) { - if(_save_file_ps){ - fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get()); - } - - if (!_decoder) { - //创建解码器 - if (checkTS(packet, bytes)) { - //猜测是ts负载 - InfoP(this) << "judged to be TS"; - _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this); - } else { - //猜测是ps负载 - InfoP(this) << "judged to be PS"; - _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, this); - } - } - - if (_decoder) { - HttpRequestSplitter::input((char *) packet, bytes); - } -} - -void RtpProcess::inputFrame(const Frame::Ptr &frame){ - _last_rtp_time.resetTime(); +void RtpProcess::inputFrame(const Frame::Ptr &frame) { + _last_frame_time.resetTime(); _dts = frame->dts(); if (_save_file_video && frame->getTrackType() == TrackVideo) { fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get()); @@ -175,20 +117,20 @@ void RtpProcess::inputFrame(const Frame::Ptr &frame){ _muxer->inputFrame(frame); } -void RtpProcess::addTrack(const Track::Ptr & track){ +void RtpProcess::addTrack(const Track::Ptr &track) { _muxer->addTrack(track); } bool RtpProcess::alive() { - GET_CONFIG(int,timeoutSec,RtpProxy::kTimeoutSec) - if(_last_rtp_time.elapsedTime() / 1000 < timeoutSec){ + GET_CONFIG(int, timeoutSec, RtpProxy::kTimeoutSec) + if (_last_frame_time.elapsedTime() / 1000 < timeoutSec) { return true; } return false; } -void RtpProcess::onDetach(){ - if(_on_detach){ +void RtpProcess::onDetach() { + if (_on_detach) { _on_detach(); } } @@ -198,45 +140,45 @@ void RtpProcess::setOnDetach(const function &cb) { } string RtpProcess::get_peer_ip() { - if(_addr){ + if (_addr) { return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); } return "0.0.0.0"; } uint16_t RtpProcess::get_peer_port() { - if(!_addr){ + if (!_addr) { return 0; } return ntohs(((struct sockaddr_in *) _addr)->sin_port); } string RtpProcess::get_local_ip() { - if(_sock){ + if (_sock) { return _sock->get_local_ip(); } return "0.0.0.0"; } uint16_t RtpProcess::get_local_port() { - if(_sock){ - return _sock->get_local_port(); + if (_sock) { + return _sock->get_local_port(); } return 0; } -string RtpProcess::getIdentifier() const{ +string RtpProcess::getIdentifier() const { return _media_info._streamid; } -int RtpProcess::totalReaderCount(){ +int RtpProcess::totalReaderCount() { return _muxer ? _muxer->totalReaderCount() : 0; } -void RtpProcess::setListener(const std::weak_ptr &listener){ - if(_muxer){ +void RtpProcess::setListener(const std::weak_ptr &listener) { + if (_muxer) { _muxer->setMediaListener(listener); - }else{ + } else { _listener = listener; } } @@ -262,7 +204,7 @@ void RtpProcess::emitOnPublish() { //触发推流鉴权事件 auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); - if(!flag){ + if (!flag) { //该事件无人监听,默认不鉴权 GET_CONFIG(bool, toHls, General::kPublishToHls); GET_CONFIG(bool, toMP4, General::kPublishToMP4); diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 7715eda0..807a3e49 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -12,18 +12,14 @@ #define ZLMEDIAKIT_RTPPROCESS_H #if defined(ENABLE_RTPPROXY) +#include "ProcessInterface.h" +#include "Common/MultiMediaSourceMuxer.h" -#include "Rtsp/RtpReceiver.h" -#include "Decoder.h" -#include "Common/Device.h" -#include "Common/Stamp.h" -#include "Http/HttpRequestSplitter.h" -#include "Extension/CommonRtp.h" using namespace mediakit; -namespace mediakit{ +namespace mediakit { -class RtpProcess : public HttpRequestSplitter, public RtpReceiver, public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this{ +class RtpProcess : public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; friend class RtpProcessHelper; @@ -32,14 +28,15 @@ public: /** * 输入rtp + * @param is_udp 是否为udp模式 * @param sock 本地监听的socket * @param data rtp数据指针 - * @param data_len rtp数据长度 + * @param len rtp数据长度 * @param addr 数据源地址 * @param dts_out 解析出最新的dts * @return 是否解析成功 */ - bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); + bool inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, int len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); /** * 是否超时,用于超时移除对象 @@ -67,34 +64,26 @@ public: void setListener(const std::weak_ptr &listener); protected: - void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; void inputFrame(const Frame::Ptr &frame) override; void addTrack(const Track::Ptr & track) override; void resetTracks() override {}; - const char *onSearchPacketTail(const char *data,int len) override; - int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; }; - private: void emitOnPublish(); - void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp); private: - std::shared_ptr _rtp_decoder; - std::shared_ptr _save_file_rtp; - std::shared_ptr _save_file_ps; - std::shared_ptr _save_file_video; - struct sockaddr *_addr = nullptr; - uint16_t _sequence = 0; - MultiMediaSourceMuxer::Ptr _muxer; - Ticker _last_rtp_time; uint32_t _dts = 0; - DecoderImp::Ptr _decoder; - std::weak_ptr _listener; - MediaInfo _media_info; uint64_t _total_bytes = 0; + struct sockaddr *_addr = nullptr; Socket::Ptr _sock; + MediaInfo _media_info; + Ticker _last_frame_time; function _on_detach; + std::shared_ptr _save_file_rtp; + std::shared_ptr _save_file_video; + std::weak_ptr _listener; + ProcessInterface::Ptr _process; + MultiMediaSourceMuxer::Ptr _muxer; }; }//namespace mediakit diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index b895de25..0f4196c5 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -9,7 +9,9 @@ */ #if defined(ENABLE_RTPPROXY) +#include #include "RtpSelector.h" +#include "RtpSplitter.h" namespace mediakit{ @@ -22,17 +24,19 @@ void RtpSelector::clear(){ bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len, const struct sockaddr *addr,uint32_t *dts_out) { - //使用ssrc为流id uint32_t ssrc = 0; if (!getSSRC(data, data_len, ssrc)) { WarnL << "get ssrc from rtp failed:" << data_len; return false; } - - //假定指定了流id,那么通过流id来区分是否为一路流(哪怕可能同时收到多路流) auto process = getProcess(printSSRC(ssrc), true); if (process) { - return process->inputRtp(sock, data, data_len, addr, dts_out); + try { + return process->inputRtp(true, sock, data, data_len, addr, dts_out); + } catch (...) { + delProcess(printSSRC(ssrc), process.get()); + throw; + } } return false; } diff --git a/src/Rtp/PSRtpSender.cpp b/src/Rtp/RtpSender.cpp similarity index 66% rename from src/Rtp/PSRtpSender.cpp rename to src/Rtp/RtpSender.cpp index 7f0a335a..8fdb531a 100644 --- a/src/Rtp/PSRtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -9,33 +9,29 @@ */ #if defined(ENABLE_RTPPROXY) -#include "PSRtpSender.h" +#include "RtpSender.h" #include "Rtsp/RtspSession.h" #include "Thread/WorkThreadPool.h" +#include "RtpCache.h" namespace mediakit{ -PSRtpSender::PSRtpSender(uint32_t ssrc, uint8_t payload_type) { - GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); - _rtp_encoder = std::make_shared(CodecInvalid, ssrc, video_mtu, 90000, payload_type, 0); - _rtp_encoder->setRtpRing(std::make_shared()); - _rtp_encoder->getRtpRing()->setDelegate(std::make_shared([this](const RtpPacket::Ptr &rtp, bool is_key){ - onRtp(rtp, is_key); - })); +RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) { _poller = EventPollerPool::Instance().getPoller(); - InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); + _interface = std::make_shared([this](std::shared_ptr > list) { + onFlushRtpList(std::move(list)); + }, ssrc, payload_type); } -PSRtpSender::~PSRtpSender() { - InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); +RtpSender::~RtpSender() { } -void PSRtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ +void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ _is_udp = is_udp; _socket = Socket::createSocket(_poller, false); _dst_url = dst_url; _dst_port = dst_port; - weak_ptr weak_self = shared_from_this(); + weak_ptr weak_self = shared_from_this(); if (is_udp) { _socket->bindUdpSock(0); auto poller = _poller; @@ -73,7 +69,7 @@ void PSRtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_ud } } -void PSRtpSender::onConnect(){ +void RtpSender::onConnect(){ _is_connect = true; //加大发送缓存,防止udp丢包之类的问题 SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024); @@ -83,37 +79,38 @@ void PSRtpSender::onConnect(){ _socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } //连接建立成功事件 - weak_ptr weak_self = shared_from_this(); + weak_ptr weak_self = shared_from_this(); _socket->setOnErr([weak_self](const SockException &err) { auto strong_self = weak_self.lock(); if (strong_self) { strong_self->onErr(err); } }); - InfoL << "开始发送 ps rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; + InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; +} + +void RtpSender::addTrack(const Track::Ptr &track){ + _interface->addTrack(track); +} + +void RtpSender::addTrackCompleted(){ + _interface->addTrackCompleted(); +} + +void RtpSender::resetTracks(){ + _interface->resetTracks(); } //此函数在其他线程执行 -void PSRtpSender::inputFrame(const Frame::Ptr &frame) { +void RtpSender::inputFrame(const Frame::Ptr &frame) { if (_is_connect) { //连接成功后才做实质操作(节省cpu资源) - PSEncoder::inputFrame(frame); + _interface->inputFrame(frame); } } //此函数在其他线程执行 -void PSRtpSender::onPS(uint32_t stamp, void *packet, size_t bytes) { - _rtp_encoder->inputFrame(std::make_shared((char *) packet, bytes, stamp)); -} - -//此函数在其他线程执行 -void PSRtpSender::onRtp(const RtpPacket::Ptr &rtp, bool) { - //开启合并写提高发送性能 - PacketCache::inputPacket(true, rtp, false); -} - -//此函数在其他线程执行 -void PSRtpSender::onFlush(shared_ptr > rtp_list, bool) { +void RtpSender::onFlushRtpList(shared_ptr > rtp_list) { if(!_is_connect){ //连接成功后才能发送数据 return; @@ -124,29 +121,29 @@ void PSRtpSender::onFlush(shared_ptr > rtp_list, bool) { _poller->async([rtp_list, is_udp, socket]() { int i = 0; int size = rtp_list->size(); - rtp_list->for_each([&](const RtpPacket::Ptr &packet) { + rtp_list->for_each([&](Buffer::Ptr &packet) { if (is_udp) { //udp模式,rtp over tcp前4个字节可以忽略 - socket->send(std::make_shared(packet, 4), nullptr, 0, ++i == size); + socket->send(std::make_shared(std::move(packet), 4), nullptr, 0, ++i == size); } else { //tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 - socket->send(std::make_shared(packet, 2), nullptr, 0, ++i == size); + socket->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); } }); }); } -void PSRtpSender::onErr(const SockException &ex, bool is_connect) { +void RtpSender::onErr(const SockException &ex, bool is_connect) { _is_connect = false; //监听socket断开事件,方便重连 if (is_connect) { WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what(); } else { - WarnL << "停止发送 ps rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what(); + WarnL << "停止发送 rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what(); } - weak_ptr weak_self = shared_from_this(); + weak_ptr weak_self = shared_from_this(); _connect_timer = std::make_shared(10.0, [weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { diff --git a/src/Rtp/PSRtpSender.h b/src/Rtp/RtpSender.h similarity index 53% rename from src/Rtp/PSRtpSender.h rename to src/Rtp/RtpSender.h index da817905..86a58894 100644 --- a/src/Rtp/PSRtpSender.h +++ b/src/Rtp/RtpSender.h @@ -8,42 +8,27 @@ * may be found in the AUTHORS file in the root of the source tree. */ -#ifndef ZLMEDIAKIT_PSRTPSENDER_H -#define ZLMEDIAKIT_PSRTPSENDER_H +#ifndef ZLMEDIAKIT_RTPSENDER_H +#define ZLMEDIAKIT_RTPSENDER_H #if defined(ENABLE_RTPPROXY) #include "PSEncoder.h" #include "Extension/CommonRtp.h" namespace mediakit{ -class RingDelegateHelper : public RingDelegate { +//rtp发送客户端,支持发送GB28181协议 +class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this{ public: - typedef function onRtp; + typedef std::shared_ptr Ptr; - ~RingDelegateHelper() override{} - RingDelegateHelper(onRtp on_rtp){ - _on_rtp = std::move(on_rtp); - } - void onWrite(RtpPacket::Ptr in, bool is_key) override{ - _on_rtp(in, is_key); - } - -private: - onRtp _on_rtp; -}; - -//该类在PSEncoder的基础上,实现了mpeg-ps的rtp打包以及发送 -class PSRtpSender : public PSEncoder, public std::enable_shared_from_this, public PacketCache{ -public: - typedef std::shared_ptr Ptr; + ~RtpSender() override; /** - * 构造函数 + * 构造函数,创建GB28181 RTP发送客户端 * @param ssrc rtp的ssrc * @param payload_type 国标中ps-rtp的pt一般为96 */ - PSRtpSender(uint32_t ssrc, uint8_t payload_type = 96); - ~PSRtpSender() override; + RtpSender(uint32_t ssrc, uint8_t payload_type = 96); /** * 开始发送ps-rtp包 @@ -59,20 +44,26 @@ public: */ void inputFrame(const Frame::Ptr &frame) override; -protected: - //mpeg-ps回调 - void onPS(uint32_t stamp, void *packet, size_t bytes) override; + /** + * 添加track,内部会调用Track的clone方法 + * 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系 + * @param track + */ + virtual void addTrack(const Track::Ptr & track) override; /** - * 批量flush rtp包时触发该函数 - * @param rtp_list rtp包列表 - * @param key_pos 是否包含关键帧 + * 添加所有Track完毕 */ - void onFlush(std::shared_ptr > rtp_list, bool key_pos) override; + virtual void addTrackCompleted() override; + + /** + * 重置track + */ + virtual void resetTracks() override; private: - //rtp打包后回调 - void onRtp(const RtpPacket::Ptr &in, bool is_key); + //合并写输出 + void onFlushRtpList(std::shared_ptr > rtp_list); //udp/tcp连接成功回调 void onConnect(); //异常断开socket事件 @@ -86,9 +77,9 @@ private: Socket::Ptr _socket; EventPoller::Ptr _poller; Timer::Ptr _connect_timer; - std::shared_ptr _rtp_encoder; + MediaSinkInterface::Ptr _interface; }; }//namespace mediakit #endif// defined(ENABLE_RTPPROXY) -#endif //ZLMEDIAKIT_PSRTPSENDER_H +#endif //ZLMEDIAKIT_RTPSENDER_H diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index d417e148..16196fdf 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -52,7 +52,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) process = RtpSelector::Instance().getProcess(stream_id, true); udp_server->setOnRead([udp_server, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) { - process->inputRtp(udp_server, buf->data(), buf->size(), addr); + process->inputRtp(true, udp_server, buf->data(), buf->size(), addr); }); } else { //未指定流id,一个端口多个流,通过ssrc来分流 diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 9104ffdd..21fb5e2d 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -73,7 +73,7 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) { _process = RtpSelector::Instance().getProcess(_stream_id, true); _process->setListener(dynamic_pointer_cast(shared_from_this())); } - _process->inputRtp(getSock(), data, len, &addr); + _process->inputRtp(false, getSock(), data, len, &addr); _ticker.resetTime(); } diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 07c9166b..9c411b9a 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -35,13 +35,14 @@ protected: bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; - void onRtpPacket(const char *data,uint64_t len) override; + // 收到rtp回调 + void onRtpPacket(const char *data, uint64_t len) override; private: - RtpProcess::Ptr _process; Ticker _ticker; - struct sockaddr addr; string _stream_id; + struct sockaddr addr; + RtpProcess::Ptr _process; }; }//namespace mediakit