Older/ToolKit/Poller/EventPoller.cpp
amass 9de3af15eb
All checks were successful
Deploy / PullDocker (push) Successful in 12s
Deploy / Build (push) Successful in 1m51s
add ZLMediaKit code for learning.
2024-09-28 23:55:00 +08:00

607 lines
18 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "SelectWrap.h"
#include "EventPoller.h"
#include "Util/util.h"
#include "Util/uv_errno.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
#include "Network/sockutil.h"
#if defined(HAS_EPOLL)
#include <sys/epoll.h>
#if !defined(EPOLLEXCLUSIVE)
#define EPOLLEXCLUSIVE 0
#endif
#define EPOLL_SIZE 1024
//防止epoll惊群
#ifndef EPOLLEXCLUSIVE
#define EPOLLEXCLUSIVE 0
#endif
#define toEpoll(event) (((event) & Event_Read) ? EPOLLIN : 0) \
| (((event) & Event_Write) ? EPOLLOUT : 0) \
| (((event) & Event_Error) ? (EPOLLHUP | EPOLLERR) : 0) \
| (((event) & Event_LT) ? 0 : EPOLLET)
#define toPoller(epoll_event) (((epoll_event) & (EPOLLIN | EPOLLRDNORM | EPOLLHUP)) ? Event_Read : 0) \
| (((epoll_event) & (EPOLLOUT | EPOLLWRNORM)) ? Event_Write : 0) \
| (((epoll_event) & EPOLLHUP) ? Event_Error : 0) \
| (((epoll_event) & EPOLLERR) ? Event_Error : 0)
#define create_event() epoll_create(EPOLL_SIZE)
#endif //HAS_EPOLL
#if defined(HAS_KQUEUE)
#include <sys/event.h>
#define KEVENT_SIZE 1024
#define create_event() kqueue()
#endif // HAS_KQUEUE
using namespace std;
namespace toolkit {
EventPoller &EventPoller::Instance() {
return *(EventPollerPool::Instance().getFirstPoller());
}
void EventPoller::addEventPipe() {
SockUtil::setNoBlocked(_pipe.readFD());
SockUtil::setNoBlocked(_pipe.writeFD());
// 添加内部管道事件
if (addEvent(_pipe.readFD(), EventPoller::Event_Read, [this](int event) { onPipeEvent(); }) == -1) {
throw std::runtime_error("Add pipe fd to poller failed");
}
}
EventPoller::EventPoller(std::string name) {
#if defined(HAS_EPOLL) || defined(HAS_KQUEUE)
_event_fd = create_event();
if (_event_fd == -1) {
throw runtime_error(StrPrinter << "Create event fd failed: " << get_uv_errmsg());
}
SockUtil::setCloExec(_event_fd);
#endif //HAS_EPOLL
_name = std::move(name);
_logger = Logger::Instance().shared_from_this();
addEventPipe();
}
void EventPoller::shutdown() {
async_l([]() {
throw ExitException();
}, false, true);
if (_loop_thread) {
//防止作为子进程时崩溃
try { _loop_thread->join(); } catch (...) { _loop_thread->detach(); }
delete _loop_thread;
_loop_thread = nullptr;
}
}
EventPoller::~EventPoller() {
shutdown();
#if defined(HAS_EPOLL) || defined(HAS_KQUEUE)
if (_event_fd != -1) {
close(_event_fd);
_event_fd = -1;
}
#endif
//退出前清理管道中的数据
onPipeEvent(true);
InfoL << getThreadName();
}
int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
TimeTicker();
if (!cb) {
WarnL << "PollEventCB is empty";
return -1;
}
if (isCurrentThread()) {
#if defined(HAS_EPOLL)
struct epoll_event ev = {0};
ev.events = toEpoll(event) ;
ev.data.fd = fd;
int ret = epoll_ctl(_event_fd, EPOLL_CTL_ADD, fd, &ev);
if (ret != -1) {
_event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
}
return ret;
#elif defined(HAS_KQUEUE)
struct kevent kev[2];
int index = 0;
if (event & Event_Read) {
EV_SET(&kev[index++], fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, nullptr);
}
if (event & Event_Write) {
EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
}
int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);
if (ret != -1) {
_event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
}
return ret;
#else
#ifndef _WIN32
// win32平台socket套接字不等于文件描述符所以可能不适用这个限制
if (fd >= FD_SETSIZE) {
WarnL << "select() can not watch fd bigger than " << FD_SETSIZE;
return -1;
}
#endif
auto record = std::make_shared<Poll_Record>();
record->fd = fd;
record->event = event;
record->call_back = std::move(cb);
_event_map.emplace(fd, record);
return 0;
#endif
}
async([this, fd, event, cb]() mutable {
addEvent(fd, event, std::move(cb));
});
return 0;
}
int EventPoller::delEvent(int fd, PollCompleteCB cb) {
TimeTicker();
if (!cb) {
cb = [](bool success) {};
}
if (isCurrentThread()) {
#if defined(HAS_EPOLL)
int ret = -1;
if (_event_map.erase(fd)) {
_event_cache_expired.emplace(fd);
ret = epoll_ctl(_event_fd, EPOLL_CTL_DEL, fd, nullptr);
}
cb(ret != -1);
return ret;
#elif defined(HAS_KQUEUE)
int ret = -1;
if (_event_map.erase(fd)) {
_event_cache_expired.emplace(fd);
struct kevent kev[2];
int index = 0;
EV_SET(&kev[index++], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);
}
cb(ret != -1);
return ret;
#else
int ret = -1;
if (_event_map.erase(fd)) {
_event_cache_expired.emplace(fd);
ret = 0;
}
cb(ret != -1);
return ret;
#endif //HAS_EPOLL
}
//跨线程操作
async([this, fd, cb]() mutable {
delEvent(fd, std::move(cb));
});
return 0;
}
int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) {
TimeTicker();
if (!cb) {
cb = [](bool success) {};
}
if (isCurrentThread()) {
#if defined(HAS_EPOLL)
struct epoll_event ev = { 0 };
ev.events = toEpoll(event);
ev.data.fd = fd;
auto ret = epoll_ctl(_event_fd, EPOLL_CTL_MOD, fd, &ev);
cb(ret != -1);
return ret;
#elif defined(HAS_KQUEUE)
struct kevent kev[2];
int index = 0;
EV_SET(&kev[index++], fd, EVFILT_READ, event & Event_Read ? EV_ADD | EV_CLEAR : EV_DELETE, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_WRITE, event & Event_Write ? EV_ADD | EV_CLEAR : EV_DELETE, 0, 0, nullptr);
int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);
cb(ret != -1);
return ret;
#else
auto it = _event_map.find(fd);
if (it != _event_map.end()) {
it->second->event = event;
}
cb(it != _event_map.end());
return it != _event_map.end() ? 0 : -1;
#endif // HAS_EPOLL
}
async([this, fd, event, cb]() mutable {
modifyEvent(fd, event, std::move(cb));
});
return 0;
}
Task::Ptr EventPoller::async(TaskIn task, bool may_sync) {
return async_l(std::move(task), may_sync, false);
}
Task::Ptr EventPoller::async_first(TaskIn task, bool may_sync) {
return async_l(std::move(task), may_sync, true);
}
Task::Ptr EventPoller::async_l(TaskIn task, bool may_sync, bool first) {
TimeTicker();
if (may_sync && isCurrentThread()) {
task();
return nullptr;
}
auto ret = std::make_shared<Task>(std::move(task));
{
lock_guard<mutex> lck(_mtx_task);
if (first) {
_list_task.emplace_front(ret);
} else {
_list_task.emplace_back(ret);
}
}
//写数据到管道,唤醒主线程
_pipe.write("", 1);
return ret;
}
bool EventPoller::isCurrentThread() {
return !_loop_thread || _loop_thread->get_id() == this_thread::get_id();
}
inline void EventPoller::onPipeEvent(bool flush) {
char buf[1024];
int err = 0;
if (!flush) {
for (;;) {
if ((err = _pipe.read(buf, sizeof(buf))) > 0) {
// 读到管道数据,继续读,直到读空为止
continue;
}
if (err == 0 || get_uv_error(true) != UV_EAGAIN) {
// 收到eof或非EAGAIN(无更多数据)错误,说明管道无效了,重新打开管道
ErrorL << "Invalid pipe fd of event poller, reopen it";
delEvent(_pipe.readFD());
_pipe.reOpen();
addEventPipe();
}
break;
}
}
decltype(_list_task) _list_swap;
{
lock_guard<mutex> lck(_mtx_task);
_list_swap.swap(_list_task);
}
_list_swap.for_each([&](const Task::Ptr &task) {
try {
(*task)();
} catch (ExitException &) {
_exit_flag = true;
} catch (std::exception &ex) {
ErrorL << "Exception occurred when do async task: " << ex.what();
}
});
}
SocketRecvBuffer::Ptr EventPoller::getSharedBuffer(bool is_udp) {
#if !defined(__linux) && !defined(__linux__)
// 非Linux平台下tcp和udp共享recvfrom方案使用同一个buffer
is_udp = 0;
#endif
auto ret = _shared_buffer[is_udp].lock();
if (!ret) {
ret = SocketRecvBuffer::create(is_udp);
_shared_buffer[is_udp] = ret;
}
return ret;
}
thread::id EventPoller::getThreadId() const {
return _loop_thread ? _loop_thread->get_id() : thread::id();
}
const std::string& EventPoller::getThreadName() const {
return _name;
}
static thread_local std::weak_ptr<EventPoller> s_current_poller;
// static
EventPoller::Ptr EventPoller::getCurrentPoller() {
return s_current_poller.lock();
}
void EventPoller::runLoop(bool blocked, bool ref_self) {
if (blocked) {
if (ref_self) {
s_current_poller = shared_from_this();
}
_sem_run_started.post();
_exit_flag = false;
uint64_t minDelay;
#if defined(HAS_EPOLL)
struct epoll_event events[EPOLL_SIZE];
while (!_exit_flag) {
minDelay = getMinDelay();
startSleep();//用于统计当前线程负载情况
int ret = epoll_wait(_event_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);
sleepWakeUp();//用于统计当前线程负载情况
if (ret <= 0) {
//超时或被打断
continue;
}
_event_cache_expired.clear();
for (int i = 0; i < ret; ++i) {
struct epoll_event &ev = events[i];
int fd = ev.data.fd;
if (_event_cache_expired.count(fd)) {
//event cache refresh
continue;
}
auto it = _event_map.find(fd);
if (it == _event_map.end()) {
epoll_ctl(_event_fd, EPOLL_CTL_DEL, fd, nullptr);
continue;
}
auto cb = it->second;
try {
(*cb)(toPoller(ev.events));
} catch (std::exception &ex) {
ErrorL << "Exception occurred when do event task: " << ex.what();
}
}
}
#elif defined(HAS_KQUEUE)
struct kevent kevents[KEVENT_SIZE];
while (!_exit_flag) {
minDelay = getMinDelay();
struct timespec timeout = { (long)minDelay / 1000, (long)minDelay % 1000 * 1000000 };
startSleep();
int ret = kevent(_event_fd, nullptr, 0, kevents, KEVENT_SIZE, minDelay ? &timeout : nullptr);
sleepWakeUp();
if (ret <= 0) {
continue;
}
_event_cache_expired.clear();
for (int i = 0; i < ret; ++i) {
auto &kev = kevents[i];
auto fd = kev.ident;
if (_event_cache_expired.count(fd)) {
// event cache refresh
continue;
}
auto it = _event_map.find(fd);
if (it == _event_map.end()) {
EV_SET(&kev, fd, kev.filter, EV_DELETE, 0, 0, nullptr);
kevent(_event_fd, &kev, 1, nullptr, 0, nullptr);
continue;
}
auto cb = it->second;
int event = 0;
switch (kev.filter) {
case EVFILT_READ: event = Event_Read; break;
case EVFILT_WRITE: event = Event_Write; break;
default: WarnL << "unknown kevent filter: " << kev.filter; break;
}
try {
(*cb)(event);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when do event task: " << ex.what();
}
}
}
#else
int ret, max_fd;
FdSet set_read, set_write, set_err;
List<Poll_Record::Ptr> callback_list;
struct timeval tv;
while (!_exit_flag) {
//定时器事件中可能操作_event_map
minDelay = getMinDelay();
tv.tv_sec = (decltype(tv.tv_sec)) (minDelay / 1000);
tv.tv_usec = 1000 * (minDelay % 1000);
set_read.fdZero();
set_write.fdZero();
set_err.fdZero();
max_fd = 0;
for (auto &pr : _event_map) {
if (pr.first > max_fd) {
max_fd = pr.first;
}
if (pr.second->event & Event_Read) {
set_read.fdSet(pr.first);//监听管道可读事件
}
if (pr.second->event & Event_Write) {
set_write.fdSet(pr.first);//监听管道可写事件
}
if (pr.second->event & Event_Error) {
set_err.fdSet(pr.first);//监听管道错误事件
}
}
startSleep();//用于统计当前线程负载情况
ret = zl_select(max_fd + 1, &set_read, &set_write, &set_err, minDelay ? &tv : nullptr);
sleepWakeUp();//用于统计当前线程负载情况
if (ret <= 0) {
//超时或被打断
continue;
}
_event_cache_expired.clear();
//收集select事件类型
for (auto &pr : _event_map) {
int event = 0;
if (set_read.isSet(pr.first)) {
event |= Event_Read;
}
if (set_write.isSet(pr.first)) {
event |= Event_Write;
}
if (set_err.isSet(pr.first)) {
event |= Event_Error;
}
if (event != 0) {
pr.second->attach = event;
callback_list.emplace_back(pr.second);
}
}
callback_list.for_each([&](Poll_Record::Ptr &record) {
if (_event_cache_expired.count(record->fd)) {
//event cache refresh
return;
}
try {
record->call_back(record->attach);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when do event task: " << ex.what();
}
});
callback_list.clear();
}
#endif //HAS_EPOLL
} else {
_loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self);
_sem_run_started.wait();
}
}
uint64_t EventPoller::flushDelayTask(uint64_t now_time) {
decltype(_delay_task_map) task_copy;
task_copy.swap(_delay_task_map);
for (auto it = task_copy.begin(); it != task_copy.end() && it->first <= now_time; it = task_copy.erase(it)) {
//已到期的任务
try {
auto next_delay = (*(it->second))();
if (next_delay) {
//可重复任务,更新时间截止线
_delay_task_map.emplace(next_delay + now_time, std::move(it->second));
}
} catch (std::exception &ex) {
ErrorL << "Exception occurred when do delay task: " << ex.what();
}
}
task_copy.insert(_delay_task_map.begin(), _delay_task_map.end());
task_copy.swap(_delay_task_map);
auto it = _delay_task_map.begin();
if (it == _delay_task_map.end()) {
//没有剩余的定时器了
return 0;
}
//最近一个定时器的执行延时
return it->first - now_time;
}
uint64_t EventPoller::getMinDelay() {
auto it = _delay_task_map.begin();
if (it == _delay_task_map.end()) {
//没有剩余的定时器了
return 0;
}
auto now = getCurrentMillisecond();
if (it->first > now) {
//所有任务尚未到期
return it->first - now;
}
//执行已到期的任务并刷新休眠延时
return flushDelayTask(now);
}
EventPoller::DelayTask::Ptr EventPoller::doDelayTask(uint64_t delay_ms, function<uint64_t()> task) {
DelayTask::Ptr ret = std::make_shared<DelayTask>(std::move(task));
auto time_line = getCurrentMillisecond() + delay_ms;
async_first([time_line, ret, this]() {
//异步执行的目的是刷新select或epoll的休眠时间
_delay_task_map.emplace(time_line, ret);
});
return ret;
}
///////////////////////////////////////////////
static size_t s_pool_size = 0;
static bool s_enable_cpu_affinity = true;
INSTANCE_IMP(EventPollerPool)
EventPoller::Ptr EventPollerPool::getFirstPoller() {
return static_pointer_cast<EventPoller>(_threads.front());
}
EventPoller::Ptr EventPollerPool::getPoller(bool prefer_current_thread) {
auto poller = EventPoller::getCurrentPoller();
if (prefer_current_thread && _prefer_current_thread && poller) {
return poller;
}
return static_pointer_cast<EventPoller>(getExecutor());
}
void EventPollerPool::preferCurrentThread(bool flag) {
_prefer_current_thread = flag;
}
const std::string EventPollerPool::kOnStarted = "kBroadcastEventPollerPoolStarted";
EventPollerPool::EventPollerPool() {
auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true, s_enable_cpu_affinity);
NOTICE_EMIT(EventPollerPoolOnStartedArgs, kOnStarted, *this, size);
InfoL << "EventPoller created size: " << size;
}
void EventPollerPool::setPoolSize(size_t size) {
s_pool_size = size;
}
void EventPollerPool::enableCpuAffinity(bool enable) {
s_enable_cpu_affinity = enable;
}
} // namespace toolkit