implement-your-own blocking queue in java

Learn implement-your-own blocking queue in java with practical examples, diagrams, and best practices. Covers java, multithreading, synchronization development techniques with visual explanations.

Implementing Your Own Blocking Queue in Java

Hero image for implement-your-own blocking queue in java

Learn how to build a custom blocking queue in Java using wait(), notifyAll(), and ReentrantLock with Condition for robust multithreaded communication.

In concurrent programming, a blocking queue is a crucial data structure that facilitates communication and synchronization between producer and consumer threads. Unlike a regular queue, a blocking queue will block a thread trying to add an element to a full queue or retrieve an element from an empty queue, until space becomes available or an element is present, respectively. Java's java.util.concurrent package provides excellent implementations like ArrayBlockingQueue and LinkedBlockingQueue. However, understanding how to implement one from scratch offers deep insights into multithreading primitives like wait(), notifyAll(), synchronized, and ReentrantLock with Condition.

Understanding the Core Concepts

A blocking queue needs to handle two primary scenarios:

  1. Producer attempts to add to a full queue: The producer thread must wait until space becomes available.
  2. Consumer attempts to take from an empty queue: The consumer thread must wait until an element is available.

To achieve this, we rely on Java's built-in synchronization mechanisms. The Object.wait() and Object.notifyAll() methods, used in conjunction with synchronized blocks, provide a fundamental way to achieve inter-thread communication. Alternatively, the java.util.concurrent.locks.ReentrantLock and java.util.concurrent.locks.Condition interfaces offer more fine-grained control over locking and waiting.

flowchart TD
    Producer[Producer Thread]
    Consumer[Consumer Thread]
    Queue[Blocking Queue]

    Producer -- "add(item)" --> Queue
    Queue -- "take()" --> Consumer

    subgraph Producer Logic
        P1[Check if Queue is Full]
        P2{Is Full?}
        P3["Wait (on 'notFull' condition)"]
        P4[Add Item]
        P5["Notify (on 'notEmpty' condition)"]
    end

    subgraph Consumer Logic
        C1[Check if Queue is Empty]
        C2{Is Empty?}
        C3["Wait (on 'notEmpty' condition)"]
        C4[Take Item]
        C5["Notify (on 'notFull' condition)"]
    end

    Producer --> P1
    P1 --> P2
    P2 -- Yes --> P3
    P3 --> P1
    P2 -- No --> P4
    P4 --> P5
    P5 --> Producer

    Consumer --> C1
    C1 --> C2
    C2 -- Yes --> C3
    C3 --> C1
    C2 -- No --> C4
    C4 --> C5
    C5 --> Consumer

Flowchart of Producer-Consumer interaction with a Blocking Queue

Implementation with synchronized, wait(), and notifyAll()

This approach uses the intrinsic lock of an object and its associated wait() and notifyAll() methods. The synchronized keyword ensures that only one thread can execute the critical section at a time, preventing race conditions. wait() releases the lock and puts the thread into a waiting state, while notifyAll() wakes up all waiting threads on that object.

import java.util.LinkedList;
import java.util.Queue;

public class SimpleBlockingQueue<T> {
    private final Queue<T> queue;
    private final int capacity;

    public SimpleBlockingQueue(int capacity) {
        this.queue = new LinkedList<>();
        this.capacity = capacity;
    }

    public synchronized void put(T element) throws InterruptedException {
        while (queue.size() == capacity) {
            wait(); // Queue is full, wait for space
        }
        queue.add(element);
        notifyAll(); // Element added, notify waiting consumers
    }

    public synchronized T take() throws InterruptedException {
        while (queue.isEmpty()) {
            wait(); // Queue is empty, wait for elements
        }
        T element = queue.remove();
        notifyAll(); // Element removed, notify waiting producers
        return element;
    }

    public synchronized int size() {
        return queue.size();
    }

    public static void main(String[] args) {
        SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<>(5);

        Runnable producer = () -> {
            for (int i = 0; i < 10; i++) {
                try {
                    queue.put(i);
                    System.out.println("Produced: " + i + ", Queue size: " + queue.size());
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };

        Runnable consumer = () -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Integer element = queue.take();
                    System.out.println("Consumed: " + element + ", Queue size: " + queue.size());
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };

        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

A basic blocking queue implementation using synchronized, wait(), and notifyAll().

Implementation with ReentrantLock and Condition

The java.util.concurrent.locks package provides more advanced and flexible synchronization primitives. ReentrantLock offers explicit locking and unlocking, which can be more robust than synchronized blocks in complex scenarios. Condition objects, obtained from a Lock instance, allow threads to wait and signal on specific conditions, providing more control than Object.wait()/notifyAll() which operate on the intrinsic lock of an object.

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class AdvancedBlockingQueue<T> {
    private final Queue<T> queue;
    private final int capacity;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public AdvancedBlockingQueue(int capacity) {
        this.queue = new LinkedList<>();
        this.capacity = capacity;
    }

    public void put(T element) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await(); // Queue is full, wait for space
            }
            queue.add(element);
            notEmpty.signalAll(); // Element added, notify waiting consumers
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); // Queue is empty, wait for elements
            }
            T element = queue.remove();
            notFull.signalAll(); // Element removed, notify waiting producers
            return element;
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        AdvancedBlockingQueue<Integer> queue = new AdvancedBlockingQueue<>(5);

        Runnable producer = () -> {
            for (int i = 0; i < 10; i++) {
                try {
                    queue.put(i);
                    System.out.println("Produced: " + i + ", Queue size: " + queue.size());
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };

        Runnable consumer = () -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Integer element = queue.take();
                    System.out.println("Consumed: " + element + ", Queue size: " + queue.size());
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };

        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

An advanced blocking queue implementation using ReentrantLock and Condition objects.

Choosing the Right Implementation

For most common scenarios, Java's built-in ArrayBlockingQueue or LinkedBlockingQueue from java.util.concurrent are highly optimized and recommended. They handle edge cases, fairness, and performance considerations far better than a custom implementation. However, building your own blocking queue is an excellent educational exercise to solidify your understanding of concurrent programming primitives. If you find yourself needing highly specialized behavior not offered by existing classes, then a custom implementation might be considered, but always benchmark and thoroughly test it.