Thread-Safe Queue - Two Serious Errors

Thread-Safe Queue - Two Serious Errors

This post is a cross-post from www.ModernesCpp.com.

In my last post "Monitor Object"?I implemented a thread-safe queue. I made two serious errors. Sorry. Today, I will fix these issues.

First, I want to show you again the erroneous implementation from my last post to understand the context.

// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
    void lock() const {
        monitMutex.lock();
    }

    void unlock() const {
        monitMutex.unlock();
    }

    void notify_one() const noexcept {
        monitCond.notify_one();
    }

    template <typename Predicate>
    void wait(Predicate pred) const {                 // (10)
        std::unique_lock<std::mutex> monitLock(monitMutex);
        monitCond.wait(monitLock, pred);
    }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template <typename T>                                  // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        lock();
        myQueue.push(val);                             // (6)
        unlock();
        notify_one();
    }
    
    T get(){ 
        wait( [this] { return ! myQueue.empty(); } );  // (2)
        lock();
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        unlock();
        return val;
    }

private:
    std::queue<T> myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue<int> safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}        

?

The key idea of the example is that the Monitor Object is encapsulated in a class and can, therefore, be reused. The class?Monitor uses a std::mutex as monitor lock and a std::condition_variable as monitor condition. The class?Monitor provides the minimal interface that a Monitor Object should support.

?

ThreadSafeQueue in line (1) extends?std::queue in line (3) with a thread-safe interface. ThreadSafeQueue derives from the class Monitor and uses its member functions to support the synchronized member functions add and get. The member functions add and get use the monitor lock to protect the Monitor Object, particularly the non-thread-safe myQueue. add notifies the waiting thread when a new item was added to myQueue. This notification is thread-safe. The member function get (line (3)) deserves more attention. First, the wait member function of the underlying condition variable is called. This wait call needs an additional predicate to protect against spurious and lost wakeups (C++ Core Guidelines: Be Aware of the Traps of Condition Variables). The operations modifying?myQueue (lines 4 and 5) must also be protected because they can interleave with the call myQueue.push(val) (line 6). The Monitor Object safeQueue line (7) uses the lambda functions in lines (8) and (9) to add or remove a number from the synchronized safeQueue. ThreadSafeQueue itself is a class template and can hold values from an arbitrary type. One hundred clients add 100 random numbers between 1 - 6 to?safeQueue (line 7), while hundred clients remove these 100 numbers concurrently from the safeQueue. The output of the program shows the numbers and the thread ids.

No alt text provided for this image

?

No alt text provided for this image

Modernes C++ Mentoring

Be part of my mentoring programs:

?Stay informed: Subscribe via E-Mail.

This program has two serious issues. Dietmar Kühl and Frank Birbacher wrote me an e-mail about it. These are their words, which I translated into English. My additions are cursive and bold.

?

  1. In ThreadSafeQueue::get() it is tested by Monitor::wait() whether myQueue contains an element or waits for an element to be contained. However, the lock is only held inside wait(), i.e. in get() you cannot be sure that the element is still in myQueue: another thread may get the lock and remove the element, resulting in undefined behavior on the call to myQueue.front().
  2. If the copy/move constructor of T throws an exception, the ThreadSafeQueue is in an inconsistent state: no member function is active, but the mutex is locked

?

The fix is that Monitor::wait() can only be called if a unique_lock is held. This can be achieved, for example, by having Monitor provide an appropriate (protected?) function that returns a suitable object and requests a reference to it in wait():

struct Monitor {
   using Lock = std::unique_lock<std::mutex>; // could be wrapper if you prefer
   [[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
   template <typename Predicate>
   void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
   // …
};

template <typename T>
T ThreadSafeQueue<T>::get() {
   auto kerberos = receiveGuard();
   wait(kerberos, [this]{ return not myQueue.empty(); });
   T rc = std::move(myQueue.front());
   myqueue.pop();
   return rc;
}        

This version corrects the exception problem for get(). For add() you can simply use the monitor object with a lock_guard:?

 template <typename T>
void add(T val) {
   {
        std::lock_guard<Monitor> kerberos(*this);
        myqueue.push(std::move(val));
    }
    notify_one();
}        

I would probably wrap the notification in a "SendGuard" that contains a lock_guard and a reference to the condition_variable and sends the notification upon destruction:

 class SendGuard {
    friend class Monitor;
    using deleter = decltype([](auto& cond){ cond->notify_one(); });
    std::unique_ptr<std::condition_variable, deleter> notifier;
    std::lock_guard<std::mutex> kerberos;
    SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};        

The move constructor and destructor should still be public and represent the whole interface! This would also make it much easier to use in add():

 template <typename T>
void add(T val) {
   auto kerberos = sendGuard();
   myqueue.push(val);
}        

Finally, here is the full implementation of Dietmar. The numbers correspond to the numbers in my monitorObjec.cpp example.

// threadsafequeue1.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
    using Lock = std::unique_lock<std::mutex>;
    [[nodiscard]] Lock receiveGuard() {
        return Lock(monitMutex);
    }

    template <typename Predicate>
    void wait(Lock& kerberos, Predicate pred) {
        monitCond.wait(kerberos, pred);
    }

    class SendGuard {
        friend class Monitor;
        using deleter = decltype([](auto* cond){ cond->notify_one(); });
        std::unique_ptr<std::condition_variable, deleter> notifier;
        std::lock_guard<std::mutex> kerberos;
        SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
    };

    SendGuard sendGuard() { return {monitMutex, monitCond}; }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template <typename T>                                  // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        auto kerberos = sendGuard();
        myQueue.push(val);                             // (6)
    }
    
    T get(){ 
        auto kerberos = receiveGuard();
        wait(kerberos, [this] { return ! myQueue.empty(); } );  // (2)
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        return val;
    }

private:
    std::queue<T> myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue<int> safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}        

As a result of the discussion above, Frank has proposed the following version below, which has a consistent and easy-to-use interface for Monitor.

// threadSafeQueue.cpp

#include <atomic>
#include <algorithm>
#include <condition_variable>
#include <deque>
#include <iterator>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <vector>


class Monitor {
public:
    struct UnlockAndNotify {
        std::mutex d_mutex;
        std::condition_variable d_condition;

        void lock() { d_mutex.lock(); }
        void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
    };

private:
    UnlockAndNotify d_combined;

public:
    std::unique_lock<UnlockAndNotify> makeLockWithNotify() {
        return std::unique_lock{d_combined};
    }

    template <typename PRED>
    std::unique_lock<std::mutex> makeLockWithWait(PRED waitForCondition) {
        std::unique_lock lock{d_combined.d_mutex};
        d_combined.d_condition.wait(lock, waitForCondition);
        return lock;
    }
};

class ThreadQueue {
    Monitor d_monitor;
    std::deque<int> d_numberQueue;

    auto makeLockWhenNotEmpty() {
        return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
    }

public:
    void addNumber(int number) {
        const auto lock = d_monitor.makeLockWithNotify();
        d_numberQueue.push_back(number);
    }

    int removeNumber() {
        const auto lock = makeLockWhenNotEmpty();
        const auto number = d_numberQueue.front();
        d_numberQueue.pop_front();
        return number;
    }
};

int main() {
   ThreadQueue queue;
   std::atomic<int> sharedSum{};
   std::atomic<int> sharedCounter{};

   std::vector<std::jthread> threads;
   threads.reserve(200);
   std::generate_n(std::back_inserter(threads), 100, [&] {
       return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
   });
   std::generate_n(std::back_inserter(threads), 100, [&] {
       return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
   });

   threads.clear(); // wait for all threads to finish
   if (sharedSum.load() != 5050) {
       throw std::logic_error("Wrong result for sum of 1..100");
   }
}        

The implementation of the monitor pattern here is based on the flexibility of std::unique_lock through its template parameter.?All of the std RAII lock?guards can be used with any class that has lock() and unlock() methods.?The UnlockAndNotify class implements this interface and notifies its?condition variable from within the unlock() method.?On top of that, the Monitor class provides a reduced public interface to create two different kinds?of locks, one with notification and one without, by creating a std::unique_lock on either the whole UnlockAndNotify instance or just the contained?std::mutex.

On the choice of std::unique_lock versus std::lock_guard I (Frank) prefer the unique_lock in the interface.?This choice allows the user of the Monitor class?more flexibility.?I value this flexibility higher than a possible performance difference to lock_guard which anyway needs to be measured first.?I?acknowledge that the given examples don't make use of this extra flexibility.

Afterward, Dietmar further developed Frank's idea: here, the protected data is kept in the Monitor, making it harder to access it unprotected

// threadsafequeue2.cpp

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <iterator>
#include <mutex>
#include <random>
#include <stdexcept>
#include <thread>
#include <tuple>
#include <vector>

namespace patterns::monitor3 {

template <typename T>
class Monitor {
public:
   struct UnlockAndNotify {
       std::mutex d_mutex;
       std::condition_variable d_condition;
   
       void lock() { d_mutex.lock(); }
       void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
   };

private:
   mutable UnlockAndNotify d_combined;
   mutable T               d_data;

public:
   std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const {
       return { d_data, std::unique_lock{d_combined} };
   }

   template <typename PRED>
   std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const {
       std::unique_lock lock{d_combined.d_mutex};
       d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
       return { d_data, std::move(lock) };
   }
};

template <typename T>
class ThreadQueue {
   Monitor<std::deque<T>> d_monitor;

public:
   void add(T number) {
       auto[numberQueue, lock] = d_monitor.makeProducerLock();
       numberQueue.push_back(number);
   }

   T remove() {
       auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
       const auto number = numberQueue.front();
       numberQueue.pop_front();
       return number;
   }
};
}

class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};

int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    patterns::monitor3::ThreadQueue<int> safeQueue;                     

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);         
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.remove(); };  

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}        

Once more, thanks a lot to?Frank?and?Dietmar. I didn't want to prove, with my erroneous implementation of a thread-safe queue in my previous post, that concurrency is hard to get right. I'm particularly annoyed that I don't put the mutex inside a lock (error 2). I teach this in my C++ classes: NNM (No Naked Mutex).

What's next?

In my next post, I dive into the future of C++: C++23.

?

Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dr?ge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschl?ger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Matthieu Bolt, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, and Rob North.

Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, and Slavko Radman.

My special thanks to Embarcadero, PVS-Studio , Tipi.build, and Take Up Code.

?

Seminars

I'm happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

Bookable (Online)

German

Standard Seminars (English/German)

Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

  • C++ - The Core Language
  • C++ - The Standard Library
  • C++ - Compact
  • C++11 and C++14
  • Concurrency with Modern C++
  • Design Pattern and Architectural Pattern with C++
  • Embedded Programming with Modern C++
  • Generic Programming (Templates) with C++

New

  • Clean Code with Modern C++
  • C++20

Contact Me

Modernes C++,

No alt text provided for this image



要查看或添加评论,请登录

Rainer Grimm的更多文章

  • std::execution: Asynchronous Algorithms

    std::execution: Asynchronous Algorithms

    supports many asynchronous algorithms for various workflows. Presenting proposal P2300R10 is not easy.

  • My ALS Journey (17/n): Christmas Special

    My ALS Journey (17/n): Christmas Special

    Today, I have a special Christmas gift. My ALS Journey so far Make the Difference Let’s do something great together:…

  • std::execution

    std::execution

    std::execution, previously known as executors or Senders/Receivers, provides “a Standard C++ framework for managing…

    1 条评论
  • C++26 Core Language: Small Improvements

    C++26 Core Language: Small Improvements

    There are more small improvements in the C++26 language, which you should know. static_assert extension First, here’s…

  • My ALS Journey (16/n): Good Bye Training / Hello Mentoring

    My ALS Journey (16/n): Good Bye Training / Hello Mentoring

    In 2025, I will no longer offer C++ classes. Instead, I will only offer C++ mentoring in the future.

    1 条评论
  • Placeholders and Extended Character Set

    Placeholders and Extended Character Set

    Placeholders are a nice way to highlight variables that are no longer needed. Additionally, the character set of C++26…

    4 条评论
  • Contracts in C++26

    Contracts in C++26

    Contracts allow you to specify preconditions, postconditions, and invariants for functions. Contracts should already be…

    5 条评论
  • Mentoring as a Key to Success

    Mentoring as a Key to Success

    I know that we are going through very challenging times. Saving is the top priority.

  • Reflection in C++26: Determine the Layout

    Reflection in C++26: Determine the Layout

    Thanks to reflection, you can determine the layout of types. My examples are based on the reflection proposal P2996R5.

    5 条评论
  • My ALS Journey (15/n): A typical Day

    My ALS Journey (15/n): A typical Day

    You may wonder how my day looks. Let me compare a day before ALS with a current day.

    3 条评论

社区洞察

其他会员也浏览了