#ifndef FTXUI_COMPONENT_RECEIVER_HPP_ #define FTXUI_COMPONENT_RECEIVER_HPP_ #include // for copy, max #include // for atomic, __atomic_base #include // for condition_variable #include #include #include // for unique_ptr, make_unique #include // for mutex, unique_lock #include // for queue #include // for move namespace ftxui { // Usage: // // Initialization: // --------------- // // auto receiver = MakeReceiver(); // auto sender_1= receiver->MakeSender(); // auto sender_2 = receiver->MakeSender(); // // Then move the senders elsewhere, potentially in a different thread. // // On the producer side: // ---------------------- // [thread 1] sender_1->Send("hello"); // [thread 2] sender_2->Send("world"); // // On the consumer side: // --------------------- // char c; // while(receiver->Receive(&c)) // Return true as long as there is a producer. // print(c) // // Receiver::Receive() returns true when there are no more senders. // clang-format off template class SenderImpl; template class ReceiverImpl; template using Sender = std::unique_ptr>; template using Receiver = std::unique_ptr>; template Receiver MakeReceiver(); // clang-format on // ---- Implementation part ---- template class SenderImpl { public: void Send(T t) { receiver_->Receive(std::move(t)); } ~SenderImpl() { receiver_->ReleaseSender(); } Sender Clone() { return receiver_->MakeSender(); } private: friend class ReceiverImpl; SenderImpl(ReceiverImpl* consumer) : receiver_(consumer) {} ReceiverImpl* receiver_; }; template class ReceiverImpl { public: Sender MakeSender() { std::unique_lock lock(mutex_); senders_++; return std::unique_ptr>(new SenderImpl(this)); } ReceiverImpl() { senders_ = 0; } bool Receive(T* t) { while (senders_ || !queue_.empty()) { std::unique_lock lock(mutex_); if (queue_.empty()) notifier_.wait(lock); if (queue_.empty()) continue; *t = std::move(queue_.front()); queue_.pop(); return true; } return false; } bool ReceiveNonBlocking(T* t) { std::unique_lock lock(mutex_); if (queue_.empty()) return false; *t = queue_.front(); queue_.pop(); return true; } bool HasPending() { std::unique_lock lock(mutex_); return !queue_.empty(); } bool HasQuitted() { std::unique_lock lock(mutex_); return queue_.empty() && !senders_; } private: friend class SenderImpl; void Receive(T t) { { std::unique_lock lock(mutex_); queue_.push(std::move(t)); } notifier_.notify_one(); } void ReleaseSender() { senders_--; notifier_.notify_one(); } std::mutex mutex_; std::queue queue_; std::condition_variable notifier_; std::atomic senders_; }; template Receiver MakeReceiver() { return std::make_unique>(); } } // namespace ftxui #endif // FTXUI_COMPONENT_RECEIVER_HPP_ // Copyright 2020 Arthur Sonzogni. All rights reserved. // Use of this source code is governed by the MIT license that can be found in // the LICENSE file.