#ifndef FTXUI_COMPONENT_RECEIVER_HPP_ #define FTXUI_COMPONENT_RECEIVER_HPP_ #include #include #include #include #include #include namespace ftxui { // Usage: // // Initialization: // --------------- // // auto receiver = MakeReceiver(); // auto sender_1= receiver.MakeSender(); // auto sender_2 = receiver.MakeSender(); // // Then move the senders elsewhere, potentially in a different thread. // // On the producer side: // ---------------------- // [thread 1] sender_1->Send("hello"); // [thread 2] sender_2->Send("world"); // // On the consumer side: // --------------------- // char c; // while(receiver->Receive(&c)) // Return true as long as there is a producer. // print(c) // // Receiver::Receive() returns true when there are no more senders. // clang-format off template class SenderImpl; template class ReceiverImpl; template using Sender = std::unique_ptr>; template using Receiver = std::unique_ptr>; template Receiver MakeReceiver(); // clang-format on // ---- Implementation part ---- template class SenderImpl { public: void Send(T t) { sender_->Receive(std::move(t)); } ~SenderImpl() { sender_->senders_--; } private: friend class ReceiverImpl; SenderImpl(ReceiverImpl* consumer) : sender_(consumer) {} ReceiverImpl* sender_; }; template class ReceiverImpl { public: Sender MakeSender() { senders_++; return std::unique_ptr>(new SenderImpl(this)); } bool Receive(T* t) { while (senders_) { std::unique_lock lock(mutex_); while (queue_.empty()) notifier_.wait(lock); if (queue_.empty()) continue; *t = std::move(queue_.front()); queue_.pop(); return true; } return false; } private: friend class SenderImpl; void Receive(T t) { std::unique_lock lock(mutex_); queue_.push(std::move(t)); notifier_.notify_one(); } std::mutex mutex_; std::queue queue_; std::condition_variable notifier_; std::atomic senders_ = 0; }; template Receiver MakeReceiver() { return std::make_unique>(); } } // namespace ftxui #endif // FTXUI_COMPONENT_RECEIVER_HPP_