C++: Efficient parallelization of small tasks

C++: Efficient parallelization of small tasks

In the realm of concurrent programming, we often encounter scenarios where numerous independent, yet minuscule tasks need efficient parallel processing. This article delves into optimizing the parallel execution of such tasks and explores different solutions. In the end, a tailormade solution I created for small tasks is shared with benchmarks.

My goal here is to share my learning and get feedback on further improvement or any possible bugs.

Problem statement: Consider a scenario where a collection of very small tasks, each encapsulated within a class with a thread-safe method Task::process(), needs to be efficiently processed in batches.

We need to design an API processTasks (Collection<Task> tasks)

  • It can be called multiple (N) times.
  • It needs to process (call task.process()) each task in collection (of size S)
  • Assume that 'Task::process()' takes on an average runtime of 'T'


Solution 0: Here is basic implementation of our function

using namespace std:
void processTasks (Collection<Task>& tasks) {
     for(Task& task : tasks)
         task.process();
}        

We can also write it as following by using 'std::for_each' and 'std::mem_fn'

void processTasks (Collection<Task>& tasks) {
     for_each(tasks.begin(), tasks.end(), mem_fn(&Task::process));
     // for_each(tasks.begin(), tasks.end(), [](Task& task) { task.process(); } );
}        

Given the choice of a container type, std::vector is preferred for its data locality, fast traversal, and random access, making it suitable for parallelization.

We have independent tasks to process so this is ideal case for parallelization. Parallel execution is beneficial when the runtime of tasks (or task-group) is large enough to justify the overhead of parallelization.

Solution 1: C++17 introduced parallel version of std::for_each which can use background threads for parallel execution. So just by specifying one extra argument 'execution::par_unseq' in above code we achieved parallelization.

void processTasks (vector<Task>& tasks) {
     for_each(execution::par_unseq
, tasks.begin(), tasks.end(), mem_fn(&Task::process));
}        

Pros:

  • Minimal code changes required and easy to switch between single/multi-threaded version by changing just one argument of function.
  • This can be ideal solution when we need to call 'processTasks' only once i.e. N = 1

Cons:

  • Non-ideal solution if this API can be called multiple times. Parallel std::for_each might be creating/destroying threads for each call which has runtime overhead. This overhead can become significant for large number of API calls and specially when tasks are very small.
  • No load balancing or control over number of threads !!


Solution 2: We can use thread-pool to avoid creating/destroying threads for each call. This also gives us control over number of threads.

So lets submit all tasks to threadpool to be processed on background threads.

size_t numThreads = 2;
ThreadPool pool(numThreads);
void processTasks (vector<Task>& tasks) {
     for(Task& task : tasks) 
         pool.submit([&task] { task.process(); });
     pool.wait_for_background_work_to_finish();
}        

Pros:

  • Threads are re-used on multiple calls so no overhead of creating/destroying threads. Better control over number of threads.

Cons:

  • ThreadPool's internal work queue needs to be locked* everytime as task needs to inserted by user through API 'submit' or popped by working threads.
  • This contention can be minimized by ThreadPool by having thread-specific queues with work stealing at the cost of complexity ThreadPool's complexity. (See https://www.youtube.com/watch?v=zULU6Hhp42w)
  • Since ThreadPool allows us process heterogeneous tasks so it generally uses Type-erasure (through std::function) to store different kind of tasks in a single* queue. But the main strength of std::function, namely its type-erasure mechanism, does not come for free, involving dynamic memory allocation, virtual dispatch, and no inlining.

Solution 3: We can minimize this contention by submitting clubbed-tasks instead of individual tasks. In following code for simplicity, we have clubbed 'tasks' into two groups and submitted to threadpool.

