Threads in Rust

Rust is a systems programming language that aims to provide memory safety and concurrency. Rust threads are one of the key features of the language that enable concurrent programming. Threads allow a program to perform multiple tasks simultaneously, improving the overall performance and responsiveness of the application. Rust threads are lightweight and provide a safe, efficient way to implement concurrency.

Must read : Rust Atomics and Locks by Mara Bos

Theory behind Rust Threads

Rust threads are based on the concept of operating system threads, which are the smallest unit of work that a computer can execute. Threads allow a program to divide its workload into smaller, independent tasks that can be executed concurrently. Each thread has its own stack and can execute independently of other threads. Threads can communicate with each other through shared memory and synchronization mechanisms.

One of the key differences between Rust threads and other thread implementations is the ownership model. Rust has a unique ownership model that ensures memory safety and thread safety. In Rust, each variable has an owner, and ownership can be transferred between threads. This ensures that each thread has exclusive access to the data it needs to work with, preventing race conditions and other thread-safety issues.

How are Rust threads built?

Rust uses operating system threads (OS threads) internally. When a new thread is created in Rust using the std::thread::spawn() function, it creates a new OS thread for the Rust runtime to use. The Rust standard library provides an abstraction over the underlying OS threads, making it easy to create and manage threads in Rust without worrying about low-level details. The Rust compiler and standard library use the operating system's native thread API to create and manage threads, so the specifics of how threads are managed can vary depending on the operating system being used.

On Linux, Rust's standard library uses the pthreads library, which implements the POSIX threads standard. On Windows, Rust uses the Windows threads API. In general, Rust's standard library abstracts away the differences between operating systems, so you can write code that works on both Linux and Windows without worrying about the underlying thread implementation.


How to create the Threads?

There are several ways to create threads in Rust:

  • Using std::thread::spawn function 1. Using function pointer 2. using closure
  • Using the std::thread::Builder struct: this allows us to specify additional options such as the stack size for the new thread.
  • Scoped threads. ->Scoped threads in Rust
  • Using the rayon crate: this provides a high-level interface for parallel programming that includes automatic work stealing and load balancing.


  1. Creating Threads in Rust using std::thread::spawn function:

Creating a new thread in Rust is simple and can be done using the std::thread::spawn function. The function takes a closure that contains the code to be executed by the new thread. The closure can capture variables from the surrounding scope, allowing the thread to access shared data.


Simple thread creation:

fn print_jai_shree_ram() {
    println!("Jai shree Ram");
}

fn main() {
    let mut thread_handle = Vec::new();
    for _i in 0..=5 {
       let th = std::thread::spawn(print_jai_shree_ram);
       thread_handle.push(th);
    }
    for i in thread_handle.into_iter() {
        i.join();
    }
}
/*
Jai shree Ram
Jai shree Ram
Jai shree Ram
Jai shree Ram
Jai shree Ram
Jai shree Ram

*/        


Here is an example of creating thread using closure :

use std::thread;
fn print_data(data:String) -> &'static str {
    println!("{}",data);
    "Jai Bajrang bali"
}

fn main() {
    let mut thread_handle = Vec::new();
    let data = String::from("Jai Shree Ram");
    for _i in 0..=5 {
       let clone_data = data.clone();
       thread_handle.push(std::thread::spawn(move || {
            print_data(clone_data )
        }));
    }
    for i in thread_handle.into_iter() {
        let ret = i.join().unwrap();
        println!("return from thread ->{}",ret);
    }
}
/*
Jai Shree Ram
Jai Shree Ram
Jai Shree Ram
Jai Shree Ram
Jai Shree Ram
return from thread ->Jai Bajrang bali
return from thread ->Jai Bajrang bali
return from thread ->Jai Bajrang bali
return from thread ->Jai Bajrang bali
return from thread ->Jai Bajrang bali
Jai Shree Ram
return from thread ->Jai Bajrang bali
*/        

In this example, a new thread is created using the thread::spawn function. The closure passed to the function contains the code to be executed by the new thread. The join function is used to wait for the thread to finish executing before continuing. The unwrap function is used to handle any errors that may occur during the execution of the thread.


Also notice that the thread is returning the value which is collected in the the main thread using join()

2. Using the std::thread::Builder struct

use std::thread;

