Active Object
This post is a cross-post from www.ModernesCpp.com.
The active object design pattern decouples method execution from method invocation for objects that each reside in their own thread of control.The goal is to introduce concurrency, by using asynchronous method invocation and a scheduler for handling requests. (Wikipedia:Active Object)
The Active Object decouples method invocation from method execution. The method invocation is performed on the client thread, but the method execution is on the Active Object. The Active Object has its thread and a list of method request objects (short request) to be executed. The client’s method invocation enqueues the requests on the Active Object’s list. The requests are dispatched to the servant.
Also known as
Problem
When many threads access a shared object synchronized, the following challenges must be solved:
Solution
Components
The Active Object pattern consists of six components:
Modernes C++ Mentoring
My mentoring programs:
Do you want to stay informed about my programs: Subscribe.
Dynamic Behavior
The dynamic behavior of the Active Object consists of three phases:
The following picture shows the sequence of messages.
?Pros and Cons
Before I present a minimal example of the Active Object, here are its pros and cons.
Pros
Cons
Example
The following example presents a simplified implementation of the Active Object. In particular, I don't define an interface for the method requests on the Active Object, which the proxy and the servant should implement. Further, the scheduler executes the next job when asked for it, and the run member function of the Active Object creates the threads.
领英推荐
The involved types future<vector<future<pair<bool, int>>>> are often quite verbose. To improve the readability, I heavily applied using declarations (line 1). This example requires a solid knowledge of promises and futures in C++. My posts about tasks provide more details.
?
// activeObject.cpp
#include <algorithm>
#include <deque>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <thread>
#include <utility>
#include <vector>
using std::async;
using std::boolalpha;
using std::cout;
using std::deque;
using std::distance;
using std::for_each;
using std::find_if;
using std::future;
using std::lock_guard;
using std::make_move_iterator;
using std::make_pair;
using std::move;
using std::mt19937;
using std::mutex;
using std::packaged_task;
using std::pair;
using std::random_device;
using std::sort;
using std::jthread;
using std::uniform_int_distribution;
using std::vector;
class IsPrime { // (8)
public:
IsPrime(int num): numb{num} {}
pair<bool, int> operator()() {
for (int j = 2; j * j <= numb; ++j){
if (numb % j == 0) return make_pair(false, numb);
}
return make_pair(true, numb);
}
private:
int numb;
};
class ActiveObject {
public:
future<pair<bool, int>> enqueueTask(int i) {
IsPrime isPrime(i);
packaged_task<pair<bool, int>()> newJob(isPrime);
auto isPrimeFuture = newJob.get_future();
{
lock_guard<mutex> lockGuard(activationListMutex);
activationList.push_back(move(newJob)); // (6)
}
return isPrimeFuture;
}
void run() {
std::jthread j([this] { // (12)
while ( !runNextTask() ); // (13)
});
}
private:
bool runNextTask() { // (14)
lock_guard<mutex> lockGuard(activationListMutex);
auto empty = activationList.empty();
if (!empty) { // (15)
auto myTask= std::move(activationList.front());
activationList.pop_front();
myTask();
}
return empty;
}
deque<packaged_task<pair<bool, int>()>> activationList; //(7)
mutex activationListMutex;
};
vector<int> getRandNumbers(int number) {
random_device seed;
mt19937 engine(seed());
uniform_int_distribution<> dist(1'000'000, 1'000'000'000); // (4)
vector<int> numbers;
for (long long i = 0 ; i < number; ++i) numbers.push_back(dist(engine));
return numbers;
}
future<vector<future<pair<bool, int>>>> getFutures(ActiveObject& activeObject,
int numberPrimes) {
return async([&activeObject, numberPrimes] {
vector<future<pair<bool, int>>> futures;
auto randNumbers = getRandNumbers(numberPrimes); // (3)
for (auto numb: randNumbers){
futures.push_back(activeObject.enqueueTask(numb)); // (5)
}
return futures;
});
}
int main() {
cout << boolalpha << '\n';
ActiveObject activeObject;
// a few clients enqueue work concurrently // (2)
auto client1 = getFutures(activeObject, 1998);
auto client2 = getFutures(activeObject, 2003);
auto client3 = getFutures(activeObject, 2011);
auto client4 = getFutures(activeObject, 2014);
auto client5 = getFutures(activeObject, 2017);
// give me the futures // (9)
auto futures = client1.get();
auto futures2 = client2.get();
auto futures3 = client3.get();
auto futures4 = client4.get();
auto futures5 = client5.get();
// put all futures together // (10)
futures.insert(futures.end(),make_move_iterator(futures2.begin()),
make_move_iterator(futures2.end()));
futures.insert(futures.end(),make_move_iterator(futures3.begin()),
make_move_iterator(futures3.end()));
futures.insert(futures.end(),make_move_iterator(futures4.begin()),
make_move_iterator(futures4.end()));
futures.insert(futures.end(),make_move_iterator(futures5.begin()),
make_move_iterator(futures5.end()));
// run the promises // (11)
activeObject.run();
// get the results from the futures
vector<pair<bool, int>> futResults;
futResults.reserve(futures.size());
for (auto& fut: futures) futResults.push_back(fut.get()); // (16)
sort(futResults.begin(), futResults.end()); // (17)
// separate the primes from the non-primes
auto prIt = find_if(futResults.begin(), futResults.end(),
[](pair<bool, int> pa){ return pa.first == true; });
cout << "Number primes: " << distance(prIt, futResults.end()) << '\n'; // (19)
cout << "Primes:" << '\n';
for_each(prIt, futResults.end(), [](auto p){ cout << p.second << " ";} ); // (20)
cout << "\n\n";
cout << "Number no primes: " << distance(futResults.begin(), prIt) << '\n'; // (18)
cout << "No primes:" << '\n';
for_each(futResults.begin(), prIt, [](auto p){ cout << p.second << " ";} );
cout << '\n';
}
First, the example's general idea is that clients can enqueue jobs concurrently on the activation list. The servant determines which numbers are prime, and the activation list is part of the Active Object. The Active Object runs the jobs enqueued in the activation list on a separate thread, and the clients can ask for the results.?
Here are the details.
The five clients enqueue the work (line 2) on the activeObject via the getFutures function. getFutures takes the activeObject and a number numberPrimes. numberPrimes random numbers are generated (line 3) between 1'000'000 and 1'000'000'000 (line 4) and pushed on the return value: vector<future<pair<bool, int>>. future<pair<bool, int> holds a bool and an int. The bool indicates if the int is a prime. Let's have a closer look at line (5): futures.push_back(activeObject.enqueueTask(numb)). This call triggers that a new job is enqueued on the activation list (line 6). All calls on the activation list have to be protected. The activation list is a deque of promises (line 7): deque<packaged_task<pair<bool, int>()>>. Each promise performs the function object IsPrime (line 8) when called. The return value is a pair of a bool and an int. The?bool indicates if the number int is prime.
Now, the work packages are prepared. Let's start the calculation. All clients return in line (9)?their handles to the associated futures. Putting all futures together (line 10) makes my job easier. The call activeObject.run() in line (11) starts the execution. The member function run (line 12) creates the thread to execute the member function runNextTask (line 13). runNextTask (line 14) determines if the deque is not empty (line 15) and creates the new task. By calling futResults.push_back(fut.get()) (line 16) on each future, all results are requested and pushed on futResults. Line (17) sorts the vector of pairs: vector<pair<bool, int>>. The remaining lines present the calculation. The iterator?prIt in line 18 holds the first iterator to a pair that has a prime number.????????????????????????????????????????????????????????????????????????????????????????????????????????????????????The screenshot shows the number of primes distance(prIt, futResults.end()) (line 19) and the primes (line 20). Only the first non-primes are displayed.
What's Next?
The Active Object and Monitor Object synchronize and schedule member function invocation. The main difference is that the Active Object executes its member function in a different thread, but the Monitor Object is in the same thread as the client. In my next post, I will present the Monitor Object.
?
Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dr?ge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschl?ger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Matthieu Bolt, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, and Rob North.
Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, and Slavko Radman.
My special thanks to Embarcadero , PVS-Studio, Tipi.build, and Take Up Code.
Seminars
I'm happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.
Bookable (Online)
German
Standard Seminars (English/German)
Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.
New
Contact Me
Modernes C++,
?