Frequently used Thread API - Random notes

Thread Creation and Management:

  • thread::spawn: Creates a new thread and executes a closure within it. It returns a JoinHandle, which can be used to wait for the thread to finish and retrieve its result.

use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        println!("Hello from another thread!");
    });

    println!("Hello from the main thread!");

    handle.join().unwrap();
}        

  • thread::join : Waits for the associated thread to finish and returns a result(std::thread::Result) indicating if it completed successfully. If the thread did not successfully finish its function because it panicked, this will contain the panic message.Use this api when you need to wait for a thread to complete before continuing execution in the main thread or another thread.

use std::thread;

let handle = thread::spawn(|| {
    println!("Thread doing work...");
});

handle.join().unwrap(); // Waits for the thread to finish.

        

  • Transferring the ownership of local variables to thread & returning result from thread: ownership of numbers is transferred to the newly spawned thread, since we used a move closure. If we had not used the move keyword, the closure would have captured numbers by reference. This would have resulted in a compiler error, since the new thread might outlive that variable.

use std::thread;

fn main() {
   let numbers = vec![1, 2, 3,4,5,6,7,8,9,10];
    let handle = thread::spawn(move || {
        let len = numbers.len();
       let sum = numbers.iter().sum::<usize>();
      sum / len // (1) 
   });

    let average = handle.join().unwrap(); // (2)
    println!("From main thread average: {average}");
}
//From main thread average: 5        

Here, the value returned by the thread’s closure (1) is sent back to the main thread through the join method (2).

  • thread::scope: It allows us to spawn threads that cannot outlive the scope of the closure we pass to that function, making it possible to safely borrow local variables. It creates a thread pool and executes closures within it. It provides a more structured way to manage multiple threads and ensures that all threads are joined before the scope ends.

use std::thread;

fn main() {
    thread::scope(|scope| {
        for i in 0..5 {
            scope.spawn(move || {
                println!("Hello from thread {}", i);
            });
        }
    }); // Here scope ends, all threads that haven’t been joined yet are automatically joined.
}        

This pattern guarantees that none of the threads spawned in the scope can outlive the scope. Because of that, this scoped spawn method does not have a 'static bound on its argument type, allowing us to reference anything as long as it outlives the scope, such as numbers.

In the example above, both of the new threads are concurrently accessing numbers. This is fine, because neither of them (nor the main thread) modifies it. If we were to change the first thread to modify numbers, the compiler wouldn’t allow us to spawn another thread that also uses numbers:


  • thread::current: Returns a reference to the current thread. It can be useful for accessing thread-specific data or performing thread-related operations.

use std::thread;

fn main() {
    let id = thread::current().id();
    println!("Thread ID: {}", id);
}        

  • thread::sleep: Pauses the current thread for a specified duration. It can be used to introduce delays or control the execution flow.

use std::thread;
use std::time::Duration;

fn main() {
    thread::sleep(Duration::from_secs(2));
    println!("Waited for 2 seconds");
}        

  • thread::yield_now: Yields the CPU to another thread, allowing other threads to run. It can be useful for load balancing or preventing a thread from monopolizing the CPU.

use std::thread;

fn main() {
    for _ in 0..1000 {
        thread::yield_now();
        // Do some work
    }
}        

  • thread::park: Blocks the current thread until it is unparked by another thread or a timeout occurs. It can be used for synchronization or waiting for events.
  • thread::unpark: Unparks a parked thread, allowing it to resume execution. It can be used in conjunction with thread::park for synchronization or signaling events.

use std::thread;

fn main() {
    let parked_thread = thread::spawn(|| {
        println!("Thread is parking...");
        thread::park();
        println!("Thread is unparked!");
    });

    // Unpark the thread after it parks
    thread::sleep(std::time::Duration::from_secs(1));
    parked_thread.thread().unpark();

    parked_thread.join().unwrap();
}        


  • thread::park_timeout:Parks the current thread for a specified duration. If the thread is unparked before the timeout, it will resume immediately. Please use this api when you want to pause a thread but have it automatically resume after a set period or if it's explicitly unparked earlier.

use std::thread;
use std::time::Duration;

let handle = thread::spawn(|| {
    println!("Thread parking for 2 seconds...");
    thread::park_timeout(Duration::from_secs(2));
    println!("Thread resumed after timeout or unparked.");
});