fn main() {
? ? // Create a new thread using the thread builder
? ? let builder = thread::Builder::new()
                               .name("my_thread".to_string())
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .stack_size(32 * 1024);
? ? let child = builder.spawn(|| {
? ? ? ? println!("Hello from a new thread!");
        let thread = thread::current();
? ? ? ? println!("Thread ID: {:?}", thread.id());
        println!("Thread Name: {:?}", thread.name());
? ? }).unwrap();


? ? // Wait for the child thread to finish
? ? child.join().unwrap();
}
/*
Hello from a new thread!
Thread ID: ThreadId(2)
Thread Name: Some("my_thread")
*/        

In this example, we create a new thread using the thread::Builder struct, which allows us to specify additional options such as the stack size for the new thread. Here, we don't specify any additional options, so the new thread will use the default stack size.

The std::thread::Builder struct in Rust allows us to specify additional options when creating a new thread. By default, a new thread is created with the minimum stack size required by the platform, but in some cases we may want to specify a larger stack size for the thread. The Builder struct allows us to set the stack size for the new thread using the stack_size method. Additionally, we can also specify the name of the new thread using the name method.

Another reason we might use the Builder struct is to catch panics that occur in the new thread. By default, if a panic occurs in a thread, the entire program will terminate. However, we can use the Builder struct to set a panic handler for the new thread using the panic_handler method. This allows us to catch the panic and handle it in a more controlled way, such as logging the error and continuing execution.

3. Using the rayon crate

use rayon::prelude::*;

fn main() {
    // Create a vector of numbers
    let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    // Use rayon to parallelize the sum of the numbers
    let sum = numbers.par_iter().sum::<i32>();

    println!("The sum of the numbers is {}", sum);
}
/*
Op => 
Finished dev [unoptimized + debuginfo] target(s) in 18.03s
Running `target/debug/jaiShreeRam`
The sum of the numbers is 55
*/        

In this example, we use the rayon crate to provide a high-level interface for parallel programming. We create a vector of numbers, and then use the par_iter method to parallelize the sum of the numbers. This allows rayon to automatically handle work stealing and load balancing to efficiently utilize all available CPU resources.

The above code is run using cargo

Need to add the rayon as a dependency in your Cargo.toml file:

Below is my Cargo.toml

[package]
name = "rayon_thread"
version = "0.1.0"
edition = "2021"


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html


[dependencies]
rayon = "1.5.1"        


Async task creation using tokio crate

This is not a thread, it is async functions which are much lighter compared to threads. We will discuss seperatly in other articles.Intro to Async Rust | LinkedIn

And 1 thread can run multiple async functions and one async can run on multiple threads.

use tokio::task;
use tokio::runtime::Runtime;

use std::thread;

async fn my_async_function() {
? ? println!("Starting async function");
? ? let handle = thread::current();
	
? ? println!("Async function complete");
}

fn main() {
? ? // create a new Tokio runtime
? ? let mut rt = Runtime::new().unwrap();

? ? // spawn a new Tokio task to run the async function
? ? let handle = rt.spawn(async {
? ? ? ? my_async_function().await;
? ? });

? ? println!("Spawned new task with handle {:?}", handle);
	// Wait for the task to complete
}

/*
Op => 
Spawned new task with handle JoinHandle { id: Id(5) }
Starting async function
Async function complete
*/        

Please note that the above code is run using the cargo :

Please do the below modification

Please add tokio as a dependency in your Cargo.toml file:

[dependencies] 
tokio = { version = "1.16.1", features = ["full"] }         

Then in your Rust code, add the following at the beginning of the file:

use tokio::task; // We have alreday done it .        

My Cargo.toml file:

[package]
name = "tokio_thread"
version = "0.1.0"
edition = "2021"


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html


[dependencies]
tokio = { version = "1.16.1", features = ["full"] }        

task::spawn function from Tokio requires that it be run inside the context of a Tokio runtime. In other words, you need to start a Tokio runtime before you can use task::spawn.

run using command: cargo run


Does rust support real time threads ?

Yes , Rust does support real-time threads. Real-time threads are threads that are guaranteed to meet their deadlines in real-time systems, which is crucial for certain applications such as control systems, aerospace, and robotics. In Rust, you can create real-time threads using the realtime module in the std library.

The realtime module provides a set of abstractions for real-time systems, such as Duration, Instant, Clock, Delay, and RealTimeThread. The RealTimeThread struct represents a real-time thread and provides methods for setting its priority and scheduling policy.

Here's an example of creating a real-time thread in Rust:

Please note that :

