Switch name Producer/Consumer -> Sender/Receiver

The producer/consumer was created for:
https://github.com/ArthurSonzogni/FTXUI/pull/11

This patch makes rename everything from Producer/Consumer toward
Sender/Receiver.
This commit is contained in:
ArthurSonzogni 2020-03-25 00:07:41 +01:00
parent 09a1b16613
commit 0a7b556a12
7 changed files with 128 additions and 126 deletions

View File

@ -73,7 +73,7 @@ add_library(component
include/ftxui/component/menu.hpp include/ftxui/component/menu.hpp
include/ftxui/component/radiobox.hpp include/ftxui/component/radiobox.hpp
include/ftxui/component/screen_interactive.hpp include/ftxui/component/screen_interactive.hpp
include/ftxui/component/producer_consumer.hpp include/ftxui/component/receiver.hpp
include/ftxui/component/toggle.hpp include/ftxui/component/toggle.hpp
) )

View File

@ -2,10 +2,10 @@
#define FTXUI_COMPONENT_EVENT_HPP #define FTXUI_COMPONENT_EVENT_HPP
#include <array> #include <array>
#include <ftxui/component/receiver.hpp>
#include <functional> #include <functional>
#include <string> #include <string>
#include <vector> #include <vector>
#include <ftxui/component/producer_consumer.hpp>
namespace ftxui { namespace ftxui {
@ -21,7 +21,7 @@ struct Event {
static Event Character(const std::string&); static Event Character(const std::string&);
static Event Special(const std::string&); static Event Special(const std::string&);
static void Convert(Consumer<char>& in, Producer<Event>& out, char c); static void Convert(Receiver<char>& in, Sender<Event>& out, char c);
// --- Arrow --- // --- Arrow ---
static Event ArrowLeft; static Event ArrowLeft;

View File

@ -1,101 +0,0 @@
#ifndef FTXUI_COMPONENTS_CONSUMER_PRODUCER_H_
#define FTXUI_COMPONENTS_CONSUMER_PRODUCER_H_
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
namespace ftxui {
// Usage:
//
// Initialization:
// ---------------
//
// auto consumer = MakeConsumer<std:string>();
// auto producer_1 = consumer.MakeProducer();
// auto producer_2 = consumer.MakeProducer();
//
// Then move one producers elsewhere, potentially in a different thread.
// ----------------------
// [thread 1] producer_1->Send("hello");
// [thread 2] producer_2->Send("world");
//
// On the consumer side:
// ---------------------
// char c;
// while(consumer_->Receive(&c)) // Return true as long as there is a producer.
// print(c)
//
// Consumer::Receive returns true when the last Producer is released.
// clang-format off
template<class T> class ProducerImpl;
template<class T> class ConsumerImpl;
template<class T> using Producer = std::unique_ptr<ProducerImpl<T>>;
template<class T> using Consumer = std::unique_ptr<ConsumerImpl<T>>;
template<class T> Consumer<T> MakeConsumer();
// clang-format on
// ---- Implementation part ----
template <class T>
class ProducerImpl {
public:
void Send(T t) { consumer_->Receive(std::move(t)); }
~ProducerImpl() { consumer_->producers_--; }
private:
friend class ConsumerImpl<T>;
ProducerImpl(ConsumerImpl<T>* consumer) : consumer_(consumer) {}
ConsumerImpl<T>* consumer_;
};
template <class T>
class ConsumerImpl {
public:
Producer<T> MakeProducer() {
producers_++;
return std::unique_ptr<ProducerImpl<T>>(new ProducerImpl<T>(this));
}
bool Receive(T* t) {
while (producers_) {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty())
notifier_.wait(lock);
if (queue_.empty())
continue;
*t = std::move(queue_.front());
queue_.pop();
return true;
}
return false;
}
private:
friend class ProducerImpl<T>;
void Receive(T t) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(std::move(t));
notifier_.notify_one();
}
std::mutex mutex_;
std::queue<T> queue_;
std::condition_variable notifier_;
std::atomic<int> producers_ = 0;
};
template <class T>
Consumer<T> MakeConsumer() {
return std::make_unique<ConsumerImpl<T>>();
}
} // namespace ftxui
#endif // FTXUI_COMPONENTS_CONSUMER_PRODUCER_H_

View File

@ -0,0 +1,103 @@
#ifndef FTXUI_COMPONENT_RECEIVER_HPP_
#define FTXUI_COMPONENT_RECEIVER_HPP_
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
namespace ftxui {
// Usage:
//
// Initialization:
// ---------------
//
// auto receiver = MakeReceiver<std:string>();
// auto sender_1= receiver.MakeSender();
// auto sender_2 = receiver.MakeSender();
//
// Then move the senders elsewhere, potentially in a different thread.
//
// On the producer side:
// ----------------------
// [thread 1] sender_1->Send("hello");
// [thread 2] sender_2->Send("world");
//
// On the consumer side:
// ---------------------
// char c;
// while(receiver->Receive(&c)) // Return true as long as there is a producer.
// print(c)
//
// Receiver::Receive() returns true when there are no more senders.
// clang-format off
template<class T> class SenderImpl;
template<class T> class ReceiverImpl;
template<class T> using Sender = std::unique_ptr<SenderImpl<T>>;
template<class T> using Receiver = std::unique_ptr<ReceiverImpl<T>>;
template<class T> Receiver<T> MakeReceiver();
// clang-format on
// ---- Implementation part ----
template <class T>
class SenderImpl {
public:
void Send(T t) { sender_->Receive(std::move(t)); }
~SenderImpl() { sender_->senders_--; }
private:
friend class ReceiverImpl<T>;
SenderImpl(ReceiverImpl<T>* consumer) : sender_(consumer) {}
ReceiverImpl<T>* sender_;
};
template <class T>
class ReceiverImpl {
public:
Sender<T> MakeSender() {
senders_++;
return std::unique_ptr<SenderImpl<T>>(new SenderImpl<T>(this));
}
bool Receive(T* t) {
while (senders_) {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty())
notifier_.wait(lock);
if (queue_.empty())
continue;
*t = std::move(queue_.front());
queue_.pop();
return true;
}
return false;
}
private:
friend class SenderImpl<T>;
void Receive(T t) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(std::move(t));
notifier_.notify_one();
}
std::mutex mutex_;
std::queue<T> queue_;
std::condition_variable notifier_;
std::atomic<int> senders_ = 0;
};
template <class T>
Receiver<T> MakeReceiver() {
return std::make_unique<ReceiverImpl<T>>();
}
} // namespace ftxui
#endif // FTXUI_COMPONENT_RECEIVER_HPP_

View File

@ -3,6 +3,7 @@
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <ftxui/component/receiver.hpp>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@ -10,7 +11,6 @@
#include "ftxui/component/event.hpp" #include "ftxui/component/event.hpp"
#include "ftxui/screen/screen.hpp" #include "ftxui/screen/screen.hpp"
#include <ftxui/component/producer_consumer.hpp>
namespace ftxui { namespace ftxui {
class Component; class Component;
@ -41,13 +41,13 @@ class ScreenInteractive : public Screen {
Dimension dimension_ = Dimension::Fixed; Dimension dimension_ = Dimension::Fixed;
ScreenInteractive(int dimx, int dimy, Dimension dimension); ScreenInteractive(int dimx, int dimy, Dimension dimension);
Producer<Event> event_producer_; Sender<Event> event_sender_;
Consumer<Event> event_consumer_; Receiver<Event> event_receiver_;
std::string set_cursor_position; std::string set_cursor_position;
std::string reset_cursor_position; std::string reset_cursor_position;
std::atomic<bool>quit_ = false; std::atomic<bool> quit_ = false;
}; };
} // namespace ftxui } // namespace ftxui

View File

@ -36,7 +36,7 @@ Event Event::Special(const std::string& input) {
return event; return event;
} }
void ParseUTF8(Consumer<char>& in, Producer<Event>& out, std::string& input) { void ParseUTF8(Receiver<char>& in, Sender<Event>& out, std::string& input) {
char c; char c;
char mask = 0b11000000; char mask = 0b11000000;
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
@ -50,7 +50,7 @@ void ParseUTF8(Consumer<char>& in, Producer<Event>& out, std::string& input) {
out->Send(Event::Character(input)); out->Send(Event::Character(input));
} }
void ParseCSI(Consumer<char>& in, Producer<Event>& out, std::string& input) { void ParseCSI(Receiver<char>& in, Sender<Event>& out, std::string& input) {
char c; char c;
while (1) { while (1) {
if (!in->Receive(&c)) if (!in->Receive(&c))
@ -72,7 +72,7 @@ void ParseCSI(Consumer<char>& in, Producer<Event>& out, std::string& input) {
} }
} }
void ParseDCS(Consumer<char>& in, Producer<Event>& out, std::string& input) { void ParseDCS(Receiver<char>& in, Sender<Event>& out, std::string& input) {
char c; char c;
// Parse until the string terminator ST. // Parse until the string terminator ST.
while (1) { while (1) {
@ -90,7 +90,7 @@ void ParseDCS(Consumer<char>& in, Producer<Event>& out, std::string& input) {
} }
} }
void ParseOSC(Consumer<char>& in, Producer<Event>& out, std::string& input) { void ParseOSC(Receiver<char>& in, Sender<Event>& out, std::string& input) {
char c; char c;
// Parse until the string terminator ST. // Parse until the string terminator ST.
while (1) { while (1) {
@ -108,7 +108,7 @@ void ParseOSC(Consumer<char>& in, Producer<Event>& out, std::string& input) {
} }
} }
void ParseESC(Consumer<char>& in, Producer<Event>& out, std::string& input) { void ParseESC(Receiver<char>& in, Sender<Event>& out, std::string& input) {
char c; char c;
if (!in->Receive(&c)) if (!in->Receive(&c))
return; return;
@ -129,7 +129,7 @@ void ParseESC(Consumer<char>& in, Producer<Event>& out, std::string& input) {
} }
// static // static
void Event::Convert(Consumer<char>& in, Producer<Event>& out, char c) { void Event::Convert(Receiver<char>& in, Sender<Event>& out, char c) {
std::string input; std::string input;
input += c; input += c;

View File

@ -58,8 +58,8 @@ void OnResize(int /* signal */) {
ScreenInteractive::ScreenInteractive(int dimx, int dimy, Dimension dimension) ScreenInteractive::ScreenInteractive(int dimx, int dimy, Dimension dimension)
: Screen(dimx, dimy), dimension_(dimension) { : Screen(dimx, dimy), dimension_(dimension) {
event_consumer_ = MakeConsumer<Event>(); event_receiver_ = MakeReceiver<Event>();
event_producer_ = event_consumer_->MakeProducer(); event_sender_ = event_receiver_->MakeSender();
} }
ScreenInteractive::~ScreenInteractive() {} ScreenInteractive::~ScreenInteractive() {}
@ -85,7 +85,7 @@ ScreenInteractive ScreenInteractive::FitComponent() {
} }
void ScreenInteractive::PostEvent(Event event) { void ScreenInteractive::PostEvent(Event event) {
event_producer_->Send(event); event_sender_->Send(event);
} }
void ScreenInteractive::Loop(Component* component) { void ScreenInteractive::Loop(Component* component) {
@ -140,25 +140,25 @@ void ScreenInteractive::Loop(Component* component) {
std::cout << std::endl; std::cout << std::endl;
}); });
auto char_consumer = MakeConsumer<char>(); auto char_receiver = MakeReceiver<char>();
// Spawn a thread to produce char. // Spawn a thread to produce char.
auto char_producer = char_consumer->MakeProducer(); auto char_sender = char_receiver->MakeSender();
std::thread read_char([&] { std::thread read_char([&] {
// TODO(arthursonzogni): Use a timeout so that it doesn't block even if the // TODO(arthursonzogni): Use a timeout so that it doesn't block even if the
// user doesn't generate new chars. // user doesn't generate new chars.
while (!quit_) while (!quit_)
char_producer->Send((char)getchar()); char_sender->Send((char)getchar());
char_producer.reset(); char_sender.reset();
}); });
// Spawn a thread producing events and consumer chars. // Spawn a thread producing events and consumer chars.
auto event_producer = event_consumer_->MakeProducer(); auto event_sender = event_receiver_->MakeSender();
std::thread convert_char_to_event([&] { std::thread convert_char_to_event([&] {
char c; char c;
while (char_consumer->Receive(&c)) while (char_receiver->Receive(&c))
Event::Convert(char_consumer, event_producer, c); Event::Convert(char_receiver, event_sender, c);
event_producer.reset(); event_sender.reset();
}); });
// The main loop. // The main loop.
@ -168,7 +168,7 @@ void ScreenInteractive::Loop(Component* component) {
std::cout << ToString() << set_cursor_position << std::flush; std::cout << ToString() << set_cursor_position << std::flush;
Clear(); Clear();
Event event; Event event;
if (event_consumer_->Receive(&event)) if (event_receiver_->Receive(&event))
component->OnEvent(event); component->OnEvent(event);
} }
read_char.join(); read_char.join();