size_t numThreads = 2;
ThreadPool pool(numThreads);
void processTasks (vector<Task>& tasks) {
     size_t midPoint = tasks.size()/2;
     // SubGroup1: Submit first half for background processing
     pool.submit([&]{ for(size_t i = 0; i < midPoint; i++)             tasks[i].process(); });
     // SubGroup2: Submit second half for background processing
     pool.submit([&]{ for(size_t i = midPoint; i < tasks.size(); i++)             tasks[i].process(); });
     pool.wait_for_background_work_to_finish();
}        

I generally breaks 'S' tasks into (numThreads * 5) for a balance between load-balancing and contention on threadpool's internal queue.

Pros:

  • Reduced contention w.r.t Solution 2
  • ThreadPool allows us process heterogeneous tasks so it can be doing some other work also in additional to work we are assigning it from 'processTasks'

Cons:

  • Contention and std::function overheads, as described above


This approach will work fine for cases where runtime of one call "processTasks" is greater than a threshold (contention, context-switching, std::function overhead)

  • I.E. when (S * T) is greater than overhead of submitting task to ThreadPool's background thread.
  • tasks.size() * Runtime(Task::process()) > ThreadPool contention,context-switching overhead

How can we optimize it further by customizing for our use-case.

Lets look at why a generic threadpool is not ideal solution for us ?

A generic threadpool

  1. is optimized for throughput
  2. need to support hetrogenious tasks
  3. Can't make any assumption about usage pattern

Since our priorities and requirements are different so lets try to create a tailor made solution for us.

  • Optimize for Latency rather than throughput (Do you know why?)
  • Homogenious Tasks: Same function needs to be called on each element of vector. (SFMF or Single Function Multiple Data)
  • Usage Pattern: We get work in batches. So we get some work and then all threads will finish doing it and finish. Only then we start next batch of work.


Solution 3: Here is implementation of a tailormade threadpool I created for our problem.

It directly keeps pointer of wrapper over "vector<Task>" and each threads works on non-overlapping index range of this vector to process tasks.

  • Avoids overheads associated with Type-Erasure/std::function - dynamic memory allocation - Virtual dispatch* - No inlining of functions because of run time bindingWhich can become comparatively significant when task size itself is small).
  • No need to maintain a task queue
  • Also no contenction on common queue while submitting/popping a task

Also it uses busy wait instead of sleep/yield to prioritize latency over throughput.

With this our final API becomes

size_t numThreads = 2;
HTaskVecThreadPool<Task> pool(numThreads);
void processTasks (vector<Task>& tasks) {
     pool.submit(tasks);
     pool.waitForAll();
}        

So how does this compare to ThreadPool (with single queue) approach. In my experiment to compute std::sin value of N numbers 1000 times on K threads.


For N = 12, K= 4

  • SingleThread: 1258.04 ns
  • ThreadPool: 20608.1 ns
  • FinalSolution: 1414.49 ns

Our solution is 14X faster than ThreadPool and only 12% degradation over single threaded solution for such small task.


For N = 120, K=4

  • SingleThread: 10381.6 ns
  • ThreadPool: 24378.2 ns
  • FinalSolution: 3994.21 ns

Our solution is 6X faster than ThreadPool and 2.6 faster than single threaded solution


For N = 1200, K=4

  • SingleThread: 107846 ns
  • ThreadPool: 62471.2 ns
  • FinalSolution: 31352.4 ns

Now for medium size tasks also we are still 2X faster than ThreadPool and 3.43X faster than SingleThread


Future work: For small tasks, we can't use a large number of threads. So we can try following approaches

  • Pin a thread to a certain CPU: OS can migrate a thread on busy-wait/sleeping to different CPUs sometimes. This can affect latency. So we can pin a thread to a certain CPU. On Linux, we can use the pthread-specific pthread_setaffinity_np function
  • Avoid core sharing: L1 and L2 caches are shared between the two threads in every core. We can avoid core sharing so that we don't assign work to two threads of same core so that one thread get exclusive access to L1/L2 caches.

要查看或添加评论,请登录

社区洞察

其他会员也浏览了