Rust Stream

In Rust, streams are a core part of asynchronous programming, commonly used to handle sequences of values produced asynchronously over time. They work similarly to iterators, but instead of blocking until each item is ready, they yield items as they become available.

Here's a breakdown of how streams work in Rust with examples. We'll use the futures crate, as it provides useful traits and utilities for working with async streams.

1. Setting up futures in Your Project

First, add futures and tokio to Cargo.toml to use async streams:

cargo add tokio -F full
cargo add futures        

Cargo.toml looks like below :

[package]
name = "async_streams"
version = "0.1.0"
edition = "2021"

[dependencies]
futures = "0.3.31"
tokio = { version = "1.41.1", features = ["full"] }        

2. Basic Stream Example

A stream is like an iterator, but instead of using next() to get items synchronously, it uses next().await to get items asynchronously.

Let's create a simple stream that yields numbers from 1 to 5 with a delay between each number.

use futures::stream;
use futures::StreamExt;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // Create a stream of numbers with a delay
    let num_stream = stream::iter(1..=5).then(|num| async move {
        sleep(Duration::from_secs(1)).await; // Delay for 1 second
        num
    });

    // Use the stream
    num_stream.for_each(|num| async move {
        println!("Received number: {}", num);
    }).await;
}
/*
In the below o/p each o/p is generated with 1 sec daly .
     Running `target\debug\async_streams.exe`
Received number: 1  
Received number: 2
Received number: 3
Received number: 4
Received number: 5
*/        

  • stream::iter(1..=5) creates a stream that yields numbers from 1 to 5.
  • .then(...) is used to asynchronously process each item in the stream, adding a 1-second delay.
  • for_each is a combinator that asynchronously waits for each item in the stream and processes it. Here, we're simply printing each received number.


What Does async move Do?

The async move block is used in Rust streams because stream combinators like then, filter_map, or for_each often require asynchronous closures to perform computations or side effects asynchronously.


  • async: Makes the block of code an asynchronous computation, allowing it to be awaited.
  • move: Ensures that any variables or resources used within the block are moved into the block's scope, ensuring they live as long as the asynchronous task.

This is essential for:

  1. Asynchronous Operations: Without async, you cannot await inside the block, which is crucial when dealing with tasks like I/O or timers.
  2. Thread Safety: The move keyword ensures that the closure takes ownership of any variables it captures, making it safe to execute on different threads.


Why Is It Used in Streams?

Stream combinators (then, filter_map, for_each, etc.) require a closure as their argument, and this closure often needs to perform asynchronous tasks. The async move block lets you define an asynchronous closure conveniently.


async move in .then:

  • The block allows performing computations asynchronously for each item.
  • The move ensures each item is moved into the block's scope, avoiding lifetimes or borrowing issues.


async move in .for_each:

  • Enables asynchronous side effects like println!, which could be replaced with more complex operations (e.g., writing to a file or sending network requests).


Key Scenarios Where async move is Essential

Capturing Variables:

If you want to capture variables by value into the closure, move is required.

use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    let multiplier = 2;

    let stream = stream::iter(1..=5).then(move |item| async move {
        item * multiplier // 'multiplier' is moved into the block
    });

    stream.for_each(|item| async move {
        println!("Processed: {}", item);
    }).await;
}
/*

*/        

Async Operations Inside Closures:

If a combinator needs to call an async function (e.g., a database query or API call), async is necessary.

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

async fn async_operation(num: u32) -> u32 {
    sleep(Duration::from_secs(1)).await; // Simulate I/O
    num * 10
}

#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=5).then(|item| async move {
        async_operation(item).await
    });

    stream.for_each(|result| async move {
        println!("Result: {}", result);
    }).await;
}
        

What Happens Without async move?

Without async, you can't use await:

let stream = stream::iter(1..=5).then(|item| {
    // ERROR: Cannot use `.await` here without `async`
    let result = async_operation(item).await;
    result
});        

Without move, variables might not live long enough, causing lifetime issues:

let multiplier = 2;
let stream = stream::iter(1..=5).then(|item| async {
    // ERROR: Borrow of `multiplier` might not live long enough
    item * multiplier
});        


Finally the async move block:

  1. Allows asynchronous operations to be used in stream combinators.
  2. Ensures closures are thread-safe and variables are properly captured.
  3. Makes it convenient to chain complex asynchronous tasks in a clean and readable manner.

3. Creating a Custom Stream

You can create your own custom stream by implementing the Stream trait, but it's easier to use async generators from the futures crate. Here’s an example of a custom stream that produces even numbers:

use futures::stream::{self, Stream};
use std::pin::Pin;
use tokio::time::{sleep, Duration};
use futures::StreamExt;

fn even_numbers() -> impl Stream<Item = u32> {
    stream::unfold(0, |mut state| async move {
        state += 2;
        sleep(Duration::from_millis(500)).await; // Delay to simulate async operation
        Some((state, state)) // Return the next number and update state
    })
}

#[tokio::main]
async fn main() {
    let even_stream = even_numbers();

    even_stream.for_each(|num| async move {
        println!("Even number: {}", num);
    }).await;
}
/*
     Running `target\debug\async_streams.exe`
Even number: 2
Even number: 4
Even number: 6
Even number: 8
:
:
:
*/        

  • stream::unfold: This function allows you to create a stream by starting from an initial state (0 in this case) and applying a closure repeatedly to produce items and update the state.
  • async move block: As the stream yields each even number, it waits for 500 ms to simulate an asynchronous process.


4. Combining Streams

Streams can be combined using the merge function, which interleaves the values from multiple streams into one stream.

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 2, 3]).then(|num| async move {
        sleep(Duration::from_secs(1)).await;
        num
    });
    
    let stream2 = stream::iter(vec![4, 5, 6]).then(|num| async move {
        sleep(Duration::from_millis(700)).await;
        num
    });

    let combined_stream = stream::select(stream1, stream2);

    combined_stream.for_each(|num| async move {
        println!("Combined stream value: {}", num);
    }).await;
}
/*
     Running `target\debug\async_streams.exe`
Combined stream value: 4
Combined stream value: 1
Combined stream value: 5
Combined stream value: 2
Combined stream value: 6
Combined stream value: 3
*/        

  • stream::select combines two streams into one by interleaving their values.
  • Delays are different between stream1 and stream2, so values are printed as they are ready.


5. Collecting Stream Data

You can collect a stream’s output into a Vec or another collection.

use futures::stream;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=5);
    
    let collected: Vec<i32> = stream.collect().await;
    
    println!("Collected values: {:?}", collected);
}
/*
     Running `target\debug\async_streams.exe`
Collected values: [1, 2, 3, 4, 5]
*/        

Here, the stream is collected into a Vec, and all values are printed at once after the stream completes


Keys Stream APIS :

List of key APIs commonly used with streams in Rust, especially when working with the futures crate.Below is their usage and how they can be applied in different scenarios. These APIs, mostly available through the StreamExt trait, help with creating, transforming, filtering, and controlling streams.

1. Creating Streams

stream::iter(iterable)

  • Converts an iterable (like a vector or range) into a stream.
  • Example: stream::iter(vec![1, 2, 3, 4]) creates a stream that yields 1, 2, 3, 4.


stream::unfold(initial_state, async_closure)

  • Generates a stream by repeatedly applying an async closure to an initial state, returning Some((item, new_state)) each time.
  • Example: stream::unfold(0, |state| async { Some((state, state + 1)) }) yields a stream of incrementing numbers


stream::repeat(item)

  • Creates an endless stream that repeats the given item.
  • Example: stream::repeat(108) yields an endless stream of 108.

2. Basic Stream Transformation and Control

next().await

  • Retrieves the next item in the stream asynchronously.
  • Usage: Used within async functions to await each item in a stream.


map(async_closure)

  • Applies an async transformation to each item in the stream.
  • Example: stream.map(|x| async move { x * 2 }) doubles each item.


for_each(async_closure)

  • Applies an async function to each item, typically used when you want to perform actions on each item without needing to modify the stream’s output.
  • Example: stream.for_each(|x| async move { println!("{}", x) }).


for_each_concurrent(limit, async_closure)

  • Similar to for_each, but processes up to limit items concurrently, allowing parallel processing.
  • Example: stream.for_each_concurrent(3, |x| async move { process(x).await }) allows up to 3 items to be processed at once.


3. Filtering and Conditional Processing

filter(async_closure): Process items based on a condition.

  • Keeps only the items that satisfy a condition in the async closure.
  • Example: stream.filter(|&x| async move { x % 2 == 0 }) filters out odd numbers.


filter_map(async_closure):Transform each item as it's processed.

  • Applies an async function that returns an Option<T>; if Some(item) is returned, the item is kept; otherwise, it’s discarded.
  • Example: stream.filter_map(|x| async move { if x > 5 { Some(x) } else { None } }).


take(n): Control how many items are taken from a stream.

  • Limits the stream to the first n items.
  • Example: stream.take(3) yields only the first 3 items.


skip(n): Control how many items are skipped from a stream.

  • Skips the first n items in the stream.
  • Example: stream.skip(2) skips the first 2 items and then continues.

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=5);

    stream.skip(2).for_each(|item| async move {
        println!("Item: {}", item);
    }).await;
}
/*
Item: 3
Item: 4
Item: 5
*/        

How to filter a stream to include only even numbers and buffer the items:

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let numbers = stream::iter(1..=10)
        .filter(|&num| async move { num % 2 == 0 }) // Keep only even numbers
        .map(|num| async move {
            sleep(Duration::from_millis(500)).await; // Simulate async processing
            num * 2 // Transform each number
        })
        .buffered(2); // Process up to 2 items concurrently

    numbers.for_each(|num| async move {
        println!("Processed number: {}", num);
    }).await;
}
/*
Processed number: 4
Processed number: 8
Processed number: 12
Processed number: 16
Processed number: 20
*/        

  • buffered(2): This allows up to two items to be processed in parallel, improving throughput.
  • filter and map: These functions allow asynchronous processing while applying conditions and transformations.


4. Stream Collection and Aggregation

collect::<T>().await

  • Collects all items in the stream into a collection, such as a Vec.
  • Example: stream.collect::<Vec<_>>().await collects items into a vector.

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=5);

    let vec: Vec<_> = stream.collect().await;

    println!("Collected: {:?}", vec);
}
/*
Collected: [1, 2, 3, 4, 5]
*/        


fold(initial, async_closure)

  • Accumulates items in the stream by applying an async function to an initial value.
  • Example: stream.fold(0, |sum, x| async move { sum + x }) sums all items in the stream.

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=5);

    let sum = stream.fold(0, |acc, item| async move { acc + item }).await;

    println!("Sum: {}", sum);
}
/*
Sum: 15
*/        

try_fold:Accumulates items into a result, halting on errors.

  • Similar to fold, but for streams that return Result<T, E>. Stops on the first error.
  • Example: stream.try_fold(0, |sum, x| async move { Ok(sum + x?) }).

use futures::{stream, StreamExt, TryStreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![Ok(1), Ok(2), Err("Error"), Ok(3)]);

    let result: Result<i32, &str> = stream.try_fold(0, |acc, val| async move {
        Ok(acc + val)
    }).await;

    println!("Final result: {:?}", result);
}
/*
Final result: Err("Error")
*/        


5. Concurrency and Buffering

buffered(limit)

  • Similar to buffer_unordered, but preserves order by awaiting items as they complete.
  • Example: stream.buffered(5) allows up to 5 items to be processed concurrently while keeping them in order.


7. Combining and Merging Streams

chain(other_stream)

  • Appends another stream to the current stream, creating a single stream.
  • Example: stream.chain(other_stream) yields all items from stream followed by other_stream.

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![4, 5, 6]);

    let combined_stream = stream1.chain(stream2);

    combined_stream.for_each(|item| async move {
        println!("Item: {}", item);
    }).await;
}
/*
Item: 1
Item: 2
Item: 3
Item: 4
Item: 5
Item: 6
*/        

select(other_stream)

  • Interleaves two streams, yielding items as they’re ready from either stream.
  • Example: stream.select(other_stream) yields items from both streams, as available.


9. Advanced Usage in Blockchain Context

  • Transaction Pool Filtering: Use filter and try_filter to filter and validate transactions in the pool before they are added to blocks.
  • Event Monitoring and Logging: Use collect to gather a list of events for logging purposes, or use for_each to process each event in real-time.
  • Peer-to-Peer Block Propagation: Use for_each_concurrent or buffer_unordered to broadcast blocks to peers, allowing multiple peers to be processed concurrently.


Blockchain Usage :

1. Blockchain space : Event Streams for Monitoring Blockchain State

A blockchain may need to broadcast real-time events, such as when a new block is mined or a transaction is confirmed. Streams can be used to create event listeners that respond to these events, providing an efficient way to monitor blockchain state changes.