handle.thread().unpark(); // Optionally, this could unpark the thread before the timeout.
handle.join().unwrap();
        

  • thread::available_parallelism: Returns the number of threads that the system can run in parallel. This is typically the number of CPU cores. Use it to determine how many threads you can spawn efficiently, especially for CPU-bound tasks.

use std::thread;

if let Ok(parallelism) = thread::available_parallelism() {
    println!("Available parallelism: {}", parallelism.get());
}        

  • thread::JoinHandle: A handle that represents a thread spawned using thread::spawn. It can be used to join (wait for) the thread.

use std::thread;

let handle = thread::spawn(|| {
    println!("Doing some work...");
});

// Wait for the thread to finish
handle.join().unwrap();
        

  • thread::BuilderUsed to configure and build a new thread, such as giving it a name or setting its stack size,name.

use std::thread;

let handle = thread::Builder::new()
    .name("worker_thread".to_string())
    .spawn(|| {
        println!("Named thread running");
    })
    .unwrap();

handle.join().unwrap();        

  • thread name: thread::current().name() , Use this api when you want to name threads for easier debugging or tracking.

use std::thread;

let handle = thread::spawn(|| {
    let thread = thread::current();
    println!("Thread name: {:?}", thread.name());
});

handle.join().unwrap();        

  • Thread::Builder::stack_size: Allows setting a custom stack size when creating a thread. Configures the stack size of a new thread. Useful when dealing with recursive functions or deep call stacks that require more memory than usual

use std::thread;

let handle = thread::Builder::new()
    .stack_size(4 * 1024 * 1024) // 4 MB
    .spawn(|| {
        println!("Thread with custom stack size");
    })
    .unwrap();

handle.join().unwrap();        

Thread Communication:

  • Channels in Rust : Creates a channel for communicating between threads. It consists of a sender and a receiver. The sender can send messages to the receiver, and the receiver can receive messages from the sender.

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel::<i32>();

    thread::spawn(move || {
        tx.send(101).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Received: {}", received);
}        

  • sync::mpsc: Provides a higher-level API for creating message channels. It offers additional features like bounded channels and try-send/try-recv operations. Channels in Rust
  • sync::Mutex: Creates a mutual exclusion lock that protects shared data. Only one thread can hold the lock at a time, ensuring exclusive access to the data.

use std::sync::Mutex;
use std::sync::Arc;
use std::thread;
fn main() {
    let counter = Arc::new(Mutex::new(0));

    let handles: Vec<_> = (0..10).map(|_| {
        let data = Arc::clone(&counter);
        thread::spawn(move || {
            let mut num = data.lock().unwrap();
            *num += 1;
        })
    }).collect();

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Counter: {}", counter.lock().unwrap());
}        

  • sync::RwLock: Creates a reader-writer lock that allows multiple readers to access shared data simultaneously but only one writer at a time.

use std::sync::RwLock;

fn main() {
    let lock = RwLock::new(0);

    // Acquire a read lock
    {
        let reader = lock.read().unwrap();
        println!("Read data: {}", *reader);
        // The read lock is automatically released at the end of this block
    }

    // Acquire a write lock
    {
        let mut writer = lock.write().unwrap();
        *writer += 10;
        println!("Modified data: {}", *writer);
        // The write lock is automatically released at the end of this block
    }
}
/*
Read data: 0
Modified data: 10
*/        


sync::Condvar:

  • Creates a condition variable that can be used to signal threads to wake up when a certain condition is met. It often works in conjunction with a mutex or RwLock.
  • Condition variables are a more commonly used option for waiting for something to happen to data protected by a mutex. They have two basic operations: wait and notify. Threads can wait on a condition variable, after which they can be woken up when another thread notifies that same condition variable. Multiple threads can wait on the same condition variable, and notifications can either be sent to one waiting thread, or to all of them.
  • Condition variables in rust has two notify functions: notify_one to wake up just one waiting thread (if any), and notify_all to wake them all up.

use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::Duration;
use std::thread;
use std::sync::Condvar; // For con variables 

fn main() {
    let queue = Mutex::new(VecDeque::new());
    let not_empty = Condvar::new();
    
    thread::scope(|s| {
        s.spawn(|| {
            loop {
                let mut q = queue.lock().unwrap();
                let item = loop {
                    if let Some(item) = q.pop_front() {
                        break item;
                    } else {
                        q = not_empty.wait(q).unwrap();
                    }
                };
                drop(q);
                dbg!(item);
            }
        });
    
        for i in 0.. {
            queue.lock().unwrap().push_back(i);
            not_empty.notify_one();
            thread::sleep(Duration::from_secs(1));
        }
    });
}        

Thread Safety and Synchronization:

  • Atomic types: Provide atomic operations for primitive types, ensuring that operations on these types are performed in a thread-safe manner.
  • Cell and RefCell: Provide interior mutability, allowing mutable data to be stored within immutable structures. They can be used to avoid unnecessary copying or cloning of data.


Barrier:

  • Creates a barrier that blocks multiple threads until all threads have reached the barrier. It can be used for synchronization or coordinating the execution of multiple threads.
  • A Barrier allows multiple threads to synchronize at a certain point. When a thread reaches the barrier, it will wait until a predefined number of threads have also reached the barrier. Once all threads have reached, they are all allowed to proceed.

use std::sync::{Arc, Barrier};
use std::thread;

fn main() {
    let barrier = Arc::new(Barrier::new(3));

    for i in 0..3 {
        let c = Arc::clone(&barrier);
        thread::spawn(move || {
            println!("Thread {} waiting at the barrier...", i);
            c.wait();
            println!("Thread {} has crossed the barrier!", i);
        });
    }

    // Let threads complete
    thread::sleep(std::time::Duration::from_secs(1));
}
/*
Thread 0 waiting at the barrier...
Thread 1 waiting at the barrier...
Thread 2 waiting at the barrier...
Thread 2 has crossed the barrier!
Thread 0 has crossed the barrier!
Thread 1 has crossed the barrier!
*/        

  • Semaphore: Rust doesn't have a built-in thread semaphore, but we can use tokio::sync::Semaphore if we are working with async code or create a custom one using Mutex and Condvar.


use tokio::sync::Semaphore;
use std::sync::Arc;
use tokio::task;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));

    for i in 0..5 {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        task::spawn(async move {
            println!("Task {} started", i);
            drop(permit); // Release permit when done
        });
    }
}
/*
Task 1 started
Task 2 started
Task 0 started
Task 3 started
*/        

Thread Pooling and Task Management

scoped_threadpool:

Link : scoped_threadpool - Rust

scoped_threadpool create allows you to create a scoped thread pool where threads can borrow data from the parent thread. It ensures that all threads complete before the function exits, thus preventing data races.

This(scoped_threadpool) crate provides a stable, safe and scoped threadpool.

It can be used to execute a number of short-lived jobs in parallel without the need to respawn the underlying threads.

Jobs are runnable by borrowing the pool for a given scope, during which an arbitrary number of them can be executed. These jobs can access data of any lifetime outside of the pools scope, which allows working on non-'static references in parallel.

Example : cargo add scoped_threadpool

[dependencies]
scoped_threadpool = "0.1.9"        

Code:

extern crate scoped_threadpool;
use scoped_threadpool::Pool;

fn main() {
    let mut pool = Pool::new(4);
    let mut data = vec![1, 2, 3, 4];
    
// Use the threads as scoped threads that can
// reference anything outside this closure
    pool.scoped(|scope| {
  
// Create references to each element in the vector ...
        for item in &mut data {
            scope.execute(move || {

      // ... and add 1 to it in a seperate thread
                *item += 1; // Safe because of scoping
            });
        }
    });

    println!("{:?}", data); // Outputs: [2, 3, 4, 5]
}        

Thread-Local Storage

thread_local!:

  • A macro for creating thread-local variables, which are variables that have unique values for each thread. This can be useful for storing thread-specific data.
  • thread_local! allows you to create data that is local to each thread, and each thread will get its own separate instance of the data.

use std::cell::RefCell;
use std::thread;

thread_local! {
    static THREAD_LOCAL_DATA: RefCell<u32> = RefCell::new(1);
}

