Joining Async Tasks in Rust

Concurrent Execution of async Functions

In Rust's asynchronous programming model, you can run multiple async functions concurrently, allowing tasks to be executed simultaneously without blocking the main thread. This concurrency is essential for achieving better performance and responsiveness in our applications, especially when dealing with I/O-bound operations.

The basic idea is that you can start multiple async functions, and they will execute concurrently as long as they perform asynchronous operations.

In asynchronous programming, it's common to deal with multiple tasks that run concurrently. These tasks might perform various operations, such as making network requests, performing calculations, or handling user input. To efficiently work with these tasks, we often need to:

  1. Run Multiple Tasks Concurrently: We want to execute multiple tasks concurrently to take advantage of the available CPU cores and improve the efficiency of your application.
  2. Wait for Task Completion: After launching these tasks, we need a way to wait for their completion before proceeding to the next steps of our program.

Here's where join, join_all, and JoinSet come into play:

Join:

Reference : Join in futures::future - Rust (docs.rs)

The join function in the futures crate allows you to concurrently run two or more asynchronous tasks and wait for all of them to complete.


It Joins the result of two futures, waiting for them both to complete.

This function will return a new future which awaits both futures to complete. The returned future will finish with a tuple of both results.

Note that this function consumes the passed futures and returns a wrapped version of it.

This function combines multiple futures into one, and when all the individual futures are resolved, it produces a tuple of their result.

#cargo.toml
[package]
name = "futures_block_on"
version = "0.1.0"
edition = "2021"

[dependencies]
futures = "0.3.28"
tokio = { version = "1.29.1", features = ["full"] }
        

Example 1:


use tokio::time::{sleep, Duration};
use futures::future::join;

async fn task1() -> i32 {
    // Perform some asynchronous computation
    sleep(Duration::from_secs(3)).await;
    101
}

async fn task2() -> f64 {
    // Perform another asynchronous computation
    sleep(Duration::from_secs(5)).await;
    3.14
}

#[tokio::main]
async fn main() {
    let (result1, result2) = join(task1(), task2()).await;
    println!("Result 1: {}", result1);
    println!("Result 2: {}", result2);
}

/*
Running `target/debug/join_demo`
Result 1: 101
Result 2: 3.14

// Please note that the result will be obtained after 5 sec at once , even though result is obtained after 3 secs. 
*/        

Example 2:

use tokio::time::{sleep, Duration};
use futures::future::join;

async fn async_operation_1() -> i32 {
? ? println!("Start async operation 1");

? ? sleep(Duration::from_secs(2)).await;
? ? println!("Async operation 1 completed");
? ? 101
}

async fn async_operation_2() -> String {
? ? println!("Start async operation 2");
? ? sleep(Duration::from_secs(3)).await;
? ? println!("Async operation 2 completed");
? ? "Jai Shree Ram, Jai Bajrang bali - Async!".to_string()
}

#[tokio::main]
async fn main() {
? ? println!("Start of main function");

? ? let future1 = async_operation_1();
? ? let future2 = async_operation_2();

? ? // Use the `join` combinator to run both futures concurrently.
? ? let result = join(future1, future2).await;

? ? // Extract results from the futures.
? ? let result1 = result.0;
? ? let result2 = result.1;

? ? println!("Result of async operation 1: {}", result1);
? ? println!("Result of async operation 2: {}", result2);

? ? println!("End of main function");
}
/*
?Running `target/debug/asyncProgram`
Start of main function
Start async operation 1
Start async operation 2
Async operation 1 completed
Async operation 2 completed
Result of async operation 1: 101
Result of async operation 2: Jai Shree Ram, Jai Bajrang bali - Async!
End of main function
amit:~/OmRustPractice/asyncProgram$ 
*/        


join_all:

join_all in futures::future - Rust (docs.rs)

join_all function provided by some asynchronous libraries, particularly in the context of asynchronous programming in Rust.

In Rust, libraries like futures and tokio offer an join_all function which allows you to concurrently await a collection of asynchronous tasks and gather their results into a single result. This function is commonly used when you want to execute multiple asynchronous tasks concurrently and wait for all of them to complete.

For example, in the futures crate, you can use the join_all function to concurrently await a collection of Future items and get a Vec of their results. Here's a simplified example:

#cargo.toml
[dependencies]
futures = "0.3.28"
tokio = { version = "1.29.1", features = ["full"] }        
use futures::future::join_all;
use tokio::time::{sleep, Duration};

async fn expensive_computation(data: i32,task_id:i32,delay_sec:u8) -> i32 {
    // Simulate a time-consuming computation
    println!("Task_id = {} <=> delay_sec = {}",task_id,delay_sec);
    sleep(Duration::from_secs(delay_sec.into())).await;
    data * 2
}

