完善线程安全设计

This commit is contained in:
xiongziliang 2022-08-27 10:17:06 +08:00
parent 8b0bd2d224
commit c2ab45f78d
15 changed files with 89 additions and 59 deletions

View File

@ -15,6 +15,7 @@
#include "mk_track.h"
#include "mk_frame.h"
#include "mk_events_objects.h"
#include "mk_thread.h"
#ifdef __cplusplus
extern "C" {
@ -246,7 +247,7 @@ API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source
typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result;
/**
* ps-rtp流(ssrc区分多路)
* ps-rtp流(ssrc区分多路)api线程安全
* @param ctx
* @param dst_url ip或域名
* @param dst_port
@ -258,12 +259,18 @@ typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result;
API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data);
/**
* ps-rtp发送
* ps-rtp发送api线程安全
* @param ctx
* @param ssrc rtp的ssrc10null或空字符串rtp推流
* @return 10
*/
API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc);
API_EXPORT void API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc);
/**
* 线
* @param ctx
*/
API_EXPORT mk_thread API_CALL mk_media_get_owner_thread(mk_media ctx);
#ifdef __cplusplus
}

View File

@ -22,6 +22,7 @@ public:
template<typename ...ArgsType>
MediaHelper(ArgsType &&...args){
_channel = std::make_shared<DevChannel>(std::forward<ArgsType>(args)...);
_poller = EventPollerPool::Instance().getPoller();
}
~MediaHelper(){}
@ -58,6 +59,11 @@ public:
_on_regist_data = user_data;
}
// 获取所属线程
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override {
return _poller;
}
protected:
// 通知其停止推流
bool close(MediaSource &sender,bool force) override{
@ -111,6 +117,7 @@ protected:
}
private:
EventPoller::Ptr _poller;
DevChannel::Ptr _channel;
on_mk_media_close _on_close = nullptr;
on_mk_media_seek _on_seek = nullptr;
@ -265,17 +272,29 @@ API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_u
args.ssrc = ssrc;
args.is_udp = is_udp;
//sender参数无用
(*obj)->getChannel()->startSendRtp(MediaSource::NullMediaSource(), args, [cb, user_data](uint16_t local_port, const SockException &ex){
// sender参数无用
auto ref = *obj;
(*obj)->getOwnerPoller(MediaSource::NullMediaSource())->async([args, ref, cb, user_data]() {
ref->getChannel()->startSendRtp(MediaSource::NullMediaSource(), args, [cb, user_data](uint16_t local_port, const SockException &ex) {
if (cb) {
cb(user_data, local_port, ex.getErrCode(), ex.what());
}
});
});
}
API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc){
API_EXPORT void API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc){
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
//sender参数无用
return (*obj)->getChannel()->stopSendRtp(MediaSource::NullMediaSource(), ssrc ? ssrc : "");
MediaHelper::Ptr *obj = (MediaHelper::Ptr *)ctx;
// sender参数无用
auto ref = *obj;
string ssrc_str = ssrc ? ssrc : "";
(*obj)->getOwnerPoller(MediaSource::NullMediaSource())->async([ref, ssrc_str]() {
ref->getChannel()->stopSendRtp(MediaSource::NullMediaSource(), ssrc_str);
});
}
API_EXPORT mk_thread API_CALL mk_media_get_owner_thread(mk_media ctx) {
MediaHelper::Ptr *obj = (MediaHelper::Ptr *)ctx;
return (*obj)->getOwnerPoller(MediaSource::NullMediaSource()).get();
}

View File

@ -576,8 +576,8 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string &
}
#ifdef ENABLE_MP4
try {
MP4Reader::Ptr pReader(new MP4Reader(vhost, app, stream, file_path));
pReader->startReadMP4();
auto reader = std::make_shared<MP4Reader>(vhost, app, stream, file_path);
reader->startReadMP4();
return MediaSource::find(schema, vhost, app, stream);
} catch (std::exception &ex) {
WarnL << ex.what();
@ -733,7 +733,7 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc
if (listener) {
return listener->getOwnerPoller(sender);
}
return nullptr;
return EventPollerPool::Instance().getPoller();
}

View File

@ -81,7 +81,7 @@ public:
// 获取丢包率
virtual int getLossRate(MediaSource &sender, TrackType type) { return -1; }
// 获取所在线程
virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) { return nullptr; }
virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) = 0;
////////////////////////仅供MultiMediaSourceMuxer对象继承////////////////////////
// 开启或关闭录制

View File

