Skip to content

queue - Synchronized Queue Data Structures

The queue module provides thread-safe queue implementations for coordinating work between multiple producer and consumer threads or processes.

Complexity Reference

Operation Time Space Notes
Queue.put() O(1) O(1) Add item (blocks if full); uses deque internally
Queue.get() O(1) O(1) Remove item (blocks if empty)
Queue.put_nowait() O(1) O(1) Add item (raises Full if full)
Queue.get_nowait() O(1) O(1) Remove item (raises Empty if empty)
Queue.qsize() O(1) O(1) Get approximate size
PriorityQueue.put() O(log n) O(1) Add with priority
PriorityQueue.get() O(log n) O(1) Get highest priority
LifoQueue.put() O(1) amortized O(1) Add item (LIFO); uses list internally
LifoQueue.get() O(1) O(1) Remove item (LIFO)

Basic Queue (FIFO)

from queue import Queue
import threading
import time

# Create queue with max size - O(1)
q = Queue(maxsize=5)

# Put items - O(1) amortized
q.put('item1')  # O(1)
q.put('item2')  # O(1)
q.put('item3')  # O(1)

# Get items FIFO - O(1) amortized
print(q.get())  # O(1) - 'item1'
print(q.get())  # O(1) - 'item2'
print(q.get())  # O(1) - 'item3'

Producer-Consumer Pattern

from queue import Queue
import threading
import time

def producer(q, items):
    """Producer thread adds items to queue - O(n)"""
    for item in items:  # O(n)
        print(f"Producing {item}")
        q.put(item)  # O(1) amortized
        time.sleep(0.1)

def consumer(q, count):
    """Consumer thread retrieves items - O(n)"""
    for _ in range(count):  # O(n)
        item = q.get()  # O(1) amortized, blocks if empty
        print(f"Consuming {item}")
        q.task_done()  # O(1) - mark task complete

# Create queue - O(1)
q = Queue()

# Create producer thread - O(1)
prod = threading.Thread(
    target=producer,
    args=(q, ['A', 'B', 'C', 'D', 'E'])
)

# Create consumer thread - O(1)
cons = threading.Thread(
    target=consumer,
    args=(q, 5)
)

# Start threads
prod.start()  # O(1)
cons.start()  # O(1)

# Wait for completion
q.join()  # O(n) - blocks until all tasks done
print("All tasks complete")

Priority Queue

from queue import PriorityQueue
import heapq

# Create priority queue - O(1)
pq = PriorityQueue()

# Add items with priority - O(log n)
pq.put((1, 'high'))      # O(log 1)
pq.put((3, 'low'))       # O(log 2)
pq.put((2, 'medium'))    # O(log 3)

# Get items by priority (lowest first) - O(log n)
print(pq.get())  # O(log 3) - (1, 'high')
print(pq.get())  # O(log 2) - (2, 'medium')
print(pq.get())  # O(log 1) - (3, 'low')

LIFO Queue (Stack)

from queue import LifoQueue

# Create LIFO queue (stack) - O(1)
lifo = LifoQueue()

# Push items - O(1) amortized
lifo.put('a')  # O(1)
lifo.put('b')  # O(1)
lifo.put('c')  # O(1)

# Pop items (last in, first out) - O(1) amortized
print(lifo.get())  # O(1) - 'c'
print(lifo.get())  # O(1) - 'b'
print(lifo.get())  # O(1) - 'a'

Non-blocking Operations

from queue import Queue, Empty, Full

# Create queue - O(1)
q = Queue(maxsize=2)

try:
    # Put without blocking - O(1)
    q.put_nowait('item1')  # O(1)
    q.put_nowait('item2')  # O(1)
    q.put_nowait('item3')  # Raises Full
except Full:
    print("Queue is full")

try:
    # Get without blocking - O(1)
    print(q.get_nowait())  # O(1) - 'item1'
    print(q.get_nowait())  # O(1) - 'item2'
    print(q.get_nowait())  # Raises Empty
except Empty:
    print("Queue is empty")

Timeout Operations

from queue import Queue, Empty, Full
import time

# Create queue - O(1)
q = Queue()

# Put with timeout - O(1) amortized
try:
    q.put('item', timeout=1.0)  # O(1), waits up to 1 second
    print("Item added")
except Full:
    print("Queue full, timeout expired")

# Get with timeout - O(1) amortized
try:
    item = q.get(timeout=1.0)  # O(1), waits up to 1 second
    print(f"Got: {item}")
except Empty:
    print("Queue empty, timeout expired")

Thread Pool Pattern

from queue import Queue
import threading

class ThreadPool:
    """Simple thread pool using Queue - O(1) operations"""

    def __init__(self, num_workers):
        self.task_queue = Queue()  # O(1)
        self.workers = []

        # Create worker threads - O(num_workers)
        for _ in range(num_workers):  # O(num_workers)
            worker = threading.Thread(
                target=self._worker,
                daemon=True
            )
            worker.start()  # O(1)
            self.workers.append(worker)  # O(1)

    def _worker(self):
        """Worker thread processes tasks - O(n)"""
        while True:  # O(n)
            task, args = self.task_queue.get()  # O(1)
            try:
                task(*args)  # Execute task
            finally:
                self.task_queue.task_done()  # O(1)

    def submit(self, task, args=()):
        """Submit task to queue - O(1)"""
        self.task_queue.put((task, args))  # O(1)

    def wait(self):
        """Wait for all tasks to complete - O(n)"""
        self.task_queue.join()  # O(n) - blocks until done

# Usage
def work(task_id):
    """Simulate work - O(1)"""
    print(f"Processing task {task_id}")

# Create pool with 3 workers - O(3)
pool = ThreadPool(3)

# Submit 10 tasks - O(10)
for i in range(10):  # O(10)
    pool.submit(work, (i,))  # O(1)

# Wait for completion - O(10)
pool.wait()
print("All tasks done")

Common Patterns

Work Distribution

from queue import Queue
import threading
import random

# Create queue for work distribution - O(1)
work_queue = Queue()

# Add work items - O(n)
for i in range(100):  # O(100)
    work_queue.put(f"job_{i}")  # O(1)

# Distribute to 4 workers - O(4*n)
def worker(worker_id, queue):
    """Process work items - O(n)"""
    while True:  # O(n)
        try:
            job = queue.get_nowait()  # O(1)
            print(f"Worker {worker_id}: {job}")
            queue.task_done()  # O(1)
        except:
            break

# Start workers
for i in range(4):  # O(4)
    t = threading.Thread(target=worker, args=(i, work_queue))
    t.start()  # O(1)

Rate Limiting

from queue import Queue
import threading
import time

# Create queue with max size for rate limiting - O(1)
rate_limited_queue = Queue(maxsize=10)

def rate_limiter():
    """Control rate of processing - O(n)"""
    interval = 0.1  # Process one item every 100ms
    while True:  # O(n)
        item = rate_limited_queue.get()  # O(1)
        if item is None:  # Sentinel to exit
            break
        print(f"Processing at {time.time()}")
        rate_limited_queue.task_done()  # O(1)
        time.sleep(interval)

# Start rate limiter - O(1)
limiter = threading.Thread(target=rate_limiter, daemon=True)
limiter.start()

# Submit work (blocks if queue is full) - O(n)
for i in range(5):  # O(5)
    rate_limited_queue.put(f"item_{i}")  # O(1), blocks if full

Queue vs Other Structures

from queue import Queue
from collections import deque

# Queue (thread-safe) - O(1) operations
q = Queue()
q.put('item')  # O(1), thread-safe

# deque (not thread-safe) - O(1) operations
d = deque()
d.append('item')  # O(1), NOT thread-safe

# Use Queue for multi-threaded applications
# Use deque for single-threaded applications

Version Notes

  • Python 2.6+: queue module available
  • Python 3.x: Same functionality
  • All versions: O(1) for standard queue operations

Best Practices

Do:

  • Use Queue for thread-safe work distribution
  • Use PriorityQueue for priority-based processing
  • Use LifoQueue for stack-like behavior in threads
  • Use timeouts to prevent deadlocks
  • Call task_done() to track completion

Avoid:

  • Using Queue in single-threaded code (use deque)
  • Forgetting to call task_done()
  • Ignoring timeouts (risk of deadlock)
  • Creating excessive queues (reuse when possible)
  • Assuming thread safety elsewhere (only queue operations are safe)