Add a Producer/Consumer system.

It allow you to create the two end of a pipe: A producer and consumer.
The producer can be moved into another thread.
Several producer can be created if necessary.

This will ease merging:
https://github.com/ArthurSonzogni/FTXUI/pull/11
This commit is contained in:
ArthurSonzogni 2020-03-24 23:26:55 +01:00
parent 6de8c63907
commit 09a1b16613
7 changed files with 244 additions and 66 deletions

View File

@ -18,9 +18,17 @@ add_library(screen
src/ftxui/screen/string.cpp src/ftxui/screen/string.cpp
src/ftxui/screen/terminal.cpp src/ftxui/screen/terminal.cpp
src/ftxui/screen/wcwidth.cpp src/ftxui/screen/wcwidth.cpp
include/ftxui/screen/box.hpp
include/ftxui/screen/color.hpp
include/ftxui/screen/screen.hpp
include/ftxui/screen/string.hpp
) )
add_library(dom add_library(dom
include/ftxui/dom/elements.hpp
include/ftxui/dom/node.hpp
include/ftxui/dom/requirement.hpp
include/ftxui/dom/take_any_args.hpp
src/ftxui/dom/blink.cpp src/ftxui/dom/blink.cpp
src/ftxui/dom/bold.cpp src/ftxui/dom/bold.cpp
src/ftxui/dom/border.cpp src/ftxui/dom/border.cpp
@ -57,6 +65,16 @@ add_library(component
src/ftxui/component/radiobox.cpp src/ftxui/component/radiobox.cpp
src/ftxui/component/screen_interactive.cpp src/ftxui/component/screen_interactive.cpp
src/ftxui/component/toggle.cpp src/ftxui/component/toggle.cpp
include/ftxui/component/checkbox.hpp
include/ftxui/component/component.hpp
include/ftxui/component/container.hpp
include/ftxui/component/event.hpp
include/ftxui/component/input.hpp
include/ftxui/component/menu.hpp
include/ftxui/component/radiobox.hpp
include/ftxui/component/screen_interactive.hpp
include/ftxui/component/producer_consumer.hpp
include/ftxui/component/toggle.hpp
) )
add_library(ftxui::screen ALIAS screen) add_library(ftxui::screen ALIAS screen)

View File

@ -5,6 +5,7 @@
#include <functional> #include <functional>
#include <string> #include <string>
#include <vector> #include <vector>
#include <ftxui/component/producer_consumer.hpp>
namespace ftxui { namespace ftxui {
@ -20,7 +21,7 @@ struct Event {
static Event Character(const std::string&); static Event Character(const std::string&);
static Event Special(const std::string&); static Event Special(const std::string&);
static Event GetEvent(std::function<char()> getchar); static void Convert(Consumer<char>& in, Producer<Event>& out, char c);
// --- Arrow --- // --- Arrow ---
static Event ArrowLeft; static Event ArrowLeft;

View File

@ -0,0 +1,101 @@
#ifndef FTXUI_COMPONENTS_CONSUMER_PRODUCER_H_
#define FTXUI_COMPONENTS_CONSUMER_PRODUCER_H_
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
namespace ftxui {
// Usage:
//
// Initialization:
// ---------------
//
// auto consumer = MakeConsumer<std:string>();
// auto producer_1 = consumer.MakeProducer();
// auto producer_2 = consumer.MakeProducer();
//
// Then move one producers elsewhere, potentially in a different thread.
// ----------------------
// [thread 1] producer_1->Send("hello");
// [thread 2] producer_2->Send("world");
//
// On the consumer side:
// ---------------------
// char c;
// while(consumer_->Receive(&c)) // Return true as long as there is a producer.
// print(c)
//
// Consumer::Receive returns true when the last Producer is released.
// clang-format off
template<class T> class ProducerImpl;
template<class T> class ConsumerImpl;
template<class T> using Producer = std::unique_ptr<ProducerImpl<T>>;
template<class T> using Consumer = std::unique_ptr<ConsumerImpl<T>>;
template<class T> Consumer<T> MakeConsumer();
// clang-format on
// ---- Implementation part ----
template <class T>
class ProducerImpl {
public:
void Send(T t) { consumer_->Receive(std::move(t)); }
~ProducerImpl() { consumer_->producers_--; }
private:
friend class ConsumerImpl<T>;
ProducerImpl(ConsumerImpl<T>* consumer) : consumer_(consumer) {}
ConsumerImpl<T>* consumer_;
};
template <class T>
class ConsumerImpl {
public:
Producer<T> MakeProducer() {
producers_++;
return std::unique_ptr<ProducerImpl<T>>(new ProducerImpl<T>(this));
}
bool Receive(T* t) {
while (producers_) {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty())
notifier_.wait(lock);
if (queue_.empty())
continue;
*t = std::move(queue_.front());
queue_.pop();
return true;
}
return false;
}
private:
friend class ProducerImpl<T>;
void Receive(T t) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(std::move(t));
notifier_.notify_one();
}
std::mutex mutex_;
std::queue<T> queue_;
std::condition_variable notifier_;
std::atomic<int> producers_ = 0;
};
template <class T>
Consumer<T> MakeConsumer() {
return std::make_unique<ConsumerImpl<T>>();
}
} // namespace ftxui
#endif // FTXUI_COMPONENTS_CONSUMER_PRODUCER_H_

View File

@ -10,6 +10,7 @@
#include "ftxui/component/event.hpp" #include "ftxui/component/event.hpp"
#include "ftxui/screen/screen.hpp" #include "ftxui/screen/screen.hpp"
#include <ftxui/component/producer_consumer.hpp>
namespace ftxui { namespace ftxui {
class Component; class Component;
@ -40,13 +41,13 @@ class ScreenInteractive : public Screen {
Dimension dimension_ = Dimension::Fixed; Dimension dimension_ = Dimension::Fixed;
ScreenInteractive(int dimx, int dimy, Dimension dimension); ScreenInteractive(int dimx, int dimy, Dimension dimension);
std::condition_variable events_queue_cv; Producer<Event> event_producer_;
std::mutex events_queue_mutex; Consumer<Event> event_consumer_;
std::queue<Event> events_queue;
std::atomic<bool> quit_ = false;
std::string set_cursor_position; std::string set_cursor_position;
std::string reset_cursor_position; std::string reset_cursor_position;
std::atomic<bool>quit_ = false;
}; };
} // namespace ftxui } // namespace ftxui

View File

@ -36,19 +36,25 @@ Event Event::Special(const std::string& input) {
return event; return event;
} }
Event ParseUTF8(std::function<char()>& getchar, std::string& input) { void ParseUTF8(Consumer<char>& in, Producer<Event>& out, std::string& input) {
if ((input[0] & 0b11000000) == 0b11000000) char c;
input += getchar(); char mask = 0b11000000;
if ((input[0] & 0b11100000) == 0b11100000) for (int i = 0; i < 3; ++i) {
input += getchar(); if ((input[0] & mask) == mask) {
if ((input[0] & 0b11110000) == 0b11110000) if (!in->Receive(&c))
input += getchar(); return;
return Event::Character(input); input += c;
}
mask = mask >> 1 | 0b10000000;
}
out->Send(Event::Character(input));
} }
Event ParseCSI(std::function<char()> getchar, std::string& input) { void ParseCSI(Consumer<char>& in, Producer<Event>& out, std::string& input) {
char c;
while (1) { while (1) {
char c = getchar(); if (!in->Receive(&c))
return;
input += c; input += c;
if (c >= '0' && c <= '9') if (c >= '0' && c <= '9')
@ -58,80 +64,95 @@ Event ParseCSI(std::function<char()> getchar, std::string& input) {
continue; continue;
if (c >= ' ' && c <= '~') if (c >= ' ' && c <= '~')
return Event::Special(input); return out->Send(Event::Special(input));
// Invalid ESC in CSI. // Invalid ESC in CSI.
if (c == '\x1B') if (c == '\x1B')
return Event::Special(input); return out->Send(Event::Special(input));
} }
} }
Event ParseDCS(std::function<char()> getchar, std::string& input) { void ParseDCS(Consumer<char>& in, Producer<Event>& out, std::string& input) {
char c;
// Parse until the string terminator ST. // Parse until the string terminator ST.
while (1) { while (1) {
input += getchar(); if (!in->Receive(&c))
return;
input += c;
if (input.back() != '\x1B') if (input.back() != '\x1B')
continue; continue;
input += getchar(); if (!in->Receive(&c))
return;
input += c;
if (input.back() != '\\') if (input.back() != '\\')
continue; continue;
return Event::Special(input); return out->Send(Event::Special(input));
} }
} }
Event ParseOSC(std::function<char()> getchar, std::string& input) { void ParseOSC(Consumer<char>& in, Producer<Event>& out, std::string& input) {
char c;
// Parse until the string terminator ST. // Parse until the string terminator ST.
while (1) { while (1) {
input += getchar(); if (!in->Receive(&c))
return;
input += c;
if (input.back() != '\x1B') if (input.back() != '\x1B')
continue; continue;
input += getchar(); if (!in->Receive(&c))
return;
input += c;
if (input.back() != '\\') if (input.back() != '\\')
continue; continue;
return Event::Special(input); return out->Send(Event::Special(input));
} }
} }
Event ParseESC(std::function<char()> getchar, std::string& input) { void ParseESC(Consumer<char>& in, Producer<Event>& out, std::string& input) {
input += getchar(); char c;
switch (input.back()) { if (!in->Receive(&c))
return;
input += c;
switch (c) {
case 'P': case 'P':
return ParseDCS(getchar, input); return ParseDCS(in, out, input);
case '[': case '[':
return ParseCSI(getchar, input); return ParseCSI(in, out, input);
case ']': case ']':
return ParseOSC(getchar, input); return ParseOSC(in, out, input);
default: default:
input += getchar(); if (!in->Receive(&c))
return Event::Special(input); return;
input += c;
out->Send(Event::Special(input));
} }
} }
// static // static
Event Event::GetEvent(std::function<char()> getchar) { void Event::Convert(Consumer<char>& in, Producer<Event>& out, char c) {
std::string input; std::string input;
input += getchar(); input += c;
unsigned char head = input[0]; unsigned char head = input[0];
switch (head) { switch (head) {
case 24: // CAN case 24: // CAN
case 26: // SUB case 26: // SUB
return Event(); // Ignored. return;
case 'P': case 'P':
return ParseDCS(getchar, input); return ParseDCS(in, out, input);
case '\x1B': case '\x1B':
return ParseESC(getchar, input); return ParseESC(in, out, input);
} }
if (head < 32) // C0 if (head < 32) // C0
return Event::Special(input); return out->Send(Event::Special(input));
if (head == 127) // Delete if (head == 127) // Delete
return Event::Special(input); return out->Send(Event::Special(input));
return ParseUTF8(getchar, input); return ParseUTF8(in, out, input);
} }
// --- Arrow --- // --- Arrow ---

View File

@ -0,0 +1,30 @@
#ifndef FTXUI_COMPONENTS_TASK_QUEUE_H_
#define FTXUI_COMPONENTS_TASK_QUEUE_H_
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
template <class T>
class TaskQueue {
public:
void Post(T task);
void Close();
bool Take(T& task);
private:
std::unique_lock<std::mutex> lock(events_queue_mutex);
events_queue.push(event);
events_queue_cv.notify_one();
std::condition_variable events_queue_cv;
std::mutex events_queue_mutex;
std::queue<Event> events_queue;
std::atomic<bool> quit_ = false;
};
#endif // FTXUI_COMPONENTS_TASK_QUEUE_H_

View File

@ -57,7 +57,11 @@ void OnResize(int /* signal */) {
} }
ScreenInteractive::ScreenInteractive(int dimx, int dimy, Dimension dimension) ScreenInteractive::ScreenInteractive(int dimx, int dimy, Dimension dimension)
: Screen(dimx, dimy), dimension_(dimension) {} : Screen(dimx, dimy), dimension_(dimension) {
event_consumer_ = MakeConsumer<Event>();
event_producer_ = event_consumer_->MakeProducer();
}
ScreenInteractive::~ScreenInteractive() {} ScreenInteractive::~ScreenInteractive() {}
// static // static
@ -81,21 +85,7 @@ ScreenInteractive ScreenInteractive::FitComponent() {
} }
void ScreenInteractive::PostEvent(Event event) { void ScreenInteractive::PostEvent(Event event) {
std::unique_lock<std::mutex> lock(events_queue_mutex); event_producer_->Send(event);
events_queue.push(event);
events_queue_cv.notify_one();
}
void ScreenInteractive::EventLoop(Component* component) {
std::unique_lock<std::mutex> lock(events_queue_mutex);
while (!quit_ && events_queue.empty())
events_queue_cv.wait(lock);
// After the wait, we own the lock.
while (!events_queue.empty()) {
component->OnEvent(events_queue.front());
events_queue.pop();
}
} }
void ScreenInteractive::Loop(Component* component) { void ScreenInteractive::Loop(Component* component) {
@ -150,12 +140,25 @@ void ScreenInteractive::Loop(Component* component) {
std::cout << std::endl; std::cout << std::endl;
}); });
// Spawn a thread. It will listen for new characters being typed. auto char_consumer = MakeConsumer<char>();
std::thread read_char([this] {
while (!quit_) { // Spawn a thread to produce char.
auto event = Event::GetEvent([] { return (char)getchar(); }); auto char_producer = char_consumer->MakeProducer();
PostEvent(std::move(event)); std::thread read_char([&] {
} // TODO(arthursonzogni): Use a timeout so that it doesn't block even if the
// user doesn't generate new chars.
while (!quit_)
char_producer->Send((char)getchar());
char_producer.reset();
});
// Spawn a thread producing events and consumer chars.
auto event_producer = event_consumer_->MakeProducer();
std::thread convert_char_to_event([&] {
char c;
while (char_consumer->Receive(&c))
Event::Convert(char_consumer, event_producer, c);
event_producer.reset();
}); });
// The main loop. // The main loop.
@ -164,9 +167,12 @@ void ScreenInteractive::Loop(Component* component) {
Draw(component); Draw(component);
std::cout << ToString() << set_cursor_position << std::flush; std::cout << ToString() << set_cursor_position << std::flush;
Clear(); Clear();
EventLoop(component); Event event;
if (event_consumer_->Receive(&event))
component->OnEvent(event);
} }
read_char.join(); read_char.join();
convert_char_to_event.join();
OnExit(0); OnExit(0);
} }