Python async/await — Complete Guide to asyncio and Concurrent Programming

Stop blocking your I/O. Learn how Python's async/await syntax and asyncio event loop work under the hood, with practical patterns for concurrent HTTP requests, rate limiting, and production-grade async code.

Why async/await Matters

Python's Global Interpreter Lock (GIL) means threads don't give you true parallelism for CPU-bound work. But for I/O-bound tasks — HTTP requests, database queries, file reads — threads waste CPU time waiting. asyncio solves this with cooperative multitasking: a single thread handles thousands of concurrent I/O operations by switching between coroutines while one is waiting.

The result: 10–100x more concurrent connections on the same hardware, with simpler code than threads and no race conditions from shared state.

The Core Concepts

Coroutines vs Regular Functions

# Regular function — blocks the thread while sleeping
import time

def slow_fetch():
    time.sleep(1)  # entire thread frozen
    return "data"

# Coroutine — yields control back to event loop while waiting
import asyncio

async def fast_fetch():
    await asyncio.sleep(1)  # event loop runs other tasks
    return "data"

The async def keyword creates a coroutine function. Calling it returns a coroutine object — it doesn't execute until you await it or schedule it as a task.

Running a Coroutine

import asyncio

async def main():
    result = await fast_fetch()
    print(result)

# Entry point — starts the event loop
asyncio.run(main())

Concurrent Tasks with asyncio.gather

The real power comes from running multiple coroutines concurrently:

import asyncio
import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return {"url": url, "status": response.status}

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

# Fetch 100 URLs concurrently instead of sequentially
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
results = asyncio.run(fetch_all(urls))
print(f"Fetched {len(results)} URLs")
# Takes ~1s instead of 100s with sequential requests

Error Handling in Concurrent Tasks

By default, asyncio.gather cancels all tasks if one fails. Use return_exceptions=True to collect errors without cancellation:

async def fetch_with_retry(session, url, retries=3):
    for attempt in range(retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
                resp.raise_for_status()
                return await resp.json()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == retries - 1:
                return {"error": str(e), "url": url}
            await asyncio.sleep(2 ** attempt)  # exponential backoff

async def fetch_all_safe(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        # return_exceptions=True: failures don't cancel siblings
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return [r for r in results if not isinstance(r, Exception)]

Rate Limiting with Semaphores

Firing 1000 concurrent requests will get you rate-limited or banned. Use asyncio.Semaphore to cap concurrency:

import asyncio
import aiohttp

async def fetch_with_semaphore(
    session: aiohttp.ClientSession,
    url: str,
    semaphore: asyncio.Semaphore
) -> dict:
    async with semaphore:  # blocks if 10 tasks already running
        async with session.get(url) as response:
            return await response.json()

async def fetch_rate_limited(urls: list[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_semaphore(session, url, semaphore)
            for url in urls
        ]
        return await asyncio.gather(*tasks)

# Max 10 in-flight requests at any time
results = asyncio.run(fetch_rate_limited(urls, max_concurrent=10))

Token Bucket Rate Limiter

For API rate limits measured in requests-per-minute, a token bucket gives smoother pacing than a semaphore:

import asyncio
import time

class TokenBucketRateLimiter:
    def __init__(self, requests_per_minute: int):
        self.rpm = requests_per_minute
        self.tokens = float(requests_per_minute)
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.rpm, self.tokens + elapsed * (self.rpm / 60.0))
            self.last_refill = now

            if self.tokens < 1:
                wait = (1 - self.tokens) * (60.0 / self.rpm)
                await asyncio.sleep(wait)
                self.tokens = 0
            else:
                self.tokens -= 1

# Usage with OpenAI API (60 RPM tier)
limiter = TokenBucketRateLimiter(requests_per_minute=55)  # small buffer

async def call_openai(client, prompt: str) -> str:
    await limiter.acquire()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

Async Context Managers and Generators

class AsyncDatabasePool:
    """Async context manager for connection pool lifecycle."""

    async def __aenter__(self):
        self.pool = await asyncpg.create_pool(dsn="postgresql://...")
        return self.pool

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.pool.close()
        return False  # don't suppress exceptions

async def main():
    async with AsyncDatabasePool() as pool:
        rows = await pool.fetch("SELECT * FROM users LIMIT 10")

# Async generator for streaming large result sets
async def stream_large_table(pool, batch_size: int = 100):
    offset = 0
    while True:
        rows = await pool.fetch(
            f"SELECT * FROM events LIMIT {batch_size} OFFSET {offset}"
        )
        if not rows:
            break
        for row in rows:
            yield dict(row)
        offset += batch_size

async def process_events():
    async with AsyncDatabasePool() as pool:
        async for event in stream_large_table(pool):
            await process_event(event)  # no OOM from loading millions of rows

asyncio.TaskGroup (Python 3.11+)

The newer TaskGroup API is safer than gather — it cancels all tasks in the group if any raises, and collects exceptions properly:

import asyncio

async def main():
    results = []

    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data("https://api.example.com/users"))
        task2 = tg.create_task(fetch_data("https://api.example.com/orders"))
        task3 = tg.create_task(fetch_data("https://api.example.com/products"))

    # All tasks done here — exceptions raised as ExceptionGroup
    results = [task1.result(), task2.result(), task3.result()]
    return results

Common Pitfalls

Mixing sync and async code

# BAD: calling blocking function inside coroutine freezes the event loop
async def bad():
    import requests
    data = requests.get("https://api.example.com")  # blocks entire event loop!

# GOOD: use asyncio.to_thread for blocking calls
async def good():
    import requests
    data = await asyncio.to_thread(requests.get, "https://api.example.com")

Forgetting to await

# BAD: returns coroutine object, not the result
result = fetch_data()  # forgot await -- silent bug!

# GOOD
result = await fetch_data()

Creating tasks in wrong context

# BAD: task created without reference may be garbage collected
asyncio.create_task(background_work())  # no reference = may cancel silently

# GOOD: hold a reference
background_tasks = set()
task = asyncio.create_task(background_work())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)

