Atomics in Rust
Amit Nadiger
Polyglot(Rust??, Move, C++, C, Kotlin, Java) Blockchain, Polkadot, UTXO, Substrate, Sui, Aptos, Wasm, Proxy-wasm,AndroidTV, Dvb, STB, Linux, Cas, Engineering management.
Atomics in Rust are fundamental building blocks for achieving safe concurrent programming. They enable multiple threads to share and modify data without causing data races. Atomics are particularly important in scenarios where performance is critical, and using locking mechanisms (like Mutex or RwLock) could lead to undesirable contention and overhead.
In computer science, Atomic is used to describe an operation that is indivisible: it is either fully completed, or it didn’t happen yet.
What Are Atomics?
Atomics are low-level data types that provide lock-free, thread-safe operations. They allow threads to perform read-modify-write operations in a single, uninterruptible step, known as an atomic operation. Unlike traditional locks, which can cause a thread to sleep while waiting, atomics enable non-blocking behavior, allowing multiple threads to work concurrently without needing to coordinate through a locking mechanism.
Atomic operations are:
Atomic operations are the main building block for anything involving multiple threads. All the concurrency primitives, such as mutexes and condition variables, are implemented using atomic operations.
Atomic Types Overview
Rust provides several atomic types under the std::sync::atomic module:
Common Atomic Operations:
Each of the available atomic types has the same interface with methods for storing and loading, methods for atomic "fetch-and-modify" operations, and some more advanced "compare-and-exchange" methods.
Function signatures are as follows, using AtomicI32 as an example::
impl AtomicI32 {
pub fn fetch_add(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_sub(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_or(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_and(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_nand(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_xor(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_max(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_min(&self, v: i32, ordering: Ordering) -> i32;
pub fn swap(&self, v: i32, ordering: Ordering) -> i32; // "fetch_store"
}
fetch_add, fetch_sub, fetch_and, fetch_or, fetch_xor: Atomically performs arithmetic or bitwise operations and updates the value.
Example :
use std::sync::atomic::{AtomicUsize, Ordering};
fn main() {
let atomic_counter = AtomicUsize::new(1);
// Add 2 atomically
let old_val = atomic_counter.fetch_add(2, Ordering::SeqCst);
println!("Old Value: {}, New Value: {}", old_val, atomic_counter.load(Ordering::SeqCst));
}
The following are some commonly used atomic operations:
2. Load and Store:
use std::sync::atomic::{AtomicUsize, Ordering};
fn main() {
let atomic_val = AtomicUsize::new(5);
// Load the current value
let current_val = atomic_val.load(Ordering::Relaxed);
println!("Current Value: {}", current_val);
// Store a new value
atomic_val.store(10, Ordering::Relaxed);
println!("Updated Value: {}", atomic_val.load(Ordering::Relaxed));
}
/*
Current Value: 5
Updated Value: 10
*/
3. Compare-and-Swap (CAS):
compare_exchange(current, new, Ordering, Ordering): This is a more flexible version of CAS that allows specifying different memory orderings for success and failure.
use std::sync::atomic::{AtomicUsize, Ordering};
fn main() {
let atomic_val = AtomicUsize::new(5);
// Attempt to change the value from 5 to 10
// let result = atomic_val.compare_and_swap(5, 10, Ordering::SeqCst); // compare_and_swap is deprecated
let result = atomic_val.compare_exchange(5, 10, Ordering::SeqCst, Ordering::SeqCst).unwrap();
println!("Old Value: {}", result);
println!("New Value: {}", atomic_val.load(Ordering::SeqCst));
}
/*
Old Value: 5
New Value: 10
*/
If I use it like below see error is returned :
use std::sync::atomic::{AtomicUsize, Ordering};
fn main() {
let atomic_val = AtomicUsize::new(5);
// Attempt to change the value from 5 to 10
// let result = atomic_val.compare_and_swap(5, 10, Ordering::SeqCst); // compare_and_swap is deprecated
let result = atomic_val.compare_exchange(8, 10, Ordering::SeqCst, Ordering::SeqCst);
println!("Old Value: {:?}", result);
println!("New Value: {}", atomic_val.load(Ordering::SeqCst));
}
/*
Old Value: Err(5)
New Value: 5
*/
Note: compare_and_swap is deprecated, and the preferred method is compare_exchange:Compare-and-Exchange (Modern Replacement for CAS):
领英推荐
4. Swap:
swap(new, Ordering): Atomically replaces the current value with the new value and returns the old one.
use std::sync::atomic::{AtomicUsize, Ordering};
fn main() {
// Create an AtomicUsize with an initial value of 5.
let atomic_value = AtomicUsize::new(5);
// Swap the value to 10 and retrieve the old value.
let old_value = atomic_value.swap(10, Ordering::SeqCst);
println!("Old value: {}", old_value);
println!("New value: {}", atomic_value.load(Ordering::SeqCst));
}
/*
Old value: 5
New value: 10
*/
Swapping across multiple threads :
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::thread;
fn main() {
// Use Arc to enable sharing of the AtomicUsize between threads.
let shared_value = Arc::new(AtomicUsize::new(100));
let handles: Vec<_> = (0..5).map(|i| {
let shared_value = Arc::clone(&shared_value);
thread::spawn(move || {
// Each thread will swap a new value into the atomic variable.
let old = shared_value.swap(i, Ordering::SeqCst);
println!("Thread {} swapped out old value: {}", i, old);
})
}).collect();
for handle in handles {
handle.join().unwrap();
}
println!("Final value: {}", shared_value.load(Ordering::SeqCst));
}
/*
Thread 0 swapped out old value: 100
Thread 1 swapped out old value: 0
Thread 2 swapped out old value: 1
Thread 3 swapped out old value: 2
Thread 4 swapped out old value: 3
Final value: 4
*/
In the above code Vec<_> is a shorthand that allows the compiler to infer the type for the elements in the vector, rather than specifying the type explicitly.
Ex:
let handles: Vec<_> = vec![];
the represents an unspecified type, so Rust will infer the type based on context. Here, Vec<> essentially means "a Vec of some type, to be determined later." If you later add elements to handles (e.g., handles.push(some_value)), the compiler will deduce the correct type of the vector based on the type of some_value.
Memory Ordering
One of the key concepts in atomic operations is memory ordering. This determines how atomic operations on different threads are observed. Rust's atomic methods require a memory ordering argument, such as:
Example: Basic Atomic Counter
The following example demonstrates how to use AtomicUsize to create a basic atomic counter.
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::thread;
fn main() {
// Create an AtomicUsize counter wrapped in an Arc
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
// Spawn 10 threads, each incrementing the counter 10 times
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..10 {
// Increment the counter atomically
println!(" {}", counter_clone.load(Ordering::SeqCst));
counter_clone.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}
// Wait for all threads to finish
for handle in handles {
handle.join().unwrap();
}
// Print the final counter value
println!("Final counter value: {}", counter.load(Ordering::SeqCst));
}
/*
0
1
2
::::
97
98
99
Final counter value: 100
*/
Atomic vs Mutex/RefCell/Cell
Use Cases
Below example shows that we are process 100 items one by one on a background thread, while the main thread gives the user regular updates on the progress:
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
fn process_item(item: usize) {
// Simulate processing time
println!("Processing item {item}");
thread::sleep(Duration::from_millis(100));
}
fn main() {
let num_done = AtomicUsize::new(0);
thread::scope(|s| {
// A background thread to process all 100 items.
s.spawn(|| {
for i in 0..100 {
process_item(i); // Assuming this takes some time.
num_done.store(i + 1, Ordering::Relaxed);
}
});
// The main thread shows status updates, every second.
loop {
let n = num_done.load(Ordering::Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
/*
Working.. 0/100 done
Processing item 0
Processing item 1
Processing item 2
Processing item 3
Processing item 4
Working.. 4/100 done
Processing item 5
Processing item 6
Processing item 7
Processing item 8
Processing item 9
Working.. 9/100 done
::::
::::
Processing item 98
Processing item 99
Working.. 99/100 done
Done!
*/
Every time the background thread finishes processing an item, it stores the number of processed items in an AtomicUsize. Meanwhile, the main thread shows that number to the user to inform them of the progress, about once per half second. Once the main thread sees that all 100 items have been processed, it exits the scope, which implicitly joins the background thread, and informs the user that everything is done.
There is a problem with above design :
Once the last item is processed, it might take up to half second(or predefined time) for the main thread to know, introducing an unnecessary delay at the end. To solve this, we can use thread parking to wake the main thread from its sleep whenever there is new information it might be interested in.
Improved example where syncronization is achived:
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
fn process_item(item: usize) {
// Simulate processing time
println!("Processing item {item}");
thread::sleep(Duration::from_millis(100));
}
fn main() {
let num_done = AtomicUsize::new(0);
let main_thread = thread::current();
thread::scope(|s| {
// A background thread to process all 100 items.
s.spawn(|| {
for i in 0..100 {
process_item(i); // Assuming this takes some time.
num_done.store(i + 1, Ordering::Relaxed);
main_thread.unpark(); // Wake up the main thread.
}
});
// The main thread shows status updates.
loop {
let n = num_done.load(Ordering::Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::park_timeout(Duration::from_secs(1));
}
});
println!("Done!");
}
/*
Working.. 0/100 done
Processing item 0
Processing item 1
Working.. 1/100 done
Processing item 2
Working.. 2/100 done
Processing item 3
Working.. 3/100 done
Processing item 4
::::
::::
Processing item 98
Working.. 98/100 done
Processing item 99
Working.. 99/100 done
Done!
*/
Practical Use Cases for Atomics
Example: Spinlock Using Atomics
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
struct Spinlock {
locked: AtomicBool,
}
impl Spinlock {
fn new() -> Self {
Spinlock {
locked: AtomicBool::new(false),
}
}
fn lock(&self) {
// Attempt to acquire the lock by setting `locked` to `true`.
// If `locked` was `false`, it will change to `true` and proceed.
// If `locked` was `true`, it will keep retrying until it can acquire the lock.
while self.locked.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {}
}
fn unlock(&self) {
// Release the lock by setting `locked` back to `false`.
self.locked.store(false, Ordering::Release);
}
}
fn main() {
let spinlock = Spinlock::new();
let handle = thread::spawn(move || {
spinlock.lock();
println!("Locked by thread");
thread::sleep(Duration::from_secs(2));
spinlock.unlock();
println!("Unlocked by thread");
});
handle.join().unwrap();
}
/*
Locked by thread
Unlocked by thread
*/