C++20: Thread Synchronization with Coroutines
This is a cross-post from www.ModernesCpp.com.
It's a typical requirement for thread management to synchronize them. One thread prepares, in this case, a work-package another thread is waiting for.
I assume most of you use condition variables for this sender/receiver or producer/consumer workflow. Condition variables have many inherent risks such as spurious wakeup and lost wakeup. Before I implement thread synchronization with coroutines, let me rephrase from a previous post about the inherent challenges of condition variables.
The Challenges of Condition Variables
Here is the pattern for the correct usage of condition variables.
// conditionVariables.cpp #include <condition_variable> #include <iostream> #include <thread> std::mutex mutex_; std::condition_variable condVar; bool dataReady{false}; void waitingForWork(){ std::cout << "Waiting " << std::endl; std::unique_lock<std::mutex> lck(mutex_); condVar.wait(lck, []{ return dataReady; }); // (4) std::cout << "Running " << std::endl; } void setDataReady(){ { std::lock_guard<std::mutex> lck(mutex_); dataReady = true; } std::cout << "Data prepared" << std::endl; condVar.notify_one(); // (3) } int main(){ std::cout << std::endl; std::thread t1(waitingForWork); // (1) std::thread t2(setDataReady); // (2) t1.join(); t2.join(); std::cout << std::endl; }
How does the synchronization work? The program has two child threads: t1 and t2. They get their work package waitingForWork and setDataRead in lines (1 and 2). setDataReady notifies - using the condition variable condVar - that it is done with the preparation of the work: condVar.notify_one()(line 3). While holding the lock, thread t1 waits for its notification: condVar.wait(lck, []{ return dataReady; })( line 4). The sender and receiver need a lock. In the case of the sender a std::lock_guard is sufficient, because it calls lock and unlock only once. In the case of the receiver, a std::unique_lock is necessary because it usually frequently locks and unlocks its mutex.
Finally, the output of the program.
Maybe you are wondering: Why do you need a predicate for the wait call because you can invoke wait without a predicate? This workflow seems quite too complicated for such a simple synchronization of threads.
Now we are back to the missing memory condition variables have and the two phenomena called lost wakeup and spurious wakeup.
Lost Wakeup and Spurious Wakeup
- Lost wakeup: The sender sends its notification before the receiver is the wait state. The consequence is that the notification is lost.
- Spurious wakeup: It may happen that the receiver wakes up, although no notification happened.
To become not the victim of these two issues, you have to use an additional predicate as memory. If not you have in this example a 50/50 chance for a lost wakeup. A lost wakeup is essentially a deadlock because a thread waits for something that never happens.
This is not the end of the traps you can fall into with condition variables. Read the details in the previous post: C++ Core Guidelines: Be Aware of the Traps of Condition Variables.
Thanks to coroutines, thread synchronization is quite easy without the inherent risks of condition variables such as spurious wakeups and lost wakeups.
Thread Synchronization with co_await
// senderReceiver.cpp #include <coroutine> #include <chrono> #include <iostream> #include <functional> #include <string> #include <stdexcept> #include <atomic> #include <thread> class Event { public: Event() = default; Event(const Event&) = delete; Event(Event&&) = delete; Event& operator=(const Event&) = delete; Event& operator=(Event&&) = delete; class Awaiter; Awaiter operator co_await() const noexcept; void notify() noexcept; private: friend class Awaiter; mutable std::atomic<void*> suspendedWaiter{nullptr}; mutable std::atomic<bool> notified{false}; }; class Event::Awaiter { public: Awaiter(const Event& eve): event(eve) {} bool await_ready() const; bool await_suspend(std::coroutine_handle<> corHandle) noexcept; void await_resume() noexcept {} private: friend class Event; const Event& event; std::coroutine_handle<> coroutineHandle; }; bool Event::Awaiter::await_ready() const { // (7) // allow at most one waiter if (event.suspendedWaiter.load() != nullptr){ throw std::runtime_error("More than one waiter is not valid"); } // event.notified == false; suspends the coroutine // event.notified == true; the coroutine is executed such as a usual function return event.notified; } // (8) bool Event::Awaiter::await_suspend(std::coroutine_handle<> corHandle) noexcept { coroutineHandle = corHandle; if (event.notified) return false; // store the waiter for later notification event.suspendedWaiter.store(this); return true; } void Event::notify() noexcept { // (6) notified = true; // try to load the waiter auto* waiter = static_cast<Awaiter*>(suspendedWaiter.load()); // check if a waiter is available if (waiter != nullptr) { // resume the coroutine => await_resume waiter->coroutineHandle.resume(); } } Event::Awaiter Event::operator co_await() const noexcept { return Awaiter{ *this }; } struct Task { struct promise_type { Task get_return_object() { return {}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() { return {}; } void return_void() {} void unhandled_exception() {} }; }; Task receiver(Event& event) { // (3) auto start = std::chrono::high_resolution_clock::now(); co_await event; std::cout << "Got the notification! " << std::endl; auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = end - start; std::cout << "Waited " << elapsed.count() << " seconds." << std::endl; } using namespace std::chrono_literals; int main(){ std::cout << std::endl; std::cout << "Notification before waiting" << std::endl; Event event1{}; auto senderThread1 = std::thread([&event1]{ event1.notify(); }); // (1) auto receiverThread1 = std::thread(receiver, std::ref(event1)); // (4) receiverThread1.join(); senderThread1.join(); std::cout << std::endl; std::cout << "Notification after 2 seconds waiting" << std::endl; Event event2{}; auto receiverThread2 = std::thread(receiver, std::ref(event2)); // (5) auto senderThread2 = std::thread([&event2]{ std::this_thread::sleep_for(2s); event2.notify(); // (2) }); receiverThread2.join(); senderThread2.join(); std::cout << std::endl; }
Thread synchronization with coroutines is straightforward from the user's perspective. Let’s have a look at the program senderReiver.cpp. The threads senderThread1 (line 1) and senderThread2 (line 2) use an event to send its notification. The function receiver in line (3) is the coroutine which is executed in the thread receiverThread1 (line 4) and receiverThread2 (line 5). I measured the time between the begin and the end of the coroutine and displayed it. This number shows how long the coroutine waits. The following screenshot displays the output of the program with the Wandbox. The Compiler Explorer does not support thread creation but Matt is "on it".
The output displays that the execution of the second coroutine takes about two seconds. The reason is that event1 sends its notification (line 1) before the coroutine is suspended, but the event2 sends its notification after a time duration of 2 seconds (line 2).
Now, I put the implementer's hat on and give you a rough idea of the workflow of the coroutine framework.
The simplified workflow
If you compare the class Generator in the last post (C++20: An Infinite Data Stream with Coroutines) with the class Event in this example, there is a subtle difference. In the first case, the Generator is the awaitable and the awaiter, in the second case, the Event uses the operator co_await to return the awaiter. This separation of concerns into the awaitable and the awaiter improves the structure of the code.
In my explanation to both workflows, I assume, the in the first case the event notification happens before the coroutine awaits the events. For the second case, I assume it the other way around.
Let’s first look at event1 and the first workflow. event1 send its notification before receiverThread1 is started. The call event1 (line 1) triggers the member function notify (line 6). First the notification flag is set and then, the call auto* waiter = static_cast<awaiter*>(suspendedWaiter.load()); loads the potential waiter. In this case, the waiter is a nullptr because it was not set before. This means the following resume call on the waiter is not executed. The subsequentially performed function await_ready (line 7) checks first if there is more than one waiter. In this case, I throw for simplicity reasons a std::runtime exception. The crucial part of this member function is the return value. event.notification was already set to true in the notify method. true means in this case that the coroutine is not suspended and executes such as a usual function.
In the second workflow, the co_await event2 call happens before event2 sends its notification. co_wait event2 triggers the call await_ready (line 7). The big difference to the first workflow is now that event.notified is false. This false value causes the suspension of the coroutine. Technically the member function await_suspend (lines 8) is executed. await_suspend gets the coroutine handle corHandle and stores it for later invocation in the variable coroutineHandle. Of course, later invocation means resumption. Secondly, the waiter is stored in the variable suspendedWaiter. When later event2.notify triggers its notification, the method notify (line 6) is executed. The difference to the first workflow is that the condition waiter != nullptr evaluates to true. The consequence is that the waiter uses the coroutineHandle to resume the coroutine.
What's next?
If I have one conclusion to this and my last post (C++20: An Infinite Data Stream with Coroutines) then this one: Don't implement your coroutines. Use existing coroutines such as the one available with cppcoro from Lewis Baker. I exactly follow this advice in my next post.
Four Voucher for Educative
There are four vouchers for educative to win: https://bit.ly/VoucherEducative. The vouchers allow you to access all educative.io courses for a quarter of a year.
Entrepreneur, Leader, Architect, Full-Stack Extreme Virtuoso: Business Analysis, Cyber Security, Data Science. ITIL BPM SLM Expert bringing Modern Approaches to drive Business Processes.
4 年Great article Rainer - I frequently see producer/consumer synchronization incorrectly implemented - your example with the condition variables is very clear :)