use tokio::sync::watch;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, rx) = watch::channel("No new blocks yet");

    // Task to send new blocks
    tokio::spawn(async move {
        let new_blocks = vec!["Block1", "Block2", "Block3"];
        for block in new_blocks {
            sleep(Duration::from_secs(1)).await; // Simulate time between new blocks
            tx.send(block).unwrap();
        }
    });

    // Task to receive and process blocks
    let mut rx = rx; // Use mutable receiver to track changes
    loop {
        if rx.changed().await.is_ok() {
            let block = *rx.borrow(); // Access the new value
            println!("New block received: {}", block);
        } else {
            break; // Exit loop if sender is dropped
        }
    }
}

/*
New block received: Block1
New block received: Block2
New block received: Block3
*/        

watch::channel:

  • This is a one-way communication channel where the latest value is always available to receivers. It allows you to send updates when there are changes (like new blocks) while using minimal resources.


rx.changed().await:

  • Waits for the value to change.
  • Returns an Ok(()) if the sender (tx) updates the value, or an error if the sender is dropped.


rx.borrow(): Retrieves the current value of the watch::Receiver.

Loop Exit Condition:

  • The loop breaks if rx.changed().await returns an error, indicating that the sender has been dropped.


2. Block Propagation Across Peers

In decentralized networks, a node needs to broadcast new blocks to peers as they are mined or received. With streams, you can handle peer-to-peer communication efficiently by broadcasting blocks asynchronously to peers.

For example, suppose each peer receives a new block and verifies it before propagating it to other peers. Using streams allows this process to happen in parallel across multiple peers:

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

async fn propagate_block_to_peer(peer_id: u32, block: &str) {
    println!("Propagating block {} to peer {}", block, peer_id);
    sleep(Duration::from_secs(1)).await; // Simulate network delay
    println!("Finished propagating to peer {}", peer_id);
}

#[tokio::main]
async fn main() {
    let peers = vec![1, 2, 3];
    let block = "Block12345";

    let propagation_stream = stream::iter(peers)
        .for_each_concurrent(2, |peer| propagate_block_to_peer(peer, block));

    propagation_stream.await;
}

/*
Propagating block Block12345 to peer 1
Propagating block Block12345 to peer 2
Finished propagating to peer 1
Finished propagating to peer 2
Propagating block Block12345 to peer 3
Finished propagating to peer 3
*/        

  • for_each_concurrent(2, ...): This function limits concurrency, broadcasting the block to up to two peers at a time.
  • Network propagation: This stream-based approach enables nodes to asynchronously propagate blocks while avoiding bottlenecks, which is crucial in a large blockchain network.


3. Transaction Pools and Real-Time Validation

In a blockchain node, a transaction pool is a collection of unconfirmed transactions waiting to be added to a block. Each new transaction needs to be validated asynchronously to avoid blocking the node’s main tasks.

Streams help efficiently manage and process this transaction pool. For example, as new transactions are added to the pool, you can use a stream to validate and prioritize them in real time:


use futures::stream::{self, StreamExt};
use tokio::time::sleep;
use std::time::Duration;

#[derive(Debug)]
struct Transaction {
    id: u32,
    valid: bool,
}

// Mock validation function
async fn validate_transaction(tx: &Transaction) -> bool {
    sleep(Duration::from_millis(100)).await;
    tx.valid
}

#[tokio::main]
async fn main() {
    let transactions = vec![
        Transaction { id: 1, valid: true },
        Transaction { id: 2, valid: false },
        Transaction { id: 3, valid: true },
    ];

    let transaction_stream = stream::iter(transactions)
        .then(|tx| async move {
            let is_valid = validate_transaction(&tx).await;
            if is_valid {
                Some(tx) // Keep the transaction if valid
            } else {
                None // Discard invalid transactions
            }
        })
        .filter_map(|tx| async move { tx }); // Filter out None values

    transaction_stream
        .for_each(|tx| async move {
            println!("Valid transaction: {:?}", tx);
        })
        .await;
}
/*
Valid transaction: Transaction { id: 1, valid: true }
Valid transaction: Transaction { id: 3, valid: true }
*/        

filter_map:

  • filter_map is used to apply an asynchronous transformation and filter at the same time.
  • It expects a closure that returns Option<T>, where Some(value) keeps the item, and None discards it.


Inside then:

  • Instead of returning (tx, is_valid) and filtering later, then directly maps valid transactions to Some(tx) and invalid transactions to None.

Used filter_map to Process Results:

  • The filter_map combinator removes None values, leaving only valid transactions.


Thank you reading and comment if you have anything.

要查看或添加评论,请登录