From 81625e1964f0737f2f038c0e51e095b6fb2bba68 Mon Sep 17 00:00:00 2001 From: Simon Brunel Date: Sun, 4 Jun 2017 18:22:24 +0200 Subject: [PATCH] Add multithreading support Make QPromise thread safe but also ensure that continuation lambdas (then/fail/finally) are called in the thread of the promise instance they are attached to. --- README.md | 9 +- src/qtpromise/qpromise_p.h | 119 ++++++++++------ tests/auto/auto.pro | 3 +- tests/auto/benchmark/benchmark.pro | 1 - tests/auto/benchmark/tst_benchmark.cpp | 3 +- tests/auto/thread/thread.pro | 5 + tests/auto/thread/tst_thread.cpp | 186 +++++++++++++++++++++++++ 7 files changed, 281 insertions(+), 45 deletions(-) create mode 100644 tests/auto/thread/thread.pro create mode 100644 tests/auto/thread/tst_thread.cpp diff --git a/README.md b/README.md index b66e271..7d388a1 100644 --- a/README.md +++ b/README.md @@ -103,13 +103,13 @@ QFuture future = QtConcurrent::run([]() { return 42; }); -QPromise promise = qtPromise(future); +QPromise promise = qPromise(future); ``` or simply: ```cpp -auto promise = qtPromise(QtConcurrent::run([]() { +auto promise = qPromise(QtConcurrent::run([]() { // {...} })); ``` @@ -154,6 +154,11 @@ promise.then([](int res) { }); ``` +## Thread-Safety + QPromise is thread-safe and can be copied and accessed across different threads. QPromise relies on [explicitly data sharing](http://doc.qt.io/qt-5/qexplicitlyshareddatapointer.html#details) and thus `auto p2 = p1` represents the same promise: when `p1` resolves, handlers registered on `p1` and `p2` are called, the fulfilled value being shared between both instances. + +> **Note:** while it's safe to access the resolved value from different threads using [`then`](#qpromise-then), QPromise provides no guarantee about the object being pointed to. Thread-safety and reentrancy rules for that object still apply. + ## QPromise ### `QPromise::QPromise(resolver)` Creates a new promise that will be fulfilled or rejected by the given `resolver` lambda: diff --git a/src/qtpromise/qpromise_p.h b/src/qtpromise/qpromise_p.h index 8626854..86fac56 100644 --- a/src/qtpromise/qpromise_p.h +++ b/src/qtpromise/qpromise_p.h @@ -6,10 +6,14 @@ #include "qpromiseglobal.h" // Qt -#include +#include +#include +#include +#include +#include #include #include -#include +#include namespace QtPromise { @@ -26,10 +30,22 @@ class QPromiseReject; namespace QtPromisePrivate { +// https://stackoverflow.com/a/21653558 template -inline void qtpromise_defer(F&& f) +static void qtpromise_defer(F&& f, QThread* thread = nullptr) { - QTimer::singleShot(0, std::forward(f)); + struct Event : public QEvent + { + using FType = typename std::decay::type; + Event(FType&& f) : QEvent(QEvent::None), m_f(std::move(f)) { } + Event(const FType& f) : QEvent(QEvent::None), m_f(f) { } + ~Event() { m_f(); } + FType m_f; + }; + + QObject* target = QAbstractEventDispatcher::instance(thread); + Q_ASSERT_X(target, "postMetaCall", "Target thread must have an event loop"); + QCoreApplication::postEvent(target, new Event(std::forward(f))); } template @@ -320,21 +336,35 @@ class PromiseDataBase: public QSharedData { public: using Error = QtPromise::QPromiseError; - using Catcher = std::function; + using Catcher = std::pair, std::function >; virtual ~PromiseDataBase() {} - bool isFulfilled() const { return m_settled && m_error.isNull(); } - bool isRejected() const { return m_settled && !m_error.isNull(); } - bool isPending() const { return !m_settled; } - - void addCatcher(Catcher catcher) + bool isFulfilled() const { - m_catchers.append(std::move(catcher)); + return !isPending() && m_error.isNull(); + } + + bool isRejected() const + { + return !isPending() && !m_error.isNull(); + } + + bool isPending() const + { + QReadLocker lock(&m_lock); + return !m_settled; + } + + void addCatcher(std::function catcher) + { + QWriteLocker lock(&m_lock); + m_catchers.append({QThread::currentThread(), std::move(catcher)}); } void reject(Error error) { + Q_ASSERT(isPending()); Q_ASSERT(m_error.isNull()); m_error.reset(new Error(std::move(error))); setSettled(); @@ -342,26 +372,36 @@ public: void dispatch() { - Q_ASSERT(!isPending()); + if (isPending()) { + return; + } - if (isFulfilled()) { + if (m_error.isNull()) { notify(); return; } - Q_ASSERT(isRejected()); - QSharedPointer error = m_error; + m_lock.lockForWrite(); QVector catchers(std::move(m_catchers)); + m_lock.unlock(); + + QSharedPointer error = m_error; + Q_ASSERT(!error.isNull()); + for (const auto& catcher: catchers) { + const auto& fn = catcher.second; qtpromise_defer([=]() { - catcher(*error); - }); + fn(*error); + }, catcher.first); } } protected: + mutable QReadWriteLock m_lock; + void setSettled() { + QWriteLocker lock(&m_lock); Q_ASSERT(!m_settled); m_settled = true; } @@ -377,18 +417,13 @@ private: template class PromiseData: public PromiseDataBase { + using Handler = std::pair, std::function >; + public: - using Handler = std::function; - - void addHandler(Handler handler) + void addHandler(std::function handler) { - m_handlers.append(std::move(handler)); - } - - const T& value() const - { - Q_ASSERT(!m_value.isNull()); - return *m_value; + QWriteLocker lock(&this->m_lock); + m_handlers.append({QThread::currentThread(), std::move(handler)}); } void resolve(T&& value) @@ -407,13 +442,18 @@ public: void notify() Q_DECL_OVERRIDE { - Q_ASSERT(this->isFulfilled()); - QSharedPointer value(m_value); + this->m_lock.lockForWrite(); QVector handlers(std::move(m_handlers)); + this->m_lock.unlock(); + + QSharedPointer value(m_value); + Q_ASSERT(!value.isNull()); + for (const auto& handler: handlers) { + const auto& fn = handler.second; qtpromise_defer([=]() { - handler(*value); - }); + fn(*value); + }, handler.first); } } @@ -425,12 +465,13 @@ private: template <> class PromiseData: public PromiseDataBase { -public: - using Handler = std::function; + using Handler = std::pair, std::function >; - void addHandler(Handler handler) +public: + void addHandler(std::function handler) { - m_handlers.append(std::move(handler)); + QWriteLocker lock(&m_lock); + m_handlers.append({QThread::currentThread(), std::move(handler)}); } void resolve() { setSettled(); } @@ -438,12 +479,12 @@ public: protected: void notify() Q_DECL_OVERRIDE { - Q_ASSERT(isFulfilled()); + this->m_lock.lockForWrite(); QVector handlers(std::move(m_handlers)); + this->m_lock.unlock(); + for (const auto& handler: handlers) { - qtpromise_defer([=]() { - handler(); - }); + qtpromise_defer(handler.second, handler.first); } } diff --git a/tests/auto/auto.pro b/tests/auto/auto.pro index da47758..ff9600d 100644 --- a/tests/auto/auto.pro +++ b/tests/auto/auto.pro @@ -4,4 +4,5 @@ SUBDIRS += \ future \ helpers \ qpromise \ - requirements + requirements \ + thread diff --git a/tests/auto/benchmark/benchmark.pro b/tests/auto/benchmark/benchmark.pro index cc5bd31..e032662 100644 --- a/tests/auto/benchmark/benchmark.pro +++ b/tests/auto/benchmark/benchmark.pro @@ -1,4 +1,3 @@ -QT += concurrent TARGET = tst_benchmark SOURCES += $$PWD/tst_benchmark.cpp diff --git a/tests/auto/benchmark/tst_benchmark.cpp b/tests/auto/benchmark/tst_benchmark.cpp index 7d2587b..f2b6e25 100644 --- a/tests/auto/benchmark/tst_benchmark.cpp +++ b/tests/auto/benchmark/tst_benchmark.cpp @@ -2,7 +2,6 @@ #include // Qt -#include #include using namespace QtPromise; @@ -11,11 +10,11 @@ class tst_benchmark: public QObject { Q_OBJECT +private Q_SLOTS: void valueResolve(); void valueReject(); void valueThen(); void errorReject(); -private Q_SLOTS: void errorThen(); }; // class tst_benchmark diff --git a/tests/auto/thread/thread.pro b/tests/auto/thread/thread.pro new file mode 100644 index 0000000..eebfce3 --- /dev/null +++ b/tests/auto/thread/thread.pro @@ -0,0 +1,5 @@ +QT += concurrent +TARGET = tst_thread +SOURCES += $$PWD/tst_thread.cpp + +include(../tests.pri) diff --git a/tests/auto/thread/tst_thread.cpp b/tests/auto/thread/tst_thread.cpp new file mode 100644 index 0000000..2743a71 --- /dev/null +++ b/tests/auto/thread/tst_thread.cpp @@ -0,0 +1,186 @@ +// QtPromise +#include + +// Qt +#include +#include + +using namespace QtPromise; + +class tst_thread: public QObject +{ + Q_OBJECT + +private Q_SLOTS: + void resolve(); + void resolve_void(); + void reject(); + void then(); + void then_void(); + void fail(); + void finally(); + +}; // class tst_thread + +QTEST_MAIN(tst_thread) +#include "tst_thread.moc" + +void tst_thread::resolve() +{ + int value = -1; + size_t target = 0; + size_t source = 0; + + QPromise([&](const QPromiseResolve& resolve) { + QtConcurrent::run([=, &source]() { + source = (size_t)QThread::currentThread(); + resolve(42); + }); + }).then([&](int res) { + target = (size_t)QThread::currentThread(); + value = res; + }).wait(); + + QVERIFY(source != 0); + QVERIFY(source != target); + QCOMPARE(target, (size_t)QThread::currentThread()); + QCOMPARE(value, 42); +} + +void tst_thread::resolve_void() +{ + int value = -1; + size_t target = 0; + size_t source = 0; + + QPromise([&](const QPromiseResolve& resolve) { + QtConcurrent::run([=, &source]() { + source = (size_t)QThread::currentThread(); + resolve(); + }); + }).then([&]() { + target = (size_t)QThread::currentThread(); + value = 43; + }).wait(); + + QVERIFY(source != 0); + QVERIFY(source != target); + QCOMPARE(target, (size_t)QThread::currentThread()); + QCOMPARE(value, 43); +} + +void tst_thread::reject() +{ + QString error; + size_t target = 0; + size_t source = 0; + + QPromise([&](const QPromiseResolve&, const QPromiseReject& reject) { + QtConcurrent::run([=, &source]() { + source = (size_t)QThread::currentThread(); + reject(QString("foo")); + }); + }).fail([&](const QString& err) { + target = (size_t)QThread::currentThread(); + error = err; + return -1; + }).wait(); + + QVERIFY(source != 0); + QVERIFY(source != target); + QCOMPARE(target, (size_t)QThread::currentThread()); + QCOMPARE(error, QString("foo")); +} + +void tst_thread::then() +{ + size_t source; + QPromise p([&](const QPromiseResolve& resolve) { + source = (size_t)QThread::currentThread(); + resolve(42); + }); + + size_t target; + int value = -1; + qPromise(QtConcurrent::run([&](const QPromise& p) { + p.then([&](int res) { + target = (size_t)QThread::currentThread(); + value = res; + }).wait(); + }, p)).wait(); + + QVERIFY(target != 0); + QVERIFY(source != target); + QCOMPARE(source, (size_t)QThread::currentThread()); + QCOMPARE(value, 42); +} + +void tst_thread::then_void() +{ + size_t source; + QPromise p([&](const QPromiseResolve& resolve) { + source = (size_t)QThread::currentThread(); + resolve(); + }); + + size_t target; + int value = -1; + qPromise(QtConcurrent::run([&](const QPromise& p) { + p.then([&]() { + target = (size_t)QThread::currentThread(); + value = 43; + }).wait(); + }, p)).wait(); + + QVERIFY(target != 0); + QVERIFY(source != target); + QCOMPARE(source, (size_t)QThread::currentThread()); + QCOMPARE(value, 43); +} + +void tst_thread::fail() +{ + size_t source; + QPromise p([&](const QPromiseResolve&, const QPromiseReject& reject) { + source = (size_t)QThread::currentThread(); + reject(QString("foo")); + }); + + size_t target; + QString error; + qPromise(QtConcurrent::run([&](const QPromise& p) { + p.fail([&](const QString& err) { + target = (size_t)QThread::currentThread(); + error = err; + return -1; + }).wait(); + }, p)).wait(); + + QVERIFY(target != 0); + QVERIFY(source != target); + QCOMPARE(source, (size_t)QThread::currentThread()); + QCOMPARE(error, QString("foo")); +} + +void tst_thread::finally() +{ + size_t source; + QPromise p([&](const QPromiseResolve& resolve) { + source = (size_t)QThread::currentThread(); + resolve(42); + }); + + size_t target; + int value = -1; + qPromise(QtConcurrent::run([&](const QPromise& p) { + p.finally([&]() { + target = (size_t)QThread::currentThread(); + value = 43; + }).wait(); + }, p)).wait(); + + QVERIFY(target != 0); + QVERIFY(source != target); + QCOMPARE(source, (size_t)QThread::currentThread()); + QCOMPARE(value, 43); +}