fn main() {
    THREAD_LOCAL_DATA.with(|data| {
        *data.borrow_mut() = 5;
    });

    thread::spawn(|| {
        THREAD_LOCAL_DATA.with(|data| {
            *data.borrow_mut() += 1; // This will not affect other threads
            println!("Thread local value: {}", *data.borrow());
        });
    }).join().unwrap();

    THREAD_LOCAL_DATA.with(|data| {
        println!("Main thread local value: {}", *data.borrow());
    });
}

/*
Thread local value: 2
Main thread local value: 5
*/        

Thread Affinity

thread::set_affinity:

  • Sets the affinity of a thread to a specific CPU core. This can be useful for performance optimization or debugging purposes.

Thread Priority

  • thread::set_priority: Sets the priority of a thread. Higher priority threads are more likely to be scheduled before lower priority threads.

  • Rust does not directly support setting thread priority or CPU affinity natively. You would typically use external crates like thread-priority or make OS-specific calls (using libc for Unix or winapi for Windows).

Example (Setting Priority with thread-priority crate):

add thread_priority to your cargo.toml.

cargo add thread_priority

use thread_priority::{set_current_thread_priority, ThreadPriority};

fn main() {
    if set_current_thread_priority(ThreadPriority::Max).is_ok() {
        println!("Thread priority set to maximum.");
    } else {
        println!("Failed to set thread priority.");
    }
}        


Thread Profiling and Debugging

  • perf: A command-line tool that can be used to profile Rust applications and measure their performance.
  • gdb: A GNU debugger that can be used to debug Rust applications, including multi-threaded applications.

Miscellaneous

thread::panic:

  • Panics the current thread, causing it to terminate and potentially triggering a panic in other threads.
  • If a thread panics, it will stop execution. You can catch the panic using std::panic::catch_unwind.

use std::panic;
use std::thread;

fn main() {
    let result = panic::catch_unwind(|| {
        thread::spawn(|| {
            panic!("This thread panicked!");
        }).join().unwrap();
    });

    match result {
        Ok(_) => println!("Thread completed successfully."),
        Err(_) => println!("Thread panicked."),
    }
}

/*
thread '<unnamed>' panicked at 'This thread panicked!', /tmp/6ybPZgtgpg/main.rs:7:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any { .. }', /tmp/6ybPZgtgpg/main.rs:8:19
Thread panicked.
*/        

Note: The availability and functionality of some of these APIs may depend on the specific operating system and Rust version we are using.


Referance : Rust Atomics and Locks — Chapter 1. Basics of Rust Concurrency

要查看或添加评论,请登录

Amit Nadiger的更多文章

  • Rust Stream

    Rust Stream

    In Rust, streams are a core part of asynchronous programming, commonly used to handle sequences of values produced…

  • Atomics in Rust

    Atomics in Rust

    Atomics in Rust are fundamental building blocks for achieving safe concurrent programming. They enable multiple threads…

  • Difference b/w Cell and RefCell

    Difference b/w Cell and RefCell

    Both Cell and RefCell are used in Rust to introduce interior mutability within immutable data structures, which means…

  • Tokio::spawn() in depth

    Tokio::spawn() in depth

    Tokio::spawn() is a function provided by the Tokio runtime that allows you to create a new concurrent task. Unlike…

  • tokio::spawn() Vs Async block Vs Async func

    tokio::spawn() Vs Async block Vs Async func

    Asynchronous programming is a powerful paradigm for handling I/O-bound operations efficiently. Rust provides several…

  • Tokio Async APIS - Random notes

    Tokio Async APIS - Random notes

    In this article, we will explore how to effectively use Tokio and the Futures crate for asynchronous programming in…

  • Reactor and Executors in Async programming

    Reactor and Executors in Async programming

    In asynchronous (async) programming, Reactor and Executor are two crucial components responsible for managing the…

  • Safe Integer Arithmetic in Rust

    Safe Integer Arithmetic in Rust

    Rust, as a systems programming language, emphasizes safety and performance. One critical aspect of system programming…

  • iter() vs into_iter()

    iter() vs into_iter()

    In Rust, iter() and into_iter() are methods used to create iterators over collections, but they have distinct…

  • Zero-cost abstraction in Rust

    Zero-cost abstraction in Rust

    Rust supports zero-cost abstractions by ensuring that high-level abstractions provided by the language and standard…