World's smallest thread pool in C++

World's smallest thread pool in C++

OK, the title is a trap, maybe it won't be the smallest but close I hope, I finally put the sacred red book into good use, trying to open my mind to Uncle Bjarne's elegant simplicity. I used the just released 3rd edition of "A tour of C++" for this article.

The basic "anatomy" of a thread pool

No alt text provided for this image

There will be a function, main thread, that on certain conditions will create a task (a socket descriptor that requires read/write operations, for instance) and add it to the Task Queue and then notify the pool of threads so one of them will pick up the task and execute some procedure on it.

In the sacred red book, Section 18.4 "Waiting for events", page 243 (Kindle edition) explains the Producer/Consumer model using std::condition_variable module and locking facilities to pass notifications from Producer to Consumer. It's a very clear and simple example, not a thread pool, but provides the basic building block of the mechanism.

In my case, the producer will be the main() function, the task is a simple integer, representing a file descriptor, this is the whole function, let's dissect it below.

int main() 
{

	//---not relevant, just to simulate intervals and signals
	std::random_device rd;
	std::mt19937 mt{rd()};
	std::uniform_int_distribution<int> dist(1, 50);
	std::uniform_int_distribution<int> dist2(1, 10);
	//--


	//create thread pool
	constexpr int pool_size {5};
	std::vector<std::stop_source> stops(pool_size);
	std::vector<std::jthread> pool(pool_size);
	for (int i = 0; i < pool_size; i++) {
		stops[i] = std::stop_source();
		pool[i] = std::jthread(consumer, stops[i].get_token());
	}


	//producer loop that creates task on certain signals (epoll for instance)	
	int i{0};
	while (true)
	{
		//create task and dispatch work
		++i;
		int fd {dist(mt)};
		{
			std::scoped_lock lock {m_mutex};
			m_queue.push(fd);
			m_cond.notify_one();
		}


		//simulate interval waiting for signal
		std::this_thread::sleep_for (std::chrono::milliseconds(dist2(mt)));
		
		//on certain specific signal stop all threads in pool and terminate program (sigterm, sigkill)
		if (i > 100)?
		{
			for (auto s: stops) {
				s.request_stop();
				std::scoped_lock lock {m_mutex};
				m_cond.notify_one();
			}
			break;
		}
	}
	
	return 0;
}

        

We are using C++ 20 std::jthread which provides a bit more features than the plain std::thread, like the ability to send a "stop" signal to the thread, which we will use in the example. Before we dive into the Producer's code, let's see the shared variables between main() and consume(), the producer and consumer. These are C++ abstractions that help build the whole mechanism with very little effort:


std::queue<int> m_queue;
std::condition_variable m_cond;
std::mutex m_mutex;
        

The relevant parts of the Producer in main(), the thread pool, C++ 20 help us here with the use of std::stop_source, the abstraction that helps to notify a jthread the stop signal, we must pass it to the constructor of jthread alongside with the function that represents the Consumer.

	//create thread pool
	constexpr int pool_size {5};
	std::vector<std::stop_source> stops(pool_size);
	std::vector<std::jthread> pool(pool_size);
	for (int i = 0; i < pool_size; i++) {
		stops[i] = std::stop_source();
		pool[i] = std::jthread(consumer, stops[i].get_token());
	}        

When a certain signal happens, like a socket ready for read operations, the Producer will create a task (a file descriptor) and add it to the Task Queue, using proper C++ locks and notifications, because the variables involved are shared with the Consumer and then notify the pool that there is a task available:

int fd {dist(mt)};
{
	std::scoped_lock lock {m_mutex};
	m_queue.push(fd);
	m_cond.notify_one();
}        

If a certain signal occurs, like SIGTERM or SIGKILL, the Producer must notify the worker threads, the Consumers, to stop. This part is explained in section 18.5.4 "Stopping a thread" and makes use of std::stop_source.

if (i > 100)
{
	for (auto s: stops) {
		s.request_stop();
		std::scoped_lock lock {m_mutex};
		m_cond.notify_one();
	}
	break;
}?        

And that's all for the Producer, in the real world, it could be a Linux epoll event loop that when the event is a "socket ready for read/write" then the task is created with this socket FD and dispatched to the pool, so the thread running the main epoll loop is freed to keep processing kernel events while the request is being processed in the background in another thread. That's my plan BTW to play a bit more with this code.

The Consumer

void consumer(std::stop_token tok)
{
	//----not relevant - just to simulate random amount of time processing task
	std::random_device rd;
	std::mt19937 mt{rd()};
	std::uniform_int_distribution<int> dist(1, 50);
	//---


	//start task - init resources
	fprintf( stderr, "[pool] %ld started\n", pthread_self());
	
	//main worker loop
	while(!tok.stop_requested())
	{
		//prepare lock
		std::unique_lock lock{m_mutex};?
		//release lock, reaquire it if conditions met
		m_cond.wait(lock, [&tok] { return (!m_queue.empty() || tok.stop_requested()); });?
		
		if (tok.stop_requested()) { lock.unlock(); break; }
		
		//get task
		auto fd = m_queue.front();
		m_queue.pop();
		lock.unlock();
		
		//---simulate processing task
		fprintf(stderr, "[pool] %ld processing FD: %d\n", pthread_self(), fd);
		std::this_thread::sleep_for (std::chrono::milliseconds(dist(mt)));
		//---
	}
	
	//ending task - free resources if necessary
	fprintf( stderr, "[pool] %ld stopping thread\n", pthread_self());
	
}