#[tokio::main]
async fn main() {
    let data = vec![1, 2, 3, 4, 5,6,7,8,9];
    let results = join_all(data.iter().map(|&d| expensive_computation(d,d,(10-d).try_into().unwrap()))).await;

    println!("Results: {:?}", results);

    for (index, result) in results.into_iter().enumerate() {
        println!("Result {}: {}", index, result);
    }
}
/*
Op => 
   Running `target/debug/join_all_demo1`
Task_id = 1 <=> delay_sec = 9
Task_id = 2 <=> delay_sec = 8
Task_id = 3 <=> delay_sec = 7
Task_id = 4 <=> delay_sec = 6
Task_id = 5 <=> delay_sec = 5
Task_id = 6 <=> delay_sec = 4
Task_id = 7 <=> delay_sec = 3
Task_id = 8 <=> delay_sec = 2
Task_id = 9 <=> delay_sec = 1


Results: [2, 4, 6, 8, 10, 12, 14, 16, 18]

Result 0: 2
Result 1: 4
Result 2: 6
Result 3: 8
Result 4: 10
Result 5: 12
Result 6: 14
Result 7: 16
Result 8: 18
*/        

In the above program you can see the results are obtained exactly in the order of the input even if the async tasks have different sleep period i.e sleep in exactly reverse direction.

i.e

Use join_all when you have a collection of asynchronous tasks, and we want to run them concurrently and obtain their results in the same order as the input tasks and results are stored in vector.


Here's an another example of using a mutex to safely share a counter between concurrent tasks:

use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
use futures::future::join_all;

async fn increment_counter(counter: Arc<Mutex<i32>>,task_id:u8,timeout:u32) ->i32 {
    let mut ctr:i32 = 0;
    {
        let mut guard = counter.lock().unwrap();
        *guard += 1;
        ctr = *guard;
    }
    sleep(Duration::from_secs(timeout.into())).await;
    // Perform other asynchronous operations...
    println!("Counter incremented with TaskId {} and counter _val = {}",task_id,ctr);
    ctr

}

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0));

    let tasks = vec![
        increment_counter(Arc::clone(&counter),0,5),
        increment_counter(Arc::clone(&counter),1,2),
    ];

    let result = join_all(tasks).await;
    println!("result = {:?}",result);

    let final_value = counter.lock().unwrap();
    println!("Final counter value: {}", *final_value);
} 
/*

     Running `target/debug/join_all`
Counter incremented with TaskId 1 and counter _val = 2
Counter incremented with TaskId 0 and counter _val = 1
result = [1, 2]
Final counter value: 2

*/        

In the above example, both increment_counter tasks share the same counter data wrapped in a mutex. The use of Arc ensures safe reference counting, and the mutex guarantees exclusive access while a task is modifying the data.

Shared state and concurrency management are essential topics in asynchronous programming. Properly using mutexes, atomics, or other synchronization primitives helps you avoid race conditions and data corruption in your concurrent async code.

Please note that in the above program also the results is in the same order as the input tasks and results are stored in vector.

JoinSet :

JoinSet in tokio::task - Rust (docs.rs)

JoinSet is a struct in the tokio::task module of Rust programming language. It is used to collect a set of tasks spawned on a Tokio runtime and await the completion of some or all of the tasks in the set 1. The set is not ordered, and the tasks will be returned in the order they complete. All of the tasks must have the same return type T. When the JoinSet is dropped, all tasks in the JoinSet are immediately aborted

Ex of usage:

[dependencies]
tokio = { version = "1.29.1", features = ["full"] }        
//main.rs
use std::sync::{Arc, Mutex};
use tokio::task::JoinSet;

async fn increment_counter(counter: Arc<Mutex<i32>>) {
    {
        let mut guard = counter.lock().unwrap();
        *guard += 1;
    }

// Perform other asynchronous operations...
    println!("Counter incremented");
}

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0));

    // Using Tokio JoinSet
    let mut join_set = JoinSet::new();
    for i in 0..10 {
        join_set.spawn(increment_counter(Arc::clone(&counter)));
    }

    while let Some(res) = join_set.join_next().await {
        println!("{res:?}");
    }

    let final_value = counter.lock().unwrap();
    println!("Final counter value: {}", *final_value);
} 
/*
warning: `joinSet_demo` (bin "joinSet_demo") generated 2 warnings (run `cargo fix --bin "joinSet_demo"` to apply 1 suggestion)
    Finished dev [unoptimized + debuginfo] target(s) in 0.03s
     Running `target/debug/joinSet_demo`
Counter incremented
Counter incremented
Ok(())
Ok(())
Counter incremented
Counter incremented
Ok(())
Ok(())
Counter incremented
Counter incremented
Ok(())
Ok(())
Counter incremented
Ok(())
Counter incremented
Ok(())
Counter incremented
Counter incremented
Ok(())
Ok(())
Final counter value: 10
*/
        

