Dead simple example of using Multiprocessing Queue, Pool and Locking

Learn dead simple example of using multiprocessing queue, pool and locking with practical examples, diagrams, and best practices. Covers python, python-2.7, multiprocessing development techniques w...

Dead Simple Multiprocessing: Queue, Pool, and Locking in Python

Hero image for Dead simple example of using Multiprocessing Queue, Pool and Locking

Explore the fundamentals of Python's multiprocessing module with practical examples of Queues for inter-process communication, Pools for parallel task execution, and Locks for resource synchronization.

Python's multiprocessing module is a powerful tool for leveraging multiple CPU cores, allowing you to run tasks in parallel and significantly speed up computations. However, managing shared resources and communication between processes can be tricky. This article provides a dead simple, yet comprehensive, introduction to three core components of multiprocessing: Queue, Pool, and Lock.

Understanding Multiprocessing Concepts

Before diving into code, it's crucial to grasp why these tools are necessary. Unlike threads, processes in Python (due to the Global Interpreter Lock, or GIL) run in separate memory spaces. This isolation prevents direct memory sharing, necessitating explicit mechanisms for communication and synchronization.

  • Queue: A Queue provides a way for processes to safely exchange data. It's a first-in, first-out (FIFO) data structure that handles all the locking required to ensure data integrity when multiple processes are adding or removing items.
  • Pool: A Pool offers a convenient way to parallelize the execution of a function across multiple input values, distributing tasks among a fixed number of worker processes.
  • Lock: A Lock is a synchronization primitive used to protect shared resources from being accessed by multiple processes simultaneously, preventing race conditions and ensuring data consistency.
flowchart TD
    A[Main Process] --> B{Create Queue, Pool, Lock}
    B --> C[Spawn Worker Processes (Pool)]
    C --> D[Workers get tasks from Queue]
    D --> E[Process data]
    E --> F{Access Shared Resource?}
    F -->|Yes| G[Acquire Lock]
    G --> H[Modify Shared Resource]
    H --> I[Release Lock]
    I --> J[Put results into Queue]
    F -->|No| J
    J --> D
    C --> K[Main Process collects results from Queue]
    K --> L[Terminate Pool]
    L --> M[End]

Conceptual flow of multiprocessing with Queue, Pool, and Lock

Using multiprocessing.Queue for Inter-Process Communication

The Queue is your go-to for passing messages or data between processes. It's thread-safe and process-safe, meaning you don't have to worry about low-level synchronization when using it. One process can put items into the queue, and another can get items from it.

import multiprocessing
import time

def worker_queue(q):
    print(f"Worker {multiprocessing.current_process().name} started.")
    while True:
        item = q.get() # Blocks until an item is available
        if item is None: # Sentinel value to signal termination
            break
        print(f"Worker {multiprocessing.current_process().name} processing: {item}")
        time.sleep(0.1) # Simulate work
    print(f"Worker {multiprocessing.current_process().name} finished.")

if __name__ == '__main__':
    my_queue = multiprocessing.Queue()
    
    # Start a worker process
    p = multiprocessing.Process(target=worker_queue, args=(my_queue,))
    p.start()

    # Put some items into the queue
    for i in range(5):
        my_queue.put(f"Task {i}")
        time.sleep(0.05)

    # Put a sentinel value to tell the worker to exit
    my_queue.put(None)

    # Wait for the worker process to finish
    p.join()
    print("Main process finished.")

Example demonstrating inter-process communication using multiprocessing.Queue.

Parallelizing Tasks with multiprocessing.Pool

The Pool class simplifies parallel execution by managing a pool of worker processes. You submit tasks to the pool, and it automatically distributes them among its workers. This is ideal for 'embarrassingly parallel' problems where tasks are independent.

import multiprocessing
import time

def square(number):
    print(f"Worker {multiprocessing.current_process().name} squaring {number}")
    time.sleep(0.1) # Simulate work
    return number * number

if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    # Create a Pool with 4 worker processes
    with multiprocessing.Pool(processes=4) as pool:
        # Map the square function to all numbers in parallel
        results = pool.map(square, numbers)
    
    print("Original numbers:", numbers)
    print("Squared results:", results)
    print("Main process finished.")

Using multiprocessing.Pool to parallelize a simple squaring function.

Synchronizing Access with multiprocessing.Lock

When multiple processes need to access or modify a shared resource (like a global variable, a file, or a database connection), a Lock is essential to prevent race conditions. A Lock ensures that only one process can hold the lock at any given time, thereby guaranteeing exclusive access to the protected resource.

import multiprocessing
import time

def increment_counter(counter, lock):
    print(f"Worker {multiprocessing.current_process().name} started.")
    for _ in range(100000):
        lock.acquire() # Acquire the lock
        try:
            counter.value += 1 # Access shared resource
        finally:
            lock.release() # Release the lock
    print(f"Worker {multiprocessing.current_process().name} finished.")

if __name__ == '__main__':
    # Use Value for shared mutable data between processes
    shared_counter = multiprocessing.Value('i', 0) # 'i' for integer
    counter_lock = multiprocessing.Lock()

    processes = []
    for _ in range(5):
        p = multiprocessing.Process(target=increment_counter, args=(shared_counter, counter_lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Final counter value: {shared_counter.value}")
    print("Main process finished.")

Protecting a shared counter with multiprocessing.Lock.

This example uses multiprocessing.Value to create a shared integer that can be safely modified by multiple processes. Without the Lock, the shared_counter.value would likely be less than the expected 500,000 due to race conditions.