🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Python Async/Await — Asynchronous Programming

Python AdvancedAsync🟢 Free Lesson

Advertisement

Python Async/Await — Asynchronous Programming

Async programming handles thousands of concurrent operations efficiently by using cooperative multitasking. Instead of threads, it uses coroutines that yield control back to the event loop when waiting for I/O.

Learning Objectives

  • Understand the event loop and coroutine execution model
  • Write async functions with async/await syntax
  • Use asyncio.gather for concurrent operations
  • Build async context managers and async generators
  • Use aiohttp for async HTTP requests
  • Handle errors in async code properly
  • Know when async is appropriate vs threading or multiprocessing

Basics of Async/Await

import asyncio

# Define a coroutine with async def
async def greet(name):
    await asyncio.sleep(1)  # Non-blocking sleep
    return f"Hello, {name}"

# Run the coroutine
result = asyncio.run(greet("Alice"))
print(result)  # "Hello, Alice"

How the Event Loop Works

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)  # Yields control to event loop
    print("Task 1 completed")
    return "result1"

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)  # Yields control to event loop
    print("Task 2 completed")
    return "result2"

async def main():
    # Sequential execution (total: ~3 seconds)
    result1 = await task1()
    result2 = await task2()
    print(f"Sequential: {result1}, {result2}")

asyncio.run(main())

Concurrent Execution with gather

import asyncio

async def main():
    # Concurrent execution (total: ~2 seconds, not 3)
    results = await asyncio.gather(
        task1(),
        task2()
    )
    print(f"Concurrent: {results}")

asyncio.run(main())

Running Coroutines Concurrently

asyncio.gather — Run Multiple Coroutines

import asyncio
import aiohttp
import time

async def fetch_page(session, url):
    """Fetch a single page."""
    async with session.get(url) as response:
        text = await response.text()
        return {'url': url, 'status': response.status, 'length': len(text)}

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/get",
        "https://httpbin.org/ip"
    ]

    start = time.time()

    async with aiohttp.ClientSession() as session:
        # Run all fetches concurrently
        results = await asyncio.gather(
            *[fetch_page(session, url) for url in urls]
        )

    elapsed = time.time() - start
    print(f"Fetched {len(results)} pages in {elapsed:.2f}s")

    for result in results:
        print(f"  {result['url']}: {result['status']} ({result['length']} bytes)")

asyncio.run(main())

asyncio.create_task — Schedule Coroutines

import asyncio

async def background_task(name, duration):
    """Run in background."""
    print(f"Starting {name}")
    await asyncio.sleep(duration)
    print(f"Completed {name}")
    return f"{name} done"

async def main():
    # Create tasks (they start immediately)
    task1 = asyncio.create_task(background_task("Task1", 2))
    task2 = asyncio.create_task(background_task("Task2", 1))
    task3 = asyncio.create_task(background_task("Task3", 3))

    # Do other work while tasks run
    print("Main thread doing other work...")
    await asyncio.sleep(0.5)

    # Wait for all tasks to complete
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(f"Results: {result1}, {result2}, {result3}")

asyncio.run(main())

asyncio.wait — Fine-grained Control

import asyncio

async def slow_task():
    await asyncio.sleep(3)
    return "slow"

async def fast_task():
    await asyncio.sleep(1)
    return "fast"

async def main():
    tasks = [slow_task(), fast_task()]

    # Wait with FIRST_COMPLETED
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        print(f"First completed: {task.result()}")

    # Wait for remaining
    done, pending = await asyncio.wait(pending)
    for task in done:
        print(f"Next completed: {task.result()}")

asyncio.run(main())

Async Context Managers

Async context managers handle setup and teardown of async resources:

import aiohttp
import asyncio

# Using built-in async context manager
async def fetch_with_session():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://httpbin.org/get') as response:
            return await response.json()

