Older/ToolKit/Thread/TaskExecutor.h
amass 9de3af15eb
All checks were successful
Deploy / PullDocker (push) Successful in 12s
Deploy / Build (push) Successful in 1m51s
add ZLMediaKit code for learning.
2024-09-28 23:55:00 +08:00

254 lines
5.9 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLTOOLKIT_TASKEXECUTOR_H
#define ZLTOOLKIT_TASKEXECUTOR_H
#include <mutex>
#include <memory>
#include <functional>
#include "Util/List.h"
#include "Util/util.h"
namespace toolkit {
/**
* cpu负载计算器
*/
class ThreadLoadCounter {
public:
/**
* 构造函数
* @param max_size 统计样本数量
* @param max_usec 统计时间窗口,亦即最近{max_usec}的cpu负载率
*/
ThreadLoadCounter(uint64_t max_size, uint64_t max_usec);
~ThreadLoadCounter() = default;
/**
* 线程进入休眠
*/
void startSleep();
/**
* 休眠唤醒,结束休眠
*/
void sleepWakeUp();
/**
* 返回当前线程cpu使用率范围为 0 ~ 100
* @return 当前线程cpu使用率
*/
int load();
private:
struct TimeRecord {
TimeRecord(uint64_t tm, bool slp) {
_time = tm;
_sleep = slp;
}
bool _sleep;
uint64_t _time;
};
private:
bool _sleeping = true;
uint64_t _last_sleep_time;
uint64_t _last_wake_time;
uint64_t _max_size;
uint64_t _max_usec;
std::mutex _mtx;
List<TimeRecord> _time_list;
};
class TaskCancelable : public noncopyable {
public:
TaskCancelable() = default;
virtual ~TaskCancelable() = default;
virtual void cancel() = 0;
};
template<class R, class... ArgTypes>
class TaskCancelableImp;
template<class R, class... ArgTypes>
class TaskCancelableImp<R(ArgTypes...)> : public TaskCancelable {
public:
using Ptr = std::shared_ptr<TaskCancelableImp>;
using func_type = std::function<R(ArgTypes...)>;
~TaskCancelableImp() = default;
template<typename FUNC>
TaskCancelableImp(FUNC &&task) {
_strongTask = std::make_shared<func_type>(std::forward<FUNC>(task));
_weakTask = _strongTask;
}
void cancel() override {
_strongTask = nullptr;
}
operator bool() {
return _strongTask && *_strongTask;
}
void operator=(std::nullptr_t) {
_strongTask = nullptr;
}
R operator()(ArgTypes ...args) const {
auto strongTask = _weakTask.lock();
if (strongTask && *strongTask) {
return (*strongTask)(std::forward<ArgTypes>(args)...);
}
return defaultValue<R>();
}
template<typename T>
static typename std::enable_if<std::is_void<T>::value, void>::type
defaultValue() {}
template<typename T>
static typename std::enable_if<std::is_pointer<T>::value, T>::type
defaultValue() {
return nullptr;
}
template<typename T>
static typename std::enable_if<std::is_integral<T>::value, T>::type
defaultValue() {
return 0;
}
protected:
std::weak_ptr<func_type> _weakTask;
std::shared_ptr<func_type> _strongTask;
};
using TaskIn = std::function<void()>;
using Task = TaskCancelableImp<void()>;
class TaskExecutorInterface {
public:
TaskExecutorInterface() = default;
virtual ~TaskExecutorInterface() = default;
/**
* 异步执行任务
* @param task 任务
* @param may_sync 是否允许同步执行该任务
* @return 任务是否添加成功
*/
virtual Task::Ptr async(TaskIn task, bool may_sync = true) = 0;
/**
* 最高优先级方式异步执行任务
* @param task 任务
* @param may_sync 是否允许同步执行该任务
* @return 任务是否添加成功
*/
virtual Task::Ptr async_first(TaskIn task, bool may_sync = true);
/**
* 同步执行任务
* @param task
* @return
*/
void sync(const TaskIn &task);
/**
* 最高优先级方式同步执行任务
* @param task
* @return
*/
void sync_first(const TaskIn &task);
};
/**
* 任务执行器
*/
class TaskExecutor : public ThreadLoadCounter, public TaskExecutorInterface {
public:
using Ptr = std::shared_ptr<TaskExecutor>;
/**
* 构造函数
* @param max_size cpu负载统计样本数
* @param max_usec cpu负载统计时间窗口大小
*/
TaskExecutor(uint64_t max_size = 32, uint64_t max_usec = 2 * 1000 * 1000);
~TaskExecutor() = default;
};
class TaskExecutorGetter {
public:
using Ptr = std::shared_ptr<TaskExecutorGetter>;
virtual ~TaskExecutorGetter() = default;
/**
* 获取任务执行器
* @return 任务执行器
*/
virtual TaskExecutor::Ptr getExecutor() = 0;
/**
* 获取执行器个数
*/
virtual size_t getExecutorSize() const = 0;
};
class TaskExecutorGetterImp : public TaskExecutorGetter {
public:
TaskExecutorGetterImp() = default;
~TaskExecutorGetterImp() = default;
/**
* 根据线程负载情况,获取最空闲的任务执行器
* @return 任务执行器
*/
TaskExecutor::Ptr getExecutor() override;
/**
* 获取所有线程的负载率
* @return 所有线程的负载率
*/
std::vector<int> getExecutorLoad();
/**
* 获取所有线程任务执行延时,单位毫秒
* 通过此函数也可以大概知道线程负载情况
* @return
*/
void getExecutorDelay(const std::function<void(const std::vector<int> &)> &callback);
/**
* 遍历所有线程
*/
void for_each(const std::function<void(const TaskExecutor::Ptr &)> &cb);
/**
* 获取线程数
*/
size_t getExecutorSize() const override;
protected:
size_t addPoller(const std::string &name, size_t size, int priority, bool register_thread, bool enable_cpu_affinity = true);
protected:
size_t _thread_pos = 0;
std::vector<TaskExecutor::Ptr> _threads;
};
}//toolkit
#endif //ZLTOOLKIT_TASKEXECUTOR_H