Production Patterns

Structured Concurrency Pattern

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_background_tasks():
    """Ensure background tasks are cleaned up on shutdown."""
    tasks = set()

    def create_task(coro):
        task = asyncio.create_task(coro)
        tasks.add(task)
        task.add_done_callback(tasks.discard)
        return task

    try:
        yield create_task
    finally:
        if tasks:
            # Cancel remaining tasks and wait for cleanup
            for task in tasks:
                task.cancel()
            await asyncio.gather(*tasks, return_exceptions=True)

async def main():
    async with managed_background_tasks() as create_task:
        create_task(poll_queue())
        create_task(sync_cache())
        await serve_requests()  # main work
→ Format and analyze Python code with DevKits
aiforeverthing.com — Free developer tools, no signup

Frequently Asked Questions

When should I use asyncio vs threading vs multiprocessing?

Use asyncio for I/O-bound work (HTTP, databases, file I/O). Use threading when you need to call blocking third-party libraries that can't be made async. Use multiprocessing for CPU-bound work (number crunching, image processing) to bypass the GIL.

What is the event loop?

The event loop is asyncio's scheduler. It runs coroutines, handles I/O events from the OS, and decides which coroutine to run next. There's one event loop per thread. asyncio.run() creates one, runs your coroutine, and cleans it up.

Can I use async/await with Django or Flask?

Django 3.1+ supports async views. Flask added async support in 2.0 via flask[async]. For fully async web frameworks, consider FastAPI or Starlette which were built async-first.

What is the difference between asyncio.gather and asyncio.wait?

gather takes coroutines or futures, runs them concurrently, and returns results in order. wait takes a set of tasks and returns when conditions are met (ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION). Use gather for most cases; wait when you need fine-grained control over completion.

🚀 Recommended: Deploy This on Hostinger VPS

The fastest way to get this running in production is a Hostinger VPS — starting at $3.99/mo, includes one-click Docker support, full root access, and SSD storage. Readers of this guide can use the link below for up to 75% off.

Get Hostinger VPS → Affiliate link — we may earn a commission at no extra cost to you.