?        

A consumer is a function with a loop to process events, basically, the consumer waits until notified, locks the queue and extracts the top task, unlocks ASAP, and starts processing the task (read and write to/from a socket). In this loop, it will check if the Producer has triggered the stop signal, if that is the case, the Consumer will exit the loop and finish running.

The relevant parts

//prepare lock
std::unique_lock lock{m_mutex};

//release lock, reaquire it if conditions met
m_cond.wait(lock, [&tok] { return (!m_queue.empty() || tok.stop_requested()); });?
		
if (tok.stop_requested()) { lock.unlock(); break; }
		
//get task
auto fd = m_queue.front();
m_queue.pop();
lock.unlock();        

std::condition_variable is the magic behind this, it provides the communication between the producer and consumer when a task is ready in the queue, the other part is the use of std::stop_token to send the "stop and exit" signal to the thread.

And that's all, meet helloworld.cpp - the smallest Thread Pool in the world, possibly in the universe!

#include <iostream>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>?
#include <random>
#include <stop_token>
#include <string>
#include <cstdio>

std::queue<int> m_queue;
std::condition_variable m_cond;
std::mutex m_mutex;


void consumer(std::stop_token tok)?
{
	//----not relevant - just to simulate random amount of time processing task
	std::random_device rd;
	std::mt19937 mt{rd()};
	std::uniform_int_distribution<int> dist(1, 50);
	//---


	//start task - init resources
	fprintf( stderr, "[pool] %ld started\n", pthread_self());
	
	//main worker loop
	while(!tok.stop_requested())
	{
		//prepare lock
		std::unique_lock lock{m_mutex};?
		//release lock, reaquire it if conditions met
		m_cond.wait(lock, [&tok] { return (!m_queue.empty() || tok.stop_requested()); });?
		
		if (tok.stop_requested()) { lock.unlock(); break; }
		
		//get task
		auto fd = m_queue.front();
		m_queue.pop();
		lock.unlock();
		
		//---simulate processing task
		fprintf(stderr, "[pool] %ld processing FD: %d\n", pthread_self(), fd);
		std::this_thread::sleep_for (std::chrono::milliseconds(dist(mt)));
		//---
	}
	
	//ending task - free resources if necessary
	fprintf( stderr, "[pool] %ld stopping thread\n", pthread_self());
	
}


int main() {


	//---not relevant, just to simulate intervals and signals
	std::random_device rd;
	std::mt19937 mt{rd()};
	std::uniform_int_distribution<int> dist(1, 50);
	std::uniform_int_distribution<int> dist2(1, 10);
	//--


	//create thread pool
	constexpr int pool_size {5};
	std::vector<std::stop_source> stops(pool_size);
	std::vector<std::jthread> pool(pool_size);
	for (int i = 0; i < pool_size; i++) {
		stops[i] = std::stop_source();
		pool[i] = std::jthread(consumer, stops[i].get_token());
	}


	//producer loop that creates task on certain signals (epoll for instance)	
	int i{0};
	while (true)
	{
		//create task and dispatch work
		++i;
		int fd {dist(mt)};
		{
			std::scoped_lock lock {m_mutex};
			m_queue.push(fd);
			m_cond.notify_one();
		}


		//simulate interval waiting for signal
		std::this_thread::sleep_for (std::chrono::milliseconds(dist2(mt)));
		
		//on certain specific signal stop all threads in pool and terminate program (sigterm, sigkill)
		if (i > 100)?
		{
			for (auto s: stops) {
				s.request_stop();
				std::scoped_lock lock {m_mutex};
				m_cond.notify_one();
			}
			break;
		}
	}
	
	return 0;
}        

To compile it using GNU C++ (tested with 11.2/12.2):

g++ -O3 -std=c++20 -pedantic -Wall -pthread helloworld.cpp -o a        

The output looks like this:

./a

[pool] 140514633270848 started

[pool] 140514616485440 started

[pool] 140514616485440 processing FD: 50

[pool] 140514608092736 started

[pool] 140514624878144 started

[pool] 140514599700032 started

[pool] 140514608092736 processing FD: 36

[pool] 140514633270848 processing FD: 17

[pool] 140514624878144 processing FD: 29

.... more of these messages

[pool] 140514608092736 processing FD: 16

[pool] 140514624878144 processing FD: 39

[pool] 140514633270848 processing FD: 29

... stop signal sent to all worker threads after 100 tasks

[pool] 140514608092736 stopping thread

[pool] 140514616485440 stopping thread

[pool] 140514599700032 stopping thread

[pool] 140514633270848 stopping thread

[pool] 140514624878144 stopping thread

... the program ends here

Please note that this code will compile in Compiler Explorer terrific website, but it won't run (an include error, I think it's CE bug), but it will run on your machine, Linux or Windows (MingW-x64)

Never mind the use of random and chrono, it was just for the sake of the example, to simulate running tasks and signals, if you remove these lines, you can see how brief is the relevant code.

If you find a shorter or simpler code for a thread pool please let me know in the comments.

Any errors in the code above are my own, Uncle Bjarne is free from all responsibility, I just interpreted his valuable lessons on the power and simplicity of C++ abstractions to create this little toy, naive? maybe, but it is running as expected, on Windows and Linux.

要查看或添加评论,请登录

社区洞察

其他会员也浏览了