Channels in Rust

Concurrency and communication between threads are essential aspects of many concurrent applications. Rust provides robust mechanisms for inter-thread communication (IPC) to facilitate safe and efficient communication between threads. One such mechanism is the Channel, which is available in the std::sync::mpsc module.


Channels provide a message-passing communication mechanism between threads. They allow threads to send and receive values across a synchronized channel. The mpsc acronym stands for "multiple producer, single consumer," meaning that multiple threads can produce or send values into the channel, but only a single thread can consume or receive those values.

Key points about channels in Rust:

  1. Communication: Channels enable communication between different threads or tasks by sending messages or data from one end (sender) and receiving them at the other end (receiver).
  2. Synchronization: Channels also provide synchronization capabilities, allowing threads or tasks to coordinate their actions by blocking or waiting until messages are available to be received.
  3. Multiple Producers, Single Consumer: Channels in Rust support multiple senders (Sender) but only a single receiver (Receiver). This design ensures safe and consistent message passing.
  4. Ownership and Move Semantics: Messages or data sent through channels are typically owned by the sender and transferred to the receiver using Rust's move semantics. This ensures memory safety and avoids data races.
  5. Blocking and Non-Blocking Operations: Channels support both blocking and non-blocking operations. Senders can either block until the message is sent or use non-blocking methods to attempt sending without blocking.
  6. Error Handling: Sending messages through a channel can result in errors, such as when the receiver has been dropped. Proper error handling is essential to handle such scenarios gracefully.


Creating a Channel

To create a channel, we use the channel function provided by the std::sync::mpsc module. This function returns a pair of objects: a Sender and a Receiver. The Sender can be used to send values into the channel, while the Receiver can be used to receive values from the channel.

Let's see an example of creating a channel:

use std::sync::mpsc; 
fn main() { 
    // Create a channel 
    let (sender, receiver) = mpsc::channel(); 
}         

Sending and Receiving Values

Once we have a Sender and Receiver, we can use them to send and receive values, respectively. The Sender provides the send method, which is used to send a value into the channel. The Receiver provides the recv method, which is used to receive a value from the channel.

Here's an example of sending and receiving values through a channel:

use std::sync::mpsc;
use std::thread;

fn main() {
? ? let (sender, receiver) = mpsc::channel();

? ? // Spawn a thread to send values
? ? let handle = thread::spawn(move || {
? ? ? ? sender.send(21).unwrap();
? ? ? ? sender.send(99).unwrap();
? ? ? ? sender.send(123).unwrap();
? ? });

? ? // Receive values in the main thread
    while let Ok(msg) = receiver.recv() {
        println!("Received: {:?}", msg);
    }

? ? let res = handle.join();
? ? match res {
        Ok(val) => println!("Success in joining thread "),
	Err(_msg) => {
	    println!("Eroor in joining thread");
	},
    }
}
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./asyncThread
Received: 21
Received: 99
Received: 123
Success in joining thread
*/        

In this example, we spawn a new thread that sends three values (21, 99, and 123) into the channel using the send method. In the main thread, we receive the values from the channel using the recv method. Note that the recv method blocks until a value is available in the channel.

Error Handling

When sending values into a channel, the send method returns a Result indicating whether the value was successfully sent or if an error occurred. Similarly, when receiving values from a channel, the recv method returns a Result indicating whether a value was received or if an error occurred.

It's important to handle these Result values properly to handle potential errors during channel communication. Additionally, if all senders are dropped, the recv method will return Err to signal that the channel has been closed.


Closing Channels

In Rust's std::sync::mpsc (multiple producer, single consumer) channels, we need to close the channels by dropping the sender side of the channel. Dropping the sender indicates to the receiver that no more messages will be sent, allowing the receiver to gracefully exit its loop or handle the termination signal accordingly. Failing to do so can lead to the receiver waiting indefinitely for messages that will never arrive.

When a sender is dropped, it sends an internal termination signal to the receiver, notifying it that the channel has been closed and no further messages will be sent. The receiver can then gracefully exit its loop or handle the termination signal accordingly.

Here's an example to illustrate the importance of dropping the sender:

/*
* Demonstrate the dorpping the sender to signal the receiver that no 
*  message will be sent
*/

use std::sync::mpsc;
use std::thread;

