Dead simple example of using Multiprocessing Queue, Pool and Locking
Categories:
Dead Simple Multiprocessing: Queue, Pool, and Locking in Python

Explore the fundamentals of Python's multiprocessing
module with practical examples of Queues for inter-process communication, Pools for parallel task execution, and Locks for resource synchronization.
Python's multiprocessing
module is a powerful tool for leveraging multiple CPU cores, allowing you to run tasks in parallel and significantly speed up computations. However, managing shared resources and communication between processes can be tricky. This article provides a dead simple, yet comprehensive, introduction to three core components of multiprocessing
: Queue
, Pool
, and Lock
.
Understanding Multiprocessing Concepts
Before diving into code, it's crucial to grasp why these tools are necessary. Unlike threads, processes in Python (due to the Global Interpreter Lock, or GIL) run in separate memory spaces. This isolation prevents direct memory sharing, necessitating explicit mechanisms for communication and synchronization.
- Queue: A
Queue
provides a way for processes to safely exchange data. It's a first-in, first-out (FIFO) data structure that handles all the locking required to ensure data integrity when multiple processes are adding or removing items. - Pool: A
Pool
offers a convenient way to parallelize the execution of a function across multiple input values, distributing tasks among a fixed number of worker processes. - Lock: A
Lock
is a synchronization primitive used to protect shared resources from being accessed by multiple processes simultaneously, preventing race conditions and ensuring data consistency.
flowchart TD A[Main Process] --> B{Create Queue, Pool, Lock} B --> C[Spawn Worker Processes (Pool)] C --> D[Workers get tasks from Queue] D --> E[Process data] E --> F{Access Shared Resource?} F -->|Yes| G[Acquire Lock] G --> H[Modify Shared Resource] H --> I[Release Lock] I --> J[Put results into Queue] F -->|No| J J --> D C --> K[Main Process collects results from Queue] K --> L[Terminate Pool] L --> M[End]
Conceptual flow of multiprocessing with Queue, Pool, and Lock
Using multiprocessing.Queue
for Inter-Process Communication
The Queue
is your go-to for passing messages or data between processes. It's thread-safe and process-safe, meaning you don't have to worry about low-level synchronization when using it. One process can put items into the queue, and another can get items from it.
import multiprocessing
import time
def worker_queue(q):
print(f"Worker {multiprocessing.current_process().name} started.")
while True:
item = q.get() # Blocks until an item is available
if item is None: # Sentinel value to signal termination
break
print(f"Worker {multiprocessing.current_process().name} processing: {item}")
time.sleep(0.1) # Simulate work
print(f"Worker {multiprocessing.current_process().name} finished.")
if __name__ == '__main__':
my_queue = multiprocessing.Queue()
# Start a worker process
p = multiprocessing.Process(target=worker_queue, args=(my_queue,))
p.start()
# Put some items into the queue
for i in range(5):
my_queue.put(f"Task {i}")
time.sleep(0.05)
# Put a sentinel value to tell the worker to exit
my_queue.put(None)
# Wait for the worker process to finish
p.join()
print("Main process finished.")
Example demonstrating inter-process communication using multiprocessing.Queue
.
None
) or a specific termination signal when using Queue
in a loop to gracefully shut down worker processes. Otherwise, q.get()
will block indefinitely, preventing your program from exiting.Parallelizing Tasks with multiprocessing.Pool
The Pool
class simplifies parallel execution by managing a pool of worker processes. You submit tasks to the pool, and it automatically distributes them among its workers. This is ideal for 'embarrassingly parallel' problems where tasks are independent.
import multiprocessing
import time
def square(number):
print(f"Worker {multiprocessing.current_process().name} squaring {number}")
time.sleep(0.1) # Simulate work
return number * number
if __name__ == '__main__':
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Create a Pool with 4 worker processes
with multiprocessing.Pool(processes=4) as pool:
# Map the square function to all numbers in parallel
results = pool.map(square, numbers)
print("Original numbers:", numbers)
print("Squared results:", results)
print("Main process finished.")
Using multiprocessing.Pool
to parallelize a simple squaring function.
with
statement ensures that the Pool
is properly closed and its worker processes are terminated, even if errors occur. This is the recommended way to use Pool
.Synchronizing Access with multiprocessing.Lock
When multiple processes need to access or modify a shared resource (like a global variable, a file, or a database connection), a Lock
is essential to prevent race conditions. A Lock
ensures that only one process can hold the lock at any given time, thereby guaranteeing exclusive access to the protected resource.
import multiprocessing
import time
def increment_counter(counter, lock):
print(f"Worker {multiprocessing.current_process().name} started.")
for _ in range(100000):
lock.acquire() # Acquire the lock
try:
counter.value += 1 # Access shared resource
finally:
lock.release() # Release the lock
print(f"Worker {multiprocessing.current_process().name} finished.")
if __name__ == '__main__':
# Use Value for shared mutable data between processes
shared_counter = multiprocessing.Value('i', 0) # 'i' for integer
counter_lock = multiprocessing.Lock()
processes = []
for _ in range(5):
p = multiprocessing.Process(target=increment_counter, args=(shared_counter, counter_lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final counter value: {shared_counter.value}")
print("Main process finished.")
Protecting a shared counter with multiprocessing.Lock
.
Lock
is released after it's acquired, even if an error occurs. Using a try...finally
block with lock.release()
is a robust pattern. For simpler cases, with lock:
can be used, which handles acquisition and release automatically.This example uses multiprocessing.Value
to create a shared integer that can be safely modified by multiple processes. Without the Lock
, the shared_counter.value
would likely be less than the expected 500,000 due to race conditions.