提高线程安全性

This commit is contained in:
ziyue 2022-10-31 17:53:20 +08:00
parent 918b1fce6c
commit 520945c2e9
11 changed files with 81 additions and 58 deletions

View File

@ -70,7 +70,6 @@ MediaSource::MediaSource(const string &schema, const string &vhost, const string
_app = app; _app = app;
_stream_id = stream_id; _stream_id = stream_id;
_create_stamp = time(NULL); _create_stamp = time(NULL);
_default_poller = EventPollerPool::Instance().getPoller();
} }
MediaSource::~MediaSource() { MediaSource::~MediaSource() {
@ -233,22 +232,23 @@ toolkit::EventPoller::Ptr MediaSource::getOwnerPoller() {
toolkit::EventPoller::Ptr ret; toolkit::EventPoller::Ptr ret;
auto listener = _listener.lock(); auto listener = _listener.lock();
if (listener) { if (listener) {
ret = listener->getOwnerPoller(*this); return listener->getOwnerPoller(*this);
} }
return ret ? ret : _default_poller; throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed:" + getUrl());
} }
void MediaSource::onReaderChanged(int size) { void MediaSource::onReaderChanged(int size) {
weak_ptr<MediaSource> weak_self = shared_from_this(); weak_ptr<MediaSource> weak_self = shared_from_this();
getOwnerPoller()->async([weak_self, size]() { auto listener = _listener.lock();
if (!listener) {
return;
}
getOwnerPoller()->async([weak_self, size, listener]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
return; return;
} }
auto listener = strong_self->_listener.lock(); listener->onReaderChanged(*strong_self, size);
if (listener) {
listener->onReaderChanged(*strong_self, size);
}
}); });
} }
@ -729,7 +729,7 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc
if (listener) { if (listener) {
return listener->getOwnerPoller(sender); return listener->getOwnerPoller(sender);
} }
return EventPollerPool::Instance().getPoller(); throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed");
} }
bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) { bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) {

View File

@ -24,9 +24,9 @@
#include "Extension/Track.h" #include "Extension/Track.h"
#include "Record/Recorder.h" #include "Record/Recorder.h"
namespace toolkit{ namespace toolkit {
class Session; class Session;
}// namespace toolkit } // namespace toolkit
namespace mediakit { namespace mediakit {
@ -57,8 +57,8 @@ public:
~NotImplemented() override = default; ~NotImplemented() override = default;
}; };
MediaSourceEvent(){}; MediaSourceEvent() {};
virtual ~MediaSourceEvent(){}; virtual ~MediaSourceEvent() {};
// 获取媒体源类型 // 获取媒体源类型
virtual MediaOriginType getOriginType(MediaSource &sender) const { return MediaOriginType::unknown; } virtual MediaOriginType getOriginType(MediaSource &sender) const { return MediaOriginType::unknown; }
@ -135,10 +135,10 @@ private:
}; };
//该对象用于拦截感兴趣的MediaSourceEvent事件 //该对象用于拦截感兴趣的MediaSourceEvent事件
class MediaSourceEventInterceptor : public MediaSourceEvent{ class MediaSourceEventInterceptor : public MediaSourceEvent {
public: public:
MediaSourceEventInterceptor(){} MediaSourceEventInterceptor() = default;
~MediaSourceEventInterceptor() override {} ~MediaSourceEventInterceptor() override = default;
void setDelegate(const std::weak_ptr<MediaSourceEvent> &listener); void setDelegate(const std::weak_ptr<MediaSourceEvent> &listener);
std::shared_ptr<MediaSourceEvent> getDelegate() const; std::shared_ptr<MediaSourceEvent> getDelegate() const;
@ -169,23 +169,20 @@ private:
/** /**
* url获取媒体相关信息 * url获取媒体相关信息
*/ */
class MediaInfo{ class MediaInfo {
public: public:
~MediaInfo() {} ~MediaInfo() = default;
MediaInfo() {} MediaInfo() = default;
MediaInfo(const std::string &url) { parse(url); } MediaInfo(const std::string &url) { parse(url); }
void parse(const std::string &url); void parse(const std::string &url);
std::string shortUrl() const { std::string shortUrl() const { return _vhost + "/" + _app + "/" + _streamid; }
return _vhost + "/" + _app + "/" + _streamid; std::string getUrl() const { return _schema + "://" + shortUrl(); }
}
std::string getUrl() const {
return _schema + "://" + shortUrl();
}
public: public:
uint16_t _port = 0;
std::string _full_url; std::string _full_url;
std::string _schema; std::string _schema;
std::string _host; std::string _host;
uint16_t _port = 0;
std::string _vhost; std::string _vhost;
std::string _app; std::string _app;
std::string _streamid; std::string _streamid;
@ -200,7 +197,7 @@ public:
static MediaSource& NullMediaSource(); static MediaSource& NullMediaSource();
using Ptr = std::shared_ptr<MediaSource>; using Ptr = std::shared_ptr<MediaSource>;
MediaSource(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &stream_id) ; MediaSource(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &stream_id);
virtual ~MediaSource(); virtual ~MediaSource();
////////////////获取MediaSource相关信息//////////////// ////////////////获取MediaSource相关信息////////////////
@ -214,12 +211,9 @@ public:
// 流id // 流id
const std::string& getId() const; const std::string& getId() const;
std::string shortUrl() const { std::string shortUrl() const { return _vhost + "/" + _app + "/" + _stream_id; }
return _vhost + "/" + _app + "/" + _stream_id;
} std::string getUrl() const { return _schema + "://" + shortUrl(); }
std::string getUrl() const {
return _schema + "://" + shortUrl();
}
//获取对象所有权 //获取对象所有权
std::shared_ptr<void> getOwnership(); std::shared_ptr<void> getOwnership();
@ -235,7 +229,7 @@ public:
// 获取数据速率单位bytes/s // 获取数据速率单位bytes/s
int getBytesSpeed(TrackType type = TrackInvalid); int getBytesSpeed(TrackType type = TrackInvalid);
// 获取流创建GMT unix时间戳单位秒 // 获取流创建GMT unix时间戳单位秒
uint64_t getCreateStamp() const {return _create_stamp;} uint64_t getCreateStamp() const { return _create_stamp; }
// 获取流上线时间,单位秒 // 获取流上线时间,单位秒
uint64_t getAliveSecond() const; uint64_t getAliveSecond() const;
@ -266,9 +260,9 @@ public:
// 拖动进度条 // 拖动进度条
bool seekTo(uint32_t stamp); bool seekTo(uint32_t stamp);
//暂停 // 暂停
bool pause(bool pause); bool pause(bool pause);
//倍数播放 // 倍数播放
bool speed(float speed); bool speed(float speed);
// 关闭该流 // 关闭该流
bool close(bool force); bool close(bool force);
@ -310,9 +304,9 @@ protected:
void regist(); void regist();
private: private:
//媒体注销 // 媒体注销
bool unregist(); bool unregist();
//触发媒体事件 // 触发媒体事件
void emitEvent(bool regist); void emitEvent(bool regist);
protected: protected:
@ -327,12 +321,11 @@ private:
std::string _app; std::string _app;
std::string _stream_id; std::string _stream_id;
std::weak_ptr<MediaSourceEvent> _listener; std::weak_ptr<MediaSourceEvent> _listener;
toolkit::EventPoller::Ptr _default_poller; // 对象个数统计
//对象个数统计
toolkit::ObjectStatistic<MediaSource> _statistic; toolkit::ObjectStatistic<MediaSource> _statistic;
}; };
///缓存刷新策略类 /// 缓存刷新策略类
class FlushPolicy { class FlushPolicy {
public: public:
FlushPolicy() = default; FlushPolicy() = default;
@ -342,7 +335,7 @@ public:
private: private:
// 音视频的最后时间戳 // 音视频的最后时间戳
uint64_t _last_stamp[2] = {0, 0}; uint64_t _last_stamp[2] = { 0, 0 };
}; };
/// 合并写缓存模板 /// 合并写缓存模板
@ -352,9 +345,7 @@ private:
template<typename packet, typename policy = FlushPolicy, typename packet_list = toolkit::List<std::shared_ptr<packet> > > template<typename packet, typename policy = FlushPolicy, typename packet_list = toolkit::List<std::shared_ptr<packet> > >
class PacketCache { class PacketCache {
public: public:
PacketCache(){ PacketCache() { _cache = std::make_shared<packet_list>(); }
_cache = std::make_shared<packet_list>();
}
virtual ~PacketCache() = default; virtual ~PacketCache() = default;
@ -392,15 +383,15 @@ public:
private: private:
bool flushImmediatelyWhenCloseMerge() { bool flushImmediatelyWhenCloseMerge() {
//一般的协议关闭合并写时立即刷新缓存这样可以减少一帧的延时但是rtp例外 // 一般的协议关闭合并写时立即刷新缓存这样可以减少一帧的延时但是rtp例外
//因为rtp的包很小一个RtpPacket包中也不是完整的一帧图像所以在关闭合并写时 // 因为rtp的包很小一个RtpPacket包中也不是完整的一帧图像所以在关闭合并写时
//还是有必要缓冲一帧的rtp(也就是时间戳相同的rtp)再输出,这样虽然会增加一帧的延时 // 还是有必要缓冲一帧的rtp(也就是时间戳相同的rtp)再输出,这样虽然会增加一帧的延时
//但是却对性能提升很大,这样做还是比较划算的 // 但是却对性能提升很大,这样做还是比较划算的
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
GET_CONFIG(int, rtspLowLatency, Rtsp::kLowLatency); GET_CONFIG(int, rtspLowLatency, Rtsp::kLowLatency);
if(std::is_same<packet, RtpPacket>::value && rtspLowLatency){ if (std::is_same<packet, RtpPacket>::value && rtspLowLatency) {
return true; return true;
} }

View File

@ -91,6 +91,7 @@ const std::string &MultiMediaSourceMuxer::getStreamId() const {
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, const ProtocolOption &option) { MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, const ProtocolOption &option) {
_poller = EventPollerPool::Instance().getPoller(); _poller = EventPollerPool::Instance().getPoller();
_create_in_poller = _poller->isCurrentThread();
_vhost = vhost; _vhost = vhost;
_app = app; _app = app;
_stream_id = stream; _stream_id = stream;
@ -310,7 +311,12 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) {
return _poller; return _poller;
} }
try { try {
return listener->getOwnerPoller(sender); auto ret = listener->getOwnerPoller(sender);
if (ret != _poller) {
WarnL << "OwnerPoller changed:" << _get_origin_url();
_poller = ret;
}
return ret;
} catch (MediaSourceEvent::NotImplemented &) { } catch (MediaSourceEvent::NotImplemented &) {
// listener未重载getOwnerPoller // listener未重载getOwnerPoller
return _poller; return _poller;
@ -348,6 +354,7 @@ bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) {
} }
void MultiMediaSourceMuxer::onAllTrackReady() { void MultiMediaSourceMuxer::onAllTrackReady() {
CHECK(!_create_in_poller || getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread());
setMediaListener(getDelegate()); setMediaListener(getDelegate());
if (_rtmp) { if (_rtmp) {

View File

@ -217,6 +217,7 @@ protected:
private: private:
bool _is_enable = false; bool _is_enable = false;
bool _create_in_poller = false;
std::string _vhost; std::string _vhost;
std::string _app; std::string _app;
std::string _stream_id; std::string _stream_id;

View File

@ -592,6 +592,10 @@ std::shared_ptr<SockInfo> RtmpSession::getOriginSock(MediaSource &sender) const
return const_cast<RtmpSession *>(this)->shared_from_this(); return const_cast<RtmpSession *>(this)->shared_from_this();
} }
toolkit::EventPoller::Ptr RtmpSession::getOwnerPoller(MediaSource &sender) {
return getPoller();
}
void RtmpSession::setSocketFlags(){ void RtmpSession::setSocketFlags(){
GET_CONFIG(int, merge_write_ms, General::kMergeWriteMS); GET_CONFIG(int, merge_write_ms, General::kMergeWriteMS);
if (merge_write_ms > 0) { if (merge_write_ms > 0) {

View File

@ -80,6 +80,8 @@ private:
std::string getOriginUrl(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override;
// 获取媒体源客户端相关信息 // 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override; std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
// 由于支持断连续推存在OwnerPoller变更的可能
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
void setSocketFlags(); void setSocketFlags();
std::string getStreamId(const std::string &str); std::string getStreamId(const std::string &str);

View File

@ -66,11 +66,14 @@ RtpProcess::~RtpProcess() {
} }
bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) { bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) {
if (!_sock) { if (_sock != sock) {
//第一次运行本函数 // 第一次运行本函数
bool first = !_sock;
_sock = sock; _sock = sock;
_addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr)));
emitOnPublish(); if (first) {
emitOnPublish();
}
} }
_total_bytes += len; _total_bytes += len;
@ -228,7 +231,7 @@ void RtpProcess::emitOnPublish() {
if (!strong_self) { if (!strong_self) {
return; return;
} }
auto poller = strong_self->_sock ? strong_self->_sock->getPoller() : EventPollerPool::Instance().getPoller(); auto poller = strong_self->getOwnerPoller(MediaSource::NullMediaSource());
poller->async([weak_self, err, option]() { poller->async([weak_self, err, option]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
@ -269,7 +272,10 @@ std::shared_ptr<SockInfo> RtpProcess::getOriginSock(MediaSource &sender) const {
} }
toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) { toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) {
return _sock ? _sock->getPoller() : EventPollerPool::Instance().getPoller(); if (_sock) {
return _sock->getPoller();
}
throw std::runtime_error("RtpProcess::getOwnerPoller failed:" + _media_info._streamid);
} }
float RtpProcess::getLossRate(MediaSource &sender, TrackType type) { float RtpProcess::getLossRate(MediaSource &sender, TrackType type) {

View File

@ -1159,6 +1159,10 @@ std::shared_ptr<SockInfo> RtspSession::getOriginSock(MediaSource &sender) const
return const_cast<RtspSession *>(this)->shared_from_this(); return const_cast<RtspSession *>(this)->shared_from_this();
} }
toolkit::EventPoller::Ptr RtspSession::getOwnerPoller(MediaSource &sender) {
return getPoller();
}
void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){ void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){
updateRtcpContext(rtp); updateRtcpContext(rtp);
} }

View File

@ -91,6 +91,8 @@ protected:
std::string getOriginUrl(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override;
// 获取媒体源客户端相关信息 // 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override; std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
// 由于支持断连续推存在OwnerPoller变更的可能
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
/////TcpSession override//// /////TcpSession override////
ssize_t send(toolkit::Buffer::Ptr pkt) override; ssize_t send(toolkit::Buffer::Ptr pkt) override;

View File

@ -74,6 +74,10 @@ std::shared_ptr<SockInfo> WebRtcPusher::getOriginSock(MediaSource &sender) const
return static_pointer_cast<SockInfo>(getSession()); return static_pointer_cast<SockInfo>(getSession());
} }
toolkit::EventPoller::Ptr WebRtcPusher::getOwnerPoller(MediaSource &sender) {
return getPoller();
}
void WebRtcPusher::onRecvRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) { void WebRtcPusher::onRecvRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) {
if (!_simulcast) { if (!_simulcast) {
assert(_push_src); assert(_push_src);

View File

@ -44,6 +44,8 @@ protected:
std::string getOriginUrl(MediaSource &sender) const override; std::string getOriginUrl(MediaSource &sender) const override;
// 获取媒体源客户端相关信息 // 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override; std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
// 由于支持断连续推存在OwnerPoller变更的可能
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
// 获取丢包率 // 获取丢包率
float getLossRate(MediaSource &sender,TrackType type) override; float getLossRate(MediaSource &sender,TrackType type) override;