Why async/await Matters
Python's Global Interpreter Lock (GIL) means threads don't give you true parallelism for CPU-bound work. But for I/O-bound tasks — HTTP requests, database queries, file reads — threads waste CPU time waiting. asyncio solves this with cooperative multitasking: a single thread handles thousands of concurrent I/O operations by switching between coroutines while one is waiting.
The result: 10–100x more concurrent connections on the same hardware, with simpler code than threads and no race conditions from shared state.
The Core Concepts
Coroutines vs Regular Functions
# Regular function — blocks the thread while sleeping
import time
def slow_fetch():
time.sleep(1) # entire thread frozen
return "data"
# Coroutine — yields control back to event loop while waiting
import asyncio
async def fast_fetch():
await asyncio.sleep(1) # event loop runs other tasks
return "data"
The async def keyword creates a coroutine function. Calling it returns a coroutine object — it doesn't execute until you await it or schedule it as a task.
Running a Coroutine
import asyncio
async def main():
result = await fast_fetch()
print(result)
# Entry point — starts the event loop
asyncio.run(main())
Concurrent Tasks with asyncio.gather
The real power comes from running multiple coroutines concurrently:
import asyncio
import aiohttp
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return {"url": url, "status": response.status}
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Fetch 100 URLs concurrently instead of sequentially
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
results = asyncio.run(fetch_all(urls))
print(f"Fetched {len(results)} URLs")
# Takes ~1s instead of 100s with sequential requests
Error Handling in Concurrent Tasks
By default, asyncio.gather cancels all tasks if one fails. Use return_exceptions=True to collect errors without cancellation:
async def fetch_with_retry(session, url, retries=3):
for attempt in range(retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
resp.raise_for_status()
return await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == retries - 1:
return {"error": str(e), "url": url}
await asyncio.sleep(2 ** attempt) # exponential backoff
async def fetch_all_safe(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
# return_exceptions=True: failures don't cancel siblings
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
Rate Limiting with Semaphores
Firing 1000 concurrent requests will get you rate-limited or banned. Use asyncio.Semaphore to cap concurrency:
import asyncio
import aiohttp
async def fetch_with_semaphore(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore
) -> dict:
async with semaphore: # blocks if 10 tasks already running
async with session.get(url) as response:
return await response.json()
async def fetch_rate_limited(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_semaphore(session, url, semaphore)
for url in urls
]
return await asyncio.gather(*tasks)
# Max 10 in-flight requests at any time
results = asyncio.run(fetch_rate_limited(urls, max_concurrent=10))
Token Bucket Rate Limiter
For API rate limits measured in requests-per-minute, a token bucket gives smoother pacing than a semaphore:
import asyncio
import time
class TokenBucketRateLimiter:
def __init__(self, requests_per_minute: int):
self.rpm = requests_per_minute
self.tokens = float(requests_per_minute)
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.rpm, self.tokens + elapsed * (self.rpm / 60.0))
self.last_refill = now
if self.tokens < 1:
wait = (1 - self.tokens) * (60.0 / self.rpm)
await asyncio.sleep(wait)
self.tokens = 0
else:
self.tokens -= 1
# Usage with OpenAI API (60 RPM tier)
limiter = TokenBucketRateLimiter(requests_per_minute=55) # small buffer
async def call_openai(client, prompt: str) -> str:
await limiter.acquire()
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
Async Context Managers and Generators
class AsyncDatabasePool:
"""Async context manager for connection pool lifecycle."""
async def __aenter__(self):
self.pool = await asyncpg.create_pool(dsn="postgresql://...")
return self.pool
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.pool.close()
return False # don't suppress exceptions
async def main():
async with AsyncDatabasePool() as pool:
rows = await pool.fetch("SELECT * FROM users LIMIT 10")
# Async generator for streaming large result sets
async def stream_large_table(pool, batch_size: int = 100):
offset = 0
while True:
rows = await pool.fetch(
f"SELECT * FROM events LIMIT {batch_size} OFFSET {offset}"
)
if not rows:
break
for row in rows:
yield dict(row)
offset += batch_size
async def process_events():
async with AsyncDatabasePool() as pool:
async for event in stream_large_table(pool):
await process_event(event) # no OOM from loading millions of rows
asyncio.TaskGroup (Python 3.11+)
The newer TaskGroup API is safer than gather — it cancels all tasks in the group if any raises, and collects exceptions properly:
import asyncio
async def main():
results = []
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("https://api.example.com/users"))
task2 = tg.create_task(fetch_data("https://api.example.com/orders"))
task3 = tg.create_task(fetch_data("https://api.example.com/products"))
# All tasks done here — exceptions raised as ExceptionGroup
results = [task1.result(), task2.result(), task3.result()]
return results
Common Pitfalls
Mixing sync and async code
# BAD: calling blocking function inside coroutine freezes the event loop
async def bad():
import requests
data = requests.get("https://api.example.com") # blocks entire event loop!
# GOOD: use asyncio.to_thread for blocking calls
async def good():
import requests
data = await asyncio.to_thread(requests.get, "https://api.example.com")
Forgetting to await
# BAD: returns coroutine object, not the result
result = fetch_data() # forgot await -- silent bug!
# GOOD
result = await fetch_data()
Creating tasks in wrong context
# BAD: task created without reference may be garbage collected
asyncio.create_task(background_work()) # no reference = may cancel silently
# GOOD: hold a reference
background_tasks = set()
task = asyncio.create_task(background_work())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
Production Patterns
Structured Concurrency Pattern
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_background_tasks():
"""Ensure background tasks are cleaned up on shutdown."""
tasks = set()
def create_task(coro):
task = asyncio.create_task(coro)
tasks.add(task)
task.add_done_callback(tasks.discard)
return task
try:
yield create_task
finally:
if tasks:
# Cancel remaining tasks and wait for cleanup
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
async def main():
async with managed_background_tasks() as create_task:
create_task(poll_queue())
create_task(sync_cache())
await serve_requests() # main work
aiforeverthing.com — Free developer tools, no signup
Frequently Asked Questions
When should I use asyncio vs threading vs multiprocessing?
Use asyncio for I/O-bound work (HTTP, databases, file I/O). Use threading when you need to call blocking third-party libraries that can't be made async. Use multiprocessing for CPU-bound work (number crunching, image processing) to bypass the GIL.
What is the event loop?
The event loop is asyncio's scheduler. It runs coroutines, handles I/O events from the OS, and decides which coroutine to run next. There's one event loop per thread. asyncio.run() creates one, runs your coroutine, and cleans it up.
Can I use async/await with Django or Flask?
Django 3.1+ supports async views. Flask added async support in 2.0 via flask[async]. For fully async web frameworks, consider FastAPI or Starlette which were built async-first.
What is the difference between asyncio.gather and asyncio.wait?
gather takes coroutines or futures, runs them concurrently, and returns results in order. wait takes a set of tasks and returns when conditions are met (ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION). Use gather for most cases; wait when you need fine-grained control over completion.
The fastest way to get this running in production is a Hostinger VPS — starting at $3.99/mo, includes one-click Docker support, full root access, and SSD storage. Readers of this guide can use the link below for up to 75% off.
Get Hostinger VPS → Affiliate link — we may earn a commission at no extra cost to you.