Concurrent Queue

Did you hear about the queue as a data structure ?? ---- of course yes, But what is the concurrent queue, in nutshell the concurrent queue is a thread-safe data structure that allows multiple threads to insert and remove elements concurrently without causing data corruption or race conditions. It follows the First-In-First-Out (FIFO) principle, ensuring that the order of elements is preserved during concurrent access.

The simplest way to write a concurrent queue is to write a "wrapper" class. This will have an std::queue object as a member, and an std::mutex. Each public member function will lock the mutex, then call the appropriate member function(s) of the queue.

This will be thread-safe, because only one thread can call a member function of the std::queue at any time. There is no possibility of a data race, because only one thread can access the queue at any one time.

Concurrent Queue Member Functions

For the special member functions, nothing unusual is required. The defaults will call the corresponding member functions of the std::queue member and the mutex, and this is sufficient.

The push() member function will lock the mutex member, then forward its argument to the push() member function of the queue member. It then unlocks the mutex and returns.

The pop() member function takes the element value as an output parameter (by non-const reference.) It will lock the mutex, call the queue member's front() member function, and store the returned value in its argument. It then calls the queue member's pop() member function and unlocks the mutex.

If pop() is called when the queue member is empty, it should behave in a well-defined way, for example by throwing an exception.

Concurrent Queue Class Definition

#include <queue>

#include <mutex>

// Define a namespace for the concurrent queue

namespace cq {

// Exception to throw when popping an empty queue

class concurrent_queue_exception : public std::runtime_error {

public:

cq_exception() : std::runtime_error("Queue is empty") {}

cq_exception(const char *s) : std::runtime_error(s) {}

};

// The concurrent queue class

template <class T>

class concurrent_queue {

// The mutex to protect the queue

std::mutex m;

// The wrapped queue object

std::queue<T> q;

public:

// Use defaults for the special member functions

concurrent_queue() = default;

// Thread-safe call to std::queue::push()

void push(T value)

{

std::lock_guard<std::mutex> lckg(m);

q.push(value);

}

void pop(T& value)

{

// Lock the mutex

std::lock_guard<std::mutex> lckg(m);

// Do not pop() an empty queue!

if (q.empty()) {

throw cq_exception("Queue is empty");

}

// Save the front element's value

value = q.front();

// Remove the front element

q.pop();

}

};

}//end of namespace

We define a class for the exception object which is thrown when popping an empty queue. This class is derived from std::exception, so it can be caught by a generic exception handler. This is thrown before calling the queue member's pop(), so the queue member will not have been modified.

The only other place where an exception can be thrown is when copying the return value from calling the queue member's front(), because the element's copy constructor could throw an exception. Again, the queue member has not been modified.

In both cases, the mutex will be unlocked when the local destructors are called. This implementation provides the strong exception guarantee, in which everything is left as it was before the member function call.

Conclusions:

Concurrent queue can be implemented using mutex, conditional variable and lock free programming.

Implementation using mutex:

Only one thread can access the queue at any one time. In effect, when a multi-threaded program accesses the queue, it temporarily becomes single-threaded.

Implementation using condition variable:

this ?makes the program more flexible, and it also makes the graining slightly less coarse grained. Other threads can call different member functions while pop() is waiting for data to arrive.

A much more efficient solution would be to implement the queue ourselves, using lock-free programming techniques. The queue would perform atomic operations instead of using mutexes. In high-performance computing, a ring buffer (circular queue) is often used instead of a linear queue.

It's important to note that implementing a fully functional and efficient lock-free concurrent queue is a non-trivial task and requires careful consideration of memory management, synchronization, and atomic operations.

Specialized libraries that provide robust lock-free data structures, such as Intel TBB or Boost.Lockfree, are recommended for production scenarios.

Ahmed Abou-Ghanem

Senior Software Developer

1 年

I’m guessibg the lock guard is a smart pointer that locks the mutex on construction and unlocks the mutex on destruction as soon as it goes out of scope.

要查看或添加评论,请登录

Mohamed Salem的更多文章

  • Management of Bluetooth devices in Linux

    Management of Bluetooth devices in Linux

    Here I will talk about how to connect with a Bluetooth device in Linux using a command-line utility called…

  • Priority Queue in C++

    Priority Queue in C++

    A priority queue in c++ is a type of container adapter, which processes only the highest priority element, i.e.

  • Bubble Sort in CPP

    Bubble Sort in CPP

    We will explain the simple sorting Algorithm which (Bubble Sort) Bubble Sort is a simple algorithm which is used to…

  • Global variables with the same name

    Global variables with the same name

    You know from the Build process that the compiler will export the global variables to the assembler and the assembler…

  • Function before and after main()

    Function before and after main()

    Did you start to execute functions before and after the main() function ? you have two solutions to do that : first…

  • Command_line_arguments

    Command_line_arguments

    ???? ???? ?????? ???? ???????? ?? ?? command line arguments ???? ???? ???? ???????? ????? ????????? ???? ???? :O ????…

  • C_prototypes

    C_prototypes

    ??? ???? ?????? ????? ??????? ?? ????? ??? 1) int main(){} void main(){} ?????? ??????? ??? ??????? ????????? ??????…

  • Data Structures

    Data Structures

    https://drive.google.

  • Most Common Questions in Embedded C

    Most Common Questions in Embedded C

    https://drive.google.

社区洞察

其他会员也浏览了