Thread-Safe Queue - Two Serious Errors
This post is a cross-post from www.ModernesCpp.com.
In my last post "Monitor Object"?I implemented a thread-safe queue. I made two serious errors. Sorry. Today, I will fix these issues.
First, I want to show you again the erroneous implementation from my last post to understand the context.
// monitorObject.cpp
#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
class Monitor {
public:
void lock() const {
monitMutex.lock();
}
void unlock() const {
monitMutex.unlock();
}
void notify_one() const noexcept {
monitCond.notify_one();
}
template <typename Predicate>
void wait(Predicate pred) const { // (10)
std::unique_lock<std::mutex> monitLock(monitMutex);
monitCond.wait(monitLock, pred);
}
private:
mutable std::mutex monitMutex;
mutable std::condition_variable monitCond;
};
template <typename T> // (1)
class ThreadSafeQueue: public Monitor {
public:
void add(T val){
lock();
myQueue.push(val); // (6)
unlock();
notify_one();
}
T get(){
wait( [this] { return ! myQueue.empty(); } ); // (2)
lock();
auto val = myQueue.front(); // (4)
myQueue.pop(); // (5)
unlock();
return val;
}
private:
std::queue<T> myQueue; // (3)
};
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << '\n';
constexpr auto NumberThreads = 100;
ThreadSafeQueue<int> safeQueue; // (7)
auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8)
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9)
std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "\n\n";
}
?
The key idea of the example is that the Monitor Object is encapsulated in a class and can, therefore, be reused. The class?Monitor uses a std::mutex as monitor lock and a std::condition_variable as monitor condition. The class?Monitor provides the minimal interface that a Monitor Object should support.
?
ThreadSafeQueue in line (1) extends?std::queue in line (3) with a thread-safe interface. ThreadSafeQueue derives from the class Monitor and uses its member functions to support the synchronized member functions add and get. The member functions add and get use the monitor lock to protect the Monitor Object, particularly the non-thread-safe myQueue. add notifies the waiting thread when a new item was added to myQueue. This notification is thread-safe. The member function get (line (3)) deserves more attention. First, the wait member function of the underlying condition variable is called. This wait call needs an additional predicate to protect against spurious and lost wakeups (C++ Core Guidelines: Be Aware of the Traps of Condition Variables). The operations modifying?myQueue (lines 4 and 5) must also be protected because they can interleave with the call myQueue.push(val) (line 6). The Monitor Object safeQueue line (7) uses the lambda functions in lines (8) and (9) to add or remove a number from the synchronized safeQueue. ThreadSafeQueue itself is a class template and can hold values from an arbitrary type. One hundred clients add 100 random numbers between 1 - 6 to?safeQueue (line 7), while hundred clients remove these 100 numbers concurrently from the safeQueue. The output of the program shows the numbers and the thread ids.
?
Modernes C++ Mentoring
Be part of my mentoring programs:
?Stay informed: Subscribe via E-Mail.
This program has two serious issues. Dietmar Kühl and Frank Birbacher wrote me an e-mail about it. These are their words, which I translated into English. My additions are cursive and bold.
?
?
The fix is that Monitor::wait() can only be called if a unique_lock is held. This can be achieved, for example, by having Monitor provide an appropriate (protected?) function that returns a suitable object and requests a reference to it in wait():
struct Monitor {
using Lock = std::unique_lock<std::mutex>; // could be wrapper if you prefer
[[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
template <typename Predicate>
void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
// …
};
template <typename T>
T ThreadSafeQueue<T>::get() {
auto kerberos = receiveGuard();
wait(kerberos, [this]{ return not myQueue.empty(); });
T rc = std::move(myQueue.front());
myqueue.pop();
return rc;
}
This version corrects the exception problem for get(). For add() you can simply use the monitor object with a lock_guard:?
template <typename T>
void add(T val) {
{
std::lock_guard<Monitor> kerberos(*this);
myqueue.push(std::move(val));
}
notify_one();
}
I would probably wrap the notification in a "SendGuard" that contains a lock_guard and a reference to the condition_variable and sends the notification upon destruction:
class SendGuard {
friend class Monitor;
using deleter = decltype([](auto& cond){ cond->notify_one(); });
std::unique_ptr<std::condition_variable, deleter> notifier;
std::lock_guard<std::mutex> kerberos;
SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};
The move constructor and destructor should still be public and represent the whole interface! This would also make it much easier to use in add():
template <typename T>
void add(T val) {
auto kerberos = sendGuard();
myqueue.push(val);
}
Finally, here is the full implementation of Dietmar. The numbers correspond to the numbers in my monitorObjec.cpp example.
// threadsafequeue1.cpp
#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
class Monitor {
public:
using Lock = std::unique_lock<std::mutex>;
[[nodiscard]] Lock receiveGuard() {
return Lock(monitMutex);
}
template <typename Predicate>
void wait(Lock& kerberos, Predicate pred) {
monitCond.wait(kerberos, pred);
}
class SendGuard {
friend class Monitor;
using deleter = decltype([](auto* cond){ cond->notify_one(); });
std::unique_ptr<std::condition_variable, deleter> notifier;
std::lock_guard<std::mutex> kerberos;
SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};
SendGuard sendGuard() { return {monitMutex, monitCond}; }
private:
mutable std::mutex monitMutex;
mutable std::condition_variable monitCond;
};
template <typename T> // (1)
class ThreadSafeQueue: public Monitor {
public:
void add(T val){
auto kerberos = sendGuard();
myQueue.push(val); // (6)
}
T get(){
auto kerberos = receiveGuard();
wait(kerberos, [this] { return ! myQueue.empty(); } ); // (2)
auto val = myQueue.front(); // (4)
myQueue.pop(); // (5)
return val;
}
private:
std::queue<T> myQueue; // (3)
};
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << '\n';
constexpr auto NumberThreads = 100;
ThreadSafeQueue<int> safeQueue; // (7)
auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8)
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9)
std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "\n\n";
}
As a result of the discussion above, Frank has proposed the following version below, which has a consistent and easy-to-use interface for Monitor.
领英推荐
// threadSafeQueue.cpp
#include <atomic>
#include <algorithm>
#include <condition_variable>
#include <deque>
#include <iterator>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <vector>
class Monitor {
public:
struct UnlockAndNotify {
std::mutex d_mutex;
std::condition_variable d_condition;
void lock() { d_mutex.lock(); }
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};
private:
UnlockAndNotify d_combined;
public:
std::unique_lock<UnlockAndNotify> makeLockWithNotify() {
return std::unique_lock{d_combined};
}
template <typename PRED>
std::unique_lock<std::mutex> makeLockWithWait(PRED waitForCondition) {
std::unique_lock lock{d_combined.d_mutex};
d_combined.d_condition.wait(lock, waitForCondition);
return lock;
}
};
class ThreadQueue {
Monitor d_monitor;
std::deque<int> d_numberQueue;
auto makeLockWhenNotEmpty() {
return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
}
public:
void addNumber(int number) {
const auto lock = d_monitor.makeLockWithNotify();
d_numberQueue.push_back(number);
}
int removeNumber() {
const auto lock = makeLockWhenNotEmpty();
const auto number = d_numberQueue.front();
d_numberQueue.pop_front();
return number;
}
};
int main() {
ThreadQueue queue;
std::atomic<int> sharedSum{};
std::atomic<int> sharedCounter{};
std::vector<std::jthread> threads;
threads.reserve(200);
std::generate_n(std::back_inserter(threads), 100, [&] {
return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
});
std::generate_n(std::back_inserter(threads), 100, [&] {
return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
});
threads.clear(); // wait for all threads to finish
if (sharedSum.load() != 5050) {
throw std::logic_error("Wrong result for sum of 1..100");
}
}
The implementation of the monitor pattern here is based on the flexibility of std::unique_lock through its template parameter.?All of the std RAII lock?guards can be used with any class that has lock() and unlock() methods.?The UnlockAndNotify class implements this interface and notifies its?condition variable from within the unlock() method.?On top of that, the Monitor class provides a reduced public interface to create two different kinds?of locks, one with notification and one without, by creating a std::unique_lock on either the whole UnlockAndNotify instance or just the contained?std::mutex.
On the choice of std::unique_lock versus std::lock_guard I (Frank) prefer the unique_lock in the interface.?This choice allows the user of the Monitor class?more flexibility.?I value this flexibility higher than a possible performance difference to lock_guard which anyway needs to be measured first.?I?acknowledge that the given examples don't make use of this extra flexibility.
Afterward, Dietmar further developed Frank's idea: here, the protected data is kept in the Monitor, making it harder to access it unprotected
// threadsafequeue2.cpp
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <iterator>
#include <mutex>
#include <random>
#include <stdexcept>
#include <thread>
#include <tuple>
#include <vector>
namespace patterns::monitor3 {
template <typename T>
class Monitor {
public:
struct UnlockAndNotify {
std::mutex d_mutex;
std::condition_variable d_condition;
void lock() { d_mutex.lock(); }
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};
private:
mutable UnlockAndNotify d_combined;
mutable T d_data;
public:
std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const {
return { d_data, std::unique_lock{d_combined} };
}
template <typename PRED>
std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const {
std::unique_lock lock{d_combined.d_mutex};
d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
return { d_data, std::move(lock) };
}
};
template <typename T>
class ThreadQueue {
Monitor<std::deque<T>> d_monitor;
public:
void add(T number) {
auto[numberQueue, lock] = d_monitor.makeProducerLock();
numberQueue.push_back(number);
}
T remove() {
auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
const auto number = numberQueue.front();
numberQueue.pop_front();
return number;
}
};
}
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << '\n';
constexpr auto NumberThreads = 100;
patterns::monitor3::ThreadQueue<int> safeQueue;
auto addLambda = [&safeQueue](int val){ safeQueue.add(val);
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.remove(); };
std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "\n\n";
}
Once more, thanks a lot to?Frank?and?Dietmar. I didn't want to prove, with my erroneous implementation of a thread-safe queue in my previous post, that concurrency is hard to get right. I'm particularly annoyed that I don't put the mutex inside a lock (error 2). I teach this in my C++ classes: NNM (No Naked Mutex).
What's next?
In my next post, I dive into the future of C++: C++23.
?
Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dr?ge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschl?ger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Matthieu Bolt, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, and Rob North.
Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, and Slavko Radman.
My special thanks to Embarcadero, PVS-Studio , Tipi.build, and Take Up Code.
?
Seminars
I'm happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.
Bookable (Online)
German
Standard Seminars (English/German)
Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.
New
Contact Me
Modernes C++,