Rust Stream
Amit Nadiger
Polyglot(Rust??, Move, C++, C, Kotlin, Java) Blockchain, Polkadot, UTXO, Substrate, Sui, Aptos, Wasm, Proxy-wasm,AndroidTV, Dvb, STB, Linux, Cas, Engineering management.
In Rust, streams are a core part of asynchronous programming, commonly used to handle sequences of values produced asynchronously over time. They work similarly to iterators, but instead of blocking until each item is ready, they yield items as they become available.
Here's a breakdown of how streams work in Rust with examples. We'll use the futures crate, as it provides useful traits and utilities for working with async streams.
1. Setting up futures in Your Project
First, add futures and tokio to Cargo.toml to use async streams:
cargo add tokio -F full
cargo add futures
Cargo.toml looks like below :
[package]
name = "async_streams"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.31"
tokio = { version = "1.41.1", features = ["full"] }
2. Basic Stream Example
A stream is like an iterator, but instead of using next() to get items synchronously, it uses next().await to get items asynchronously.
Let's create a simple stream that yields numbers from 1 to 5 with a delay between each number.
use futures::stream;
use futures::StreamExt;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// Create a stream of numbers with a delay
let num_stream = stream::iter(1..=5).then(|num| async move {
sleep(Duration::from_secs(1)).await; // Delay for 1 second
num
});
// Use the stream
num_stream.for_each(|num| async move {
println!("Received number: {}", num);
}).await;
}
/*
In the below o/p each o/p is generated with 1 sec daly .
Running `target\debug\async_streams.exe`
Received number: 1
Received number: 2
Received number: 3
Received number: 4
Received number: 5
*/
What Does async move Do?
The async move block is used in Rust streams because stream combinators like then, filter_map, or for_each often require asynchronous closures to perform computations or side effects asynchronously.
This is essential for:
Why Is It Used in Streams?
Stream combinators (then, filter_map, for_each, etc.) require a closure as their argument, and this closure often needs to perform asynchronous tasks. The async move block lets you define an asynchronous closure conveniently.
async move in .then:
async move in .for_each:
Key Scenarios Where async move is Essential
Capturing Variables:
If you want to capture variables by value into the closure, move is required.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let multiplier = 2;
let stream = stream::iter(1..=5).then(move |item| async move {
item * multiplier // 'multiplier' is moved into the block
});
stream.for_each(|item| async move {
println!("Processed: {}", item);
}).await;
}
/*
*/
Async Operations Inside Closures:
If a combinator needs to call an async function (e.g., a database query or API call), async is necessary.
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn async_operation(num: u32) -> u32 {
sleep(Duration::from_secs(1)).await; // Simulate I/O
num * 10
}
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5).then(|item| async move {
async_operation(item).await
});
stream.for_each(|result| async move {
println!("Result: {}", result);
}).await;
}
What Happens Without async move?
Without async, you can't use await:
let stream = stream::iter(1..=5).then(|item| {
// ERROR: Cannot use `.await` here without `async`
let result = async_operation(item).await;
result
});
Without move, variables might not live long enough, causing lifetime issues:
let multiplier = 2;
let stream = stream::iter(1..=5).then(|item| async {
// ERROR: Borrow of `multiplier` might not live long enough
item * multiplier
});
Finally the async move block:
3. Creating a Custom Stream
You can create your own custom stream by implementing the Stream trait, but it's easier to use async generators from the futures crate. Here’s an example of a custom stream that produces even numbers:
use futures::stream::{self, Stream};
use std::pin::Pin;
use tokio::time::{sleep, Duration};
use futures::StreamExt;
fn even_numbers() -> impl Stream<Item = u32> {
stream::unfold(0, |mut state| async move {
state += 2;
sleep(Duration::from_millis(500)).await; // Delay to simulate async operation
Some((state, state)) // Return the next number and update state
})
}
#[tokio::main]
async fn main() {
let even_stream = even_numbers();
even_stream.for_each(|num| async move {
println!("Even number: {}", num);
}).await;
}
/*
Running `target\debug\async_streams.exe`
Even number: 2
Even number: 4
Even number: 6
Even number: 8
:
:
:
*/
4. Combining Streams
Streams can be combined using the merge function, which interleaves the values from multiple streams into one stream.
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 2, 3]).then(|num| async move {
sleep(Duration::from_secs(1)).await;
num
});
let stream2 = stream::iter(vec![4, 5, 6]).then(|num| async move {
sleep(Duration::from_millis(700)).await;
num
});
let combined_stream = stream::select(stream1, stream2);
combined_stream.for_each(|num| async move {
println!("Combined stream value: {}", num);
}).await;
}
/*
Running `target\debug\async_streams.exe`
Combined stream value: 4
Combined stream value: 1
Combined stream value: 5
Combined stream value: 2
Combined stream value: 6
Combined stream value: 3
*/
5. Collecting Stream Data
You can collect a stream’s output into a Vec or another collection.
use futures::stream;
use futures::StreamExt;
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
let collected: Vec<i32> = stream.collect().await;
println!("Collected values: {:?}", collected);
}
/*
Running `target\debug\async_streams.exe`
Collected values: [1, 2, 3, 4, 5]
*/
Here, the stream is collected into a Vec, and all values are printed at once after the stream completes
Keys Stream APIS :
List of key APIs commonly used with streams in Rust, especially when working with the futures crate.Below is their usage and how they can be applied in different scenarios. These APIs, mostly available through the StreamExt trait, help with creating, transforming, filtering, and controlling streams.
1. Creating Streams
stream::iter(iterable)
stream::unfold(initial_state, async_closure)
stream::repeat(item)
2. Basic Stream Transformation and Control
next().await
map(async_closure)
for_each(async_closure)
for_each_concurrent(limit, async_closure)
3. Filtering and Conditional Processing
filter(async_closure): Process items based on a condition.
filter_map(async_closure):Transform each item as it's processed.
take(n): Control how many items are taken from a stream.
skip(n): Control how many items are skipped from a stream.
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
stream.skip(2).for_each(|item| async move {
println!("Item: {}", item);
}).await;
}
/*
Item: 3
Item: 4
Item: 5
*/
How to filter a stream to include only even numbers and buffer the items:
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let numbers = stream::iter(1..=10)
.filter(|&num| async move { num % 2 == 0 }) // Keep only even numbers
.map(|num| async move {
sleep(Duration::from_millis(500)).await; // Simulate async processing
num * 2 // Transform each number
})
.buffered(2); // Process up to 2 items concurrently
numbers.for_each(|num| async move {
println!("Processed number: {}", num);
}).await;
}
/*
Processed number: 4
Processed number: 8
Processed number: 12
Processed number: 16
Processed number: 20
*/
4. Stream Collection and Aggregation
collect::<T>().await
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
let vec: Vec<_> = stream.collect().await;
println!("Collected: {:?}", vec);
}
/*
Collected: [1, 2, 3, 4, 5]
*/
fold(initial, async_closure)
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
let sum = stream.fold(0, |acc, item| async move { acc + item }).await;
println!("Sum: {}", sum);
}
/*
Sum: 15
*/
try_fold:Accumulates items into a result, halting on errors.
use futures::{stream, StreamExt, TryStreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(vec![Ok(1), Ok(2), Err("Error"), Ok(3)]);
let result: Result<i32, &str> = stream.try_fold(0, |acc, val| async move {
Ok(acc + val)
}).await;
println!("Final result: {:?}", result);
}
/*
Final result: Err("Error")
*/
5. Concurrency and Buffering
buffered(limit)
7. Combining and Merging Streams
chain(other_stream)
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![4, 5, 6]);
let combined_stream = stream1.chain(stream2);
combined_stream.for_each(|item| async move {
println!("Item: {}", item);
}).await;
}
/*
Item: 1
Item: 2
Item: 3
Item: 4
Item: 5
Item: 6
*/
select(other_stream)
9. Advanced Usage in Blockchain Context
Blockchain Usage :
1. Blockchain space : Event Streams for Monitoring Blockchain State
A blockchain may need to broadcast real-time events, such as when a new block is mined or a transaction is confirmed. Streams can be used to create event listeners that respond to these events, providing an efficient way to monitor blockchain state changes.
use tokio::sync::watch;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel("No new blocks yet");
// Task to send new blocks
tokio::spawn(async move {
let new_blocks = vec!["Block1", "Block2", "Block3"];
for block in new_blocks {
sleep(Duration::from_secs(1)).await; // Simulate time between new blocks
tx.send(block).unwrap();
}
});
// Task to receive and process blocks
let mut rx = rx; // Use mutable receiver to track changes
loop {
if rx.changed().await.is_ok() {
let block = *rx.borrow(); // Access the new value
println!("New block received: {}", block);
} else {
break; // Exit loop if sender is dropped
}
}
}
/*
New block received: Block1
New block received: Block2
New block received: Block3
*/
watch::channel:
rx.changed().await:
rx.borrow(): Retrieves the current value of the watch::Receiver.
Loop Exit Condition:
2. Block Propagation Across Peers
In decentralized networks, a node needs to broadcast new blocks to peers as they are mined or received. With streams, you can handle peer-to-peer communication efficiently by broadcasting blocks asynchronously to peers.
For example, suppose each peer receives a new block and verifies it before propagating it to other peers. Using streams allows this process to happen in parallel across multiple peers:
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn propagate_block_to_peer(peer_id: u32, block: &str) {
println!("Propagating block {} to peer {}", block, peer_id);
sleep(Duration::from_secs(1)).await; // Simulate network delay
println!("Finished propagating to peer {}", peer_id);
}
#[tokio::main]
async fn main() {
let peers = vec![1, 2, 3];
let block = "Block12345";
let propagation_stream = stream::iter(peers)
.for_each_concurrent(2, |peer| propagate_block_to_peer(peer, block));
propagation_stream.await;
}
/*
Propagating block Block12345 to peer 1
Propagating block Block12345 to peer 2
Finished propagating to peer 1
Finished propagating to peer 2
Propagating block Block12345 to peer 3
Finished propagating to peer 3
*/
3. Transaction Pools and Real-Time Validation
In a blockchain node, a transaction pool is a collection of unconfirmed transactions waiting to be added to a block. Each new transaction needs to be validated asynchronously to avoid blocking the node’s main tasks.
Streams help efficiently manage and process this transaction pool. For example, as new transactions are added to the pool, you can use a stream to validate and prioritize them in real time:
use futures::stream::{self, StreamExt};
use tokio::time::sleep;
use std::time::Duration;
#[derive(Debug)]
struct Transaction {
id: u32,
valid: bool,
}
// Mock validation function
async fn validate_transaction(tx: &Transaction) -> bool {
sleep(Duration::from_millis(100)).await;
tx.valid
}
#[tokio::main]
async fn main() {
let transactions = vec![
Transaction { id: 1, valid: true },
Transaction { id: 2, valid: false },
Transaction { id: 3, valid: true },
];
let transaction_stream = stream::iter(transactions)
.then(|tx| async move {
let is_valid = validate_transaction(&tx).await;
if is_valid {
Some(tx) // Keep the transaction if valid
} else {
None // Discard invalid transactions
}
})
.filter_map(|tx| async move { tx }); // Filter out None values
transaction_stream
.for_each(|tx| async move {
println!("Valid transaction: {:?}", tx);
})
.await;
}
/*
Valid transaction: Transaction { id: 1, valid: true }
Valid transaction: Transaction { id: 3, valid: true }
*/
filter_map:
Inside then:
Used filter_map to Process Results:
Thank you reading and comment if you have anything.