# Custom async context manager
class AsyncDatabaseConnection:
    """Custom async context manager for database connections."""

    def __init__(self, db_url):
        self.db_url = db_url
        self.connection = None

    async def __aenter__(self):
        """Async setup."""
        print(f"Connecting to {self.db_url}")
        # Simulate async connection
        await asyncio.sleep(0.1)
        self.connection = f"Connection to {self.db_url}"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async teardown."""
        print(f"Closing connection to {self.db_url}")
        await asyncio.sleep(0.1)
        self.connection = None
        return False  # Don't suppress exceptions

    async def execute(self, query):
        print(f"Executing: {query}")
        await asyncio.sleep(0.1)
        return f"Result of: {query}"

async def main():
    async with AsyncDatabaseConnection("localhost:5432") as db:
        result = await db.execute("SELECT * FROM users")
        print(result)

asyncio.run(main())

Async Generator Context Manager

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_resource(name):
    """Async context manager using decorator."""
    print(f"Setting up {name}")
    await asyncio.sleep(0.1)
    resource = {"name": name, "active": True}
    try:
        yield resource
    finally:
        print(f"Tearing down {name}")
        resource["active"] = False
        await asyncio.sleep(0.1)

async def main():
    async with managed_resource("database") as db:
        print(f"Using {db}")
        await asyncio.sleep(0.5)
    print(f"Resource released: {db}")

asyncio.run(main())

Async Iterators and Generators

import asyncio

# Async generator
async def async_range(start, stop, delay=0.1):
    """Async generator that yields numbers with delay."""
    for i in range(start, stop):
        await asyncio.sleep(delay)
        yield i

# Async iterator usage
async def main():
    async for num in async_range(0, 5):
        print(f"Got: {num}")

asyncio.run(main())

# Async generator in class
class AsyncCounter:
    def __init__(self, limit):
        self.limit = limit
        self.current = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.limit:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.current += 1
        return self.current

async def main():
    async for num in AsyncCounter(5):
        print(f"Count: {num}")

asyncio.run(main())

Async HTTP with aiohttp

import asyncio
import aiohttp
from typing import List, Dict

class AsyncHTTPClient:
    """High-performance async HTTP client."""

    def __init__(self, max_concurrent=10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None

    async def __aenter__(self):
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
        timeout = aiohttp.ClientTimeout(total=30)
        self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
        return self

    async def __aexit__(self, *args):
        await self.session.close()

    async def fetch(self, url: str) -> Dict:
        """Fetch a single URL with rate limiting."""
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    return {
                        'url': url,
                        'status': response.status,
                        'content': await response.text(),
                        'headers': dict(response.headers)
                    }
            except aiohttp.ClientError as e:
                return {'url': url, 'error': str(e)}

    async def fetch_many(self, urls: List[str]) -> List[Dict]:
        """Fetch multiple URLs concurrently."""
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
        "https://httpbin.org/headers",
    ]

    async with AsyncHTTPClient(max_concurrent=5) as client:
        results = await client.fetch_many(urls)
        for result in results:
            if 'error' in result:
                print(f"Error: {result['url']}: {result['error']}")
            else:
                print(f"Success: {result['url']}: {result['status']}")

asyncio.run(main())

Async Web Scraper

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin

class AsyncWebScraper:
    """Asynchronous web scraper."""

    def __init__(self, max_concurrent=5):
        self.max_concurrent = max_concurrent
        self.visited = set()
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_page(self, session, url):
        """Fetch a single page."""
        async with self.semaphore:
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                    if response.status == 200:
                        return await response.text()
                    return None
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None

    def extract_links(self, html, base_url):
        """Extract links from HTML."""
        soup = BeautifulSoup(html, 'html.parser')
        links = set()
        for a in soup.find_all('a', href=True):
            href = urljoin(base_url, a['href'])
            if href.startswith('http'):
                links.add(href)
        return links

    async def scrape(self, start_url, max_pages=10):
        """Scrape pages starting from URL."""
        visited = set()
        to_visit = {start_url}

        async with aiohttp.ClientSession() as session:
            while to_visit and len(visited) < max_pages:
                url = to_visit.pop()
                if url in visited:
                    continue

                html = await self.fetch_page(session, url)
                if html:
                    visited.add(url)
                    print(f"Scraped ({len(visited)}): {url}")

                    # Extract and queue new links
                    new_links = self.extract_links(html, url)
                    to_visit.update(new_links - visited)

        return visited

async def main():
    scraper = AsyncWebScraper(max_concurrent=3)
    pages = await scraper.scrape("https://example.com", max_pages=20)
    print(f"Scraped {len(pages)} pages")

asyncio.run(main())

Error Handling in Async Code

import asyncio

async def risky_operation():
    await asyncio.sleep(1)
    raise ValueError("Something went wrong")

async def safe_operation():
    await asyncio.sleep(0.5)
    return "success"

async def main():
    # Handle errors with gather
    results = await asyncio.gather(
        risky_operation(),
        safe_operation(),
        return_exceptions=True  # Don't raise, return exception objects
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"Success: {result}")

    # Handle errors with create_task
    task = asyncio.create_task(risky_operation())
    try:
        result = await task
    except ValueError as e:
        print(f"Task failed: {e}")
        task.cancel()  # Cancel if needed

asyncio.run(main())

Timeout Handling

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def main():
    # Method 1: asyncio.wait_for
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

    # Method 2: asyncio.timeout (Python 3.11+)
    try:
        async with asyncio.timeout(2.0):
            result = await slow_operation()
            print(result)
    except TimeoutError:
        print("Operation timed out")

asyncio.run(main())

Real-World Examples

Example 1: Async API Polling

import asyncio
import aiohttp
import json
from datetime import datetime

class AsyncAPIPoller:
    """Poll multiple API endpoints concurrently."""

    def __init__(self, endpoints, interval=60):
        self.endpoints = endpoints
        self.interval = interval
        self.results = {}

    async def poll_endpoint(self, session, name, url):
        """Poll a single endpoint."""
        async with session.get(url) as response:
            data = await response.json()
            self.results[name] = {
                'data': data,
                'timestamp': datetime.now().isoformat(),
                'status': response.status
            }

    async def poll_all(self):
        """Poll all endpoints concurrently."""
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.poll_endpoint(session, name, url)
                for name, url in self.endpoints.items()
            ]
            await asyncio.gather(*tasks, return_exceptions=True)

    async def run(self):
        """Run polling loop."""
        while True:
            await self.poll_all()
            print(f"Polled {len(self.results)} endpoints at {datetime.now()}")
            await asyncio.sleep(self.interval)

# Usage
poller = AsyncAPIPoller({
    'users': 'https://api.example.com/users',
    'orders': 'https://api.example.com/orders',
    'inventory': 'https://api.example.com/inventory',
}, interval=30)
# asyncio.run(poller.run())

Example 2: Async Task Queue

import asyncio
from dataclasses import dataclass
from typing import Callable, Any
import time

@dataclass
class Task:
    func: Callable
    args: tuple = ()
    kwargs: dict = None
    priority: int = 0

    def __post_init__(self):
        if self.kwargs is None:
            self.kwargs = {}

class AsyncTaskQueue:
    """Async task queue with worker pool."""

    def __init__(self, max_workers=5):
        self.max_workers = max_workers
        self.task_queue = asyncio.Queue()
        self.results = []

    async def worker(self, worker_id):
        """Worker that processes tasks."""
        while True:
            task = await self.task_queue.get()
            try:
                if asyncio.iscoroutinefunction(task.func):
                    result = await task.func(*task.args, **task.kwargs)
                else:
                    result = task.func(*task.args, **task.kwargs)
                self.results.append({
                    'worker': worker_id,
                    'result': result,
                    'task': task.func.__name__
                })
            except Exception as e:
                self.results.append({
                    'worker': worker_id,
                    'error': str(e),
                    'task': task.func.__name__
                })
            finally:
                self.task_queue.task_done()

    async def add_task(self, task):
        """Add a task to the queue."""
        await self.task_queue.put(task)

    async def run(self):
        """Start workers and process tasks."""
        workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.max_workers)
        ]

        # Wait for all tasks to be processed
        await self.task_queue.join()

        # Cancel workers
        for worker in workers:
            worker.cancel()

        return self.results

# Usage
async def process_item(item):
    await asyncio.sleep(0.5)
    return f"Processed: {item}"

async def main():
    queue = AsyncTaskQueue(max_workers=3)

    # Add tasks
    for i in range(20):
        await queue.add_task(Task(process_item, args=(i,)))

    results = await queue.run()
    print(f"Completed {len(results)} tasks")

asyncio.run(main())

Example 3: Async File Processor

import asyncio
import aiofiles
import aiohttp
import os
from pathlib import Path

class AsyncFileProcessor:
    """Process files asynchronously."""

    def __init__(self, max_concurrent=10):
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def download_file(self, session, url, output_path):
        """Download a file asynchronously."""
        async with self.semaphore:
            async with session.get(url) as response:
                async with aiofiles.open(output_path, 'wb') as f:
                    async for chunk in response.content.iter_chunked(8192):
                        await f.write(chunk)
                return output_path

    async def process_text_file(self, filepath):
        """Process a text file asynchronously."""
        async with aiofiles.open(filepath, 'r') as f:
            content = await f.read()
            # Process content
            lines = content.split('\n')
            word_count = sum(len(line.split()) for line in lines)
            return {
                'file': filepath,
                'lines': len(lines),
                'words': word_count
            }

    async def batch_download(self, urls, output_dir='./downloads'):
        """Download multiple files concurrently."""
        os.makedirs(output_dir, exist_ok=True)

        async with aiohttp.ClientSession() as session:
            tasks = []
            for i, url in enumerate(urls):
                filename = f"file_{i}.dat"
                output_path = os.path.join(output_dir, filename)
                tasks.append(self.download_file(session, url, output_path))

            results = await asyncio.gather(*tasks, return_exceptions=True)
            return results

async def main():
    processor = AsyncFileProcessor(max_concurrent=5)

    # Example: Process multiple text files
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    tasks = [processor.process_text_file(f) for f in files if os.path.exists(f)]
    results = await asyncio.gather(*tasks)

    for result in results:
        print(f"{result['file']}: {result['lines']} lines, {result['words']} words")

asyncio.run(main())

Common Mistakes

MistakeProblemSolution
Blocking in async codeEvent loop freezesUse await for I/O, asyncio.to_thread() for CPU
Forgetting awaitCoroutine not executedAlways await coroutines
Mixing sync and asyncUnexpected blockingUse asyncio.to_thread() for sync code
Not using return_exceptions=TrueFirst error stops all tasksUse it in gather() for resilience
Creating tasks without awaitingSilent failuresAlways await or track created tasks
Using time.sleep() in asyncBlocks entire event loopUse asyncio.sleep() instead

Best Practices

# 1. Use asyncio.run() as entry point
async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

# 2. Use semaphore for rate limiting
semaphore = asyncio.Semaphore(10)  # Max 10 concurrent

async def limited_fetch(url):
    async with semaphore:
        return await fetch(url)

# 3. Use asyncio.to_thread() for blocking code
import asyncio

def blocking_io():
    import time
    time.sleep(1)
    return "done"

async def main():
    result = await asyncio.to_thread(blocking_io)

# 4. Use TaskGroups for structured concurrency (Python 3.11+)
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch(url1))
        task2 = tg.create_task(fetch(url2))
    # All tasks guaranteed to complete

# 5. Set timeouts to prevent hanging
try:
    result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
    print("Operation timed out")

Key Takeaways

  1. async def defines a coroutine; await pauses until the awaited coroutine completes
  2. asyncio.gather() runs multiple coroutines concurrently — use it for parallel I/O
  3. Use asyncio.create_task() to schedule coroutines that run in the background
  4. Async is ideal for I/O-bound tasks — use multiprocessing for CPU-bound work
  5. Always use asyncio.sleep() instead of time.sleep() in async code
  6. Use return_exceptions=True in gather() to handle errors gracefully
  7. Async context managers (async with) handle resource cleanup for async operations

Premium Content

Python Async/Await — Asynchronous Programming

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement