Concurrent Queue
Did you hear about the queue as a data structure ?? ---- of course yes, But what is the concurrent queue, in nutshell the concurrent queue is a thread-safe data structure that allows multiple threads to insert and remove elements concurrently without causing data corruption or race conditions. It follows the First-In-First-Out (FIFO) principle, ensuring that the order of elements is preserved during concurrent access.
The simplest way to write a concurrent queue is to write a "wrapper" class. This will have an std::queue object as a member, and an std::mutex. Each public member function will lock the mutex, then call the appropriate member function(s) of the queue.
This will be thread-safe, because only one thread can call a member function of the std::queue at any time. There is no possibility of a data race, because only one thread can access the queue at any one time.
Concurrent Queue Member Functions
For the special member functions, nothing unusual is required. The defaults will call the corresponding member functions of the std::queue member and the mutex, and this is sufficient.
The push() member function will lock the mutex member, then forward its argument to the push() member function of the queue member. It then unlocks the mutex and returns.
The pop() member function takes the element value as an output parameter (by non-const reference.) It will lock the mutex, call the queue member's front() member function, and store the returned value in its argument. It then calls the queue member's pop() member function and unlocks the mutex.
If pop() is called when the queue member is empty, it should behave in a well-defined way, for example by throwing an exception.
Concurrent Queue Class Definition
#include <queue>
#include <mutex>
// Define a namespace for the concurrent queue
namespace cq {
// Exception to throw when popping an empty queue
class concurrent_queue_exception : public std::runtime_error {
public:
cq_exception() : std::runtime_error("Queue is empty") {}
cq_exception(const char *s) : std::runtime_error(s) {}
};
// The concurrent queue class
template <class T>
class concurrent_queue {
// The mutex to protect the queue
std::mutex m;
// The wrapped queue object
std::queue<T> q;
public:
// Use defaults for the special member functions
concurrent_queue() = default;
// Thread-safe call to std::queue::push()
void push(T value)
领英推荐
{
std::lock_guard<std::mutex> lckg(m);
q.push(value);
}
void pop(T& value)
{
// Lock the mutex
std::lock_guard<std::mutex> lckg(m);
// Do not pop() an empty queue!
if (q.empty()) {
throw cq_exception("Queue is empty");
}
// Save the front element's value
value = q.front();
// Remove the front element
q.pop();
}
};
}//end of namespace
We define a class for the exception object which is thrown when popping an empty queue. This class is derived from std::exception, so it can be caught by a generic exception handler. This is thrown before calling the queue member's pop(), so the queue member will not have been modified.
The only other place where an exception can be thrown is when copying the return value from calling the queue member's front(), because the element's copy constructor could throw an exception. Again, the queue member has not been modified.
In both cases, the mutex will be unlocked when the local destructors are called. This implementation provides the strong exception guarantee, in which everything is left as it was before the member function call.
Conclusions:
Concurrent queue can be implemented using mutex, conditional variable and lock free programming.
Implementation using mutex:
Only one thread can access the queue at any one time. In effect, when a multi-threaded program accesses the queue, it temporarily becomes single-threaded.
Implementation using condition variable:
this ?makes the program more flexible, and it also makes the graining slightly less coarse grained. Other threads can call different member functions while pop() is waiting for data to arrive.
A much more efficient solution would be to implement the queue ourselves, using lock-free programming techniques. The queue would perform atomic operations instead of using mutexes. In high-performance computing, a ring buffer (circular queue) is often used instead of a linear queue.
It's important to note that implementing a fully functional and efficient lock-free concurrent queue is a non-trivial task and requires careful consideration of memory management, synchronization, and atomic operations.
Specialized libraries that provide robust lock-free data structures, such as Intel TBB or Boost.Lockfree, are recommended for production scenarios.
Senior Software Developer
1 年I’m guessibg the lock guard is a smart pointer that locks the mutex on construction and unlocks the mutex on destruction as soon as it goes out of scope.