Skip to content

asyncio Module Complexity

The asyncio module provides infrastructure for writing single-threaded concurrent code using async/await, managing coroutines and event loops.

Complexity Reference

Operation Time Space Notes
asyncio.run() O(1) O(n) n = event loop state
await coroutine O(1) O(1) Suspend execution
Task creation O(1) O(1) Wrap coroutine
asyncio.gather() O(1) O(n) n = number of tasks
asyncio.wait() O(n) O(n) n = tasks waiting
Event queue operation O(log n) O(1) Heap-based scheduling; n = pending events

Async Basics

Simple Coroutine

import asyncio

# Define coroutine - O(1)
async def greet(name):
    print(f"Hello {name}")
    await asyncio.sleep(1)  # Non-blocking wait - O(1)
    print(f"Goodbye {name}")

# Run coroutine - O(1) setup, O(?) execution
asyncio.run(greet("Alice"))

# Output:
# Hello Alice
# (1 second pause)
# Goodbye Alice

Multiple Concurrent Tasks

import asyncio

async def task(name, duration):
    """Async task - O(1) execution"""
    print(f"Task {name} starting")
    await asyncio.sleep(duration)
    print(f"Task {name} complete")

async def main():
    # Create tasks - O(1) each
    task1 = asyncio.create_task(task("A", 2))
    task2 = asyncio.create_task(task("B", 1))
    task3 = asyncio.create_task(task("C", 3))

    # Wait for all - O(n) where n = number of tasks
    await asyncio.gather(task1, task2, task3)

# Run - total time = max(2, 1, 3) = 3 seconds
asyncio.run(main())

# Output:
# Task A starting
# Task B starting
# Task C starting
# Task B complete
# Task A complete
# Task C complete

Async/Await

Async Function Basics

import asyncio

# Async function definition
async def fetch_data(url):
    """Fetch data from URL - O(1) per await"""

    print(f"Fetching {url}")

    # Simulate async I/O - O(1) context switch
    await asyncio.sleep(2)

    return {"data": f"Content from {url}"}

async def main():
    # Await coroutine - O(1) suspension
    result = await fetch_data("http://example.com")
    print(result)

# Run event loop
asyncio.run(main())

Sequential vs Concurrent

import asyncio
import time

async def task(name, duration):
    await asyncio.sleep(duration)
    return f"{name} done"

async def sequential():
    """Sequential execution - O(t1 + t2 + t3)"""

    # Each task waits for previous
    r1 = await task("Task1", 1)  # 1 second
    r2 = await task("Task2", 1)  # 1 second
    r3 = await task("Task3", 1)  # 1 second

    return [r1, r2, r3]  # Total: 3 seconds

async def concurrent():
    """Concurrent execution - O(max(t1, t2, t3))"""

    # All tasks run concurrently
    results = await asyncio.gather(
        task("Task1", 1),   # 
        task("Task2", 1),   # All parallel
        task("Task3", 1)    #
    )

    return results  # Total: 1 second

# Sequential: 3 seconds
start = time.time()
asyncio.run(sequential())
print(f"Sequential: {time.time() - start:.1f}s")

# Concurrent: 1 second
start = time.time()
asyncio.run(concurrent())
print(f"Concurrent: {time.time() - start:.1f}s")

Tasks and Coroutines

Creating Tasks

import asyncio

async def background_task():
    """Background task - O(1) per iteration"""
    for i in range(5):
        print(f"Background: {i}")
        await asyncio.sleep(0.5)

async def main():
    # Create task (scheduled immediately) - O(1)
    task = asyncio.create_task(background_task())

    # Do other work while task runs
    for i in range(3):
        print(f"Main: {i}")
        await asyncio.sleep(1)

    # Wait for task - O(1)
    await task

asyncio.run(main())

# Output interleaves:
# Main: 0
# Background: 0
# Background: 1
# Main: 1
# ...

Task Result and Exceptions

import asyncio

async def might_fail(should_fail):
    """Task that might raise exception"""
    if should_fail:
        raise ValueError("Task failed")
    return "Success"

async def main():
    # Create task - O(1)
    task = asyncio.create_task(might_fail(False))

    # Wait and get result - O(1)
    try:
        result = await task
        print(f"Result: {result}")
    except ValueError as e:
        print(f"Error: {e}")

    # Check task state - O(1)
    print(f"Done: {task.done()}")
    print(f"Result: {task.result()}")

Gathering and Waiting

Gather Multiple Tasks

import asyncio

async def fetch_url(url):
    await asyncio.sleep(1)
    return f"Data from {url}"

async def main():
    # Create multiple coroutines - O(1) each
    urls = [
        "http://example.com/1",
        "http://example.com/2",
        "http://example.com/3"
    ]

    # Gather - O(1) setup, concurrent execution
    results = await asyncio.gather(
        *[fetch_url(url) for url in urls]
    )

    # Results in order - O(1) per result
    for url, result in zip(urls, results):
        print(f"{url}: {result}")

asyncio.run(main())

Wait with Conditions

import asyncio

async def task(name, duration):
    await asyncio.sleep(duration)
    return f"{name} done"

async def main():
    # Create tasks - O(1) each
    tasks = [
        asyncio.create_task(task("A", 1)),
        asyncio.create_task(task("B", 2)),
        asyncio.create_task(task("C", 3))
    ]

    # Wait for first - O(n) check, ~O(min(durations))
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    print(f"Done: {len(done)}")  # 1
    print(f"Pending: {len(pending)}")  # 2

    # Wait for others
    for task in pending:
        await task

asyncio.run(main())

Event Loop

Manual Event Loop

import asyncio

async def coro():
    print("Coroutine execution")
    await asyncio.sleep(0.1)
    return "Done"

# Create event loop - O(1)
loop = asyncio.new_event_loop()

try:
    # Run coroutine - O(1) setup
    result = loop.run_until_complete(coro())
    print(f"Result: {result}")
finally:
    # Close loop - O(1)
    loop.close()

Running in Event Loop

import asyncio

async def main():
    # Get current event loop - O(1)
    loop = asyncio.get_running_loop()

    # Schedule callback - O(1)
    loop.call_soon(print, "Scheduled immediately")

    # Schedule delayed callback - O(log n)
    loop.call_later(0.5, print, "Scheduled in 0.5s")

    # Wait - O(1)
    await asyncio.sleep(1)

asyncio.run(main())

Locks and Synchronization

Lock for Mutual Exclusion

import asyncio

lock = asyncio.Lock()
counter = 0

async def increment():
    """Increment with lock - O(1) per operation"""
    global counter

    async with lock:  # O(1) acquire
        temp = counter
        await asyncio.sleep(0.001)  # Simulate work
        counter = temp + 1

async def main():
    # Run concurrent increments - O(1) per task
    tasks = [increment() for _ in range(10)]

    # Wait for all - O(n) where n = tasks
    await asyncio.gather(*tasks)

    print(f"Counter: {counter}")  # Correctly 10

asyncio.run(main())

Event Synchronization

import asyncio

async def waiter(event):
    """Wait for event - O(1)"""
    print("Waiting for event")
    await event.wait()
    print("Event received!")

async def notifier(event):
    """Notify event - O(n)"""
    await asyncio.sleep(1)
    print("Setting event")
    event.set()

async def main():
    event = asyncio.Event()

    # Create tasks - O(1) each
    task1 = asyncio.create_task(waiter(event))
    task2 = asyncio.create_task(notifier(event))

    # Wait for all - O(n)
    await asyncio.gather(task1, task2)

asyncio.run(main())

Semaphore for Rate Limiting

import asyncio

semaphore = asyncio.Semaphore(2)  # Allow 2 concurrent

async def limited_task(name):
    """Task with limited concurrency - O(1) per operation"""
    async with semaphore:  # O(1) acquire
        print(f"{name} acquired semaphore")
        await asyncio.sleep(1)
        print(f"{name} released semaphore")

async def main():
    # Create 5 tasks, only 2 can run concurrently
    tasks = [limited_task(f"Task-{i}") for i in range(5)]

    # Run with rate limiting - O(1) scheduling
    await asyncio.gather(*tasks)

asyncio.run(main())

Common Patterns

Timeout Handling

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "Done"

async def main():
    try:
        # Wait with timeout - O(1) timeout check
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=1.0
        )
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())

Retry with Backoff

import asyncio

async def unreliable_api(attempt=1):
    """API that fails occasionally"""
    if attempt < 3:
        raise ConnectionError("API unavailable")
    return "Success"

async def retry_with_backoff(coro, max_retries=3, backoff=1):
    """Retry with exponential backoff - O(1) per retry"""

    for attempt in range(1, max_retries + 1):
        try:
            return await coro(attempt)
        except Exception as e:
            if attempt == max_retries:
                raise

            wait_time = backoff ** (attempt - 1)
            print(f"Retry {attempt} after {wait_time}s")
            await asyncio.sleep(wait_time)

async def main():
    result = await retry_with_backoff(unreliable_api)
    print(f"Result: {result}")

asyncio.run(main())

Queue Processing

import asyncio

async def producer(queue):
    """Produce items - O(1) per item"""
    for i in range(5):
        print(f"Producing {i}")
        await queue.put(i)
        await asyncio.sleep(0.5)

async def consumer(queue, name):
    """Consume items - O(1) per item"""
    while True:
        item = await queue.get()
        print(f"{name} processing {item}")
        await asyncio.sleep(1)
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # Create producer and consumers - O(1) each
    tasks = [
        asyncio.create_task(producer(queue)),
        asyncio.create_task(consumer(queue, "Consumer-1")),
        asyncio.create_task(consumer(queue, "Consumer-2"))
    ]

    # Wait for production to complete
    await asyncio.sleep(5)

    # Cancel consumers
    for task in tasks[1:]:
        task.cancel()

asyncio.run(main())

Performance Notes

Concurrency vs Parallelism

  • Concurrency: Multiple tasks, one thread (asyncio)
  • Parallelism: Multiple tasks, multiple threads (threading)
  • asyncio: Better for I/O-bound operations
  • threading: Better for CPU-bound operations

Time Complexity

  • Task scheduling: O(log n) for n tasks
  • Event loop iteration: O(n) for n ready tasks
  • Synchronization: O(1) per acquire/release

Space Complexity

  • Event loop: O(n) for n tasks
  • Task state: O(m) for m coroutine frames
  • Queues: O(n) for n items