Async, await and future in rust
Amit Nadiger
Polyglot(Rust??, C++ 11,14,17,20, C, Kotlin, Java) Android TV, Cas, Blockchain, Polkadot, UTXO, Substrate, Wasm, Proxy-wasm,AndroidTV, Dvb, STB, Linux, Engineering management.
Asynchronous programming is a crucial aspect of modern software development, allowing applications to efficiently handle I/O-bound operations and concurrency without blocking execution threads.
Rust's asynchronous programming support in its core libraries provides the foundational components needed for writing async code, including language constructs like async and await. This support is designed to minimize memory allocation, crucial for performance-sensitive applications. However, the actual execution of async tasks is delegated to external runtimes or executors like Tokio or async-std, which handle task scheduling and I/O operations efficiently. This separation allows developers to write async code using familiar Rust syntax while benefiting from optimized runtime execution.
An async function is a special type of function in Rust that allows you to write asynchronous code in a more readable and familiar way. It indicates that the function contains asynchronous operations and may use the await keyword to pause execution without blocking the thread.
How Rust implements Async:
What happens when we add async keyword to function ?
When we decorate a function with async, it transforms the return type from a regular value into a Future.
Below are more details:
Here's how we define an async function:
async fn do_something_async() {
// Asynchronous code here
}
An async function returns a type called a future. A future represents a value that might not be available immediately but will be available at some point in the future when the asynchronous computation completes.
In order to execute the async task , we need runtime such as tokio which is an execution environment that provides the infrastructure necessary for handling asynchronous tasks. It coordinates the execution of tasks, manages event loops, and handles I/O operations, timers, and other asynchronous operations.
So, 1st we need to choose the appropriate runtime from external crates.
Then run the runtime.
and then async task can be executed on the running runtime.
In Rust we can choose the run time of our choice to execute the asyn. This is one of the examples for modular nature of rust, plug and play!
Asynchronous code needs an execution environment provided by a runtime in order to function properly. There are different runtime options available, and choosing the appropriate runtime depends on the use case, requirements, and the libraries we are working with.
Here are a few popular asynchronous runtimes in the Rust ecosystem:
For asyn programming in Rust we need to start the runtime :
This is explained in the previous article. Tokio Runtime start & config | LinkedIn
Let me use tokio runtime for rest of this article.
Below is cargo.toml to be used for all tokio examples:
Add the tokio runtiime using below command in command line
cargo add tokio --features full
[package]
name = "NameOfProgram"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.29.1", features = ["full"] }
Pausing Execution with await
The await keyword is used within an async function to pause its execution until a future is resolved. When an await is encountered, the function yields control back to the event loop, allowing other tasks to run in the meantime. Once the awaited future is ready, the function resumes its execution from where it left off.
Before jumping in to code let see dependency for asyn task in Rust :
Here's an example of using await within an async function:
use tokio::time::delay_for;
use std::time::Duration;
async fn async_operation() {
println!("Start of async operation");
delay_for(Duration::from_secs(2)).await;
println!("Async operation completed");
}
In this example, the delay_for function from the tokio crate returns a future that completes after a specified duration. By using await, the async_operation function pauses its execution for 2 seconds without blocking the thread.
Returning Values from async Functions
An async function can return a value just like a regular function. However, the returned value is wrapped in a future that resolves to the value when the asynchronous computation is complete as described earlier. This allows us to obtain the result of an asynchronous operation without blocking the thread.
Here's an example of an async function that returns a value:
async fn fetch_data_from_server() -> Result<String, reqwest::Error> {
let response = reqwest::get("https://example.com/data").await?;
response.text().await
}
In this example, the fetch_data_from_server function returns a Result<String, reqwest::Error>. The await keyword is used to wait for the response of the HTTP request to be available without blocking the thread. If the request is successful, the response body is returned as a String. If an error occurs, it's propagated through the Result type.
Futures
In Rust, a future is a placeholder for a value that may not be available yet but will be at some point in the future. Futures are a fundamental building block of asynchronous programming and are used to represent the result of asynchronous computations or operations. They enable us to work with asynchronous tasks in a structured and composable manner.
Futures allow us to perform tasks concurrently, efficiently manage I/O-bound operations, and create complex asynchronous workflows without explicitly managing low-level threading or synchronization details.
Please note that: Futures are inert in Rust and make progress only when polled. Dropping a future stops it from making further progress.
The Future Trait
The Future trait in Rust defines the interface for asynchronous computations that will eventually produce a value or an error. This trait is part of the standard library and provides a common API for interacting with different types of futures. It has a single associated type, Output, which represents the type of the value that the future will eventually produce.
Here's a simplified definition of the Future trait:
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
Please note about the futures crate : futures - Rust (docs.rs)
The futures crate contains traits and functions useful for writing async code. This includes the Stream, Sink, AsyncRead, and AsyncWrite traits, and utilities such as combinators. These utilities and traits may eventually become part of the standard library.
futures has its own executor, but not its own reactor, so it does not support execution of async I/O or timer futures. For this reason, it's not considered a full runtime. A common choice is to use utilities from futures with an executor from another crate.
Chaining and Composing Futures
One of the powerful features of futures is their ability to be combined, chained, and composed to create more complex asynchronous workflows. You can use combinators and methods provided by the Future trait to create new futures by transforming or combining existing ones.
领英推荐
FutureExt and TryFutureExt:
futures::future::FutureExt and futures::TryFutureExt traits are part of the futures crate, which is used for working with asynchronous programming using the async/await syntax. These traits provide a set of extension methods for working with Future and TryFuture types.
Here's a breakdown of what these traits are used for:
Both of these traits are useful for writing clean and expressive asynchronous code. They enable you to chain together asynchronous operations, handle errors, transform values, and more, all while maintaining the async/await syntax.
For example, you can chain futures using the map combinator to perform a computation on the result of a future:
cargo.toml
[dependencies]
futures = "0.3"
use futures::future::FutureExt; // Import the extension traits
use futures::future::TryFutureExt; // Import the extension traits
use tokio::time::{sleep, Duration};
async fn async_operation() -> i32 {
/*
Imp Note below !
Never use the std::thread::sleep() , since it is system sleep which will make the whole thread to sleep , so all the tasks will run serially , this is the vary bad thing to do , hence tokio provide the alternative for system sleep with tokio::time:;sleep
*/
sleep(Duration::from_secs(3)).await;
101
}
#[tokio::main]
async fn main() {
let future = async_operation().map(|result| {
if result > 99 {
Ok("Value is greater than 99")
} else {
Err("Value is not greater than 99")
}
});
println!("Continuing in main thread execution");
// Await the result of the future1 and print it.
match future.await {
Ok(str) => {
println!("OK => {}", str);
}
Err(str) => {
println!("Err => {}", str);
}
}
}
/*
Running `target/debug/future_chaining_ex`
Continuing in main thread execution
OK => Value is greater than 99
amit:~/OmRustPractice/AsyncExamples/future-chaining$
*/
In the above example : FutureExt trait's .map() method, as well as the TryFutureExt trait's .await? syntax to handle the result of the asynchronous computation.
Important note! :
/*
Imp Note below !
Never use the std::thread::sleep() , since it is system sleep which will make the whole thread to sleep , so all the tasks will run serially , this is the vary bad thing to do , hence tokio provide the alternative for system sleep with tokio::time:;sleep
*/
Additionally, you can use the and_then combinator to chain futures and perform subsequent asynchronous operations based on the result of a previous one:
Example :
use tokio::time::{sleep, Duration};
use futures::future::FutureExt; // Import the extension traits
use futures::TryFutureExt;
async fn async_operation() -> Result<i32, String> {
? ? sleep(Duration::from_secs(3)).await;
? ? Ok(101)
}
#[tokio::main]
async fn main() {
? ? let future1 = async_operation().await.and_then(|result| {
? ? ? ? if result > 99 {
? ? ? ? ? ? println!("Value is greater than 99");
? ? ? ? ? ? Ok(("Value is greater than 99").to_string())
? ? ? ? } else {
? ? ? ? ? ? println!("Value is less than 99");
? ? ? ? ? ? Err(("Value is less than 99").to_string())
? ? ? ? }
? ? });
}
/*
Running `target/debug/asyncProgram`
Value is greater than 99
*/
The futures crate provides a wide range of combinators and utility functions that enable you to build complex asynchronous workflows. These combinators allow you to transform, filter, and combine futures in various ways, enabling you to create elegant and efficient asynchronous code.
Concurrent Execution of async Functions
In Rust's asynchronous programming model, you can run multiple async functions concurrently, allowing tasks to be executed simultaneously without blocking the main thread. This concurrency is essential for achieving better performance and responsiveness in your applications, especially when dealing with I/O-bound operations.
The basic idea is that you can start multiple async functions, and they will execute concurrently as long as they perform asynchronous operations.
Example :
use tokio::time::{sleep, Duration};
use futures::future::join;
async fn async_operation_1() -> i32 {
? ? println!("Start async operation 1");
? ? sleep(Duration::from_secs(2)).await;
? ? println!("Async operation 1 completed");
? ? 101
}
async fn async_operation_2() -> String {
? ? println!("Start async operation 2");
? ? sleep(Duration::from_secs(3)).await;
? ? println!("Async operation 2 completed");
? ? "Jai Shree Ram, Jai Bajrang bali - Async!".to_string()
}
#[tokio::main]
async fn main() {
? ? println!("Start of main function");
? ? let future1 = async_operation_1();
? ? let future2 = async_operation_2();
? ? // Use the `join` combinator to run both futures concurrently.
? ? let result = join(future1, future2).await;
? ? // Extract results from the futures.
? ? let result1 = result.0;
? ? let result2 = result.1;
? ? println!("Result of async operation 1: {}", result1);
? ? println!("Result of async operation 2: {}", result2);
? ? println!("End of main function");
}
/*
?Running `target/debug/asyncProgram`
Start of main function
Start async operation 1
Start async operation 2
Async operation 1 completed
Async operation 2 completed
Result of async operation 1: 101
Result of async operation 2: Jai Shree Ram, Jai Bajrang bali - Async!
End of main function
amit:~/OmRustPractice/asyncProgram$
*/
cargo.toml :
[package]
name = "asyncProgram"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
Dealing with Shared State and Mutexes
When working with concurrent async functions, it's common to encounter situations where multiple tasks need access to shared data. Rust's ownership and borrowing rules still apply in asynchronous programming. To safely share mutable data between tasks, you can use Arc<Mutex<T>> to wrap the data in a mutex-protected smart pointer.
#cargo.toml
[package]
name = "futures_block_on"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.29.1", features = ["full"] }
use tokio::sync::Mutex;
use std::sync::Arc;
use std::thread;
#[tokio::main]
async fn main() {
// Create a shared data structure using an Arc (atomic reference counter).
let shared_data = Arc::new(Mutex::new(0));
// Clone Arc for use in multiple asynchronous tasks.
let data_clone1 = Arc::clone(&shared_data);
let data_clone2 = Arc::clone(&shared_data);
// Spawn an asynchronous task that modifies the shared data.
let async_task = tokio::spawn(async move {
let mut data = data_clone1.lock().await;
*data += 42;
// Simulate some asynchronous work here...
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
println!("Async Task: Shared Data after modification: {}", *data);
});
// Spawn a synchronous task that also modifies the shared data.
let sync_task = tokio::task::spawn_blocking(move || {
let mut data = data_clone2.blocking_lock();
*data += 23;
// Simulate some synchronous work here...
std::thread::sleep(std::time::Duration::from_secs(1));
println!("Synchronous task: Shared Data after modification: {}", *data);
});
// Wait for the asynchronous task to complete.
async_task.await.unwrap();
// Wait for the synchronous thread to complete.
sync_task.await.unwrap();;
// After both tasks are done, the shared data can be accessed safely.
let shared_data = shared_data.lock().await;
println!("Main Function: Shared Data after both modifications: {}", *shared_data);
}
/*
Running `/home/amit/OmPracticeRust/AsyncExperements/Ardan-1HourAsync/target/debug/tokio_block_on`
Async Task: Shared Data after modification: 42
Synchronous Task: Shared Data after modification: 65
Main Function: Shared Data after both modifications: 65
*/
By using mutexes, we can safely share data between asynchronous and synchronous contexts without encountering data races.
join_all
join_all function provided by some asynchronous libraries, particularly in the context of asynchronous programming in Rust.
In Rust, libraries like futures and tokio offer an join_all function which allows you to concurrently await a collection of asynchronous tasks and gather their results into a single result. This function is commonly used when you want to execute multiple asynchronous tasks concurrently and wait for all of them to complete.
For example, in the futures crate, you can use the join_all function to concurrently await a collection of Future items and get a Vec of their results. Here's a simplified example:
Here's an example of using a mutex to safely share a counter between concurrent tasks:
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
use futures::future::join_all;
async fn increment_counter(counter: Arc<Mutex<i32>>) {
? ? {
? ? ? ? let mut guard = counter.lock().unwrap();
? ? ? ? *guard += 1;
? ? }
? ? // Perform other asynchronous operations...
? ? println!("Counter incremented");
}
#[tokio::main]
async fn main() {
? ? let counter = Arc::new(Mutex::new(0));
? ? let tasks = vec![
? ? ? ? increment_counter(Arc::clone(&counter)),
? ? ? ? increment_counter(Arc::clone(&counter)),
? ? ];
? ? join_all(tasks).await;
? ? let final_value = counter.lock().unwrap();
? ? println!("Final counter value: {}", *final_value);
}
/*
Running `target/debug/asyncProgram`
Counter incremented
Counter incremented
Final counter value: 2
amit:~/OmRustPractice/asyncProgram$
*/
In the above example, both increment_counter tasks share the same counter data wrapped in a mutex. The use of Arc ensures safe reference counting, and the mutex guarantees exclusive access while a task is modifying the data.
Shared state and concurrency management are essential topics in asynchronous programming. Properly using mutexes, atomics, or other synchronization primitives helps you avoid race conditions and data corruption in your concurrent async code.
Using .await Inside Synchronous Code
In Rust, asynchronous programming often involves working with asynchronous functions that are marked with the async keyword. These functions can pause their execution using the await keyword to await the completion of asynchronous operations. However, there are scenarios where you might want to use .await inside synchronous code, such as traditional functions or methods that are not marked as async.
use tokio::time::{self, Duration};
async fn async_operation() -> i32 {
? ? println!("Start async operation");
? ? time::sleep(Duration::from_secs(2)).await; // Use time::sleep instead of sleep
? ? println!("Async operation completed");
? ? 101
}
fn synchronous_function() {
? ? println!("Start synchronous function");
? ? let runtime = tokio::runtime::Builder::new_current_thread()
? ? ? ? .enable_time() // Enable timers
? ? ? ? .build()
? ? ? ? .unwrap();
? ? let result = runtime.block_on(async {
? ? ? ? let value = async_operation().await;
? ? ? ? value
? ? });
? ? println!("Synchronous function completed with value: {:?}", result);
}
fn main() {
? ? synchronous_function();
}
/*
Finished dev [unoptimized + debuginfo] target(s) in 0.81s
Running `target/debug/asyncProgram`
Start synchronous function
Start async operation
Async operation completed
Synchronous function completed with value: 101
amit:~/OmRustPractice/asyncProgram$
*/
Asynchronous programming in Rust empowers developers to create responsive and efficient applications that can handle concurrency and I/O-bound operations with ease. By mastering the async, await, and futures concepts, you'll be well-equipped to write robust, scalable, and performant asynchronous code in Rust.
Running the Async using the Tokio async tasks :
Tokio provides the tokio::spawn function to launch asynchronous tasks. Here's a simple example:
Async tasks and libraries like Tokio are essential tools for building scalable, efficient, and responsive software systems in Rust, especially in scenarios where concurrency and parallelism are required. They are well-suited for network servers, web applications, and any program that needs to handle many simultaneous operations efficiently.
Using asynchronous tasks with tokio::spawn and similar constructs offers several advantages in concurrent programming:
Below example :
First, make sure to include the tokio crate in your Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
Now, you can use tokio::spawn to run asynchronous tasks concurrently. Here's an example where we spawn multiple asynchronous tasks:
use tokio::time::{Duration, sleep};
async fn task_with_delay(id: u32) {
println!("Task {} started", id);
// Simulate some async work (e.g., fetching data from a web API)
sleep(Duration::from_secs(2)).await;
println!("Task {} completed", id);
}
#[tokio::main]
async fn main() {
// Spawn multiple asynchronous tasks concurrently
let tasks = vec![
tokio::spawn(task_with_delay(1)),
tokio::spawn(task_with_delay(2)),
tokio::spawn(task_with_delay(3)),
];
// Wait for all tasks to complete
for task in tasks {
task.await.expect("Failed to await task");
}
println!("All tasks have completed");
}
/*
Op=>
warning: `Async_tasks` (bin "Async_tasks") generated 1 warning
Finished dev [unoptimized + debuginfo] target(s) in 22.67s
Running `target/debug/Async_tasks`
Task 1 started
Task 2 started
Task 3 started
Task 2 completed
Task 3 completed
Task 1 completed
All tasks have completed
*/
In the above example:
When you run this code, you will see that the tasks run concurrently and complete independently of each other, thanks to Tokio's asynchronous runtime.
Thanks for reading this article , please comment of you have anything.
Reference :