The below code sets the scheduling policy to SCHED_FIFO for the main thread using the sched_setscheduler function. It then creates a new thread using the builder.spawn() method, which spawns a new thread with the same scheduling policy as the main thread.

After the new thread is created, the scheduling policy for the main thread is reset to SCHED_OTHER using sched_setscheduler. Finally, the handle.join() method is called to wait for the new thread to complete before the program exits.

use std::thread;
use std::time::Duration;
use std::os::unix::thread::JoinHandleExt; // Import the JoinHandleExt trait to be able to call the .join() method on the JoinHandle
use std::os::raw::c_int; // Import the c_int type
use std::ffi::CString; // Import the CString type
use libc::{sched_param, sched_setscheduler, SCHED_FIFO}; // Import the necessary functions and constants from the libc crate


fn main() {
? ? let stack_size = 1024 * 1024;
? ? let mut stack = vec![0; stack_size]; // Create a stack buffer of the specified size
? ? let stack_bottom = stack.as_mut_ptr().wrapping_offset(stack_size as isize); // Get a pointer to the bottom of the stack buffer


? ? let mut param = sched_param { sched_priority: 10 }; // Create a new sched_param struct with a priority of 10
? ? let pid = unsafe { libc::getpid() }; // Get the PID of the current process
? ? let ret = unsafe { sched_setscheduler(pid, SCHED_FIFO, &mut param as *mut sched_param) }; // Set the scheduling policy of the current process to SCHED_FIFO with the specified priority
? ? if ret != 0 {
? ? ? ? println!("Failed to set scheduler: {:?}", ret);
? ? }


? ? let builder = thread::Builder::new().stack_size(stack.len()); // Create a new thread builder with the specified stack size


? ? let handle: std::thread::JoinHandle<()> = builder.spawn(|| { // Spawn a new thread with the closure as the thread's main function
? ? ? ? println!("Started new thread");
? ? ? ? for i in 1..=5 {
? ? ? ? ? ? println!("Thread count: {}", i);
? ? ? ? ? ? thread::sleep(Duration::from_millis(1000)); // Sleep the thread for 1 second
? ? ? ? }
? ? }).expect("Failed to spawn thread"); // Get a handle to the newly spawned thread


? ? let ret = unsafe { sched_setscheduler(pid, libc::SCHED_OTHER, &mut param as *mut sched_param) }; // Reset the scheduling policy of the current process to SCHED_OTHER
? ? if ret != 0 {
? ? ? ? println!("Failed to reset scheduler: {:?}", ret);
? ? }


? ? let result = handle.join(); // Wait for the thread to complete and get the result
? ? match result {
? ? ? ? Ok(()) => println!("Thread exited successfully"),
? ? ? ? Err(_) => println!("Thread panicked"),
? ? }
}


/*
Op =>
? ? Finished dev [unoptimized + debuginfo] target(s) in 0.18s
? ? ?Running `target/debug/jaiShreeRam`
Failed to set scheduler: -1
Failed to reset scheduler: -1
Started new thread
Thread count: 1
Thread count: 2
Thread count: 3
Thread count: 4
Thread count: 5
Thread exited successfully
*/        

The above code ran successfully but the sched_setscheduler system call failed with a return value of -1, which indicates an error. This means that the real-time priority scheduler was not set successfully for the thread.

The reason for this is same is in Linux pthreads ile below :

Note:?Sometime (At least in my case) even with root user I was not able to change the scheduling policy from SCHED_OTHER to SCHED_FIFO or SCHED_RR since by default, most Linux systems are not configured to allow non-real-time processes to use the SCHED_RR and SCHED_FIFO scheduling policies. These policies are typically reserved for real-time processes and require special privileges to be used. Therefore, even if you run your program with root privileges, you may still not be able to use these policies.

In order to use SCHED_RR and SCHED_FIFO policies, we need to modify the kernel configuration (make menuconfig?->?General setup -> Preemption Model =>?Change its value to?Fully Preemptible Kernel (RT).)

OR

simply use a real-time operating system that supports these policies out of the box.

My Cargo.toml

amit@DESKTOP-9LTOFUP:~/OmPracticeRust/jaiShreeRam$ cat Cargo.tom
[package]
name = "rt_thread"
version = "0.1.0"
edition = "2021"


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html


[dependencies]
libc = "0.2.101"l        