@ -219,9 +219,9 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type
}
}
void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const MediaSourceEvent::SendRtpArgs &args, const std::function<void(uint16_t, const toolkit::SockException &)> cb) {
void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceEvent::SendRtpArgs &args, const std::function<void(uint16_t, const toolkit::SockException &)> cb) {
#if defined(ENABLE_RTPPROXY)
auto rtp_sender = std::make_shared<RtpSender>();
auto rtp_sender = std::make_shared<RtpSender>(getOwnerPoller(sender));
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
rtp_sender->startSend(args, [args, weak_self, rtp_sender, cb](uint16_t local_port, const SockException &ex) mutable {
cb(local_port, ex);

View File

@ -20,6 +20,8 @@ using namespace toolkit;
namespace mediakit {
MP4Reader::MP4Reader(const string &vhost, const string &app, const string &stream_id, const string &file_path) {
//读写文件建议放在后台线程
_poller = WorkThreadPool::Instance().getPoller();
_file_path = file_path;
if (_file_path.empty()) {
GET_CONFIG(string, recordPath, Record::kFilePath);
@ -105,7 +107,7 @@ void MP4Reader::stopReadMP4() {
_timer = nullptr;
}
void MP4Reader::startReadMP4(const EventPoller::Ptr &poller_in, uint64_t sample_ms, bool ref_self, bool file_repeat) {
void MP4Reader::startReadMP4(uint64_t sample_ms, bool ref_self, bool file_repeat) {
GET_CONFIG(uint32_t, sampleMS, Record::kSampleMS);
auto strong_self = shared_from_this();
if (_muxer) {
@ -114,8 +116,6 @@ void MP4Reader::startReadMP4(const EventPoller::Ptr &poller_in, uint64_t sample_
while (!_muxer->isAllTrackReady() && readNextSample()) {}
}
//未指定线程,那么使用后台线程(读写文件采用后台线程)
auto poller = poller_in ? poller_in : WorkThreadPool::Instance().getPoller();
auto timer_sec = (sample_ms ? sample_ms : sampleMS) / 1000.0f;
//启动定时器
@ -123,7 +123,7 @@ void MP4Reader::startReadMP4(const EventPoller::Ptr &poller_in, uint64_t sample_
_timer = std::make_shared<Timer>(timer_sec, [strong_self]() {
lock_guard<recursive_mutex> lck(strong_self->_mtx);
return strong_self->readSample();
}, poller);
}, _poller);
} else {
weak_ptr<MP4Reader> weak_self = strong_self;
_timer = std::make_shared<Timer>(timer_sec, [weak_self]() {
@ -133,7 +133,7 @@ void MP4Reader::startReadMP4(const EventPoller::Ptr &poller_in, uint64_t sample_
}
lock_guard<recursive_mutex> lck(strong_self->_mtx);
return strong_self->readSample();
}, poller);
}, _poller);
}
_file_repeat = file_repeat;
@ -251,5 +251,9 @@ string MP4Reader::getOriginUrl(MediaSource &sender) const {
return _file_path;
}
toolkit::EventPoller::Ptr MP4Reader::getOwnerPoller(MediaSource &sender) {
return _poller;
}
} /* namespace mediakit */
#endif //ENABLE_MP4

View File

@ -33,12 +33,11 @@ public:
/**
* MP4文件
* @param poller mp4定时器所绑定线程线
* @param sample_ms 0
* @param ref_self
* @param file_repeat
*/
void startReadMP4(const toolkit::EventPoller::Ptr &poller = nullptr, uint64_t sample_ms = 0, bool ref_self = true, bool file_repeat = false);
void startReadMP4(uint64_t sample_ms = 0, bool ref_self = true, bool file_repeat = false);
/**
* MP4定时器
@ -60,6 +59,7 @@ private:
int totalReaderCount(MediaSource &sender) override;
MediaOriginType getOriginType(MediaSource &sender) const override;
std::string getOriginUrl(MediaSource &sender) const override;
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
bool readSample();
bool readNextSample();
@ -80,6 +80,7 @@ private:
toolkit::Timer::Ptr _timer;
MP4Demuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer;
toolkit::EventPoller::Ptr _poller;
};
} /* namespace mediakit */

View File

@ -285,7 +285,7 @@ std::shared_ptr<SockInfo> RtpProcess::getOriginSock(MediaSource &sender) const {
}
toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) {
return _sock ? _sock->getPoller() : nullptr;
return _sock ? _sock->getPoller() : EventPollerPool::Instance().getPoller();
}
void RtpProcess::setHelper(std::weak_ptr<RtcpContext> help) {

View File

@ -120,12 +120,6 @@ void RtpSelector::onManager() {
});
}
RtpSelector::RtpSelector() {
}
RtpSelector::~RtpSelector() {
}
RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector> &parent) {
_stream_id = stream_id;
_parent = parent;
@ -136,6 +130,7 @@ RtpProcessHelper::~RtpProcessHelper() {
}
void RtpProcessHelper::attachEvent() {
//主要目的是close回调触发时能把对象从RtpSelector中删除
_process->setListener(shared_from_this());
}
@ -157,6 +152,10 @@ int RtpProcessHelper::totalReaderCount(MediaSource &sender) {
return _process ? _process->getTotalReaderCount() : sender.totalReaderCount();
}
toolkit::EventPoller::Ptr RtpProcessHelper::getOwnerPoller(MediaSource &sender) {
return toolkit::EventPollerPool::Instance().getPoller();
}
RtpProcess::Ptr &RtpProcessHelper::getProcess() {
return _process;
}

View File

@ -34,6 +34,8 @@ protected:
bool close(MediaSource &sender,bool force) override;
// 观看总人数
int totalReaderCount(MediaSource &sender) override;
// 获取所属线程
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
private:
std::weak_ptr<RtpSelector > _parent;
@ -43,8 +45,8 @@ private:
class RtpSelector : public std::enable_shared_from_this<RtpSelector>{
public:
RtpSelector();
~RtpSelector();
RtpSelector() = default;
~RtpSelector() = default;
static bool getSSRC(const char *data,size_t data_len, uint32_t &ssrc);
static RtpSelector &Instance();

View File

@ -20,8 +20,8 @@ using namespace toolkit;
namespace mediakit{
RtpSender::RtpSender() {
_poller = EventPollerPool::Instance().getPoller();
RtpSender::RtpSender(EventPoller::Ptr poller) {
_poller = poller ? std::move(poller) : EventPollerPool::Instance().getPoller();
_socket_rtp = Socket::createSocket(_poller, false);
}
@ -253,9 +253,8 @@ void RtpSender::onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check) {
}
void RtpSender::onClose() {
auto cb = _on_close;
if (cb) {
_poller->async([cb]() { cb(); }, false);
if (_on_close) {
_on_close();
}
}
@ -266,25 +265,18 @@ void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr> > rtp_list) {
return;
}
weak_ptr<RtpSender> weak_self = shared_from_this();
_poller->async([rtp_list, weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
size_t i = 0;
auto size = rtp_list->size();
rtp_list->for_each([&](Buffer::Ptr &packet) {
if (strong_self->_args.is_udp) {
strong_self->onSendRtpUdp(packet, i == 0);
//udp模式rtp over tcp前4个字节可以忽略
strong_self->_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size);
if (_args.is_udp) {
onSendRtpUdp(packet, i == 0);
// udp模式rtp over tcp前4个字节可以忽略
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size);
} else {
//tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节
strong_self->_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
// tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
}
});
});
}
void RtpSender::onErr(const SockException &ex, bool is_connect) {

View File

@ -22,7 +22,7 @@ class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this
public:
typedef std::shared_ptr<RtpSender> Ptr;
RtpSender();
RtpSender(toolkit::EventPoller::Ptr poller = nullptr);
~RtpSender() override = default;
/**

View File

@ -139,6 +139,10 @@ int RtpSession::totalReaderCount(MediaSource &sender) {
return _process ? _process->getTotalReaderCount() : sender.totalReaderCount();
}
toolkit::EventPoller::Ptr RtpSession::getOwnerPoller(MediaSource &sender) {
return getPoller();
}
static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) {
//rtp前面必须预留两个字节的长度字段
for (ssize_t i = 2; i <= len - 4; ++i) {

View File

@ -38,6 +38,8 @@ protected:
bool close(MediaSource &sender,bool force) override;
// 观看总人数
int totalReaderCount(MediaSource &sender) override;
// 获取所属线程
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
// 收到rtp回调
void onRtpPacket(const char *data, size_t len) override;

View File

@ -151,7 +151,7 @@ std::shared_ptr<SockInfo> SrtTransportImp::getOriginSock(mediakit::MediaSource &
toolkit::EventPoller::Ptr SrtTransportImp::getOwnerPoller(MediaSource &sender){
auto session = getSession();
return session ? session->getPoller() : nullptr;
return session ? session->getPoller() : EventPollerPool::Instance().getPoller();
}
void SrtTransportImp::emitOnPublish() {