πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Async/Await & Event Loop in Python

Python InterviewAsynchronous Programming⭐ Premium

Advertisement

Async/Await & Event Loop in Python

Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe

Coroutine Fundamentals

import asyncio
from typing import Coroutine, Any
import time

# Basic coroutine
async def fetch_data(url: str) -> dict:
    """Simulate async data fetching."""
    print(f"Starting fetch from {url}")
    await asyncio.sleep(1)  # Simulate network delay
    return {"url": url, "data": "response"}

# Running coroutines
async def main():
    """Main function demonstrating coroutine usage."""
    # Sequential execution
    start = time.time()
    result1 = await fetch_data("api.example.com/1")
    result2 = await fetch_data("api.example.com/2")
    print(f"Sequential: {time.time() - start:.2f}s")  # ~2s
    
    # Concurrent execution
    start = time.time()
    result1, result2 = await asyncio.gather(
        fetch_data("api.example.com/1"),
        fetch_data("api.example.com/2")
    )
    print(f"Concurrent: {time.time() - start:.2f}s")  # ~1s

# Run the async function
asyncio.run(main())

ℹ️

asyncio.run() is the entry point for running async code. It creates a new event loop and runs the coroutine until it completes.

Event Loop Deep Dive

import asyncio
from asyncio import Queue
import random

class EventLoopDemo:
    """Demonstrate event loop concepts."""
    
    def __init__(self):
        self.loop = None
    
    async def producer(self, queue: Queue, name: str):
        """Async producer."""
        for i in range(5):
            item = f"{name}-{i}"
            await queue.put(item)
            print(f"Produced: {item}")
            await asyncio.sleep(random.uniform(0.1, 0.5))
        
        await queue.put(None)  # Sentinel
    
    async def consumer(self, queue: Queue, name: str):
        """Async consumer."""
        while True:
            item = await queue.get()
            if item is None:
                await queue.put(None)  # Pass sentinel to other consumers
                break
            
            print(f"{name} consumed: {item}")
            await asyncio.sleep(random.uniform(0.1, 0.3))
            queue.task_done()
    
    async def run_producer_consumer(self):
        """Run producer-consumer pattern."""
        queue = Queue(maxsize=10)
        
        producers = [
            self.producer(queue, f"Producer-{i}")
            for i in range(2)
        ]
        consumers = [
            self.consumer(queue, f"Consumer-{i}")
            for i in range(3)
        ]
        
        await asyncio.gather(*producers, *consumers)

# Custom event loop integration
async def custom_event_loop_demo():
    """Demonstrate custom event loop usage."""
    loop = asyncio.get_event_loop()
    
    # Schedule a callback
    def callback(future):
        print(f"Callback result: {future.result()}")
    
    # Create task
    async def background_task():
        await asyncio.sleep(1)
        return "Task complete"
    
    task = asyncio.create_task(background_task())
    task.add_done_callback(callback)
    
    await task

asyncio.run(custom_event_loop_demo())

Async Context Managers

import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator

# Async context manager class
class AsyncDatabaseConnection:
    """Async database connection with context manager."""
    
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None
    
    async def __aenter__(self):
        print(f"Connecting to {self.connection_string}")
        await asyncio.sleep(0.1)  # Simulate connection
        self.connection = {"status": "connected", "string": self.connection_string}
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection")
        await asyncio.sleep(0.05)
        self.connection = None
        return False  # Don't suppress exceptions
    
    async def execute(self, query: str):
        if not self.connection:
            raise RuntimeError("Not connected")
        await asyncio.sleep(0.05)
        return f"Result for: {query}"

# Function-based async context manager
@asynccontextmanager
async def async_file_handler(filename: str, mode: str = 'r'):
    """Async context manager for file operations."""
    print(f"Opening file: {filename}")
    # Simulate async file open
    await asyncio.sleep(0.01)
    file_obj = {"name": filename, "mode": mode, "closed": False}
    
    try:
        yield file_obj
    finally:
        print(f"Closing file: {filename}")
        await asyncio.sleep(0.01)
        file_obj["closed"] = True

# Usage
async def async_context_demo():
    # Class-based
    async with AsyncDatabaseConnection("postgresql://localhost/db") as db:
        result = await db.execute("SELECT * FROM users")
        print(result)
    
    # Function-based
    async with async_file_handler("data.txt", "w") as f:
        print(f"Writing to {f['name']}")

asyncio.run(async_context_demo())

Async Generators and Iterators

import asyncio
from typing import AsyncIterator, AsyncGenerator

# Async iterator
class AsyncCounter:
    """Async iterator implementation."""
    
    def __init__(self, start: int = 0, stop: int = 10):
        self.current = start
        self.stop = stop
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)  # Simulate async operation
        value = self.current
        self.current += 1
        return value

# Async generator
async def async_range(start: int, stop: int) -> AsyncGenerator[int, None]:
    """Async generator yielding numbers."""
    for i in range(start, stop):
        await asyncio.sleep(0.01)
        yield i

async def async_filter(predicate, async_iterable) -> AsyncGenerator:
    """Async filter function."""
    async for item in async_iterable:
        if predicate(item):
            yield item

