Python Async/Await — Asynchronous Programming
Async programming handles thousands of concurrent operations efficiently by using cooperative multitasking. Instead of threads, it uses coroutines that yield control back to the event loop when waiting for I/O.
Learning Objectives
- Understand the event loop and coroutine execution model
- Write async functions with async/await syntax
- Use asyncio.gather for concurrent operations
- Build async context managers and async generators
- Use aiohttp for async HTTP requests
- Handle errors in async code properly
- Know when async is appropriate vs threading or multiprocessing
Basics of Async/Await
import asyncio
# Define a coroutine with async def
async def greet(name):
await asyncio.sleep(1) # Non-blocking sleep
return f"Hello, {name}"
# Run the coroutine
result = asyncio.run(greet("Alice"))
print(result) # "Hello, Alice"
How the Event Loop Works
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2) # Yields control to event loop
print("Task 1 completed")
return "result1"
async def task2():
print("Task 2 started")
await asyncio.sleep(1) # Yields control to event loop
print("Task 2 completed")
return "result2"
async def main():
# Sequential execution (total: ~3 seconds)
result1 = await task1()
result2 = await task2()
print(f"Sequential: {result1}, {result2}")
asyncio.run(main())
Concurrent Execution with gather
import asyncio
async def main():
# Concurrent execution (total: ~2 seconds, not 3)
results = await asyncio.gather(
task1(),
task2()
)
print(f"Concurrent: {results}")
asyncio.run(main())
Running Coroutines Concurrently
asyncio.gather — Run Multiple Coroutines
import asyncio
import aiohttp
import time
async def fetch_page(session, url):
"""Fetch a single page."""
async with session.get(url) as response:
text = await response.text()
return {'url': url, 'status': response.status, 'length': len(text)}
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/ip"
]
start = time.time()
async with aiohttp.ClientSession() as session:
# Run all fetches concurrently
results = await asyncio.gather(
*[fetch_page(session, url) for url in urls]
)
elapsed = time.time() - start
print(f"Fetched {len(results)} pages in {elapsed:.2f}s")
for result in results:
print(f" {result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(main())
asyncio.create_task — Schedule Coroutines
import asyncio
async def background_task(name, duration):
"""Run in background."""
print(f"Starting {name}")
await asyncio.sleep(duration)
print(f"Completed {name}")
return f"{name} done"
async def main():
# Create tasks (they start immediately)
task1 = asyncio.create_task(background_task("Task1", 2))
task2 = asyncio.create_task(background_task("Task2", 1))
task3 = asyncio.create_task(background_task("Task3", 3))
# Do other work while tasks run
print("Main thread doing other work...")
await asyncio.sleep(0.5)
# Wait for all tasks to complete
result1 = await task1
result2 = await task2
result3 = await task3
print(f"Results: {result1}, {result2}, {result3}")
asyncio.run(main())
asyncio.wait — Fine-grained Control
import asyncio
async def slow_task():
await asyncio.sleep(3)
return "slow"
async def fast_task():
await asyncio.sleep(1)
return "fast"
async def main():
tasks = [slow_task(), fast_task()]
# Wait with FIRST_COMPLETED
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
print(f"First completed: {task.result()}")
# Wait for remaining
done, pending = await asyncio.wait(pending)
for task in done:
print(f"Next completed: {task.result()}")
asyncio.run(main())
Async Context Managers
Async context managers handle setup and teardown of async resources:
import aiohttp
import asyncio
# Using built-in async context manager
async def fetch_with_session():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as response:
return await response.json()
# Custom async context manager
class AsyncDatabaseConnection:
"""Custom async context manager for database connections."""
def __init__(self, db_url):
self.db_url = db_url
self.connection = None
async def __aenter__(self):
"""Async setup."""
print(f"Connecting to {self.db_url}")
# Simulate async connection
await asyncio.sleep(0.1)
self.connection = f"Connection to {self.db_url}"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async teardown."""
print(f"Closing connection to {self.db_url}")
await asyncio.sleep(0.1)
self.connection = None
return False # Don't suppress exceptions
async def execute(self, query):
print(f"Executing: {query}")
await asyncio.sleep(0.1)
return f"Result of: {query}"
async def main():
async with AsyncDatabaseConnection("localhost:5432") as db:
result = await db.execute("SELECT * FROM users")
print(result)
asyncio.run(main())
Async Generator Context Manager
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_resource(name):
"""Async context manager using decorator."""
print(f"Setting up {name}")
await asyncio.sleep(0.1)
resource = {"name": name, "active": True}
try:
yield resource
finally:
print(f"Tearing down {name}")
resource["active"] = False
await asyncio.sleep(0.1)
async def main():
async with managed_resource("database") as db:
print(f"Using {db}")
await asyncio.sleep(0.5)
print(f"Resource released: {db}")
asyncio.run(main())
Async Iterators and Generators
import asyncio
# Async generator
async def async_range(start, stop, delay=0.1):
"""Async generator that yields numbers with delay."""
for i in range(start, stop):
await asyncio.sleep(delay)
yield i
# Async iterator usage
async def main():
async for num in async_range(0, 5):
print(f"Got: {num}")
asyncio.run(main())
# Async generator in class
class AsyncCounter:
def __init__(self, limit):
self.limit = limit
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current
async def main():
async for num in AsyncCounter(5):
print(f"Count: {num}")
asyncio.run(main())
Async HTTP with aiohttp
import asyncio
import aiohttp
from typing import List, Dict
class AsyncHTTPClient:
"""High-performance async HTTP client."""
def __init__(self, max_concurrent=10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
timeout = aiohttp.ClientTimeout(total=30)
self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url: str) -> Dict:
"""Fetch a single URL with rate limiting."""
async with self.semaphore:
try:
async with self.session.get(url) as response:
return {
'url': url,
'status': response.status,
'content': await response.text(),
'headers': dict(response.headers)
}
except aiohttp.ClientError as e:
return {'url': url, 'error': str(e)}
async def fetch_many(self, urls: List[str]) -> List[Dict]:
"""Fetch multiple URLs concurrently."""
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
]
async with AsyncHTTPClient(max_concurrent=5) as client:
results = await client.fetch_many(urls)
for result in results:
if 'error' in result:
print(f"Error: {result['url']}: {result['error']}")
else:
print(f"Success: {result['url']}: {result['status']}")
asyncio.run(main())
Async Web Scraper
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin
class AsyncWebScraper:
"""Asynchronous web scraper."""
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.visited = set()
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_page(self, session, url):
"""Fetch a single page."""
async with self.semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
return await response.text()
return None
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
def extract_links(self, html, base_url):
"""Extract links from HTML."""
soup = BeautifulSoup(html, 'html.parser')
links = set()
for a in soup.find_all('a', href=True):
href = urljoin(base_url, a['href'])
if href.startswith('http'):
links.add(href)
return links
async def scrape(self, start_url, max_pages=10):
"""Scrape pages starting from URL."""
visited = set()
to_visit = {start_url}
async with aiohttp.ClientSession() as session:
while to_visit and len(visited) < max_pages:
url = to_visit.pop()
if url in visited:
continue
html = await self.fetch_page(session, url)
if html:
visited.add(url)
print(f"Scraped ({len(visited)}): {url}")
# Extract and queue new links
new_links = self.extract_links(html, url)
to_visit.update(new_links - visited)
return visited
async def main():
scraper = AsyncWebScraper(max_concurrent=3)
pages = await scraper.scrape("https://example.com", max_pages=20)
print(f"Scraped {len(pages)} pages")
asyncio.run(main())
Error Handling in Async Code
import asyncio
async def risky_operation():
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def safe_operation():
await asyncio.sleep(0.5)
return "success"
async def main():
# Handle errors with gather
results = await asyncio.gather(
risky_operation(),
safe_operation(),
return_exceptions=True # Don't raise, return exception objects
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Success: {result}")
# Handle errors with create_task
task = asyncio.create_task(risky_operation())
try:
result = await task
except ValueError as e:
print(f"Task failed: {e}")
task.cancel() # Cancel if needed
asyncio.run(main())
Timeout Handling
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
# Method 1: asyncio.wait_for
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
# Method 2: asyncio.timeout (Python 3.11+)
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
print(result)
except TimeoutError:
print("Operation timed out")
asyncio.run(main())
Real-World Examples
Example 1: Async API Polling
import asyncio
import aiohttp
import json
from datetime import datetime
class AsyncAPIPoller:
"""Poll multiple API endpoints concurrently."""
def __init__(self, endpoints, interval=60):
self.endpoints = endpoints
self.interval = interval
self.results = {}
async def poll_endpoint(self, session, name, url):
"""Poll a single endpoint."""
async with session.get(url) as response:
data = await response.json()
self.results[name] = {
'data': data,
'timestamp': datetime.now().isoformat(),
'status': response.status
}
async def poll_all(self):
"""Poll all endpoints concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [
self.poll_endpoint(session, name, url)
for name, url in self.endpoints.items()
]
await asyncio.gather(*tasks, return_exceptions=True)
async def run(self):
"""Run polling loop."""
while True:
await self.poll_all()
print(f"Polled {len(self.results)} endpoints at {datetime.now()}")
await asyncio.sleep(self.interval)
# Usage
poller = AsyncAPIPoller({
'users': 'https://api.example.com/users',
'orders': 'https://api.example.com/orders',
'inventory': 'https://api.example.com/inventory',
}, interval=30)
# asyncio.run(poller.run())
Example 2: Async Task Queue
import asyncio
from dataclasses import dataclass
from typing import Callable, Any
import time
@dataclass
class Task:
func: Callable
args: tuple = ()
kwargs: dict = None
priority: int = 0
def __post_init__(self):
if self.kwargs is None:
self.kwargs = {}
class AsyncTaskQueue:
"""Async task queue with worker pool."""
def __init__(self, max_workers=5):
self.max_workers = max_workers
self.task_queue = asyncio.Queue()
self.results = []
async def worker(self, worker_id):
"""Worker that processes tasks."""
while True:
task = await self.task_queue.get()
try:
if asyncio.iscoroutinefunction(task.func):
result = await task.func(*task.args, **task.kwargs)
else:
result = task.func(*task.args, **task.kwargs)
self.results.append({
'worker': worker_id,
'result': result,
'task': task.func.__name__
})
except Exception as e:
self.results.append({
'worker': worker_id,
'error': str(e),
'task': task.func.__name__
})
finally:
self.task_queue.task_done()
async def add_task(self, task):
"""Add a task to the queue."""
await self.task_queue.put(task)
async def run(self):
"""Start workers and process tasks."""
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.max_workers)
]
# Wait for all tasks to be processed
await self.task_queue.join()
# Cancel workers
for worker in workers:
worker.cancel()
return self.results
# Usage
async def process_item(item):
await asyncio.sleep(0.5)
return f"Processed: {item}"
async def main():
queue = AsyncTaskQueue(max_workers=3)
# Add tasks
for i in range(20):
await queue.add_task(Task(process_item, args=(i,)))
results = await queue.run()
print(f"Completed {len(results)} tasks")
asyncio.run(main())
Example 3: Async File Processor
import asyncio
import aiofiles
import aiohttp
import os
from pathlib import Path
class AsyncFileProcessor:
"""Process files asynchronously."""
def __init__(self, max_concurrent=10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def download_file(self, session, url, output_path):
"""Download a file asynchronously."""
async with self.semaphore:
async with session.get(url) as response:
async with aiofiles.open(output_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
return output_path
async def process_text_file(self, filepath):
"""Process a text file asynchronously."""
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
# Process content
lines = content.split('\n')
word_count = sum(len(line.split()) for line in lines)
return {
'file': filepath,
'lines': len(lines),
'words': word_count
}
async def batch_download(self, urls, output_dir='./downloads'):
"""Download multiple files concurrently."""
os.makedirs(output_dir, exist_ok=True)
async with aiohttp.ClientSession() as session:
tasks = []
for i, url in enumerate(urls):
filename = f"file_{i}.dat"
output_path = os.path.join(output_dir, filename)
tasks.append(self.download_file(session, url, output_path))
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
processor = AsyncFileProcessor(max_concurrent=5)
# Example: Process multiple text files
files = ['file1.txt', 'file2.txt', 'file3.txt']
tasks = [processor.process_text_file(f) for f in files if os.path.exists(f)]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['file']}: {result['lines']} lines, {result['words']} words")
asyncio.run(main())
Common Mistakes
| Mistake | Problem | Solution |
|---|---|---|
| Blocking in async code | Event loop freezes | Use await for I/O, asyncio.to_thread() for CPU |
Forgetting await | Coroutine not executed | Always await coroutines |
| Mixing sync and async | Unexpected blocking | Use asyncio.to_thread() for sync code |
Not using return_exceptions=True | First error stops all tasks | Use it in gather() for resilience |
| Creating tasks without awaiting | Silent failures | Always await or track created tasks |
Using time.sleep() in async | Blocks entire event loop | Use asyncio.sleep() instead |
Best Practices
# 1. Use asyncio.run() as entry point
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
# 2. Use semaphore for rate limiting
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def limited_fetch(url):
async with semaphore:
return await fetch(url)
# 3. Use asyncio.to_thread() for blocking code
import asyncio
def blocking_io():
import time
time.sleep(1)
return "done"
async def main():
result = await asyncio.to_thread(blocking_io)
# 4. Use TaskGroups for structured concurrency (Python 3.11+)
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch(url1))
task2 = tg.create_task(fetch(url2))
# All tasks guaranteed to complete
# 5. Set timeouts to prevent hanging
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out")
Key Takeaways
async defdefines a coroutine;awaitpauses until the awaited coroutine completesasyncio.gather()runs multiple coroutines concurrently — use it for parallel I/O- Use
asyncio.create_task()to schedule coroutines that run in the background - Async is ideal for I/O-bound tasks — use multiprocessing for CPU-bound work
- Always use
asyncio.sleep()instead oftime.sleep()in async code - Use
return_exceptions=Trueingather()to handle errors gracefully - Async context managers (
async with) handle resource cleanup for async operations