std::execution: More Senders
std::execution offers three types of senders: factories, adapters, and consumers. I’ll take a closer look at these today.
Most of the following content is from proposal P2300R10. I will try to represent it more concisely.
Sender Factory
A sender factory is an algorithm that takes no senders as parameters and returns a sender. I already presented them in my last post: std::execution: Sender.
Sender Adaptor
A sender adaptor is an algorithm that takes one or more senders as parameters and returns a sender.
Sender adaptors are lazy. Sender consumers such as this_thread::sync_wait start senders.
execution::let_*
execution::sender auto let_value(
execution::sender auto input,
std::invocable<values-sent-by(input)...> function
);
execution::sender auto let_error(
execution::sender auto input,
std::invocable<errors-sent-by(input)...> function
);
execution::sender auto let_stopped(
execution::sender auto input,
std::invocable auto function
);
let_value is very similar to then: when it is started, it invokes the provided function with the values sent by the input sender as arguments. However, where the sender returned from then sends exactly what that function ends up returning – let_value requires that the function return a sender, and the sender returned by let_value sends the values sent by the sender returned from the callback.
A nice example about let_value, let_error, and let_stopped has the prototype library stdexec.
The following example shows the main program of an HTTP server that handles multiple requests concurrently. (https://github.com/NVIDIA/stdexec/blob/main/examples/server_theme/let_value.cpp)
/*
* Copyright (c) 2022 Lucian Radu Teodorescu
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
int main() {
// Create a thread pool and get a scheduler from it
exec::static_thread_pool pool{8};
ex::scheduler auto sched = pool.get_scheduler();
// Fake a couple of requests
for (int i = 0; i < 10; i++) {
// The whole flow for transforming incoming requests into responses
ex::sender auto snd =
// get a sender when a new request comes
schedule_request_start(sched, i)
// make sure the request is valid; throw if not
| ex::let_value(validate_request)
// process the request in a function that may be using a different execution context
| ex::let_value(handle_request)
// If there are errors transform them into proper responses
| ex::let_error(error_to_response)
// If the flow is cancelled, send back a proper response
| ex::let_stopped(stopped_to_response)
// write the result back to the client
| ex::let_value(send_response)
// done
;
// execute the whole flow asynchronously
ex::start_detached(std::move(snd));
}
pool.request_stop();
return 0;
}
An exec::static_thread_pool pool is created with 8 threads, and the get_scheduler method is called to obtain a scheduler object named sched.
?
Modernes C++ Mentoring
Do you want to stay informed: Subscribe.
?
The program then simulates handling multiple requests by iterating over a loop 10 times. It constructs a pipeline of operations for each iteration to transform incoming requests into responses. An ex::sender snd represents this pipeline.
The pipeline starts with the schedule_request_start function, which creates a sender that represents the start of handling a new request. The request is then validated using the ex::let_value function, which applies the validate_request. If the request is not valid, an exception is thrown.
Next, the request is processed using the ex::let_value function that applies the handle_request function. If any errors occur during processing, they are transformed into proper responses using the ex::let_error function applying the error_to_response function. If the flow is cancelled, a proper response is sent back using the ex::let_stopped function applying the stopped_to_response function. Finally, the result is written back to the client using the ex::let_value function, which applies the send_response function.
The ex::start_detached call detaches the execution from the current thread.
execution::into_variant
领英推荐
execution::sender auto into_variant(
execution::sender auto snd
);
Returns a sender which sends a variant of tuples of all the possible sets of types sent by the input sender.
execution::stopped_as_optional
execution::sender auto stopped_as_optional(
single-sender auto snd
);
Returns a sender that maps the value channel from a T to an optional<decay_t<T>>, and maps the stopped channel to a value of an empty optional<decay_t<T>>.
execution::stopped_as_error
template<move_constructible Error>
execution::sender auto stopped_as_error(
execution::sender auto snd,
Error err
);
Returns a sender that maps the stopped channel to an error of err.
execution::bulk
execution::sender auto bulk(
execution::sender auto input,
std::integral auto shape,
invocable<decltype(size), values-sent-by(input)...> function
);
Returns a sender that describes the callable call invoked on input according to shape.
execution::split
execution::sender auto split(execution::sender auto sender);
If the provided sender is a multi-shot sender, return that sender. Otherwise, return a multi-shot sender that sends values equivalent to those sent by the provided sender.
Some senders may only support launching their operation once, while others may be repeatable.
execution::when_all*
execution::sender auto when_all(
execution::sender auto ...inputs
);
execution::sender auto when_all_with_variant(
execution::sender auto ...inputs
);
when_all returns a sender that completes once all of the input senders have completed. when_all_with_variant does the same, but it adapts all the input senders using into_variant, and so it does not constrain the input arguments as when_all does.
execution::scheduler auto sched = thread_pool.scheduler();
execution::sender auto sends_1 = ...;
execution::sender auto sends_abc = ...;
execution::sender auto both = execution::when_all(
sends_1,
sends_abc
);
execution::sender auto final = execution::then(both, [](auto... args){
std::cout << std::format("the two args: {}, {}", args...);
});
// when final executes, it will print "the two args: 1, abc"
Sender Consumer
A sender consumer is an algorithm that takes one or more senders as parameters and does not return a sender.
this_thread::sync_wait
auto sync_wait(
execution::sender auto sender
) requires (always-sends-same-values(sender))
-> std::optional<std::tuple<values-sent-by(sender)>>;
this_thread::sync_wait is a sender consumer that submits the work described by the provided sender for execution, blocking the current std::thread or thread of main until the work is completed, and returns an optional tuple of values sent by the provided sender on its completion of work. sync_wait is one way to exit the senders domain, retrieving the task graph result.
If the provided sender sends an error instead of values, sync_wait throws that error as an exception, or rethrows the original exception if the error is of type std::exception_ptr.
If the provided sender sends the “stopped” signal instead of values, sync_wait returns an empty optional.
A Short Christmas Break
I will take a two-week Christmas break. My next post will be published on January 13th.