async def async_map(function, async_iterable) -> AsyncGenerator:
    """Async map function."""
    async for item in async_iterable:
        yield function(item)

# Usage
async def async_iteration_demo():
    # Async generator
    async for num in async_range(0, 5):
        print(num, end=" ")
    print()
    
    # Async iterator
    counter = AsyncCounter(0, 5)
    async for num in counter:
        print(num, end=" ")
    print()
    
    # Composing async operations
    async def double(x):
        return x * 2
    
    async def is_even(x):
        return x % 2 == 0
    
    doubled = async_map(double, async_range(0, 10))
    evens = async_filter(is_even, doubled)
    
    async for num in evens:
        print(num, end=" ")

asyncio.run(async_iteration_demo())

⚠️

Async generators cannot be used with standard list comprehensions. Use async comprehensions: [x async for x in async_gen]

Concurrent Async Operations

import asyncio
from asyncio import TaskGroup
from typing import List, Any
import random

# Gathering tasks
async def fetch_with_timeout(url: str, timeout: float) -> dict:
    """Fetch with timeout protection."""
    try:
        async with asyncio.timeout(timeout):
            await asyncio.sleep(random.uniform(0.1, 2.0))
            return {"url": url, "status": "success"}
    except asyncio.TimeoutError:
        return {"url": url, "status": "timeout"}

async def concurrent_fetch_demo():
    """Demonstrate concurrent fetching with different patterns."""
    urls = [
        "api.example.com/1",
        "api.example.com/2",
        "api.example.com/3",
        "api.example.com/4",
        "api.example.com/5",
    ]
    
    # Basic gather
    tasks = [fetch_with_timeout(url, 1.0) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Process results
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(result)

# TaskGroup for structured concurrency
async def task_group_demo():
    """Demonstrate TaskGroup for structured concurrency."""
    async with TaskGroup() as tg:
        task1 = tg.create_task(fetch_with_timeout("api/1", 1.0))
        task2 = tg.create_task(fetch_with_timeout("api/2", 1.0))
        task3 = tg.create_task(fetch_with_timeout("api/3", 1.0))
    
    # All tasks guaranteed to be complete here
    print(task1.result())
    print(task2.result())
    print(task3.result())

# Semaphore for rate limiting
async def rate_limited_fetch(url: str, semaphore: asyncio.Semaphore):
    """Fetch with rate limiting."""
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(0.5)
        return {"url": url, "status": "success"}

async def rate_limiting_demo():
    """Demonstrate rate limiting with semaphore."""
    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent requests
    
    urls = [f"api.example.com/{i}" for i in range(10)]
    tasks = [rate_limited_fetch(url, semaphore) for url in urls]
    
    results = await asyncio.gather(*tasks)
    print(f"Completed {len(results)} requests")

asyncio.run(concurrent_fetch_demo())

Async Patterns and Best Practices

import asyncio
from dataclasses import dataclass
from typing import Callable, Any
from functools import wraps

# Async retry decorator
def async_retry(max_attempts: int = 3, delay: float = 1.0):
    """Decorator for retrying async functions."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        await asyncio.sleep(delay * (2 ** attempt))
            
            raise last_exception
        return wrapper
    return decorator

# Async caching
class AsyncLRUCache:
    """LRU cache for async functions."""
    
    def __init__(self, maxsize: int = 128):
        self.maxsize = maxsize
        self.cache = {}
        self.order = []
    
    def __call__(self, func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            key = (args, tuple(sorted(kwargs.items())))
            
            if key in self.cache:
                self.order.remove(key)
                self.order.append(key)
                return self.cache[key]
            
            result = await func(*args, **kwargs)
            
            if len(self.cache) >= self.maxsize:
                oldest = self.order.pop(0)
                del self.cache[oldest]
            
            self.cache[key] = result
            self.order.append(key)
            return result
        
        wrapper.cache_clear = lambda: (self.cache.clear(), self.order.clear())
        return wrapper

# Usage
@async_retry(max_attempts=3, delay=0.5)
async def unreliable_operation():
    import random
    if random.random() < 0.7:
        raise ConnectionError("Service unavailable")
    return "Success"

@AsyncLRUCache(maxsize=100)
async def expensive_computation(n: int) -> int:
    await asyncio.sleep(0.1)  # Simulate expensive work
    return n ** 2

async def best_practices_demo():
    # Use asyncio.gather for concurrent operations
    tasks = [expensive_computation(i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(results)
    
    # Use asyncio.wait for more control
    tasks = [asyncio.create_task(expensive_computation(i)) for i in range(5)]
    done, pending = await asyncio.wait(tasks, timeout=0.5)
    
    for task in done:
        print(f"Completed: {task.result()}")

asyncio.run(best_practices_demo())

Follow-Up Questions

  1. Explain the difference between asyncio and threading for I/O-bound tasks.

  2. How does the event loop handle blocking calls in async code?

  3. What is structured concurrency and why is it important?

  4. How do you test async code effectively?

  5. Explain the concept of backpressure in async systems.

Advertisement