fn main() {
? ? let (sender, receiver) = mpsc::channel();

? ? // Spawn a thread to receive messages
? ? let receiver_thread = thread::spawn(move || {
? ? ? ? loop {
? ? ? ? ? ? match receiver.recv() {
? ? ? ? ? ? ? ? Ok(message) => println!("Received: {}", message),
? ? ? ? ? ? ? ? Err(_) => {
? ? ? ? ? ? ? ? ? ? println!("Sender dropped, exiting receiver loop");
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? });

? ? // Send some messages
? ? sender.send("Jai Shree Ram").unwrap();
? ? sender.send("Jai Bajarang Bali").unwrap();

? ? // It's important to drop the sender to signal the end of messages
? ? drop(sender);

? ? // Wait for the receiver thread to finish
? ? receiver_thread.join().unwrap();
}
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./AsyncThreadDroppingChannel
Received: Jai Shree Ram
Received: Jai Bajarang Bali
Sender dropped, exiting receiver loop
*/        

In the example above, the drop(sender) statement is used to explicitly drop the sender side of the channel, indicating to the receiver that no more messages will be sent. When the receiver detects the termination signal by receiving an Err value from receiver.recv(), it exits the loop and the program can continue or terminate gracefully.

By properly dropping the sender, you ensure that both the sender and receiver sides of the channel are in sync and can correctly handle the end of communication.


Type of channels in Rust

In Rust, there are two main types of channels available for inter-thread communication: synchronous channels and asynchronous channels. These channels differ in their behavior and usage patterns.


Synchronous Channels (std::sync::mpsc)

Synchronous channels, also known as bounded channels, provide blocking operations for sending and receiving values. These channels have a fixed capacity, and when the channel is full, the sender will block until there is space available for the value to be sent. Similarly, when the channel is empty, the receiver will block until a value is available to be received.

Synchronous channels are created using the std::sync::mpsc module and the sync_channel function. Here's an example of creating a synchronous channel:

use std::sync::mpsc;

fn main() { 
    let (sender, receiver) = mpsc::sync_channel(5); 
}         

In this example, sync_channel creates a synchronous channel with a capacity of 5. Both the sender and receiver can block when the channel is full or empty, respectively.

Complete example:

We create a synchronous channel using mpsc::sync_channel. The channel has a capacity of 1, meaning it can hold only one message at a time. We spawn a new thread to send a message through the channel using the send method. On the receiving side, we use the recv method to block until a message is received from the channel.

use std::sync::mpsc;
use std::thread;

fn main() {
? ? // Create a synchronous channel with a capacity of 1
? ? let (tx, rx) = mpsc::sync_channel(1);

? ? // Spawn a thread to send a message
? ? thread::spawn(move || {
? ? ? ? let message = String::from("Hello from the sender!");
? ? ? ? tx.send(message).expect("Failed to send message");
? ? ? ? println!("Sender: Message sent");
? ? });

? ? // Receive the message from the channel
? ? let received = rx.recv().expect("Failed to receive message");
? ? println!("Receiver: Received message: {}", received);
}
/*
Op =>
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/Threads$ ./syncChannel
Sender: Message sent
Receiver: Received message: Hello from the sender!
*/        

Advantages:

  1. Capacity Control: Bounded channels have a fixed capacity, allowing you to control the maximum number of items that can be held in the channel. This helps prevent resource exhaustion and manage backpressure.
  2. Backpressure: When a bounded channel reaches its capacity, it naturally applies backpressure to the sender, forcing it to either block or handle the situation gracefully.
  3. Resource Management: Bounded channels are useful in situations where resources (such as memory or processing time) need to be managed efficiently.

Disadvantages:

  1. Potential Blocking: If the sender tries to send a value to a full bounded channel, it will block until there is space available. This can lead to potential deadlock or waiting if not handled properly.
  2. Limited Capacity: Bounded channels have a fixed capacity, which means they can only hold a certain number of items. If the capacity is not sufficient for the workload, it can lead to dropped or delayed messages.

Suitable Scenarios:

  1. Producer-Consumer Patterns: Bounded channels are commonly used in producer-consumer scenarios, where you want to control the flow of items between producers and consumers, preventing overload or resource exhaustion.
  2. Rate Limiting: If you need to limit the rate at which data is processed or transmitted, a bounded channel can be used to enforce the rate by setting an appropriate capacity.
  3. Resource Pooling: Bounded channels can be used to manage pools of limited resources, ensuring that the number of available resources does not exceed the capacity.

Difference between Sync and Async channels:

1. Synchronous Channels (sync_channel):

  • Synchronous channels provide blocking send and receive operations.
  • When a sender attempts to send a message, it will block until there is a corresponding receiver ready to receive the message.
  • Similarly, when a receiver attempts to receive a message, it will block until there is a corresponding sender ready to send a message.
  • Synchronous channels have a bounded capacity, meaning they can hold a certain number of messages before the sender blocks.
  • They ensure that the sender and receiver are synchronized, as messages are exchanged in a request-response manner.
  • Synchronous channels are useful when you want to control the flow of communication explicitly and ensure that each message is processed in order.

2. Asynchronous Channels (channel()):

  • Asynchronous channels provide non-blocking send and receive operations.
  • When a sender attempts to send a message, it will immediately return without blocking, even if there is no receiver.
  • Similarly, when a receiver attempts to receive a message, it will immediately return without blocking, even if there is no sender.
  • Asynchronous channels have an unbounded capacity, meaning they can hold an unlimited number of messages.
  • They allow for decoupled communication between threads, where messages can be sent and received independently of each other.
  • Asynchronous channels are suitable when you want to achieve high concurrency and decouple the sender and receiver, allowing them to work at their own pace.

Here's an example to illustrate the difference between synchronous and asynchronous channels:

/*
Purpose of this program is demonstrate the 
difference between sync and async channels.
Espically w.r.t blocking natuture of sync channels 
and non-blocking nature of async channels.
*/

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
? ? // Synchronous channel with capacity 1
? ? let (sync_sender, sync_receiver) = mpsc::sync_channel(1);

? ? // Asynchronous channel
? ? let (async_sender, async_receiver) = mpsc::channel();

? ? // Spawn a thread for synchronous communication
? ? let sync_thread = thread::spawn(move || {
? ? ? ? sync_sender.send("SyncMsg1 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message1 sent");
? ? ? ? sync_sender.send("SyncMsg2 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message2 sent");
	sync_sender.send("SyncMsg3 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message3 sent");
	sync_sender.send("SyncMsg4 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message4 sent");
	sync_sender.send("SyncMsg5 from synchronous channel").unwrap();
? ? ? ? println!("Synchronous message5 sent");
? ? });

? ? // Spawn a thread for asynchronous communication
? ? let async_thread = thread::spawn(move || {
? ? ? ? async_sender.send("ASyncMsg1 from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message1 sent");
? ? ? ? async_sender.send("ASyncMsg2 from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message2 sent");
	async_sender.send("ASyncMsg3 from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message3 sent");
	async_sender.send("ASyncMsg4from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message4 sent");
	async_sender.send("ASyncMsg5 from asynchronous channel").unwrap();
? ? ? ? println!("Asynchronous message5 sent");
? ? });

? ? // Receive messages from synchronous channel
? ? for message in sync_receiver {
	thread::sleep(Duration::from_secs(1));
? ? ? ? println!("Received from synchronous channel: {}", message);
? ? }

    // Receive messages from asynchronous channel
? ? for message in async_receiver {
	thread::sleep(Duration::from_secs(1));
? ? ? ? println!("Received from asynchronous channel: {}", message);
? ? }

? ? // Wait for the threads to finish
? ? sync_thread.join().unwrap();
? ? async_thread.join().unwrap();
}

/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./DiffBwSyncAndAsyncChannel
Synchronous message1 sent
Synchronous message2 sent
Asynchronous message1 sent
Asynchronous message2 sent
Asynchronous message3 sent
Asynchronous message4 sent
Asynchronous message5 sent
Received from synchronous channel: SyncMsg1 from synchronous channel
Synchronous message3 sent
Received from synchronous channel: SyncMsg2 from synchronous channel
Synchronous message4 sent
Received from synchronous channel: SyncMsg3 from synchronous channel
Synchronous message5 sent
Received from synchronous channel: SyncMsg4 from synchronous channel
Received from synchronous channel: SyncMsg5 from synchronous channel
Received from asynchronous channel: ASyncMsg1 from asynchronous channel
Received from asynchronous channel: ASyncMsg2 from asynchronous channel
Received from asynchronous channel: ASyncMsg3 from asynchronous channel
Received from asynchronous channel: ASyncMsg4from asynchronous channel
Received from asynchronous channel: ASyncMsg5 from asynchronous channel
*/        

Please see the o/p of above program.

Synchronous messages are waiting for message to be delivered, since channel capacity is only 1, then sender is waiting for receiver to consume.

Since receiver is receiving the message in 1 sec delay, sending is also sending the message with 1 sec delay in sync messages .


Whereas in Async message, you can see sending is not waiting for receiver to consume the message, it just sent all the message one by one.

And receiver consume the message with 1 sec delay.


Bidirectional communication between 2 threads using channels:

/*
Bi directional communication using 2 seperate sync channel.
The communication between thread 1 and 2 are synced .
1st thread1 sends msg
Then thread 2 receives
Then thread2 send message to thread1
Then thread 1 receives the message from Thread2
*/
use std::sync::mpsc;
use std::thread;


fn main() {
? ? let (tx1, rx1) = mpsc::sync_channel(1); // Channel 1 for communication 
                                            // from sender1 to receiver1
? ? let (tx2, rx2) = mpsc::sync_channel(1); // Channel 2 for communication 
                                            // from sender2 to receiver2
? ? // Spawn two threads for the senders
? ? let handle1 = thread::spawn(move || {
	let s = String::from("Hello(Jai ShreeRam)");
? ? ? ? tx1.send(s.clone()).unwrap();
	println!("Thread 1 sent the message to thrd 2 => {}: ",s);
? ? ? ? let received:String = rx2.recv().unwrap();
? ? ? ? println!("Thread 1 received: => {}", received);
? ? });

? ? let handle2 = thread::spawn(move || {
	let s = String::from("Hello(Jai Bajrang bali)");
? ? ? ? let received = rx1.recv().unwrap();
	//let received: String = rx1.recv().unwrap();
? ? ? ? println!("Thread 2 received: => {}", received);
	//tx1.send("Message from sender1, Hello(Jai ShreeRam? ?)".to_String()).unwrap();
	tx2.send(s.clone()).unwrap();
	println!("Thread 2 sent the message to thrd 1 =>{}: ",s);
? ? });

? ? // Wait for the sender threads to finish
? ? handle1.join().unwrap();
? ? handle2.join().unwrap();
}

/*
 Op=> 
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./BiDirectionComm_SyncChnl
Thread 1 sent the message to thrd 2 => Hello(Jai ShreeRam):
Thread 2 received: => Hello(Jai ShreeRam)
Thread 2 sent the message to thrd 1 =>Hello(Jai Bajrang bali):
Thread 1 received: => Hello(Jai Bajrang bali)
*/*        

In the above code , using 2 sync channels with capacity 1 , we did 1 round trip from thread1 -> thread 2 -> thread 1.


There are different ways to create the async messages in Rust, please see below.

Asynchronous Channels (sync::mpsc)

Asynchronous channels can be created in 2 ways:

  1. using sync::mpsc
  2. tokio::sync::mpsc


1. Asynchronous Channels Using sync::mpsc

Asynchronous channels, also known as unbounded channels, provide non-blocking operations for sending and receiving values. These channels have an unlimited capacity, meaning that senders can always send values without blocking, and receivers can always receive values without blocking. If the receiver is not ready to receive a value, it can choose to await or poll the channel later.

/*
Demonstrate the multiple producer and snigle sender.
cloning the sender?
droping the unmoved sender to indicate the receiver no more message will be sent.
*/
use std::sync::mpsc;
use std::thread;

fn main() {
? ? let (tx, rx) = mpsc::channel(); // Unbounded channel
? ? let tx1 = tx.clone();
? ? let handle1 = thread::spawn(move || {
? ? ? ? println!("Hello from thread1");
? ? ? ? let message: String = String::from("Jai-Thread1");
? ? ? ? tx1.send(message).unwrap();
	// At the end of this block , tx1 is automatically dropped?
	// since tx1 is shared with this thread via move.?
? ? });

? ? let tx2 = tx.clone();
? ? let handle2 = thread::spawn(move || {
? ? ? ? println!("Hello from thread2");
? ? ? ? let message: String = String::from("Shree-Thread2");
? ? ? ? tx2.send(message).unwrap();
? ? ? ? // At the end of this block , tx2 is automatically dropped?
	// since tx2 is shared with this thread via move.?
? ? });

? ? let tx3 = tx.clone();
? ? let handle3 = thread::spawn(move || {
? ? ? ? println!("Hello from thread3");
? ? ? ? let message: String = String::from("Ram-Thread3");
? ? ? ? tx3.send(message).unwrap();
? ? ? ? // At the end of this block , tx3 is automatically dropped?
	// since tx3 is shared with this thread via move.?
? ? });

? ? println!("Om from-Thread1");

    // We need to drop tx as it is still in scope. 
    // if we don't drop tx  the below while loop?
    // looking for message will be waiting indefinitely to 
    // look for messages from tx

? ? drop(tx); // Drop the sender to close the channel

? ? while let Ok(message) = rx.recv() {
? ? ? ? println!("Received from Thread: {}", message);
? ? }

? ? handle1.join().unwrap();
? ? handle2.join().unwrap();
? ? handle3.join().unwrap();
}?
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./AsyncThreadWithMultipleProdeucer
Hello from thread1
Hello from thread2
Om from-Thread1
Hello from thread3
Received from Thread: Jai-Thread1
Received from Thread: Shree-Thread2
Received from Thread: Ram-Thread3
/*
/*
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/DS/Threads$ ./AsyncThreadWithMultipleProdeucer
Hello from thread1
Hello from thread2
Om from-Thread1
Received from Thread: Jai-Thread1
Received from Thread: Shree-Thread2
Hello from thread3
Received from Thread: Ram-Thread3
*/        


Advantages:

  1. No Capacity Limit: Unbounded channels do not have a fixed capacity, allowing them to hold an unlimited number of items. This can be useful in situations where the workload is unpredictable or where you don't want to impose artificial limits.
  2. No Blocking on Send: Senders can always send items to an unbounded channel without being blocked, regardless of the current number of items in the channel.

Disadvantages:

  1. Potential Resource Exhaustion: Since unbounded channels have no capacity limit, they can potentially consume unlimited memory or other resources if items are produced faster than they are consumed.
  2. Lack of Backpressure: Unbounded channels do not provide built-in backpressure mechanisms. If the consumer is slower than the producer, the channel can grow indefinitely, leading to potential memory or resource issues.

Suitable Scenarios:

  1. Asynchronous Communication: Unbounded channels are commonly used in asynchronous communication patterns, where the workload or the rate of data production is unpredictable.
  2. Event Streams: If you are working with event streams or event-driven architectures, where events can arrive at any time and need to be processed without imposing limits, unbounded channels can be suitable.


2. Asynchronous channel using tokio:

Asynchronous channels are commonly used in asynchronous and concurrent programming scenarios and are provided by libraries like tokio through the tokio::sync::mpsc module. Here's an example of creating an asynchronous channel using tokio:

use tokio::sync::mpsc;
use tokio::task;

#[tokio::main]
async fn main() {
? ? // Create an asynchronous channel with an unbounded capacity
? ? let (mut tx, mut rx) = mpsc::channel(10);

? ? // Spawn a task to send a message
? ? let send_task = task::spawn(async move {
? ? ? ? let message = String::from("Hello from the sender!");
? ? ? ? tx.send(message).await.expect("Failed to send message");
? ? ? ? println!("Sender: Message sent");
? ? });

? ? // Receive the message from the channel
? ? let received = rx.recv().await.expect("Failed to receive message");
? ? println!("Receiver: Received message: {}", received);

? ? // Wait for the send task to complete
? ? send_task.await.expect("Failed to join send task");
}
/*
warning: `tokio_chnnl_demo` (bin "tokio_chnnl_demo") generated 2 warnings
? ? Finished dev [unoptimized + debuginfo] target(s) in 7.52s
? ? ?Running `target/debug/tokio_chnnl_demo`
Sender: Message sent
Receiver: Received message: Hello from the sender!
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/tokio_chnnl_demo$

*/        

In the above example, we use the Tokio runtime and its asynchronous channel implementation (tokio::sync::mpsc::channel). The channel has an unbounded capacity, allowing multiple messages to be sent and received. We spawn a new asynchronous task to send a message using the send method. On the receiving side, we use the recv method with await to asynchronously wait for a message from the channel.

Note that in the async example, we use the #[tokio::main] attribute to create a Tokio runtime and execute the main function in an asynchronous context.

It's important to note that the tokio library is commonly used for asynchronous programming in Rust, and its asynchronous channels are specifically designed to work well with asynchronous execution models.

Will try to write a separate article on tokio lib .

FYI : Tutorial | Tokio - An asynchronous Rust runtime

Why tokio channels are preferred for async channels?

The main advantage of using channels provided by the Tokio library (tokio::sync::mpsc) compared to the channels in std::sync::mpsc is that Tokio channels are designed specifically for asynchronous programming and integrate well with the Tokio runtime.

Here are some key advantages and differences of Tokio channels:

  1. Asynchronous Nature: Tokio channels are designed to work seamlessly with asynchronous programming. They leverage non-blocking I/O operations and async/await syntax to enable efficient and scalable concurrent programming.
  2. Integration with Tokio Runtime: Tokio channels are built to integrate with the Tokio runtime, which is an asynchronous runtime for Rust. The Tokio runtime provides a high-performance, task-based execution model and handles scheduling of asynchronous tasks. Using Tokio channels within the Tokio runtime allows for efficient coordination and communication between async tasks.
  3. Backpressure and Capacity Management: Tokio channels support backpressure and allow controlling the capacity of the channel. Backpressure is a mechanism to control the flow of data when the receiver is slower than the sender. Tokio channels provide methods like send and try_send that return futures indicating whether the send operation was successful or not. This allows for more fine-grained control over handling backpressure.
  4. Multi-Producer, Single-Consumer (MPSC): Both std::sync::mpsc and tokio::sync::mpsc provide MPSC channels, which allow multiple senders but only one receiver. However, Tokio channels provide additional features for asynchronous programming, such as buffering options and capacity management, which are useful in scenarios where non-blocking communication is required.
  5. Runtime Integration: Tokio channels work well within the Tokio runtime and can be used alongside other Tokio utilities, such as timers, futures, and asynchronous I/O operations. This integration allows for building highly concurrent and efficient asynchronous systems.
  6. Tokio channels provide a more specialized and feature-rich solution for asynchronous communication and coordination in Rust. If you're building asynchronous applications or working with the Tokio runtime, using tokio::sync::mpsc can provide better integration and performance compared to std::sync::mpsc. However, if you're working with synchronous code or don't require the advanced features of Tokio channels, std::sync::mpsc can still be a suitable choice.


What is tokio runtime?

Tokio runtime can be thought of as a scheduler or task executor. It is responsible for managing the execution of asynchronous tasks in a Rust application and provides the necessary infrastructure for efficient and concurrent programming.

The Tokio runtime utilizes an event-driven architecture to handle asynchronous tasks. It includes an event loop, sometimes referred to as a reactor, that listens for events and dispatches tasks for execution based on those events. These events can include I/O operations completing, timers expiring, or other types of notifications.

When a task is executed within the Tokio runtime, it runs in a non-blocking manner, allowing other tasks to make progress concurrently. This concurrency is achieved through the use of asynchronous programming techniques, such as futures and async/await syntax, which allow tasks to suspend and resume execution without blocking threads.

The Tokio runtime manages task scheduling and coordination, ensuring that tasks are executed in an efficient and orderly manner. It handles task prioritization, event handling, and resource management, allowing developers to focus on writing asynchronous code without worrying about low-level details.

By abstracting away the complexities of managing threads and I/O operations, the Tokio runtime simplifies the process of building highly concurrent applications. It provides a unified and efficient way to handle I/O-bound operations and enables applications to efficiently utilize system resources.

Bidirectional communication using channels

In Rust, channels are typically unidirectional, meaning they allow communication in one direction (from sender to receiver). However, you can achieve bi-directional communication using two separate channels: one for each direction of communication.

Here's an example of how to implement bi-directional communication using two channels:

use std::sync::mpsc;
use std::thread;


fn main() {
? ? let (tx1, rx1) = mpsc::channel(); // Channel 1 for communication 
                                      // from sender1 to receiver1
? ? let (tx2, rx2) = mpsc::channel(); // Channel 2 for communication 
                                      // from sender2 to receiver2


? ? // Spawn two threads for the senders
? ? let handle1 = thread::spawn(move || {
? ? ? ? tx1.send("Message from thrd1").unwrap();
? ? ? ? let received = rx2.recv().unwrap();
? ? ? ? println!("Thrd1 received: {}", received);
? ? });


? ? let handle2 = thread::spawn(move || {
? ? ? ? tx2.send("Message from thrd22").unwrap();
? ? ? ? let received = rx1.recv().unwrap();
? ? ? ? println!("Thrd2 received: {}", received);
? ? });


? ? // Wait for the sender threads to finish
? ? handle1.join().unwrap();
? ? handle2.join().unwrap();
}

/* Op => 
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/Threads$ ./BiDirectionComm_chnl
Thrd2 received: Message from thrd1
Thrd1 received: Message from thrd2
*/        

The two threads run concurrently, allowing bi-directional communication between Thrd1 and Thrd2 through the channels.

Keep in mind that in this example, the communication is synchronized, meaning the threads will block until both sides of the communication are ready. If you need asynchronous communication, We may need to explore other concurrency models or libraries like Tokio.

Choosing the Right Channel

The choice between synchronous and asynchronous channels depends on the requirements of your application. Synchronous channels are suitable for scenarios where you want blocking behavior and need to control the buffer capacity. Asynchronous channels are more appropriate for non-blocking and asynchronous programming scenarios.

Consider factors such as the expected load, concurrency requirements, and the design of your application when choosing between synchronous and asynchronous channels. Both types of channels provide efficient and safe inter-thread communication in Rust, allowing you to build robust concurrent applications.

Conclusion

Channels provide a powerful mechanism for inter-thread communication in Rust. They allow safe and synchronized message passing between threads, enabling coordination and data exchange. By using channels, Rust programs can achieve efficient and concurrent communication while maintaining the ownership and borrowing guarantees provided by the language.

Remember to handle the Result values returned by the send and recv methods appropriately to handle errors and gracefully handle channel closures. Channels are a valuable tool in your concurrency toolbox, allowing you to build robust and scalable concurrent applications in Rust.


Thanks for reading till end . Please comment if you have any !

Pankaj Singh Rathore

Chief Founding Engineer | Director of Engineering. Passionate About Blockchain & Data Science ??. Expert in ETH, Solana, Cosmos, Polkadot | JS, Python, Rust, Go. Driving Innovation & Building the Future of Technology ??

1 年

Too Informative, Thanks, buddy.

要查看或添加评论,请登录

Amit Nadiger的更多文章

  • List of C++ 17 additions

    List of C++ 17 additions

    1. std::variant and std::optional std::variant: A type-safe union that can hold one of several types, useful for…

  • List of C++ 14 additions

    List of C++ 14 additions

    1. Generic lambdas Lambdas can use auto parameters to accept any type.

    6 条评论
  • Passing imp DS(vec,map,set) to function

    Passing imp DS(vec,map,set) to function

    In Rust, we can pass imp data structures such as , , and to functions in different ways, depending on whether you want…

  • Atomics in C++

    Atomics in C++

    The C++11 standard introduced the library, providing a way to perform operations on shared data without explicit…

    1 条评论
  • List of C++ 11 additions

    List of C++ 11 additions

    1. Smart Pointers Types: std::unique_ptr, std::shared_ptr, and std::weak_ptr.

    2 条评论
  • std::lock, std::trylock in C++

    std::lock, std::trylock in C++

    std::lock - cppreference.com Concurrency and synchronization are essential aspects of modern software development.

    3 条评论
  • std::unique_lock,lock_guard, & scoped_lock

    std::unique_lock,lock_guard, & scoped_lock

    C++11 introduced several locking mechanisms to simplify thread synchronization and prevent race conditions. Among them,…

  • Understanding of virtual & final in C++ 11

    Understanding of virtual & final in C++ 11

    C++ provides powerful object-oriented programming features such as polymorphism through virtual functions and control…

  • Importance of Linux kernal in AOSP

    Importance of Linux kernal in AOSP

    The Linux kernel serves as the foundational layer of the Android Open Source Project (AOSP), acting as the bridge…

    1 条评论
  • AOSP

    AOSP

    Android System Development AOSP stands for the Android Open Source Project. Its the foundation of the Android operating…

社区洞察

其他会员也浏览了