diff --git a/api/include/mk_media.h b/api/include/mk_media.h index 39a0ae80..834a960e 100755 --- a/api/include/mk_media.h +++ b/api/include/mk_media.h @@ -15,6 +15,7 @@ #include "mk_track.h" #include "mk_frame.h" #include "mk_events_objects.h" +#include "mk_thread.h" #ifdef __cplusplus extern "C" { @@ -246,7 +247,7 @@ API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result; /** - * 开始发送一路ps-rtp流(通过ssrc区分多路) + * 开始发送一路ps-rtp流(通过ssrc区分多路),此api线程安全 * @param ctx 对象指针 * @param dst_url 目标ip或域名 * @param dst_port 目标端口 @@ -258,12 +259,18 @@ typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result; API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data); /** - * 停止某路或全部ps-rtp发送 + * 停止某路或全部ps-rtp发送,此api线程安全 * @param ctx 对象指针 * @param ssrc rtp的ssrc,10进制的字符串打印,如果为null或空字符串,则停止所有rtp推流 - * @return 1成功,0失败 */ -API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc); +API_EXPORT void API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc); + +/** + * 获取所属线程 + * @param ctx 对象指针 + */ +API_EXPORT mk_thread API_CALL mk_media_get_owner_thread(mk_media ctx); + #ifdef __cplusplus } diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 747f57cf..55a7c0e7 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -22,6 +22,7 @@ public: template MediaHelper(ArgsType &&...args){ _channel = std::make_shared(std::forward(args)...); + _poller = EventPollerPool::Instance().getPoller(); } ~MediaHelper(){} @@ -58,6 +59,11 @@ public: _on_regist_data = user_data; } + // 获取所属线程 + toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override { + return _poller; + } + protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override{ @@ -111,6 +117,7 @@ protected: } private: + EventPoller::Ptr _poller; DevChannel::Ptr _channel; on_mk_media_close _on_close = nullptr; on_mk_media_seek _on_seek = nullptr; @@ -265,17 +272,29 @@ API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_u args.ssrc = ssrc; args.is_udp = is_udp; - //sender参数无用 - (*obj)->getChannel()->startSendRtp(MediaSource::NullMediaSource(), args, [cb, user_data](uint16_t local_port, const SockException &ex){ - if (cb) { - cb(user_data, local_port, ex.getErrCode(), ex.what()); - } + // sender参数无用 + auto ref = *obj; + (*obj)->getOwnerPoller(MediaSource::NullMediaSource())->async([args, ref, cb, user_data]() { + ref->getChannel()->startSendRtp(MediaSource::NullMediaSource(), args, [cb, user_data](uint16_t local_port, const SockException &ex) { + if (cb) { + cb(user_data, local_port, ex.getErrCode(), ex.what()); + } + }); }); } -API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc){ +API_EXPORT void API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc){ assert(ctx); - MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx; - //sender参数无用 - return (*obj)->getChannel()->stopSendRtp(MediaSource::NullMediaSource(), ssrc ? ssrc : ""); + MediaHelper::Ptr *obj = (MediaHelper::Ptr *)ctx; + // sender参数无用 + auto ref = *obj; + string ssrc_str = ssrc ? ssrc : ""; + (*obj)->getOwnerPoller(MediaSource::NullMediaSource())->async([ref, ssrc_str]() { + ref->getChannel()->stopSendRtp(MediaSource::NullMediaSource(), ssrc_str); + }); +} + +API_EXPORT mk_thread API_CALL mk_media_get_owner_thread(mk_media ctx) { + MediaHelper::Ptr *obj = (MediaHelper::Ptr *)ctx; + return (*obj)->getOwnerPoller(MediaSource::NullMediaSource()).get(); } \ No newline at end of file diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index c89d5fc5..25c8dc8f 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -576,8 +576,8 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string & } #ifdef ENABLE_MP4 try { - MP4Reader::Ptr pReader(new MP4Reader(vhost, app, stream, file_path)); - pReader->startReadMP4(); + auto reader = std::make_shared(vhost, app, stream, file_path); + reader->startReadMP4(); return MediaSource::find(schema, vhost, app, stream); } catch (std::exception &ex) { WarnL << ex.what(); @@ -733,7 +733,7 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc if (listener) { return listener->getOwnerPoller(sender); } - return nullptr; + return EventPollerPool::Instance().getPoller(); } diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index bdf6c58f..a6f9118f 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -81,7 +81,7 @@ public: // 获取丢包率 virtual int getLossRate(MediaSource &sender, TrackType type) { return -1; } // 获取所在线程 - virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) { return nullptr; } + virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) = 0; ////////////////////////仅供MultiMediaSourceMuxer对象继承//////////////////////// // 开启或关闭录制 diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 9528dcad..fd0964f7 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -219,9 +219,9 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type } } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const MediaSourceEvent::SendRtpArgs &args, const std::function cb) { +void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceEvent::SendRtpArgs &args, const std::function cb) { #if defined(ENABLE_RTPPROXY) - auto rtp_sender = std::make_shared(); + auto rtp_sender = std::make_shared(getOwnerPoller(sender)); weak_ptr weak_self = shared_from_this(); rtp_sender->startSend(args, [args, weak_self, rtp_sender, cb](uint16_t local_port, const SockException &ex) mutable { cb(local_port, ex); diff --git a/src/Record/MP4Reader.cpp b/src/Record/MP4Reader.cpp index 3daccbae..29d92d79 100644 --- a/src/Record/MP4Reader.cpp +++ b/src/Record/MP4Reader.cpp @@ -20,6 +20,8 @@ using namespace toolkit; namespace mediakit { MP4Reader::MP4Reader(const string &vhost, const string &app, const string &stream_id, const string &file_path) { + //读写文件建议放在后台线程 + _poller = WorkThreadPool::Instance().getPoller(); _file_path = file_path; if (_file_path.empty()) { GET_CONFIG(string, recordPath, Record::kFilePath); @@ -105,7 +107,7 @@ void MP4Reader::stopReadMP4() { _timer = nullptr; } -void MP4Reader::startReadMP4(const EventPoller::Ptr &poller_in, uint64_t sample_ms, bool ref_self, bool file_repeat) { +void MP4Reader::startReadMP4(uint64_t sample_ms, bool ref_self, bool file_repeat) { GET_CONFIG(uint32_t, sampleMS, Record::kSampleMS); auto strong_self = shared_from_this(); if (_muxer) { @@ -114,8 +116,6 @@ void MP4Reader::startReadMP4(const EventPoller::Ptr &poller_in, uint64_t sample_ while (!_muxer->isAllTrackReady() && readNextSample()) {} } - //未指定线程,那么使用后台线程(读写文件采用后台线程) - auto poller = poller_in ? poller_in : WorkThreadPool::Instance().getPoller(); auto timer_sec = (sample_ms ? sample_ms : sampleMS) / 1000.0f; //启动定时器 @@ -123,7 +123,7 @@ void MP4Reader::startReadMP4(const EventPoller::Ptr &poller_in, uint64_t sample_ _timer = std::make_shared(timer_sec, [strong_self]() { lock_guard lck(strong_self->_mtx); return strong_self->readSample(); - }, poller); + }, _poller); } else { weak_ptr weak_self = strong_self; _timer = std::make_shared(timer_sec, [weak_self]() { @@ -133,7 +133,7 @@ void MP4Reader::startReadMP4(const EventPoller::Ptr &poller_in, uint64_t sample_ } lock_guard lck(strong_self->_mtx); return strong_self->readSample(); - }, poller); + }, _poller); } _file_repeat = file_repeat; @@ -251,5 +251,9 @@ string MP4Reader::getOriginUrl(MediaSource &sender) const { return _file_path; } +toolkit::EventPoller::Ptr MP4Reader::getOwnerPoller(MediaSource &sender) { + return _poller; +} + } /* namespace mediakit */ #endif //ENABLE_MP4 \ No newline at end of file diff --git a/src/Record/MP4Reader.h b/src/Record/MP4Reader.h index a0eca1d1..889e9590 100644 --- a/src/Record/MP4Reader.h +++ b/src/Record/MP4Reader.h @@ -33,12 +33,11 @@ public: /** * 开始解复用MP4文件 - * @param poller 解复用mp4定时器所绑定线程,置空则随机采用一条后台线程 * @param sample_ms 每次读取文件数据量,单位毫秒,置0时采用配置文件配置 * @param ref_self 是否让定时器引用此对象本身,如果无其他对象引用本身,在不循环读文件时,读取文件结束后本对象将自动销毁 * @param file_repeat 是否循环读取文件,如果配置文件设置为循环读文件,此参数无效 */ - void startReadMP4(const toolkit::EventPoller::Ptr &poller = nullptr, uint64_t sample_ms = 0, bool ref_self = true, bool file_repeat = false); + void startReadMP4(uint64_t sample_ms = 0, bool ref_self = true, bool file_repeat = false); /** * 停止解复用MP4定时器 @@ -60,6 +59,7 @@ private: int totalReaderCount(MediaSource &sender) override; MediaOriginType getOriginType(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override; + toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; bool readSample(); bool readNextSample(); @@ -80,6 +80,7 @@ private: toolkit::Timer::Ptr _timer; MP4Demuxer::Ptr _demuxer; MultiMediaSourceMuxer::Ptr _muxer; + toolkit::EventPoller::Ptr _poller; }; } /* namespace mediakit */ diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index bc5f69ec..dad0bb98 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -285,7 +285,7 @@ std::shared_ptr RtpProcess::getOriginSock(MediaSource &sender) const { } toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) { - return _sock ? _sock->getPoller() : nullptr; + return _sock ? _sock->getPoller() : EventPollerPool::Instance().getPoller(); } void RtpProcess::setHelper(std::weak_ptr help) { diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 80f87110..92ef4030 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -120,12 +120,6 @@ void RtpSelector::onManager() { }); } -RtpSelector::RtpSelector() { -} - -RtpSelector::~RtpSelector() { -} - RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr &parent) { _stream_id = stream_id; _parent = parent; @@ -136,6 +130,7 @@ RtpProcessHelper::~RtpProcessHelper() { } void RtpProcessHelper::attachEvent() { + //主要目的是close回调触发时能把对象从RtpSelector中删除 _process->setListener(shared_from_this()); } @@ -157,6 +152,10 @@ int RtpProcessHelper::totalReaderCount(MediaSource &sender) { return _process ? _process->getTotalReaderCount() : sender.totalReaderCount(); } +toolkit::EventPoller::Ptr RtpProcessHelper::getOwnerPoller(MediaSource &sender) { + return toolkit::EventPollerPool::Instance().getPoller(); +} + RtpProcess::Ptr &RtpProcessHelper::getProcess() { return _process; } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 87ac2a8a..7c7fd7c6 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -34,6 +34,8 @@ protected: bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; + // 获取所属线程 + toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; private: std::weak_ptr _parent; @@ -43,8 +45,8 @@ private: class RtpSelector : public std::enable_shared_from_this{ public: - RtpSelector(); - ~RtpSelector(); + RtpSelector() = default; + ~RtpSelector() = default; static bool getSSRC(const char *data,size_t data_len, uint32_t &ssrc); static RtpSelector &Instance(); diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 68a75598..10bb78bf 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -20,8 +20,8 @@ using namespace toolkit; namespace mediakit{ -RtpSender::RtpSender() { - _poller = EventPollerPool::Instance().getPoller(); +RtpSender::RtpSender(EventPoller::Ptr poller) { + _poller = poller ? std::move(poller) : EventPollerPool::Instance().getPoller(); _socket_rtp = Socket::createSocket(_poller, false); } @@ -253,9 +253,8 @@ void RtpSender::onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check) { } void RtpSender::onClose() { - auto cb = _on_close; - if (cb) { - _poller->async([cb]() { cb(); }, false); + if (_on_close) { + _on_close(); } } @@ -266,24 +265,17 @@ void RtpSender::onFlushRtpList(shared_ptr > rtp_list) { return; } - weak_ptr weak_self = shared_from_this(); - _poller->async([rtp_list, weak_self]() { - auto strong_self = weak_self.lock(); - if (!strong_self) { - return; + size_t i = 0; + auto size = rtp_list->size(); + rtp_list->for_each([&](Buffer::Ptr &packet) { + if (_args.is_udp) { + onSendRtpUdp(packet, i == 0); + // udp模式,rtp over tcp前4个字节可以忽略 + _socket_rtp->send(std::make_shared(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size); + } else { + // tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 + _socket_rtp->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); } - size_t i = 0; - auto size = rtp_list->size(); - rtp_list->for_each([&](Buffer::Ptr &packet) { - if (strong_self->_args.is_udp) { - strong_self->onSendRtpUdp(packet, i == 0); - //udp模式,rtp over tcp前4个字节可以忽略 - strong_self->_socket_rtp->send(std::make_shared(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size); - } else { - //tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 - strong_self->_socket_rtp->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); - } - }); }); } diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 8bbbd6a6..5f966343 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -22,7 +22,7 @@ class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this public: typedef std::shared_ptr Ptr; - RtpSender(); + RtpSender(toolkit::EventPoller::Ptr poller = nullptr); ~RtpSender() override = default; /** diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 4a9d983c..a41a56b0 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -139,6 +139,10 @@ int RtpSession::totalReaderCount(MediaSource &sender) { return _process ? _process->getTotalReaderCount() : sender.totalReaderCount(); } +toolkit::EventPoller::Ptr RtpSession::getOwnerPoller(MediaSource &sender) { + return getPoller(); +} + static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) { //rtp前面必须预留两个字节的长度字段 for (ssize_t i = 2; i <= len - 4; ++i) { diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 8389c298..7ef5343e 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -38,6 +38,8 @@ protected: bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; + // 获取所属线程 + toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; // 收到rtp回调 void onRtpPacket(const char *data, size_t len) override; diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 36ac8b05..be8daec9 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -151,7 +151,7 @@ std::shared_ptr SrtTransportImp::getOriginSock(mediakit::MediaSource & toolkit::EventPoller::Ptr SrtTransportImp::getOwnerPoller(MediaSource &sender){ auto session = getSession(); - return session ? session->getPoller() : nullptr; + return session ? session->getPoller() : EventPollerPool::Instance().getPoller(); } void SrtTransportImp::emitOnPublish() {