Skip to content

multiprocessing Module Complexity

The multiprocessing module provides process-based parallelism for CPU-bound tasks, bypassing the Global Interpreter Lock (GIL).

Classes & Functions

Operation Time Space Notes
Process(target) O(1) O(1) Create process object
process.start() O(w) O(w) Start process, w = process overhead
process.join() O(1) O(1) Wait for process
Queue() O(1) O(1) Create queue
queue.put(item) O(n) O(n) Add to queue, n = serialized item size (pickling)
queue.get() O(n) O(n) Remove from queue, n = item size (unpickling)
Pool(processes) O(w) O(w) Create pool, w = process count
pool.map(fn, items) O(n) O(n) Map over items, n = item count

Process Creation

Time Complexity: O(w)

Where w = process startup overhead.

from multiprocessing import Process

def worker(name):
    print(f"Worker {name}")

# Create process: O(1)
p = Process(target=worker, args=("A",))  # O(1) to create object

# Start process: O(w)
# w = startup overhead (fork, initialization)
p.start()  # O(w)

# Join process: O(1) operation (wait for completion)
p.join()  # O(1) operation, blocks until done

Space Complexity: O(w)

from multiprocessing import Process

# Each process uses significant memory
# w = memory per process (typically 10-50MB)

processes = []
for i in range(10):
    p = Process(target=task, args=(i,))
    p.start()
    processes.append(p)  # O(10w) total memory

for p in processes:
    p.join()  # O(1) per join

# Note: Start fewer processes than you think!
# 4-8 usually sufficient for CPU-bound (use CPU count)

Queues

Queue Operations

from multiprocessing import Queue

# Create queue: O(1)
q = Queue()  # O(1)

# Put item: O(n) for pickling where n = item size
# Item is serialized before sending
q.put("item")  # O(n) for serialization

# Get item: O(n) for unpickling where n = item size
# Item is deserialized after receiving
item = q.get()  # O(n) for deserialization

# Block until available: O(n) operation for unpickling
# Time depends on when item arrives
item = q.get(timeout=5)  # O(n) for deserialization

Space Complexity

from multiprocessing import Queue

q = Queue(maxsize=100)  # O(1) to create

# Space grows as items accumulate
for i in range(100):
    q.put(i)  # O(1) per put, O(100) total space

# Space reduces as items consumed
while not q.empty():
    item = q.get()  # O(1) per get

Pool

Pool Creation

from multiprocessing import Pool
import os

# Create pool: O(w) time
# w = number of worker processes
pool = Pool(processes=4)  # O(w) to create

# Default: os.cpu_count() processes
pool = Pool()  # O(w) where w = CPU count

# Each process has significant overhead
# Time: 10-100ms per process to start
# Space: 10-50MB per process

Pool.map() - Apply Function to Items

from multiprocessing import Pool

def square(x):
    return x * x

pool = Pool(processes=4)

# Map: O(n) to submit tasks
# n = number of items
items = range(1000)
results = pool.map(square, items)  # O(n)

# Collects all results (blocks until done)
# Returns list in original order

Pool.map_async() - Non-blocking Map

from multiprocessing import Pool

pool = Pool(processes=4)

# Submit: O(n) to queue tasks
items = range(1000)
result_async = pool.map_async(square, items)  # O(n)

# Returns immediately: O(1)
# Get results later: O(1) operation
results = result_async.get()  # Blocks until done

# Get with timeout: O(1) operation
try:
    results = result_async.get(timeout=5)  # O(1)
except TimeoutError:
    pass

Pool.apply_async() - Single Task

from multiprocessing import Pool

pool = Pool(processes=4)

# Submit single task: O(1)
result_async = pool.apply_async(compute, (args,))  # O(1)

# Get result: O(1) operation
result = result_async.get()  # O(1), blocks until ready

Pool.imap() - Lazy Iterator

from multiprocessing import Pool

pool = Pool(processes=4)

# Create iterator: O(1)
# Processes items lazily
results_iter = pool.imap(process, items)  # O(1) to create

# Iterate: O(n) total for all items
for result in results_iter:  # O(n log n) due to task scheduling
    process_result(result)  # O(1) per iteration

Shutdown

from multiprocessing import Pool

pool = Pool(processes=4)

# Submit tasks
results = pool.map(task, items)  # O(n)

# Close: O(1)
# Prevents new tasks
pool.close()  # O(1)

# Join: O(w)
# Wait for all tasks to complete
pool.join()  # O(w) to wait for w processes

# Or use context manager
with Pool(processes=4) as pool:
    results = pool.map(task, items)  # Automatically cleaned up

Pipes

Pipe Communication

from multiprocessing import Pipe, Process

# Create pipe: O(1)
parent_conn, child_conn = Pipe()  # O(1)

# Send: O(1) amortized
parent_conn.send("message")  # O(1)

# Receive: O(1) operation
msg = child_conn.recv()  # O(1), blocks until available

# Bidirectional: both ends can send/receive
child_conn.send("reply")  # O(1)
reply = parent_conn.recv()  # O(1)

Common Patterns

Simple Parallel Processing

from multiprocessing import Pool

def process_item(item):
    """Process single item."""
    return item * 2

def parallel_map(items):
    """Process items in parallel."""
    with Pool() as pool:
        # Submit all: O(n)
        results = pool.map(process_item, items)  # O(n)

    return results
    # Total: O(n) time, O(w) processes

Producer-Consumer with Queues

from multiprocessing import Queue, Process

def producer(q, items):
    """Produce items."""
    for item in items:  # O(n)
        q.put(item)  # O(1) per put
    q.put(None)  # Signal end

def consumer(q):
    """Consume items."""
    results = []
    while True:
        item = q.get()  # O(1) per get
        if item is None:
            break
        results.append(process(item))  # O(m) per item
    return results

# Create queue and processes
q = Queue()
producer_p = Process(target=producer, args=(q, items))
consumer_p = Process(target=consumer, args=(q,))

producer_p.start()
consumer_p.start()

producer_p.join()  # O(1)
consumer_p.join()  # O(1)

Parallel Computation with Results

from multiprocessing import Pool, Queue

def compute(data_chunk):
    """Compute on data chunk."""
    return sum(data_chunk) * 2

def parallel_compute(data, num_processes=4):
    """Compute across processes."""
    # Chunk data
    chunk_size = len(data) // num_processes
    chunks = [
        data[i:i+chunk_size]
        for i in range(0, len(data), chunk_size)
    ]

    # Process chunks: O(n)
    with Pool(processes=num_processes) as pool:
        results = pool.map(compute, chunks)  # O(n)

    return sum(results)  # Aggregate

Pipe Communication

from multiprocessing import Pipe, Process

def worker(conn, task_id):
    """Worker receives and sends."""
    msg = conn.recv()  # O(1)
    result = process_message(msg)
    conn.send(result)  # O(1)

# Create communication channel
parent_conn, child_conn = Pipe()  # O(1)

# Start worker process
p = Process(target=worker, args=(child_conn, 1))
p.start()

# Parent sends task
parent_conn.send(("task_data",))  # O(1)

# Parent receives result
result = parent_conn.recv()  # O(1)

p.join()  # O(1)

Performance Characteristics

Process Overhead

from multiprocessing import Pool, Process

# Process creation is expensive
# Startup time: 10-100ms per process
# Memory: 10-50MB per process

# Rule: Use Pool for multiple tasks
# Creates workers once, reuses them

# Good: Reuse pool for multiple operations
with Pool(processes=4) as pool:
    results1 = pool.map(task1, items)  # O(n1)
    results2 = pool.map(task2, items)  # O(n2)

# Avoid: Creating new pool each time
for items_batch in batches:
    pool = Pool(processes=4)  # O(w) overhead each time!
    results = pool.map(task, items_batch)
    pool.close()
    pool.join()

Best Practices

from multiprocessing import Pool, cpu_count

# Good: Use CPU count for CPU-bound tasks
num_processes = cpu_count()
with Pool(processes=num_processes) as pool:
    results = pool.map(cpu_intensive_task, items)

# Avoid: Using too many processes
# More processes = more overhead, context switching
with Pool(processes=64) as pool:  # Way too many!
    results = pool.map(task, items)

# Good: Use context manager for cleanup
with Pool(processes=4) as pool:
    results = pool.map(task, items)
    # Automatically closed and joined

# Avoid: Manual cleanup
pool = Pool(processes=4)
results = pool.map(task, items)
pool.close()  # Easy to forget!
pool.join()

# Good: Use chunksize for large datasets
results = pool.map(task, large_items, chunksize=100)

# Avoid: Default chunksize for large datasets
results = pool.map(task, 1000000_items)  # chunksize=1 is slow

When to Use multiprocessing

from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor

# CPU-bound: Use multiprocessing
# - Heavy computation
# - No I/O waiting
# - True parallelism
with Pool(processes=4) as pool:
    results = pool.map(cpu_bound_task, items)

# I/O-bound: Use concurrent.futures.ThreadPoolExecutor
# - Network requests
# - File I/O
# - Database queries
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(io_task, item) for item in items]
    results = [f.result() for f in futures]

Version Notes

  • Python 2.6+: multiprocessing module introduced
  • Python 3.0+: Enhanced API
  • Python 3.4+: Better integration with concurrent.futures
  • Python 3.13+: Better IPC mechanisms

Differences from Threading

# Threading (I/O-bound):
# - Lightweight
# - Shared memory
# - GIL prevents true parallelism
# - Good for I/O operations

# Multiprocessing (CPU-bound):
# - Heavy overhead
# - Separate memory space
# - True parallelism
# - Good for CPU-intensive operations