FTXUI/include/ftxui/component/receiver.hpp

146 lines
3.6 KiB
C++
Raw Permalink Normal View History

2023-08-19 19:56:36 +08:00
// Copyright 2020 Arthur Sonzogni. All rights reserved.
// Use of this source code is governed by the MIT license that can be found in
// the LICENSE file.
#ifndef FTXUI_COMPONENT_RECEIVER_HPP_
#define FTXUI_COMPONENT_RECEIVER_HPP_
2023-05-02 19:32:37 +08:00
#include <algorithm> // for copy, max
2021-05-15 04:00:49 +08:00
#include <atomic> // for atomic, __atomic_base
2021-05-02 02:40:35 +08:00
#include <condition_variable> // for condition_variable
#include <memory> // for unique_ptr, make_unique
#include <mutex> // for mutex, unique_lock
#include <queue> // for queue
#include <utility> // for move
namespace ftxui {
// Usage:
//
// Initialization:
// ---------------
//
// auto receiver = MakeReceiver<std:string>();
2020-05-25 08:36:32 +08:00
// auto sender_1= receiver->MakeSender();
// auto sender_2 = receiver->MakeSender();
//
// Then move the senders elsewhere, potentially in a different thread.
//
// On the producer side:
// ----------------------
// [thread 1] sender_1->Send("hello");
// [thread 2] sender_2->Send("world");
//
// On the consumer side:
// ---------------------
// char c;
// while(receiver->Receive(&c)) // Return true as long as there is a producer.
// print(c)
//
// Receiver::Receive() returns true when there are no more senders.
// clang-format off
template<class T> class SenderImpl;
template<class T> class ReceiverImpl;
2021-05-02 02:40:35 +08:00
template<class T> using Sender = std::unique_ptr<SenderImpl<T>>;
template<class T> using Receiver = std::unique_ptr<ReceiverImpl<T>>;
template<class T> Receiver<T> MakeReceiver();
// clang-format on
// ---- Implementation part ----
template <class T>
class SenderImpl {
public:
SenderImpl(const SenderImpl&) = delete;
SenderImpl(SenderImpl&&) = delete;
SenderImpl& operator=(const SenderImpl&) = delete;
SenderImpl& operator=(SenderImpl&&) = delete;
void Send(T t) { receiver_->Receive(std::move(t)); }
2020-03-27 08:42:46 +08:00
~SenderImpl() { receiver_->ReleaseSender(); }
Sender<T> Clone() { return receiver_->MakeSender(); }
private:
friend class ReceiverImpl<T>;
explicit SenderImpl(ReceiverImpl<T>* consumer) : receiver_(consumer) {}
ReceiverImpl<T>* receiver_;
};
template <class T>
class ReceiverImpl {
public:
Sender<T> MakeSender() {
std::unique_lock<std::mutex> lock(mutex_);
senders_++;
return std::unique_ptr<SenderImpl<T>>(new SenderImpl<T>(this));
}
ReceiverImpl() = default;
bool Receive(T* t) {
while (senders_ || !queue_.empty()) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()) {
notifier_.wait(lock);
}
if (queue_.empty()) {
continue;
}
*t = std::move(queue_.front());
queue_.pop();
return true;
}
return false;
}
bool ReceiveNonBlocking(T* t) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
*t = queue_.front();
queue_.pop();
return true;
}
2020-05-25 08:36:32 +08:00
bool HasPending() {
std::unique_lock<std::mutex> lock(mutex_);
return !queue_.empty();
}
bool HasQuitted() {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty() && !senders_;
}
private:
friend class SenderImpl<T>;
void Receive(T t) {
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(std::move(t));
}
notifier_.notify_one();
}
void ReleaseSender() {
senders_--;
notifier_.notify_one();
}
std::mutex mutex_;
std::queue<T> queue_;
std::condition_variable notifier_;
std::atomic<int> senders_{0};
};
template <class T>
Receiver<T> MakeReceiver() {
return std::make_unique<ReceiverImpl<T>>();
}
} // namespace ftxui
#endif // FTXUI_COMPONENT_RECEIVER_HPP_