Mastering Multithreading in Java: Part 10 – Understanding Concurrent Collections
In the world of multithreading, ensuring safe and efficient access to shared resources is critical. While we’ve previously discussed mechanisms like locks, synchronization, and condition variables to manage thread access to shared data, Java provides an even higher-level solution: concurrent collections. These collections are specifically designed to handle concurrent access, allowing multiple threads to work on shared data without complex synchronization logic.
In this article, we will explore the power of concurrent collections, how they differ from regular collections, and how they can simplify multithreaded applications. We’ll cover key classes such as ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue, and others, with practical examples to demonstrate how they work.
The Problem with Regular Collections in a Concurrent Environment
Before we dive into concurrent collections, let’s revisit why regular collections fall short in multithreading scenarios. Java’s standard collections, like ArrayList, HashMap, and LinkedList, are not thread-safe. If multiple threads access these collections without proper synchronization, it can lead to issues like:
To address these challenges, Java provides concurrent collections that are designed to manage safe access to shared data across multiple threads without the risk of corruption or exceptions.
What Are Concurrent Collections?
Concurrent collections are part of the java.util.concurrent package and offer thread-safe operations for common data structures. Unlike regular collections, concurrent collections allow multiple threads to read and modify them concurrently without the need for explicit synchronization. This is done by applying efficient internal mechanisms like lock stripping or copy-on-write strategies, which ensure optimal performance even in multithreaded environments.
Let’s explore some of the most widely used concurrent collections in Java.
ConcurrentHashMap
One of the most well-known concurrent collections, ConcurrentHashMap, allows thread-safe operations on a hash map without locking the entire map for every operation.
How It Works:
Example:
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("A", 1);
map.put("B", 2);
System.out.println("Value for A: " + map.get("A"));
}
}
Use Cases:
CopyOnWriteArrayList
CopyOnWriteArrayList is a thread-safe variant of ArrayList, designed for scenarios where reads greatly outnumber writes.
How It Works:
Example:
public class CopyOnWriteArrayListExample {
public static void main(String[] args) {
List<String> list = new CopyOnWriteArrayList<>();
list.add("Apple");
list.add("Banana")
for (String fruit : list) {
System.out.println(fruit);
list.add("Orange");
}
System.out.println(list);
}
}
Use Cases:
BlockingQueue
BlockingQueue is an interface that provides thread-safe, blocking operations for inserting and retrieving elements. This is particularly useful for producer-consumer patterns.
How It Works:
Example (Producer-Consumer):
public class ProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// Producer thread
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer thread
new Thread(() -> {
try {
while (true) {
int value = queue.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
Use Cases:
ConcurrentLinkedQueue
ConcurrentLinkedQueue is an unbounded thread-safe queue based on a linked node structure, offering non-blocking operations for both insertion and retrieval.
How It Works:
Example:
public class ConcurrentLinkedQueueExample {
public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<>();
queue.add("Task1");
queue.add("Task2");
System.out.println(queue.poll());
System.out.println(queue.poll());
}
}
Use Cases:
ConcurrentSkipListMap
ConcurrentSkipListMap is a thread-safe, sorted map implementation that guarantees log(n) time complexity for most operations. It is part of the java.util.concurrent package and offers a non-blocking way to maintain a sorted map.
How It Works:
Example:
public class ConcurrentSkipListMapExample {
public static void main(String[] args) {
ConcurrentSkipListMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();
skipListMap.put(3, "Three");
skipListMap.put(1, "One");
skipListMap.put(2, "Two");
// Concurrent access
System.out.println(skipListMap);
}
}
Use Cases:
领英推荐
ConcurrentSkipListSet
ConcurrentSkipListSet is a thread-safe, sorted set based on a skip list. It offers log(n) time complexity for common operations, like searching, adding, and removing elements, and ensures thread safety without external synchronization.
How It Works:
Example:
public class ConcurrentSkipListSetExample {
public static void main(String[] args) {
ConcurrentSkipListSet<Integer> skipListSet = new ConcurrentSkipListSet<>();
skipListSet.add(3);
skipListSet.add(1);
skipListSet.add(2);
for (Integer value : skipListSet) {
System.out.println(value);
}
}
}
Use Cases:
LinkedBlockingQueue
LinkedBlockingQueue is a variant of the BlockingQueue that uses a linked node structure internally. It supports both bounded and unbounded queues and provides blocking behavior for both producers and consumers.
How It Works:
Example:
public class LinkedBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
// Producer thread
new Thread(() -> {
try {
queue.put(1);
queue.put(2);
System.out.println("Produced: " + 1 + " and " + 2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer thread
new Thread(() -> {
try {
System.out.println("Consumed: " + queue.take());
System.out.println("Consumed: " + queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
Use Cases:
SynchronousQueue
SynchronousQueue is a special type of blocking queue where each insert operation must wait for a corresponding remove operation by another thread, and vice versa. It does not store elements but instead directly transfers them between producer and consumer threads.
How It Works:
Example:
public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
// Producer thread
new Thread(() -> {
try {
queue.put(1);
System.out.println("Produced: " + 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer thread
new Thread(() -> {
try {
System.out.println("Consumed: " + queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
Use Cases:
DelayQueue
DelayQueue is a specialized implementation of BlockingQueue where elements can only be retrieved after a given delay has expired. The elements must implement the Delayed interface, which defines the delay logic.
How It Works:
Example:
public class DelayQueueExample {
static class DelayedElement implements Delayed {
private final long delayTime;
private final long creationTime;
public DelayedElement(long delayTime) {
this.delayTime = delayTime;
this.creationTime = System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
long diff = creationTime + delayTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayedElement{" + "delayTime=" + delayTime + '}';
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedElement> queue = new DelayQueue<>();
queue.put(new DelayedElement(5000));
queue.put(new DelayedElement(2000));
while (!queue.isEmpty()) {
DelayedElement element = queue.take();
System.out.println("Consumed: " + element);
}
}
}
Use Cases:
PriorityBlockingQueue
PriorityBlockingQueue is a thread-safe, unbounded blocking queue that orders its elements based on their natural ordering or a Comparator provided at queue creation. It extends BlockingQueue, but unlike other blocking queues, it does not enforce any specific capacity limits.
How It Works:
Example:
public class PriorityBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.put(5);
queue.put(1);
queue.put(3);
while (!queue.isEmpty()) {
System.out.println("Consumed: " + queue.take());
}
}
}
Use Cases:
Choosing the Right Concurrent Collection
When deciding which concurrent collection to use, consider the following:
Conclusion
Java’s concurrent collections significantly simplify multithreaded programming by abstracting the complex synchronization required to manage shared resources. Whether you’re working with maps, lists, or queues, these thread-safe collections allow multiple threads to interact efficiently without the risk of data corruption, exceptions, or performance bottlenecks.
Previously Covered Topics in This Series: