diff --git a/CMakeLists.txt b/CMakeLists.txt index 851bcfc..4caa343 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -158,6 +158,7 @@ if (FTXUI_BUILD_TESTS AND GTEST_FOUND AND THREADS_FOUND) src/ftxui/component/toggle_test.cpp src/ftxui/component/radiobox_test.cpp src/ftxui/component/container_test.cpp + src/ftxui/component/receiver_test.cpp src/ftxui/dom/gauge_test.cpp src/ftxui/dom/hbox_test.cpp src/ftxui/dom/text_test.cpp diff --git a/include/ftxui/component/receiver.hpp b/include/ftxui/component/receiver.hpp index 7fe7400..e396d2f 100644 --- a/include/ftxui/component/receiver.hpp +++ b/include/ftxui/component/receiver.hpp @@ -1,6 +1,8 @@ #ifndef FTXUI_COMPONENT_RECEIVER_HPP_ #define FTXUI_COMPONENT_RECEIVER_HPP_ +#include + #include #include #include @@ -47,13 +49,13 @@ template Receiver MakeReceiver(); template class SenderImpl { public: - void Send(T t) { sender_->Receive(std::move(t)); } - ~SenderImpl() { sender_->senders_--; } + void Send(T t) { receiver_->Receive(std::move(t)); } + ~SenderImpl() { receiver_->ReleaseSender();} private: friend class ReceiverImpl; - SenderImpl(ReceiverImpl* consumer) : sender_(consumer) {} - ReceiverImpl* sender_; + SenderImpl(ReceiverImpl* consumer) : receiver_(consumer) {} + ReceiverImpl* receiver_; }; template @@ -65,9 +67,9 @@ class ReceiverImpl { } bool Receive(T* t) { - while (senders_) { + while (senders_ || !queue_.empty()) { std::unique_lock lock(mutex_); - while (queue_.empty()) + if (queue_.empty()) notifier_.wait(lock); if (queue_.empty()) continue; @@ -82,8 +84,16 @@ class ReceiverImpl { friend class SenderImpl; void Receive(T t) { - std::unique_lock lock(mutex_); - queue_.push(std::move(t)); + { + std::unique_lock lock(mutex_); + queue_.push(std::move(t)); + } + notifier_.notify_one(); + } + + void ReleaseSender() { + std::cerr << __func__ << std::endl; + senders_--; notifier_.notify_one(); } diff --git a/src/ftxui/component/receiver_test.cpp b/src/ftxui/component/receiver_test.cpp new file mode 100644 index 0000000..212e3af --- /dev/null +++ b/src/ftxui/component/receiver_test.cpp @@ -0,0 +1,70 @@ +#include "ftxui/component/receiver.hpp" + +#include + +#include "gtest/gtest.h" + +using namespace ftxui; + +TEST(Receiver, Basic) { + auto receiver = MakeReceiver(); + auto sender = receiver->MakeSender(); + + sender->Send('a'); + sender->Send('b'); + sender->Send('c'); + sender.reset(); + + char a, b, c, d; + EXPECT_TRUE(receiver->Receive(&a)); + EXPECT_TRUE(receiver->Receive(&b)); + EXPECT_TRUE(receiver->Receive(&c)); + EXPECT_FALSE(receiver->Receive(&d)); + + EXPECT_EQ(a, 'a'); + EXPECT_EQ(b, 'b'); + EXPECT_EQ(c, 'c'); +} + +TEST(Receiver, BasicWithThread) { + auto r1 = MakeReceiver(); + auto r2 = MakeReceiver(); + auto r3 = MakeReceiver(); + + auto s1 = r1->MakeSender(); + auto s2 = r2->MakeSender(); + auto s3 = r3->MakeSender(); + + auto s1_bis = r1->MakeSender(); + + auto stream = [](Receiver receiver, Sender sender) { + char c; + while (receiver->Receive(&c)) + sender->Send(c); + }; + + // Convert data from a different thread. + auto t12 = std::thread(stream, std::move(r1), std::move(s2)); + auto t23 = std::thread(stream, std::move(r2), std::move(s3)); + + // Send some data. + s1->Send('1'); + s1_bis->Send('2'); + s1->Send('3'); + s1_bis->Send('4'); + + // Close the stream. + s1.reset(); + s1_bis.reset(); + + char c; + EXPECT_TRUE(r3->Receive(&c));EXPECT_EQ(c, '1'); + EXPECT_TRUE(r3->Receive(&c)); EXPECT_EQ(c, '2'); + EXPECT_TRUE(r3->Receive(&c)); EXPECT_EQ(c, '3'); + EXPECT_TRUE(r3->Receive(&c)); EXPECT_EQ(c, '4'); + EXPECT_FALSE(r3->Receive(&c)); + + // Thread will end at the end of the stream. + t12.join(); + t23.join(); +}