Please note that in the above output: The resulting set is not ordered, and the tasks will be returned in the order they complete.


pub fn spawn<F>(&mut self, task: F) -> AbortHandle

Spawn the provided task on the JoinSet, returning an AbortHandle that can be used to remotely cancel the task.

The provided future will start running in the background immediately when this method is called, even if you don’t await anything on this JoinSet.


async fn join_next(&mut self) -> Option<Result<T, JoinError>>

Waits until one of the tasks in the set completes and returns its output.

Returns None if the set is empty.


Ideal scenarios to use join,join_all , join_set

The choice between join, join_all, and JoinSet depends on the specific requirements of application and how we want to handle concurrent tasks. Let's discuss when to use each of these tools:

join:

Use Case: Use join when you want to run a small, known number of asynchronous tasks concurrently and obtain their results as a tuple.

Example Scenario: You have two or more tasks that are related and need to be executed concurrently. You want to wait for all of them to complete and obtain their results to work with together.

Example:

let (result1, result2) = join(task1(), task2()).await;        

join_all:

Use Case: Use join_all when you have a collection of asynchronous tasks, and you want to run them concurrently and obtain their results in the same order as the input tasks.

We would use join_all when we have a collection of tasks that we want to execute concurrently and receive their results as a vector. This is valuable when you have multiple independent tasks to run.

Example Scenario: You have a vector of tasks or futures, and you want to execute them concurrently and collect their results as a vector.

  1. Example:

let tasks = vec![task1(), task2(), task3()]; let results = join_all(tasks).await;        


JoinSet:

Use Case: Use JoinSet when you have a set of asynchronous tasks, and you want to run them concurrently and obtain their results as and when the results are available irrespective of order of the input tasks. The set is not ordered, and the tasks will be returned in the order they complete.

Example Scenario: You have a set of tasks or futures, and you want to execute them concurrently and collect in order of their completion insted of waiting for all the task to complete as join_all.


Thanks for reading till end , please comment if you have any .

Reference:

Join in futures::future - Rust (docs.rs)

join_all in futures::prelude::future - Rust (docs.rs)

JoinSet in tokio::task - Rust (docs.rs)

thebracket/Ardan-1HourAsync: Code accompanying the 1 Hour Dive into Async live stream. (github.com)


Sagar Y??????????adav

AI | Machine Learning Developer |?Full Stack

1 年

A simple and good explanation.

BIR BAHADUR

Assistant General Manager at Nepal SBI Merchant Banking Limited

1 年

Please let's connect with me ...

回复

要查看或添加评论,请登录

Amit Nadiger的更多文章

  • Process VS Thread Scheduling

    Process VS Thread Scheduling

    I want to discuss about the about the purpose of process Priority/Policy when each thread within a process has its own…

  • Linux Scheduling

    Linux Scheduling

    The Linux operating system is renowned for its robust and flexible scheduling mechanism, which ensures efficient CPU…

  • Rust modules

    Rust modules

    Referance : Modules - Rust By Example Rust uses a module system to organize and manage code across multiple files and…

  • List of C++ 17 additions

    List of C++ 17 additions

    1. std::variant and std::optional std::variant: A type-safe union that can hold one of several types, useful for…

  • List of C++ 14 additions

    List of C++ 14 additions

    1. Generic lambdas Lambdas can use auto parameters to accept any type.

    6 条评论
  • Passing imp DS(vec,map,set) to function

    Passing imp DS(vec,map,set) to function

    In Rust, we can pass imp data structures such as , , and to functions in different ways, depending on whether you want…

  • Atomics in C++

    Atomics in C++

    The C++11 standard introduced the library, providing a way to perform operations on shared data without explicit…

    1 条评论
  • List of C++ 11 additions

    List of C++ 11 additions

    Referance : C++11 - cppreference.com 1.

    2 条评论
  • std::lock, std::trylock in C++

    std::lock, std::trylock in C++

    std::lock - cppreference.com Concurrency and synchronization are essential aspects of modern software development.

    3 条评论
  • std::unique_lock,lock_guard, & scoped_lock

    std::unique_lock,lock_guard, & scoped_lock

    C++11 introduced several locking mechanisms to simplify thread synchronization and prevent race conditions. Among them,…

社区洞察

其他会员也浏览了