Channels in Rust
Amit Nadiger
Polyglot(Rust??, C++ 11,14,17,20, C, Kotlin, Java) Android TV, Cas, Blockchain, Polkadot, UTXO, Substrate, Wasm, Proxy-wasm,AndroidTV, Dvb, STB, Linux, Engineering management.
Concurrency and communication between threads are essential aspects of many concurrent applications. Rust provides robust mechanisms for inter-thread communication (IPC) to facilitate safe and efficient communication between threads. One such mechanism is the Channel, which is available in the std::sync::mpsc module.
Channels provide a message-passing communication mechanism between threads. They allow threads to send and receive values across a synchronized channel. The mpsc acronym stands for "multiple producer, single consumer," meaning that multiple threads can produce or send values into the channel, but only a single thread can consume or receive those values.
Key points about channels in Rust:
Creating a Channel
To create a channel, we use the channel function provided by the std::sync::mpsc module. This function returns a pair of objects: a Sender and a Receiver. The Sender can be used to send values into the channel, while the Receiver can be used to receive values from the channel.
Let's see an example of creating a channel:
use std::sync::mpsc;
fn main() {
// Create a channel
let (sender, receiver) = mpsc::channel();
}
Sending and Receiving Values
Once we have a Sender and Receiver, we can use them to send and receive values, respectively. The Sender provides the send method, which is used to send a value into the channel. The Receiver provides the recv method, which is used to receive a value from the channel.
Here's an example of sending and receiving values through a channel:
use std::sync::mpsc;
use std::thread;
fn main() {
? ? let (sender, receiver) = mpsc::channel();
? ? // Spawn a thread to send values
? ? let handle = thread::spawn(move || {
? ? ? ? sender.send(21).unwrap();
? ? ? ? sender.send(99).unwrap();
? ? ? ? sender.send(123).unwrap();
? ? });
? ? // Receive values in the main thread
while let Ok(msg) = receiver.recv() {
println!("Received: {:?}", msg);
}
? ? let res = handle.join();
? ? match res {
Ok(val) => println!("Success in joining thread "),
Err(_msg) => {
println!("Eroor in joining thread");
},
}
}
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./asyncThread
Received: 21
Received: 99
Received: 123
Success in joining thread
*/
In this example, we spawn a new thread that sends three values (21, 99, and 123) into the channel using the send method. In the main thread, we receive the values from the channel using the recv method. Note that the recv method blocks until a value is available in the channel.
Error Handling
When sending values into a channel, the send method returns a Result indicating whether the value was successfully sent or if an error occurred. Similarly, when receiving values from a channel, the recv method returns a Result indicating whether a value was received or if an error occurred.
It's important to handle these Result values properly to handle potential errors during channel communication. Additionally, if all senders are dropped, the recv method will return Err to signal that the channel has been closed.
Closing Channels
In Rust's std::sync::mpsc (multiple producer, single consumer) channels, we need to close the channels by dropping the sender side of the channel. Dropping the sender indicates to the receiver that no more messages will be sent, allowing the receiver to gracefully exit its loop or handle the termination signal accordingly. Failing to do so can lead to the receiver waiting indefinitely for messages that will never arrive.
When a sender is dropped, it sends an internal termination signal to the receiver, notifying it that the channel has been closed and no further messages will be sent. The receiver can then gracefully exit its loop or handle the termination signal accordingly.
Here's an example to illustrate the importance of dropping the sender:
/*
* Demonstrate the dorpping the sender to signal the receiver that no
* message will be sent
*/
use std::sync::mpsc;
use std::thread;
fn main() {
? ? let (sender, receiver) = mpsc::channel();
? ? // Spawn a thread to receive messages
? ? let receiver_thread = thread::spawn(move || {
? ? ? ? loop {
? ? ? ? ? ? match receiver.recv() {
? ? ? ? ? ? ? ? Ok(message) => println!("Received: {}", message),
? ? ? ? ? ? ? ? Err(_) => {
? ? ? ? ? ? ? ? ? ? println!("Sender dropped, exiting receiver loop");
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? });
? ? // Send some messages
? ? sender.send("Jai Shree Ram").unwrap();
? ? sender.send("Jai Bajarang Bali").unwrap();
? ? // It's important to drop the sender to signal the end of messages
? ? drop(sender);
? ? // Wait for the receiver thread to finish
? ? receiver_thread.join().unwrap();
}
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./AsyncThreadDroppingChannel
Received: Jai Shree Ram
Received: Jai Bajarang Bali
Sender dropped, exiting receiver loop
*/
In the example above, the drop(sender) statement is used to explicitly drop the sender side of the channel, indicating to the receiver that no more messages will be sent. When the receiver detects the termination signal by receiving an Err value from receiver.recv(), it exits the loop and the program can continue or terminate gracefully.
By properly dropping the sender, you ensure that both the sender and receiver sides of the channel are in sync and can correctly handle the end of communication.
Type of channels in Rust
In Rust, there are two main types of channels available for inter-thread communication: synchronous channels and asynchronous channels. These channels differ in their behavior and usage patterns.
Synchronous Channels (std::sync::mpsc)
Synchronous channels, also known as bounded channels, provide blocking operations for sending and receiving values. These channels have a fixed capacity, and when the channel is full, the sender will block until there is space available for the value to be sent. Similarly, when the channel is empty, the receiver will block until a value is available to be received.
Synchronous channels are created using the std::sync::mpsc module and the sync_channel function. Here's an example of creating a synchronous channel:
use std::sync::mpsc;
fn main() {
let (sender, receiver) = mpsc::sync_channel(5);
}
In this example, sync_channel creates a synchronous channel with a capacity of 5. Both the sender and receiver can block when the channel is full or empty, respectively.
Complete example:
We create a synchronous channel using mpsc::sync_channel. The channel has a capacity of 1, meaning it can hold only one message at a time. We spawn a new thread to send a message through the channel using the send method. On the receiving side, we use the recv method to block until a message is received from the channel.
use std::sync::mpsc;
use std::thread;
fn main() {
? ? // Create a synchronous channel with a capacity of 1
? ? let (tx, rx) = mpsc::sync_channel(1);
? ? // Spawn a thread to send a message
? ? thread::spawn(move || {
? ? ? ? let message = String::from("Hello from the sender!");
? ? ? ? tx.send(message).expect("Failed to send message");
? ? ? ? println!("Sender: Message sent");
? ? });
? ? // Receive the message from the channel
? ? let received = rx.recv().expect("Failed to receive message");
? ? println!("Receiver: Received message: {}", received);
}
/*
Op =>
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/Threads$ ./syncChannel
Sender: Message sent
Receiver: Received message: Hello from the sender!
*/
Advantages:
Disadvantages:
Suitable Scenarios:
Difference between Sync and Async channels:
1. Synchronous Channels (sync_channel):
2. Asynchronous Channels (channel()):
Here's an example to illustrate the difference between synchronous and asynchronous channels:
/*
Purpose of this program is demonstrate the
difference between sync and async channels.
Espically w.r.t blocking natuture of sync channels
and non-blocking nature of async channels.
*/
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
? ? // Synchronous channel with capacity 1
? ? let (sync_sender, sync_receiver) = mpsc::sync_channel(1);
? ? // Asynchronous channel
? ? let (async_sender, async_receiver) = mpsc::channel();
? ? // Spawn a thread for synchronous communication
? ? let sync_thread = thread::spawn(move || {
? ? ? ? sync_sender.send("SyncMsg1 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message1 sent");
? ? ? ? sync_sender.send("SyncMsg2 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message2 sent");
sync_sender.send("SyncMsg3 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message3 sent");
sync_sender.send("SyncMsg4 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message4 sent");
sync_sender.send("SyncMsg5 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message5 sent");
? ? });
? ? // Spawn a thread for asynchronous communication
? ? let async_thread = thread::spawn(move || {
? ? ? ? async_sender.send("ASyncMsg1 from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message1 sent");
? ? ? ? async_sender.send("ASyncMsg2 from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message2 sent");
async_sender.send("ASyncMsg3 from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message3 sent");
async_sender.send("ASyncMsg4from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message4 sent");
async_sender.send("ASyncMsg5 from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message5 sent");
? ? });
? ? // Receive messages from synchronous channel
? ? for message in sync_receiver {
thread::sleep(Duration::from_secs(1));
? ? ? ? println!("Received from synchronous channel: {}", message);
? ? }
// Receive messages from asynchronous channel
? ? for message in async_receiver {
thread::sleep(Duration::from_secs(1));
? ? ? ? println!("Received from asynchronous channel: {}", message);
? ? }
? ? // Wait for the threads to finish
? ? sync_thread.join().unwrap();
? ? async_thread.join().unwrap();
}
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./DiffBwSyncAndAsyncChannel
Synchronous message1 sent
Synchronous message2 sent
Asynchronous message1 sent
Asynchronous message2 sent
Asynchronous message3 sent
Asynchronous message4 sent
Asynchronous message5 sent
Received from synchronous channel: SyncMsg1 from synchronous channel
Synchronous message3 sent
Received from synchronous channel: SyncMsg2 from synchronous channel
Synchronous message4 sent
Received from synchronous channel: SyncMsg3 from synchronous channel
Synchronous message5 sent
Received from synchronous channel: SyncMsg4 from synchronous channel
Received from synchronous channel: SyncMsg5 from synchronous channel
Received from asynchronous channel: ASyncMsg1 from asynchronous channel
Received from asynchronous channel: ASyncMsg2 from asynchronous channel
Received from asynchronous channel: ASyncMsg3 from asynchronous channel
Received from asynchronous channel: ASyncMsg4from asynchronous channel
Received from asynchronous channel: ASyncMsg5 from asynchronous channel
*/
Please see the o/p of above program.
Synchronous messages are waiting for message to be delivered, since channel capacity is only 1, then sender is waiting for receiver to consume.
Since receiver is receiving the message in 1 sec delay, sending is also sending the message with 1 sec delay in sync messages .
Whereas in Async message, you can see sending is not waiting for receiver to consume the message, it just sent all the message one by one.
领英推荐
And receiver consume the message with 1 sec delay.
Bidirectional communication between 2 threads using channels:
/*
Bi directional communication using 2 seperate sync channel.
The communication between thread 1 and 2 are synced .
1st thread1 sends msg
Then thread 2 receives
Then thread2 send message to thread1
Then thread 1 receives the message from Thread2
*/
use std::sync::mpsc;
use std::thread;
fn main() {
? ? let (tx1, rx1) = mpsc::sync_channel(1); // Channel 1 for communication
// from sender1 to receiver1
? ? let (tx2, rx2) = mpsc::sync_channel(1); // Channel 2 for communication
// from sender2 to receiver2
? ? // Spawn two threads for the senders
? ? let handle1 = thread::spawn(move || {
let s = String::from("Hello(Jai ShreeRam)");
? ? ? ? tx1.send(s.clone()).unwrap();
println!("Thread 1 sent the message to thrd 2 => {}: ",s);
? ? ? ? let received:String = rx2.recv().unwrap();
? ? ? ? println!("Thread 1 received: => {}", received);
? ? });
? ? let handle2 = thread::spawn(move || {
let s = String::from("Hello(Jai Bajrang bali)");
? ? ? ? let received = rx1.recv().unwrap();
//let received: String = rx1.recv().unwrap();
? ? ? ? println!("Thread 2 received: => {}", received);
//tx1.send("Message from sender1, Hello(Jai ShreeRam? ?)".to_String()).unwrap();
tx2.send(s.clone()).unwrap();
println!("Thread 2 sent the message to thrd 1 =>{}: ",s);
? ? });
? ? // Wait for the sender threads to finish
? ? handle1.join().unwrap();
? ? handle2.join().unwrap();
}
/*
Op=>
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./BiDirectionComm_SyncChnl
Thread 1 sent the message to thrd 2 => Hello(Jai ShreeRam):
Thread 2 received: => Hello(Jai ShreeRam)
Thread 2 sent the message to thrd 1 =>Hello(Jai Bajrang bali):
Thread 1 received: => Hello(Jai Bajrang bali)
*/*
In the above code , using 2 sync channels with capacity 1 , we did 1 round trip from thread1 -> thread 2 -> thread 1.
There are different ways to create the async messages in Rust, please see below.
Asynchronous Channels (sync::mpsc)
Asynchronous channels can be created in 2 ways:
1. Asynchronous Channels Using sync::mpsc
Asynchronous channels, also known as unbounded channels, provide non-blocking operations for sending and receiving values. These channels have an unlimited capacity, meaning that senders can always send values without blocking, and receivers can always receive values without blocking. If the receiver is not ready to receive a value, it can choose to await or poll the channel later.
/*
Demonstrate the multiple producer and snigle sender.
cloning the sender?
droping the unmoved sender to indicate the receiver no more message will be sent.
*/
use std::sync::mpsc;
use std::thread;
fn main() {
? ? let (tx, rx) = mpsc::channel(); // Unbounded channel
? ? let tx1 = tx.clone();
? ? let handle1 = thread::spawn(move || {
? ? ? ? println!("Hello from thread1");
? ? ? ? let message: String = String::from("Jai-Thread1");
? ? ? ? tx1.send(message).unwrap();
// At the end of this block , tx1 is automatically dropped?
// since tx1 is shared with this thread via move.?
? ? });
? ? let tx2 = tx.clone();
? ? let handle2 = thread::spawn(move || {
? ? ? ? println!("Hello from thread2");
? ? ? ? let message: String = String::from("Shree-Thread2");
? ? ? ? tx2.send(message).unwrap();
? ? ? ? // At the end of this block , tx2 is automatically dropped?
// since tx2 is shared with this thread via move.?
? ? });
? ? let tx3 = tx.clone();
? ? let handle3 = thread::spawn(move || {
? ? ? ? println!("Hello from thread3");
? ? ? ? let message: String = String::from("Ram-Thread3");
? ? ? ? tx3.send(message).unwrap();
? ? ? ? // At the end of this block , tx3 is automatically dropped?
// since tx3 is shared with this thread via move.?
? ? });
? ? println!("Om from-Thread1");
// We need to drop tx as it is still in scope.
// if we don't drop tx the below while loop?
// looking for message will be waiting indefinitely to
// look for messages from tx
? ? drop(tx); // Drop the sender to close the channel
? ? while let Ok(message) = rx.recv() {
? ? ? ? println!("Received from Thread: {}", message);
? ? }
? ? handle1.join().unwrap();
? ? handle2.join().unwrap();
? ? handle3.join().unwrap();
}?
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./AsyncThreadWithMultipleProdeucer
Hello from thread1
Hello from thread2
Om from-Thread1
Hello from thread3
Received from Thread: Jai-Thread1
Received from Thread: Shree-Thread2
Received from Thread: Ram-Thread3
/*
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./AsyncThreadWithMultipleProdeucer
Hello from thread1
Hello from thread2
Om from-Thread1
Received from Thread: Jai-Thread1
Received from Thread: Shree-Thread2
Hello from thread3
Received from Thread: Ram-Thread3
*/
Advantages:
Disadvantages:
Suitable Scenarios:
2. Asynchronous channel using tokio:
Asynchronous channels are commonly used in asynchronous and concurrent programming scenarios and are provided by libraries like tokio through the tokio::sync::mpsc module. Here's an example of creating an asynchronous channel using tokio:
use tokio::sync::mpsc;
use tokio::task;
#[tokio::main]
async fn main() {
? ? // Create an asynchronous channel with an unbounded capacity
? ? let (mut tx, mut rx) = mpsc::channel(10);
? ? // Spawn a task to send a message
? ? let send_task = task::spawn(async move {
? ? ? ? let message = String::from("Hello from the sender!");
? ? ? ? tx.send(message).await.expect("Failed to send message");
? ? ? ? println!("Sender: Message sent");
? ? });
? ? // Receive the message from the channel
? ? let received = rx.recv().await.expect("Failed to receive message");
? ? println!("Receiver: Received message: {}", received);
? ? // Wait for the send task to complete
? ? send_task.await.expect("Failed to join send task");
}
/*
warning: `tokio_chnnl_demo` (bin "tokio_chnnl_demo") generated 2 warnings
? ? Finished dev [unoptimized + debuginfo] target(s) in 7.52s
? ? ?Running `target/debug/tokio_chnnl_demo`
Sender: Message sent
Receiver: Received message: Hello from the sender!
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/tokio_chnnl_demo$
*/
In the above example, we use the Tokio runtime and its asynchronous channel implementation (tokio::sync::mpsc::channel). The channel has an unbounded capacity, allowing multiple messages to be sent and received. We spawn a new asynchronous task to send a message using the send method. On the receiving side, we use the recv method with await to asynchronously wait for a message from the channel.
Note that in the async example, we use the #[tokio::main] attribute to create a Tokio runtime and execute the main function in an asynchronous context.
It's important to note that the tokio library is commonly used for asynchronous programming in Rust, and its asynchronous channels are specifically designed to work well with asynchronous execution models.
Will try to write a separate article on tokio lib .
Why tokio channels are preferred for async channels?
The main advantage of using channels provided by the Tokio library (tokio::sync::mpsc) compared to the channels in std::sync::mpsc is that Tokio channels are designed specifically for asynchronous programming and integrate well with the Tokio runtime.
Here are some key advantages and differences of Tokio channels:
What is tokio runtime?
Tokio runtime can be thought of as a scheduler or task executor. It is responsible for managing the execution of asynchronous tasks in a Rust application and provides the necessary infrastructure for efficient and concurrent programming.
The Tokio runtime utilizes an event-driven architecture to handle asynchronous tasks. It includes an event loop, sometimes referred to as a reactor, that listens for events and dispatches tasks for execution based on those events. These events can include I/O operations completing, timers expiring, or other types of notifications.
When a task is executed within the Tokio runtime, it runs in a non-blocking manner, allowing other tasks to make progress concurrently. This concurrency is achieved through the use of asynchronous programming techniques, such as futures and async/await syntax, which allow tasks to suspend and resume execution without blocking threads.
The Tokio runtime manages task scheduling and coordination, ensuring that tasks are executed in an efficient and orderly manner. It handles task prioritization, event handling, and resource management, allowing developers to focus on writing asynchronous code without worrying about low-level details.
By abstracting away the complexities of managing threads and I/O operations, the Tokio runtime simplifies the process of building highly concurrent applications. It provides a unified and efficient way to handle I/O-bound operations and enables applications to efficiently utilize system resources.
Bidirectional communication using channels
In Rust, channels are typically unidirectional, meaning they allow communication in one direction (from sender to receiver). However, you can achieve bi-directional communication using two separate channels: one for each direction of communication.
Here's an example of how to implement bi-directional communication using two channels:
use std::sync::mpsc;
use std::thread;
fn main() {
? ? let (tx1, rx1) = mpsc::channel(); // Channel 1 for communication
// from sender1 to receiver1
? ? let (tx2, rx2) = mpsc::channel(); // Channel 2 for communication
// from sender2 to receiver2
? ? // Spawn two threads for the senders
? ? let handle1 = thread::spawn(move || {
? ? ? ? tx1.send("Message from thrd1").unwrap();
? ? ? ? let received = rx2.recv().unwrap();
? ? ? ? println!("Thrd1 received: {}", received);
? ? });
? ? let handle2 = thread::spawn(move || {
? ? ? ? tx2.send("Message from thrd22").unwrap();
? ? ? ? let received = rx1.recv().unwrap();
? ? ? ? println!("Thrd2 received: {}", received);
? ? });
? ? // Wait for the sender threads to finish
? ? handle1.join().unwrap();
? ? handle2.join().unwrap();
}
/* Op =>
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/Threads$ ./BiDirectionComm_chnl
Thrd2 received: Message from thrd1
Thrd1 received: Message from thrd2
*/
The two threads run concurrently, allowing bi-directional communication between Thrd1 and Thrd2 through the channels.
Keep in mind that in this example, the communication is synchronized, meaning the threads will block until both sides of the communication are ready. If you need asynchronous communication, We may need to explore other concurrency models or libraries like Tokio.
Choosing the Right Channel
The choice between synchronous and asynchronous channels depends on the requirements of your application. Synchronous channels are suitable for scenarios where you want blocking behavior and need to control the buffer capacity. Asynchronous channels are more appropriate for non-blocking and asynchronous programming scenarios.
Consider factors such as the expected load, concurrency requirements, and the design of your application when choosing between synchronous and asynchronous channels. Both types of channels provide efficient and safe inter-thread communication in Rust, allowing you to build robust concurrent applications.
Conclusion
Channels provide a powerful mechanism for inter-thread communication in Rust. They allow safe and synchronized message passing between threads, enabling coordination and data exchange. By using channels, Rust programs can achieve efficient and concurrent communication while maintaining the ownership and borrowing guarantees provided by the language.
Remember to handle the Result values returned by the send and recv methods appropriately to handle errors and gracefully handle channel closures. Channels are a valuable tool in your concurrency toolbox, allowing you to build robust and scalable concurrent applications in Rust.
Thanks for reading till end . Please comment if you have any !
Chief Founding Engineer | Director of Engineering. Passionate About Blockchain & Data Science ??. Expert in ETH, Solana, Cosmos, Polkadot | JS, Python, Rust, Go. Driving Innovation & Building the Future of Technology ??
1 年Too Informative, Thanks, buddy.