Concurrent Utilities

Concurrent Collections

15 min Lesson 7 of 13

Concurrent Collections

Java's standard collections — HashMap, ArrayList, and LinkedList — are not thread-safe. When multiple threads read and write them simultaneously without external synchronisation you get corrupted state, infinite loops, or silent data loss. The java.util.concurrent package ships a set of purpose-built collections that handle concurrency internally, without requiring callers to hold locks. This lesson covers the three you will reach for most often: ConcurrentHashMap, CopyOnWriteArrayList, and the BlockingQueue family.

ConcurrentHashMap

ConcurrentHashMap is a thread-safe hash map that achieves high concurrency by dividing the underlying table into independent segments (or, since Java 8, using CAS operations on individual buckets). Reads never block. Writes lock only the specific bucket being modified, so threads operating on different keys run in parallel with no contention.

import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public class WordCounter { private final ConcurrentHashMap<String, AtomicInteger> counts = new ConcurrentHashMap<>(); // Safe to call from any thread simultaneously public void record(String word) { counts.computeIfAbsent(word, k -> new AtomicInteger()) .incrementAndGet(); } public int count(String word) { AtomicInteger val = counts.get(word); return val == null ? 0 : val.get(); } }

The key design point above is computeIfAbsent. This is an atomic operation: the map inserts the new AtomicInteger only if the key is absent, and it does so without any external lock. Calling get followed by put separately would create a race condition — use the atomic compound operations (putIfAbsent, computeIfAbsent, compute, merge) instead.

Common pitfall — compound check-then-act: Code such as if (!map.containsKey(k)) map.put(k, v) is NOT thread-safe even on a ConcurrentHashMap. Another thread can insert the key between the check and the put. Always use putIfAbsent or computeIfAbsent for atomic conditional inserts.

ConcurrentHashMap also exposes aggregation methods that traverse the map with configurable parallelism:

ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>(); scores.put("Alice", 95); scores.put("Bob", 82); scores.put("Carol", 91); // Parallel reduce — parallelism threshold of 1 means use all cores int total = scores.reduceValues(1, Integer::sum); System.out.println("Total: " + total); // 268
Size accuracy: size() on a ConcurrentHashMap is an approximation under concurrent modification. Prefer mappingCount() when you need a long and the map could grow past Integer.MAX_VALUE. For precise counts use an external AtomicLong counter.

CopyOnWriteArrayList

CopyOnWriteArrayList takes the opposite trade-off from ConcurrentHashMap: it is optimised for workloads where reads vastly outnumber writes. Every mutating operation (add, set, remove) creates a fresh copy of the entire backing array, performs the change on the copy, and then atomically replaces the reference. Readers never block and never see partial writes.

import java.util.concurrent.CopyOnWriteArrayList; public class EventListeners { private final CopyOnWriteArrayList<Runnable> listeners = new CopyOnWriteArrayList<>(); public void addListener(Runnable r) { listeners.add(r); // copies the array } public void removeListener(Runnable r) { listeners.remove(r); // copies the array } // Can be called from many threads simultaneously — zero blocking public void fireAll() { for (Runnable r : listeners) { // snapshot iteration — never throws ConcurrentModificationException r.run(); } } }

The iterator returned by CopyOnWriteArrayList operates on the snapshot captured at the moment iterator() was called. If another thread adds an element after iteration started, the current iterator will not see it — and that is intentional and safe.

When to use CopyOnWriteArrayList: event listener registries, plugin registries, observer patterns — any list that is registered once or rarely but iterated many times (e.g., on every incoming request). Avoid it when writes are frequent: the repeated full-array copy makes writes O(n) in both time and memory.

BlockingQueue

BlockingQueue is the foundation of producer-consumer patterns in Java. It extends Queue with operations that block the calling thread when the queue is empty (on take) or full (on put). This eliminates the need to write manual wait/notifyAll loops.

The interface has four categories of operations depending on how they handle capacity limits:

  • Throws exceptionadd, remove, element
  • Returns special valueoffer, poll, peek
  • Blocks indefinitelyput, take
  • Times outoffer(e, timeout, unit), poll(timeout, unit)

The most common implementation is LinkedBlockingQueue. For bounded work-queues use ArrayBlockingQueue:

import java.util.concurrent.*; public class ProducerConsumerDemo { public static void main(String[] args) throws InterruptedException { // Bounded queue — producer blocks when full, consumer blocks when empty BlockingQueue<String> queue = new ArrayBlockingQueue<>(10); // Producer thread Thread producer = new Thread(() -> { String[] items = {"task-1", "task-2", "task-3", "POISON"}; for (String item : items) { try { queue.put(item); // blocks if queue is full System.out.println("Produced: " + item); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }); // Consumer thread Thread consumer = new Thread(() -> { while (true) { try { String item = queue.take(); // blocks if queue is empty if ("POISON".equals(item)) { System.out.println("Consumer done."); return; } System.out.println("Consumed: " + item); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }); producer.start(); consumer.start(); producer.join(); consumer.join(); } }

The poison pill pattern above — sending a sentinel value to signal completion — is the cleanest way to shut down a consumer loop without using volatile flags or interrupts.

Other notable BlockingQueue implementations:

  • PriorityBlockingQueue — unbounded, orders elements by natural order or a Comparator. Tasks with higher priority are consumed first.
  • SynchronousQueue — zero capacity; every put blocks until a take is ready. Used by the cached thread pool factory internally to hand off tasks directly.
  • DelayQueue — elements become available only after a per-element delay expires. Useful for scheduled retries and TTL caches.
  • LinkedTransferQueue — combines features of SynchronousQueue and LinkedBlockingQueue; offers lower latency in high-throughput scenarios.
Choosing between BlockingQueue implementations: Use ArrayBlockingQueue when you need a hard back-pressure limit (bounded). Use LinkedBlockingQueue (optionally bounded) when you expect bursty producers. Use PriorityBlockingQueue when tasks have varying urgency. Never use an unbounded queue for work tasks in a real system — an unbounded queue absorbs back-pressure invisibly until the process runs out of memory.

Summary

The concurrent collections remove the need for coarse-grained synchronized blocks around standard collections. ConcurrentHashMap is the default thread-safe map: use its atomic compound operations and never do your own check-then-act. CopyOnWriteArrayList shines for read-heavy listener lists but is expensive to mutate. BlockingQueue is the backbone of producer-consumer pipelines, providing built-in back-pressure and clean handoff semantics. Choosing the right concurrent collection is as important as choosing the right algorithm — each one encodes a specific concurrency contract and performance trade-off.