implement-your-own blocking queue in java
Categories:
Implementing Your Own Blocking Queue in Java

Learn how to build a custom blocking queue in Java using wait()
, notifyAll()
, and ReentrantLock
with Condition
for robust multithreaded communication.
In concurrent programming, a blocking queue is a crucial data structure that facilitates communication and synchronization between producer and consumer threads. Unlike a regular queue, a blocking queue will block a thread trying to add an element to a full queue or retrieve an element from an empty queue, until space becomes available or an element is present, respectively. Java's java.util.concurrent
package provides excellent implementations like ArrayBlockingQueue
and LinkedBlockingQueue
. However, understanding how to implement one from scratch offers deep insights into multithreading primitives like wait()
, notifyAll()
, synchronized
, and ReentrantLock
with Condition
.
Understanding the Core Concepts
A blocking queue needs to handle two primary scenarios:
- Producer attempts to add to a full queue: The producer thread must wait until space becomes available.
- Consumer attempts to take from an empty queue: The consumer thread must wait until an element is available.
To achieve this, we rely on Java's built-in synchronization mechanisms. The Object.wait()
and Object.notifyAll()
methods, used in conjunction with synchronized
blocks, provide a fundamental way to achieve inter-thread communication. Alternatively, the java.util.concurrent.locks.ReentrantLock
and java.util.concurrent.locks.Condition
interfaces offer more fine-grained control over locking and waiting.
flowchart TD Producer[Producer Thread] Consumer[Consumer Thread] Queue[Blocking Queue] Producer -- "add(item)" --> Queue Queue -- "take()" --> Consumer subgraph Producer Logic P1[Check if Queue is Full] P2{Is Full?} P3["Wait (on 'notFull' condition)"] P4[Add Item] P5["Notify (on 'notEmpty' condition)"] end subgraph Consumer Logic C1[Check if Queue is Empty] C2{Is Empty?} C3["Wait (on 'notEmpty' condition)"] C4[Take Item] C5["Notify (on 'notFull' condition)"] end Producer --> P1 P1 --> P2 P2 -- Yes --> P3 P3 --> P1 P2 -- No --> P4 P4 --> P5 P5 --> Producer Consumer --> C1 C1 --> C2 C2 -- Yes --> C3 C3 --> C1 C2 -- No --> C4 C4 --> C5 C5 --> Consumer
Flowchart of Producer-Consumer interaction with a Blocking Queue
Implementation with synchronized
, wait()
, and notifyAll()
This approach uses the intrinsic lock of an object and its associated wait()
and notifyAll()
methods. The synchronized
keyword ensures that only one thread can execute the critical section at a time, preventing race conditions. wait()
releases the lock and puts the thread into a waiting state, while notifyAll()
wakes up all waiting threads on that object.
import java.util.LinkedList;
import java.util.Queue;
public class SimpleBlockingQueue<T> {
private final Queue<T> queue;
private final int capacity;
public SimpleBlockingQueue(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
public synchronized void put(T element) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // Queue is full, wait for space
}
queue.add(element);
notifyAll(); // Element added, notify waiting consumers
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // Queue is empty, wait for elements
}
T element = queue.remove();
notifyAll(); // Element removed, notify waiting producers
return element;
}
public synchronized int size() {
return queue.size();
}
public static void main(String[] args) {
SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<>(5);
Runnable producer = () -> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
System.out.println("Produced: " + i + ", Queue size: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Runnable consumer = () -> {
for (int i = 0; i < 10; i++) {
try {
Integer element = queue.take();
System.out.println("Consumed: " + element + ", Queue size: " + queue.size());
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
new Thread(producer).start();
new Thread(consumer).start();
}
}
A basic blocking queue implementation using synchronized
, wait()
, and notifyAll()
.
while
loops instead of if
statements when calling wait()
. This protects against spurious wakeups and ensures the condition is truly met before proceeding, as a thread might wake up without being notified or the condition might have changed since notification.Implementation with ReentrantLock
and Condition
The java.util.concurrent.locks
package provides more advanced and flexible synchronization primitives. ReentrantLock
offers explicit locking and unlocking, which can be more robust than synchronized
blocks in complex scenarios. Condition
objects, obtained from a Lock
instance, allow threads to wait and signal on specific conditions, providing more control than Object.wait()
/notifyAll()
which operate on the intrinsic lock of an object.
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class AdvancedBlockingQueue<T> {
private final Queue<T> queue;
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public AdvancedBlockingQueue(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
public void put(T element) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // Queue is full, wait for space
}
queue.add(element);
notEmpty.signalAll(); // Element added, notify waiting consumers
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // Queue is empty, wait for elements
}
T element = queue.remove();
notFull.signalAll(); // Element removed, notify waiting producers
return element;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
AdvancedBlockingQueue<Integer> queue = new AdvancedBlockingQueue<>(5);
Runnable producer = () -> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
System.out.println("Produced: " + i + ", Queue size: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Runnable consumer = () -> {
for (int i = 0; i < 10; i++) {
try {
Integer element = queue.take();
System.out.println("Consumed: " + element + ", Queue size: " + queue.size());
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
new Thread(producer).start();
new Thread(consumer).start();
}
}
An advanced blocking queue implementation using ReentrantLock
and Condition
objects.
Using ReentrantLock
and Condition
provides several advantages over synchronized
and wait()
/notifyAll()
:
- Fairness:
ReentrantLock
can be constructed with a fairness policy. - Multiple Conditions: You can have multiple
Condition
objects associated with a singleLock
, allowing threads to wait on different conditions independently. - Interruptible Waits:
await()
methods are interruptible, offering more control over thread interruption.
Choosing the Right Implementation
For most common scenarios, Java's built-in ArrayBlockingQueue
or LinkedBlockingQueue
from java.util.concurrent
are highly optimized and recommended. They handle edge cases, fairness, and performance considerations far better than a custom implementation. However, building your own blocking queue is an excellent educational exercise to solidify your understanding of concurrent programming primitives. If you find yourself needing highly specialized behavior not offered by existing classes, then a custom implementation might be considered, but always benchmark and thoroughly test it.