Below is explanation of the code :

  • The main() function starts by creating a stack of 1024 * 1024 bytes in size for the new thread using let mut stack = vec![0; stack_size];. The as_mut_ptr() method returns a raw pointer to the first element of the vector, and wrapping_offset() moves the pointer to the end of the stack.
  • Next, the priority of the new thread is set to 10 using let mut param = sched_param { sched_priority: 10 };. The getpid() function from the C library is used to obtain the process ID of the current process using let pid = unsafe { libc::getpid() };.
  • The scheduler is set to SCHED_FIFO using let ret = unsafe { sched_setscheduler(pid, SCHED_FIFO, &mut param as *mut sched_param) };. The sched_setscheduler() function sets the scheduling policy and parameters for the thread specified by the given pid. It returns 0 on success or -1 on error, which is checked using an if statement. If the function returns a value other than 0, it indicates that an error occurred, and the error message is printed to the console.
  • A new thread is created using the thread::Builder with the stack size set to the size of the stack using let builder = thread::Builder::new().stack_size(stack.len());. The spawn() method takes a closure as an argument that contains the code to be executed in the new thread. The closure contains a loop that prints a message every second for five seconds using thread::sleep(Duration::from_millis(1000));.
  • The scheduler is reset to the default scheduler using let ret = unsafe { sched_setscheduler(pid, libc::SCHED_OTHER, &mut param as *mut sched_param) };. This is done to avoid affecting the scheduling of other threads in the system. As before, the function return value is checked, and an error message is printed to the console if necessary.
  • The join() method is called on the handle to the new thread to wait for it to complete. The result of the join is then matched to check if the thread exited successfully or panicked using a match statement. If the thread exited successfully, the message "Thread exited successfully" is printed to the console. If the thread panicked, the message "Thread panicked" is printed to the console.

What is MPSC?

mpsc stands for "multiple producer, single consumer". In Rust, it refers to a module in the standard library that provides a way for multiple threads to send messages (or data) to a single receiving thread. It is useful for concurrent programming where multiple threads need to communicate with each other.

The mpsc module provides two main types: Sender and Receiver. A Sender can send messages to a Receiver, which receives and processes the messages. The Sender can be cloned, allowing multiple threads to send messages to the same Receiver.

The mpsc module uses a lock-free message queue internally for high performance and low overhead. It is designed to be safe for concurrent access from multiple threads without causing race conditions or deadlocks.

Here is an example of using mpsc in Rust:

use std::sync::mpsc;

fn main() {
? ? let (sender, receiver) = mpsc::channel();

? ? // Spawn a new thread that sends messages to the channel
? ? let sender_clone = sender.clone();
? ? std::thread::spawn(move || {
? ? ? ? for i in 0..10 {
? ? ? ? ? ? sender_clone.send(i).unwrap();
? ? ? ? ? ? std::thread::sleep(std::time::Duration::from_millis(100));
? ? ? ? }
? ? });
	
	std::thread::spawn(move || {
? ? ? ? for i in 0..10 {
? ? ? ? ? ? sender.send(i).unwrap();
? ? ? ? ? ? std::thread::sleep(std::time::Duration::from_millis(100));
? ? ? ? }
? ? });

? ? // Receive and process messages from the channel
? ? for received in receiver {
? ? ? ? println!("Received: {}", received);
? ? }
}
/*
Op => 
amit@DESKTOP-9LTOFUP:~/OmPracticeRust/Threads$ ./MpscThread
Received: 0
Received: 0
Received: 1
Received: 1
Received: 2
Received: 2
Received: 3
Received: 3
Received: 4
Received: 4
Received: 5
Received: 5
Received: 6
Received: 6
Received: 7
Received: 7
Received: 8
Received: 8
Received: 9
Received: 9
*/        

In this example, a channel is created with a capacity of 10 messages using the mpsc::channel function. The Sender and Receiver are stored in the sender and receiver variables, respectively.

A new thread is spawned that sends 10 messages to the channel with a 100 millisecond delay between each message. The sender is cloned and passed to the thread using the move keyword to transfer ownership of the Sender to the new thread.

In the main thread, the Receiver is used in a for loop to receive and process messages from the channel. The loop will block until 10 messages are received or the Sender is dropped.

Overall, mpsc provides a simple and effective way to implement message passing between multiple threads in Rust.

What is Rayon ?

Rayon is a data-parallelism library for Rust that provides a high-level interface for parallel programming. It is designed to make it easier to write efficient and scalable parallel code by abstracting away many of the low-level details of thread management and load balancing.

Rayon uses a technique called work stealing to distribute tasks across multiple threads. In this approach, each thread has its own deque of tasks to execute, and when a thread finishes its own tasks, it can steal tasks from other threads that still have work to do. This helps to balance the workload across all threads and avoid situations where some threads are idle while others are overloaded.

Rayon supports several different parallel programming patterns, including map-reduce, parallel iterators, and parallel loops. It also includes a number of built-in optimizations to help ensure that parallel code runs efficiently, such as automatic chunking of workloads, adaptive scheduling of tasks, and automatic detection of expensive computations that should not be parallelized.

Overall, Rayon is a powerful and flexible library that can greatly simplify the task of writing parallel code in Rust, while also providing high performance and scalability.

What is tokio :

Tokio is an asynchronous runtime for Rust that provides a runtime for executing asynchronous operations and coordinating work on a thread pool. It is designed to support highly concurrent and scalable applications. It allows developers to write asynchronous code that is easy to read and maintain.

With Tokio, you can write asynchronous code using futures, which are a way of representing an asynchronous computation. Tokio provides an implementation of futures, called Tokio Futures, that is optimized for use with the Tokio runtime. Tokio Futures also provide a set of combinators and operators that can be used to compose asynchronous operations into more complex computations.

Tokio also provides an implementation of the asynchronous networking protocol, called Tokio Net. Tokio Net includes an asynchronous TCP and UDP implementation, along with other network protocols. Tokio Net provides an API that is similar to the standard networking API in Rust, but is asynchronous and non-blocking.

To use Tokio, you need to spawn a Tokio runtime. The runtime provides an event loop that manages asynchronous tasks and dispatches I/O events. The event loop is responsible for scheduling tasks on the thread pool and handling I/O events as they occur.

To spawn a async task using Tokio, you first need to define an asynchronous function using the async keyword. This function can then be passed to the tokio::spawn function, which will create a new asynchronous task that will run the function on a thread pool managed by the Tokio runtime.

Tokio is a good choice for thread creation in Rust. Tokio is a popular asynchronous runtime for Rust, designed to enable high-throughput, low-latency applications. It is based on the concept of lightweight, non-blocking futures, which can be used to model both asynchronous I/O and task scheduling.

With Tokio, you can create async task using the tokio::spawn function, which creates a new task in the Tokio runtime. The function takes a closure that implements the Future trait, which represents an asynchronous computation that may complete in the future. This closure is executed in a separate thread managed by Tokio, allowing you to write concurrent, asynchronous code without dealing with low-level threading primitives.

In addition to async task creation, Tokio provides a rich set of tools and abstractions for working with asynchronous I/O, network programming, and more. It is widely used in the Rust ecosystem and has a large and active community, making it a reliable and well-maintained choice for concurrent programming in Rust.


Tutorial | Tokio - An asynchronous Rust runtime

Advantages of Rust Threads

Rust threads offer several advantages over other thread implementations, including:

  1. Memory Safety: Rust threads ensure memory safety and thread safety through the ownership model. This prevents race conditions and other thread-safety issues.
  2. Lightweight: Rust threads are lightweight and efficient, allowing a program to create many threads without using excessive resources.
  3. Easy to Use: Creating a new thread in Rust is simple and can be done using the thread::spawn function.
  4. Cross-Platform: Rust threads are cross-platform and can be used on any operating system that supports Rust.

Limitations of Rust Threads

While Rust threads offer many advantages, there are also some limitations to be aware of, including:

  1. No Garbage Collection: Rust does not have a garbage collector, which can make it more difficult to manage memory in threaded applications.
  2. No Automatic Deadlock Detection: Rust does not have automatic deadlock detection, which can make it more difficult to avoid deadlocks in threaded applications.
  3. No Thread Prioritization: Rust does not support thread prioritization, which can make it more difficult to control the order in which threads are executed.

Conclusion

Rust threads are a powerful and efficient way to implement concurrency in Rust programs. They offer many advantages over other thread implementations, including memory safety, lightweightness, and ease of use. However, there are also some limitations to be aware of, such as the lack of garbage collection and automatic deadlock detection. Overall, Rust threads are a great


Thanks for reading till end.

Good read : Rust: Multi threading | Emil Privér ( priver.dev )

Progress ochuKO Eyaadah

DevRel || Full Stack Blockchain Developer. || 10x hackathon ?? || #rust #solidity #Devrel #Soroban

3 个月

This is amazing and easy to understand. Thank you for this

要查看或添加评论,请登录

社区洞察

其他会员也浏览了