implement-your-own blocking queue in java
Categories:
Implementing Your Own Blocking Queue in Java

Learn how to build a custom blocking queue in Java using wait(), notifyAll(), and ReentrantLock with Condition for robust multithreaded communication.
In concurrent programming, a blocking queue is a crucial data structure that facilitates communication and synchronization between producer and consumer threads. Unlike a regular queue, a blocking queue will block a thread trying to add an element to a full queue or retrieve an element from an empty queue, until space becomes available or an element is present, respectively. Java's java.util.concurrent package provides excellent implementations like ArrayBlockingQueue and LinkedBlockingQueue. However, understanding how to implement one from scratch offers deep insights into multithreading primitives like wait(), notifyAll(), synchronized, and ReentrantLock with Condition.
Understanding the Core Concepts
A blocking queue needs to handle two primary scenarios:
- Producer attempts to add to a full queue: The producer thread must wait until space becomes available.
- Consumer attempts to take from an empty queue: The consumer thread must wait until an element is available.
To achieve this, we rely on Java's built-in synchronization mechanisms. The Object.wait() and Object.notifyAll() methods, used in conjunction with synchronized blocks, provide a fundamental way to achieve inter-thread communication. Alternatively, the java.util.concurrent.locks.ReentrantLock and java.util.concurrent.locks.Condition interfaces offer more fine-grained control over locking and waiting.
flowchart TD
Producer[Producer Thread]
Consumer[Consumer Thread]
Queue[Blocking Queue]
Producer -- "add(item)" --> Queue
Queue -- "take()" --> Consumer
subgraph Producer Logic
P1[Check if Queue is Full]
P2{Is Full?}
P3["Wait (on 'notFull' condition)"]
P4[Add Item]
P5["Notify (on 'notEmpty' condition)"]
end
subgraph Consumer Logic
C1[Check if Queue is Empty]
C2{Is Empty?}
C3["Wait (on 'notEmpty' condition)"]
C4[Take Item]
C5["Notify (on 'notFull' condition)"]
end
Producer --> P1
P1 --> P2
P2 -- Yes --> P3
P3 --> P1
P2 -- No --> P4
P4 --> P5
P5 --> Producer
Consumer --> C1
C1 --> C2
C2 -- Yes --> C3
C3 --> C1
C2 -- No --> C4
C4 --> C5
C5 --> ConsumerFlowchart of Producer-Consumer interaction with a Blocking Queue
Implementation with synchronized, wait(), and notifyAll()
This approach uses the intrinsic lock of an object and its associated wait() and notifyAll() methods. The synchronized keyword ensures that only one thread can execute the critical section at a time, preventing race conditions. wait() releases the lock and puts the thread into a waiting state, while notifyAll() wakes up all waiting threads on that object.
import java.util.LinkedList;
import java.util.Queue;
public class SimpleBlockingQueue<T> {
private final Queue<T> queue;
private final int capacity;
public SimpleBlockingQueue(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
public synchronized void put(T element) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // Queue is full, wait for space
}
queue.add(element);
notifyAll(); // Element added, notify waiting consumers
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // Queue is empty, wait for elements
}
T element = queue.remove();
notifyAll(); // Element removed, notify waiting producers
return element;
}
public synchronized int size() {
return queue.size();
}
public static void main(String[] args) {
SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<>(5);
Runnable producer = () -> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
System.out.println("Produced: " + i + ", Queue size: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Runnable consumer = () -> {
for (int i = 0; i < 10; i++) {
try {
Integer element = queue.take();
System.out.println("Consumed: " + element + ", Queue size: " + queue.size());
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
new Thread(producer).start();
new Thread(consumer).start();
}
}
A basic blocking queue implementation using synchronized, wait(), and notifyAll().
while loops instead of if statements when calling wait(). This protects against spurious wakeups and ensures the condition is truly met before proceeding, as a thread might wake up without being notified or the condition might have changed since notification.Implementation with ReentrantLock and Condition
The java.util.concurrent.locks package provides more advanced and flexible synchronization primitives. ReentrantLock offers explicit locking and unlocking, which can be more robust than synchronized blocks in complex scenarios. Condition objects, obtained from a Lock instance, allow threads to wait and signal on specific conditions, providing more control than Object.wait()/notifyAll() which operate on the intrinsic lock of an object.
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class AdvancedBlockingQueue<T> {
private final Queue<T> queue;
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public AdvancedBlockingQueue(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
public void put(T element) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // Queue is full, wait for space
}
queue.add(element);
notEmpty.signalAll(); // Element added, notify waiting consumers
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // Queue is empty, wait for elements
}
T element = queue.remove();
notFull.signalAll(); // Element removed, notify waiting producers
return element;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
AdvancedBlockingQueue<Integer> queue = new AdvancedBlockingQueue<>(5);
Runnable producer = () -> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
System.out.println("Produced: " + i + ", Queue size: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Runnable consumer = () -> {
for (int i = 0; i < 10; i++) {
try {
Integer element = queue.take();
System.out.println("Consumed: " + element + ", Queue size: " + queue.size());
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
new Thread(producer).start();
new Thread(consumer).start();
}
}
An advanced blocking queue implementation using ReentrantLock and Condition objects.
Using ReentrantLock and Condition provides several advantages over synchronized and wait()/notifyAll():
- Fairness:
ReentrantLockcan be constructed with a fairness policy. - Multiple Conditions: You can have multiple
Conditionobjects associated with a singleLock, allowing threads to wait on different conditions independently. - Interruptible Waits:
await()methods are interruptible, offering more control over thread interruption.
Choosing the Right Implementation
For most common scenarios, Java's built-in ArrayBlockingQueue or LinkedBlockingQueue from java.util.concurrent are highly optimized and recommended. They handle edge cases, fairness, and performance considerations far better than a custom implementation. However, building your own blocking queue is an excellent educational exercise to solidify your understanding of concurrent programming primitives. If you find yourself needing highly specialized behavior not offered by existing classes, then a custom implementation might be considered, but always benchmark and thoroughly test it.