Older/ToolKit/Util/RingBuffer.h
amass 9de3af15eb
All checks were successful
Deploy / PullDocker (push) Successful in 12s
Deploy / Build (push) Successful in 1m51s
add ZLMediaKit code for learning.
2024-09-28 23:55:00 +08:00

472 lines
14 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef UTIL_RINGBUFFER_H_
#define UTIL_RINGBUFFER_H_
#include <assert.h>
#include <atomic>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <condition_variable>
#include <functional>
#include "util.h"
#include "List.h"
#include "Poller/EventPoller.h"
// GOP缓存最大长度下限值
#define RING_MIN_SIZE 32
#define LOCK_GUARD(mtx) std::lock_guard<decltype(mtx)> lck(mtx)
namespace toolkit {
template <typename T>
class RingDelegate {
public:
using Ptr = std::shared_ptr<RingDelegate>;
RingDelegate() = default;
virtual ~RingDelegate() = default;
virtual void onWrite(T in, bool is_key = true) = 0;
};
template <typename T>
class _RingStorage;
template <typename T>
class _RingReaderDispatcher;
/**
* 环形缓存读取器
* 该对象的事件触发都会在绑定的poller线程中执行
* 所以把锁去掉了
* 对该对象的一切操作都应该在poller线程中执行
*/
template <typename T>
class _RingReader {
public:
using Ptr = std::shared_ptr<_RingReader>;
friend class _RingReaderDispatcher<T>;
_RingReader(std::shared_ptr<_RingStorage<T>> storage) {
_storage = std::move(storage);
setReadCB(nullptr);
setDetachCB(nullptr);
setGetInfoCB(nullptr);
setMessageCB(nullptr);
}
~_RingReader() = default;
void setReadCB(std::function<void(const T &)> cb) {
if (!cb) {
_read_cb = [](const T &) {};
} else {
_read_cb = std::move(cb);
flushGop();
}
}
void setDetachCB(std::function<void()> cb) {
_detach_cb = cb ? std::move(cb) : []() {};
}
void setGetInfoCB(std::function<Any()> cb) {
_info_cb = cb ? std::move(cb) : []() { return Any(); };
}
void setMessageCB(std::function<void(const Any &data)> cb) {
_msg_cb = cb ? std::move(cb) : [](const Any &data) {};
}
private:
void onRead(const T &data, bool /*is_key*/) { _read_cb(data); }
void onMessage(const Any &data) { _msg_cb(data); }
void onDetach() const { _detach_cb(); }
Any getInfo() { return _info_cb(); }
void flushGop() {
if (!_storage) {
return;
}
_storage->getCache().for_each([this](const List<std::pair<bool, T>> &lst) {
lst.for_each([this](const std::pair<bool, T> &pr) { onRead(pr.second, pr.first); });
});
}
private:
std::shared_ptr<_RingStorage<T>> _storage;
std::function<void(void)> _detach_cb;
std::function<void(const T &)> _read_cb;
std::function<Any()> _info_cb;
std::function<void(const Any &data)> _msg_cb;
};
template <typename T>
class _RingStorage {
public:
using Ptr = std::shared_ptr<_RingStorage>;
using GopType = List<List<std::pair<bool, T>>>;
_RingStorage(size_t max_size, size_t max_gop_size) {
// gop缓存个数不能小于32
if (max_size < RING_MIN_SIZE) {
max_size = RING_MIN_SIZE;
}
_max_size = max_size;
_max_gop_size = max_gop_size;
clearCache();
}
~_RingStorage() = default;
/**
* 写入环形缓存数据
* @param in 数据
* @param is_key 是否为关键帧
* @return 是否触发重置环形缓存大小
*/
void write(T in, bool is_key = true) {
if (is_key) {
_have_idr = true;
_started = true;
if (!_data_cache.back().empty()) {
//当前gop列队还没收到任意缓存
_data_cache.emplace_back();
}
if (_data_cache.size() > _max_gop_size) {
// GOP个数超过限制那么移除最早的GOP
popFrontGop();
}
}
if (!_have_idr && _started) {
//缓存中没有关键帧那么gop缓存无效
return;
}
_data_cache.back().emplace_back(std::make_pair(is_key, std::move(in)));
if (++_size > _max_size) {
// GOP缓存溢出
while (_data_cache.size() > 1) {
//先尝试清除老的GOP缓存
popFrontGop();
}
if (_size > _max_size) {
//还是大于最大缓冲限制那么清空所有GOP
clearCache();
}
}
}
Ptr clone() const {
Ptr ret(new _RingStorage());
ret->_size = _size;
ret->_have_idr = _have_idr;
ret->_started = _started;
ret->_max_size = _max_size;
ret->_max_gop_size = _max_gop_size;
ret->_data_cache = _data_cache;
return ret;
}
const GopType &getCache() const { return _data_cache; }
void clearCache() {
_size = 0;
_have_idr = false;
_data_cache.clear();
_data_cache.emplace_back();
}
private:
_RingStorage() = default;
void popFrontGop() {
if (!_data_cache.empty()) {
_size -= _data_cache.front().size();
_data_cache.pop_front();
if (_data_cache.empty()) {
_data_cache.emplace_back();
}
}
}
private:
bool _started = false;
bool _have_idr;
size_t _size;
size_t _max_size;
size_t _max_gop_size;
GopType _data_cache;
};
template <typename T>
class RingBuffer;
/**
* 环形缓存事件派发器只能一个poller线程操作它
* @tparam T
*/
template <typename T>
class _RingReaderDispatcher : public std::enable_shared_from_this<_RingReaderDispatcher<T>> {
public:
using Ptr = std::shared_ptr<_RingReaderDispatcher>;
using RingReader = _RingReader<T>;
using RingStorage = _RingStorage<T>;
using onChangeInfoCB = std::function<Any(Any &&info)>;
friend class RingBuffer<T>;
~_RingReaderDispatcher() {
decltype(_reader_map) reader_map;
reader_map.swap(_reader_map);
for (auto &pr : reader_map) {
auto reader = pr.second.lock();
if (reader) {
reader->onDetach();
}
}
}
private:
_RingReaderDispatcher(
const typename RingStorage::Ptr &storage, std::function<void(int, bool)> onSizeChanged) {
_reader_size = 0;
_storage = storage;
_on_size_changed = std::move(onSizeChanged);
assert(_on_size_changed);
}
void write(T in, bool is_key = true) {
for (auto it = _reader_map.begin(); it != _reader_map.end();) {
auto reader = it->second.lock();
if (!reader) {
it = _reader_map.erase(it);
--_reader_size;
onSizeChanged(false);
continue;
}
reader->onRead(in, is_key);
++it;
}
_storage->write(std::move(in), is_key);
}
void sendMessage(const Any &data) {
for (auto it = _reader_map.begin(); it != _reader_map.end();) {
auto reader = it->second.lock();
if (!reader) {
it = _reader_map.erase(it);
--_reader_size;
onSizeChanged(false);
continue;
}
reader->onMessage(data);
++it;
}
}
std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache) {
if (!poller->isCurrentThread()) {
throw std::runtime_error("You can attach RingBuffer only in it's poller thread");
}
std::weak_ptr<_RingReaderDispatcher> weak_self = this->shared_from_this();
auto on_dealloc = [weak_self, poller](RingReader *ptr) {
poller->async([weak_self, ptr]() {
auto strong_self = weak_self.lock();
if (strong_self && strong_self->_reader_map.erase(ptr)) {
--strong_self->_reader_size;
strong_self->onSizeChanged(false);
}
delete ptr;
});
};
std::shared_ptr<RingReader> reader(new RingReader(use_cache ? _storage : nullptr), on_dealloc);
_reader_map[reader.get()] = reader;
++_reader_size;
onSizeChanged(true);
return reader;
}
void onSizeChanged(bool add_flag) { _on_size_changed(_reader_size, add_flag); }
void clearCache() {
if (_reader_size == 0) {
_storage->clearCache();
}
}
std::list<Any> getInfoList(const onChangeInfoCB &on_change) {
std::list<Any> ret;
for (auto &pr : _reader_map) {
auto reader = pr.second.lock();
if (!reader) {
continue;
}
auto info = reader->getInfo();
if (!info) {
continue;
}
ret.emplace_back(on_change(std::move(info)));
}
return ret;
}
private:
std::atomic_int _reader_size;
std::function<void(int, bool)> _on_size_changed;
typename RingStorage::Ptr _storage;
std::unordered_map<void *, std::weak_ptr<RingReader>> _reader_map;
};
template <typename T>
class RingBuffer : public std::enable_shared_from_this<RingBuffer<T>> {
public:
using Ptr = std::shared_ptr<RingBuffer>;
using RingReader = _RingReader<T>;
using RingStorage = _RingStorage<T>;
using RingReaderDispatcher = _RingReaderDispatcher<T>;
using onReaderChanged = std::function<void(int size)>;
using onGetInfoCB = std::function<void(std::list<Any> &info_list)>;
RingBuffer(size_t max_size = 1024, onReaderChanged cb = nullptr, size_t max_gop_size = 1) {
_storage = std::make_shared<RingStorage>(max_size, max_gop_size);
_on_reader_changed = cb ? std::move(cb) : [](int size) {};
//先触发无人观看
_on_reader_changed(0);
}
~RingBuffer() = default;
void write(T in, bool is_key = true) {
if (_delegate) {
_delegate->onWrite(std::move(in), is_key);
return;
}
LOCK_GUARD(_mtx_map);
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
//切换线程后触发onRead事件
pr.first->async([second, in, is_key]() mutable { second->write(std::move(in), is_key); }, false);
}
_storage->write(std::move(in), is_key);
}
void sendMessage(const Any &data) {
LOCK_GUARD(_mtx_map);
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
// 切换线程后触发sendMessage
pr.first->async([second, data]() { second->sendMessage(data); }, false);
}
}
void setDelegate(const typename RingDelegate<T>::Ptr &delegate) { _delegate = delegate; }
std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache = true) {
typename RingReaderDispatcher::Ptr dispatcher;
{
LOCK_GUARD(_mtx_map);
auto &ref = _dispatcher_map[poller];
if (!ref) {
std::weak_ptr<RingBuffer> weak_self = this->shared_from_this();
auto onSizeChanged = [weak_self, poller](int size, bool add_flag) {
if (auto strong_self = weak_self.lock()) {
strong_self->onSizeChanged(poller, size, add_flag);
}
};
auto onDealloc = [poller](RingReaderDispatcher *ptr) { poller->async([ptr]() { delete ptr; }); };
ref.reset(new RingReaderDispatcher(_storage->clone(), std::move(onSizeChanged)), std::move(onDealloc));
}
dispatcher = ref;
}
return dispatcher->attach(poller, use_cache);
}
int readerCount() { return _total_count; }
void clearCache() {
LOCK_GUARD(_mtx_map);
_storage->clearCache();
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
//切换线程后清空缓存
pr.first->async([second]() { second->clearCache(); }, false);
}
}
void getInfoList(const onGetInfoCB &cb, const typename RingReaderDispatcher::onChangeInfoCB &on_change = nullptr) {
if (!cb) {
return;
}
if (!on_change) {
const_cast<typename RingReaderDispatcher::onChangeInfoCB &>(on_change) = [](Any &&info) { return std::move(info); };
}
LOCK_GUARD(_mtx_map);
auto info_vec = std::make_shared<std::vector<std::list<Any>>>();
// 1、最少确保一个元素
info_vec->resize(_dispatcher_map.empty() ? 1 : _dispatcher_map.size());
std::shared_ptr<void> on_finished(nullptr, [cb, info_vec](void *) mutable {
// 2、防止这里为空
auto &lst = *info_vec->begin();
for (auto &item : *info_vec) {
if (&lst != &item) {
lst.insert(lst.end(), item.begin(), item.end());
}
}
cb(lst);
});
auto i = 0U;
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
pr.first->async([second, info_vec, on_finished, i, on_change]() { (*info_vec)[i] = second->getInfoList(on_change); });
++i;
}
}
private:
void onSizeChanged(const EventPoller::Ptr &poller, int size, bool add_flag) {
if (size == 0) {
LOCK_GUARD(_mtx_map);
_dispatcher_map.erase(poller);
}
if (add_flag) {
++_total_count;
} else {
--_total_count;
}
_on_reader_changed(_total_count);
}
private:
struct HashOfPtr {
std::size_t operator()(const EventPoller::Ptr &key) const { return (std::size_t)key.get(); }
};
private:
std::mutex _mtx_map;
std::atomic_int _total_count { 0 };
typename RingStorage::Ptr _storage;
typename RingDelegate<T>::Ptr _delegate;
onReaderChanged _on_reader_changed;
std::unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;
};
} /* namespace toolkit */
#endif /* UTIL_RINGBUFFER_H_ */