Joining Async Tasks in Rust
Amit Nadiger
Polyglot(Rust??, C++ 11,14,17,20, C, Kotlin, Java) Android TV, Cas, Blockchain, Polkadot, UTXO, Substrate, Wasm, Proxy-wasm,AndroidTV, Dvb, STB, Linux, Engineering management.
Concurrent Execution of async Functions
In Rust's asynchronous programming model, you can run multiple async functions concurrently, allowing tasks to be executed simultaneously without blocking the main thread. This concurrency is essential for achieving better performance and responsiveness in our applications, especially when dealing with I/O-bound operations.
The basic idea is that you can start multiple async functions, and they will execute concurrently as long as they perform asynchronous operations.
In asynchronous programming, it's common to deal with multiple tasks that run concurrently. These tasks might perform various operations, such as making network requests, performing calculations, or handling user input. To efficiently work with these tasks, we often need to:
Here's where join, join_all, and JoinSet come into play:
Join:
The join function in the futures crate allows you to concurrently run two or more asynchronous tasks and wait for all of them to complete.
It Joins the result of two futures, waiting for them both to complete.
This function will return a new future which awaits both futures to complete. The returned future will finish with a tuple of both results.
Note that this function consumes the passed futures and returns a wrapped version of it.
This function combines multiple futures into one, and when all the individual futures are resolved, it produces a tuple of their result.
#cargo.toml
[package]
name = "futures_block_on"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.28"
tokio = { version = "1.29.1", features = ["full"] }
Example 1:
use tokio::time::{sleep, Duration};
use futures::future::join;
async fn task1() -> i32 {
// Perform some asynchronous computation
sleep(Duration::from_secs(3)).await;
101
}
async fn task2() -> f64 {
// Perform another asynchronous computation
sleep(Duration::from_secs(5)).await;
3.14
}
#[tokio::main]
async fn main() {
let (result1, result2) = join(task1(), task2()).await;
println!("Result 1: {}", result1);
println!("Result 2: {}", result2);
}
/*
Running `target/debug/join_demo`
Result 1: 101
Result 2: 3.14
// Please note that the result will be obtained after 5 sec at once , even though result is obtained after 3 secs.
*/
Example 2:
use tokio::time::{sleep, Duration};
use futures::future::join;
async fn async_operation_1() -> i32 {
? ? println!("Start async operation 1");
? ? sleep(Duration::from_secs(2)).await;
? ? println!("Async operation 1 completed");
? ? 101
}
async fn async_operation_2() -> String {
? ? println!("Start async operation 2");
? ? sleep(Duration::from_secs(3)).await;
? ? println!("Async operation 2 completed");
? ? "Jai Shree Ram, Jai Bajrang bali - Async!".to_string()
}
#[tokio::main]
async fn main() {
? ? println!("Start of main function");
? ? let future1 = async_operation_1();
? ? let future2 = async_operation_2();
? ? // Use the `join` combinator to run both futures concurrently.
? ? let result = join(future1, future2).await;
? ? // Extract results from the futures.
? ? let result1 = result.0;
? ? let result2 = result.1;
? ? println!("Result of async operation 1: {}", result1);
? ? println!("Result of async operation 2: {}", result2);
? ? println!("End of main function");
}
/*
?Running `target/debug/asyncProgram`
Start of main function
Start async operation 1
Start async operation 2
Async operation 1 completed
Async operation 2 completed
Result of async operation 1: 101
Result of async operation 2: Jai Shree Ram, Jai Bajrang bali - Async!
End of main function
amit:~/OmRustPractice/asyncProgram$
*/
join_all:
join_all function provided by some asynchronous libraries, particularly in the context of asynchronous programming in Rust.
In Rust, libraries like futures and tokio offer an join_all function which allows you to concurrently await a collection of asynchronous tasks and gather their results into a single result. This function is commonly used when you want to execute multiple asynchronous tasks concurrently and wait for all of them to complete.
For example, in the futures crate, you can use the join_all function to concurrently await a collection of Future items and get a Vec of their results. Here's a simplified example:
#cargo.toml
[dependencies]
futures = "0.3.28"
tokio = { version = "1.29.1", features = ["full"] }
use futures::future::join_all;
use tokio::time::{sleep, Duration};
async fn expensive_computation(data: i32,task_id:i32,delay_sec:u8) -> i32 {
// Simulate a time-consuming computation
println!("Task_id = {} <=> delay_sec = {}",task_id,delay_sec);
sleep(Duration::from_secs(delay_sec.into())).await;
data * 2
}
#[tokio::main]
async fn main() {
let data = vec![1, 2, 3, 4, 5,6,7,8,9];
let results = join_all(data.iter().map(|&d| expensive_computation(d,d,(10-d).try_into().unwrap()))).await;
println!("Results: {:?}", results);
for (index, result) in results.into_iter().enumerate() {
println!("Result {}: {}", index, result);
}
}
/*
Op =>
Running `target/debug/join_all_demo1`
Task_id = 1 <=> delay_sec = 9
Task_id = 2 <=> delay_sec = 8
Task_id = 3 <=> delay_sec = 7
Task_id = 4 <=> delay_sec = 6
Task_id = 5 <=> delay_sec = 5
Task_id = 6 <=> delay_sec = 4
Task_id = 7 <=> delay_sec = 3
Task_id = 8 <=> delay_sec = 2
Task_id = 9 <=> delay_sec = 1
Results: [2, 4, 6, 8, 10, 12, 14, 16, 18]
Result 0: 2
Result 1: 4
Result 2: 6
Result 3: 8
Result 4: 10
Result 5: 12
Result 6: 14
Result 7: 16
Result 8: 18
*/
In the above program you can see the results are obtained exactly in the order of the input even if the async tasks have different sleep period i.e sleep in exactly reverse direction.
i.e
Use join_all when you have a collection of asynchronous tasks, and we want to run them concurrently and obtain their results in the same order as the input tasks and results are stored in vector.
Here's an another example of using a mutex to safely share a counter between concurrent tasks:
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
use futures::future::join_all;
async fn increment_counter(counter: Arc<Mutex<i32>>,task_id:u8,timeout:u32) ->i32 {
let mut ctr:i32 = 0;
{
let mut guard = counter.lock().unwrap();
*guard += 1;
ctr = *guard;
}
sleep(Duration::from_secs(timeout.into())).await;
// Perform other asynchronous operations...
println!("Counter incremented with TaskId {} and counter _val = {}",task_id,ctr);
ctr
}
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
let tasks = vec![
increment_counter(Arc::clone(&counter),0,5),
increment_counter(Arc::clone(&counter),1,2),
];
let result = join_all(tasks).await;
println!("result = {:?}",result);
let final_value = counter.lock().unwrap();
println!("Final counter value: {}", *final_value);
}
/*
Running `target/debug/join_all`
Counter incremented with TaskId 1 and counter _val = 2
Counter incremented with TaskId 0 and counter _val = 1
result = [1, 2]
Final counter value: 2
*/
In the above example, both increment_counter tasks share the same counter data wrapped in a mutex. The use of Arc ensures safe reference counting, and the mutex guarantees exclusive access while a task is modifying the data.
Shared state and concurrency management are essential topics in asynchronous programming. Properly using mutexes, atomics, or other synchronization primitives helps you avoid race conditions and data corruption in your concurrent async code.
Please note that in the above program also the results is in the same order as the input tasks and results are stored in vector.
JoinSet :
JoinSet is a struct in the tokio::task module of Rust programming language. It is used to collect a set of tasks spawned on a Tokio runtime and await the completion of some or all of the tasks in the set 1. The set is not ordered, and the tasks will be returned in the order they complete. All of the tasks must have the same return type T. When the JoinSet is dropped, all tasks in the JoinSet are immediately aborted
领英推荐
Ex of usage:
[dependencies]
tokio = { version = "1.29.1", features = ["full"] }
//main.rs
use std::sync::{Arc, Mutex};
use tokio::task::JoinSet;
async fn increment_counter(counter: Arc<Mutex<i32>>) {
{
let mut guard = counter.lock().unwrap();
*guard += 1;
}
// Perform other asynchronous operations...
println!("Counter incremented");
}
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
// Using Tokio JoinSet
let mut join_set = JoinSet::new();
for i in 0..10 {
join_set.spawn(increment_counter(Arc::clone(&counter)));
}
while let Some(res) = join_set.join_next().await {
println!("{res:?}");
}
let final_value = counter.lock().unwrap();
println!("Final counter value: {}", *final_value);
}
/*
warning: `joinSet_demo` (bin "joinSet_demo") generated 2 warnings (run `cargo fix --bin "joinSet_demo"` to apply 1 suggestion)
Finished dev [unoptimized + debuginfo] target(s) in 0.03s
Running `target/debug/joinSet_demo`
Counter incremented
Counter incremented
Ok(())
Ok(())
Counter incremented
Counter incremented
Ok(())
Ok(())
Counter incremented
Counter incremented
Ok(())
Ok(())
Counter incremented
Ok(())
Counter incremented
Ok(())
Counter incremented
Counter incremented
Ok(())
Ok(())
Final counter value: 10
*/
Please note that in the above output: The resulting set is not ordered, and the tasks will be returned in the order they complete.
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
Spawn the provided task on the JoinSet, returning an AbortHandle that can be used to remotely cancel the task.
The provided future will start running in the background immediately when this method is called, even if you don’t await anything on this JoinSet.
async fn join_next(&mut self) -> Option<Result<T, JoinError>>
Waits until one of the tasks in the set completes and returns its output.
Returns None if the set is empty.
Ideal scenarios to use join,join_all , join_set
The choice between join, join_all, and JoinSet depends on the specific requirements of application and how we want to handle concurrent tasks. Let's discuss when to use each of these tools:
join:
Use Case: Use join when you want to run a small, known number of asynchronous tasks concurrently and obtain their results as a tuple.
Example Scenario: You have two or more tasks that are related and need to be executed concurrently. You want to wait for all of them to complete and obtain their results to work with together.
Example:
let (result1, result2) = join(task1(), task2()).await;
join_all:
Use Case: Use join_all when you have a collection of asynchronous tasks, and you want to run them concurrently and obtain their results in the same order as the input tasks.
We would use join_all when we have a collection of tasks that we want to execute concurrently and receive their results as a vector. This is valuable when you have multiple independent tasks to run.
Example Scenario: You have a vector of tasks or futures, and you want to execute them concurrently and collect their results as a vector.
let tasks = vec![task1(), task2(), task3()]; let results = join_all(tasks).await;
JoinSet:
Use Case: Use JoinSet when you have a set of asynchronous tasks, and you want to run them concurrently and obtain their results as and when the results are available irrespective of order of the input tasks. The set is not ordered, and the tasks will be returned in the order they complete.
Example Scenario: You have a set of tasks or futures, and you want to execute them concurrently and collect in order of their completion insted of waiting for all the task to complete as join_all.
Thanks for reading till end , please comment if you have any .
Reference:
AI | Machine Learning Developer |?Full Stack
1 年A simple and good explanation.
Assistant General Manager at Nepal SBI Merchant Banking Limited
1 年Please let's connect with me ...