🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Concurrency: ThreadPoolExecutor, ProcessPool, asyncio.gather

PythonConcurrency Patterns⭐ Premium

Advertisement

Google, Meta & Amazon Interview

Concurrency: ThreadPoolExecutor, ProcessPool, asyncio.gather

Advanced concurrency patterns for production applications

Interview Question

"Compare ThreadPoolExecutor, ProcessPoolExecutor, and asyncio.gather. When would you use each? How do you handle errors, timeouts, and resource limits? Write production-ready concurrent code."

Difficulty: Hard | Frequently asked at Google, Meta, Amazon


Theoretical Foundation

Concurrency Models Overview

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import cpu_count

# Task types
def io_bound_task(n):
    """Simulate I/O-bound task."""
    time.sleep(0.1)
    return f"IO result: {n}"

def cpu_bound_task(n):
    """Simulate CPU-bound task."""
    return sum(i * i for i in range(n))

# Comparison
print(f"CPU cores: {cpu_count()}")
print(f"I/O-bound: Use threading or asyncio")
print(f"CPU-bound: Use multiprocessing")

ℹ️

Key Concept: Choose concurrency model based on task type: I/O-bound (asyncio/threading) vs CPU-bound (multiprocessing).


ThreadPoolExecutor

Basic Usage

from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
from typing import List, Callable, Any

def fetch_url(url: str) -> dict:
    """Simulate HTTP request."""
    time.sleep(0.1)
    return {"url": url, "status": 200, "size": 1024}

def process_urls_basic(urls: List[str]) -> List[dict]:
    """Process URLs with thread pool."""
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(fetch_url, urls))
    return results

def process_urls_with_progress(urls: List[str]) -> List[dict]:
    """Process URLs with progress tracking."""
    results = []
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        # Submit all tasks
        future_to_url = {
            executor.submit(fetch_url, url): url 
            for url in urls
        }
        
        # Process completed tasks
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result()
                results.append(result)
                print(f"Completed: {url}")
            except Exception as e:
                print(f"Failed {url}: {e}")
                results.append({"url": url, "error": str(e)})
    
    return results

# Usage
urls = [f"https://api.example.com/{i}" for i in range(10)]
results = process_urls_with_progress(urls)
print(f"Processed {len(results)} URLs")

Error Handling

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from typing import List

def unreliable_task(n: int) -> int:
    """Task that might fail."""
    time.sleep(0.1)
    if n % 3 == 0:
        raise ValueError(f"Failed on {n}")
    return n * 2

def process_with_error_handling(tasks: List[int]) -> dict:
    """Process tasks with comprehensive error handling."""
    results = {"success": [], "failed": []}
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        future_to_task = {
            executor.submit(unreliable_task, task): task
            for task in tasks
        }
        
        for future in as_completed(future_to_task):
            task = future_to_task[future]
            try:
                result = future.result(timeout=5)
                results["success"].append({"task": task, "result": result})
            except ValueError as e:
                results["failed"].append({"task": task, "error": str(e)})
            except TimeoutError:
                results["failed"].append({"task": task, "error": "Timeout"})
            except Exception as e:
                results["failed"].append({"task": task, "error": f"Unexpected: {e}"})
    
    return results

# Usage
tasks = list(range(10))
results = process_with_error_handling(tasks)
print(f"Success: {len(results['success'])}")
print(f"Failed: {len(results['failed'])}")

Output:

Architecture Diagram
Success: 7
Failed: 3

Resource Management

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
from typing import List
from contextlib import contextmanager

class RateLimiter:
    """Rate limiter for thread pool."""
    
    def __init__(self, max_calls_per_second: float):
        self.min_interval = 1.0 / max_calls_per_second
        self.last_called = [0.0]
        self.lock = threading.Lock()
    
    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_called[0]
            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)
            self.last_called[0] = time.time()

def rate_limited_task(url: str, limiter: RateLimiter) -> dict:
    """Task with rate limiting."""
    limiter.wait()
    time.sleep(0.1)  # Simulate work
    return {"url": url, "status": 200}

def process_with_rate_limiting(urls: List[str], max_rps: float = 10) -> List[dict]:
    """Process URLs with rate limiting."""
    limiter = RateLimiter(max_rps)
    results = []
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(rate_limited_task, url, limiter)
            for url in urls
        ]
        
        for future in as_completed(futures):
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"Error: {e}")
    
    return results

# Usage
urls = [f"https://api.example.com/{i}" for i in range(20)]
results = process_with_rate_limiting(urls, max_rps=5)
print(f"Processed {len(results)} URLs with rate limiting")

💡

Interview Tip: Discuss thread safety, resource limits, and error handling when talking about thread pools.


ProcessPoolExecutor

Basic Usage

from concurrent.futures import ProcessPoolExecutor, as_completed
import os
import time
from typing import List

def cpu_intensive_task(n: int) -> dict:
    """CPU-bound task."""
    start = time.time()
    result = sum(i * i for i in range(n))
    return {
        "input": n,
        "result": result,
        "pid": os.getpid(),
        "time": time.time() - start
    }

def parallel_computation(numbers: List[int], max_workers: int = None) -> List[dict]:
    """Process CPU-bound tasks in parallel."""
    if max_workers is None:
        max_workers = os.cpu_count()
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(cpu_intensive_task, n): n
            for n in numbers
        }
        
        results = []
        for future in as_completed(futures):
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"Error: {e}")
        
        return results

# Usage
numbers = [100000, 200000, 300000, 400000]
results = parallel_computation(numbers)

for result in results:
    print(f"Input: {result['input']}, PID: {result['pid']}, Time: {result['time']:.3f}s")

Shared State

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time
from typing import List

def worker_with_shared_state(shared_counter, shared_array, task_id):
    """Worker that uses shared state."""
    # Modify shared counter
    with shared_counter.get_lock():
        shared_counter.value += 1
    
    # Modify shared array
    with shared_array.get_lock():
        shared_array[task_id % len(shared_array)] += 1
    
    time.sleep(0.1)
    return f"Task {task_id} completed"

def parallel_with_shared_state():
    """Demonstrate shared state in process pool."""
    shared_counter = multiprocessing.Value('i', 0)
    shared_array = multiprocessing.Array('i', [0, 0, 0, 0, 0])
    
    tasks = list(range(20))
    
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [
            executor.submit(worker_with_shared_state, shared_counter, shared_array, task)
            for task in tasks
        ]
        
        results = [f.result() for f in futures]
    
    print(f"Counter: {shared_counter.value}")
    print(f"Array: {list(shared_array)}")
    print(f"Results: {len(results)}")

if __name__ == "__main__":
    parallel_with_shared_state()

Inter-Process Communication

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time
from typing import List

def producer(queue, n_items):
    """Producer task."""
    for i in range(n_items):
        item = {"id": i, "data": f"item_{i}"}
        queue.put(item)
        time.sleep(0.01)
    queue.put(None)  # Sentinel

def consumer(queue, results):
    """Consumer task."""
    while True:
        item = queue.get()
        if item is None:
            break
        results.append(item)

def producer_consumer_example():
    """Producer-consumer with process pool."""
    queue = multiprocessing.Queue()
    manager = multiprocessing.Manager()
    results = manager.list()
    
    # Start producer
    producer_process = multiprocessing.Process(
        target=producer, args=(queue, 10)
    )
    producer_process.start()
    
    # Start consumers
    consumer_processes = []
    for _ in range(3):
        p = multiprocessing.Process(
            target=consumer, args=(queue, results)
        )
        consumer_processes.append(p)
        p.start()
    
    # Wait for completion
    producer_process.join()
    for p in consumer_processes:
        p.join()
    
    print(f"Processed {len(results)} items")

if __name__ == "__main__":
    producer_consumer_example()

⚠️

Memory Note: ProcessPoolExecutor has higher memory overhead than ThreadPoolExecutor due to separate Python interpreters.


asyncio.gather

Basic Usage

import asyncio
import time
from typing import List

async def async_fetch_url(url: str) -> dict:
    """Async HTTP request (simulated)."""
    await asyncio.sleep(0.1)  # Simulate I/O
    return {"url": url, "status": 200, "size": 1024}

async def fetch_all_urls(urls: List[str]) -> List[dict]:
    """Fetch multiple URLs concurrently."""
    tasks = [async_fetch_url(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

async def fetch_with_progress(urls: List[str]) -> List[dict]:
    """Fetch URLs with progress tracking."""
    results = []
    tasks = []
    
    for url in urls:
        task = asyncio.create_task(async_fetch_url(url))
        tasks.append(task)
    
    for task in asyncio.as_completed(tasks):
        result = await task
        results.append(result)
        print(f"Completed: {result['url']}")
    
    return results

# Usage
async def main():
    urls = [f"https://api.example.com/{i}" for i in range(10)]
    results = await fetch_all_urls(urls)
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())

Error Handling

import asyncio
from typing import List

async def unreliable_async_task(n: int) -> int:
    """Async task that might fail."""
    await asyncio.sleep(0.1)
    if n % 3 == 0:
        raise ValueError(f"Failed on {n}")
    return n * 2

async def gather_with_error_handling(tasks: List[int]) -> dict:
    """asyncio.gather with error handling."""
    results = {"success": [], "failed": []}
    
    # Create coroutine objects
    coroutines = [unreliable_async_task(task) for task in tasks]
    
    # Use return_exceptions=True to get exceptions as results
    task_results = await asyncio.gather(*coroutines, return_exceptions=True)
    
    for task, result in zip(tasks, task_results):
        if isinstance(result, Exception):
            results["failed"].append({"task": task, "error": str(result)})
        else:
            results["success"].append({"task": task, "result": result})
    
    return results

# Usage
async def main():
    tasks = list(range(10))
    results = await gather_with_error_handling(tasks)
    print(f"Success: {len(results['success'])}")
    print(f"Failed: {len(results['failed'])}")

asyncio.run(main())

Output:

Architecture Diagram
Success: 7
Failed: 3

Timeout and Cancellation

import asyncio
from typing import List

async def slow_task(n: int, delay: float) -> int:
    """Task that might be slow."""
    await asyncio.sleep(delay)
    return n

async def fetch_with_timeout(tasks: List[tuple], timeout: float) -> dict:
    """Fetch with timeout and cancellation."""
    results = {"completed": [], "timeout": [], "cancelled": []}
    
    async def run_with_timeout(task_id, coro, task_timeout):
        try:
            result = await asyncio.wait_for(coro, timeout=task_timeout)
            results["completed"].append({"task": task_id, "result": result})
        except asyncio.TimeoutError:
            results["timeout"].append(task_id)
    
    # Create tasks
    coroutines = [
        run_with_timeout(task_id, slow_task(task_id, delay), timeout)
        for task_id, delay in tasks
    ]
    
    await asyncio.gather(*coroutines, return_exceptions=True)
    return results

# Usage
async def main():
    tasks = [(1, 0.1), (2, 0.2), (3, 1.0), (4, 0.3), (5, 2.0)]
    results = await fetch_with_timeout(tasks, timeout=0.5)
    
    print(f"Completed: {results['completed']}")
    print(f"Timeout: {results['timeout']}")

asyncio.run(main())

Output:

Architecture Diagram
Completed: [{'task': 1, 'result': 1}, {'task': 2, 'result': 2}, {'task': 4, 'result': 4}]
Timeout: [3, 5]

ℹ️

Key Feature: asyncio.gather(return_exceptions=True) allows handling exceptions without cancelling other tasks.


Advanced Patterns

Combining asyncio with ProcessPool

import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
from typing import List

def cpu_intensive_task(data: List[int]) -> int:
    """CPU-bound task in separate process."""
    return sum(x * x for x in data)

async def async_cpu_task(data: List[int]) -> int:
    """Run CPU-bound task in process pool from async code."""
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_intensive_task, data)
    return result

async def process_concurrent_cpu_tasks(data_chunks: List[List[int]]) -> List[int]:
    """Process multiple CPU-bound tasks concurrently."""
    tasks = [async_cpu_task(chunk) for chunk in data_chunks]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    # Split data into chunks
    data = list(range(1000000))
    chunk_size = len(data) // 4
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
    
    # Process concurrently
    start = time.time()
    results = await process_concurrent_cpu_tasks(chunks)
    elapsed = time.time() - start
    
    print(f"Processed {len(chunks)} chunks in {elapsed:.3f}s")
    print(f"Results: {results[:5]}...")

asyncio.run(main())

Combining asyncio with ThreadPool

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from typing import List

def blocking_io_operation(url: str) -> dict:
    """Blocking I/O operation."""
    time.sleep(0.1)  # Simulate blocking I/O
    return {"url": url, "status": 200}

async def async_io_operation(url: str) -> dict:
    """Run blocking I/O in thread pool."""
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io_operation, url)
    return result

async def process_concurrent_io_tasks(urls: List[str]) -> List[dict]:
    """Process multiple I/O-bound tasks concurrently."""
    tasks = [async_io_operation(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    urls = [f"https://api.example.com/{i}" for i in range(20)]
    
    start = time.time()
    results = await process_concurrent_io_tasks(urls)
    elapsed = time.time() - start
    
    print(f"Processed {len(urls)} URLs in {elapsed:.3f}s")

asyncio.run(main())

Task Groups (Python 3.11+)

import asyncio
from typing import List

async def async_task(n: int) -> int:
    """Simple async task."""
    await asyncio.sleep(0.1)
    if n % 3 == 0:
        raise ValueError(f"Failed on {n}")
    return n * 2

async def process_with_task_group(tasks: List[int]) -> dict:
    """Process tasks using TaskGroup."""
    results = {"success": [], "failed": []}
    
    try:
        async with asyncio.TaskGroup() as tg:
            # All tasks run concurrently
            task_coros = [async_task(task) for task in tasks]
            task_objects = [tg.create_task(coro) for coro in task_coros]
        
        # All tasks completed successfully
        for task, result in zip(tasks, task_objects):
            results["success"].append({"task": task, "result": result.result()})
    
    except* ValueError as eg:
        # Handle exceptions
        for exc in eg.exceptions:
            results["failed"].append({"error": str(exc)})
    
    return results

# Usage (Python 3.11+)
async def main():
    tasks = list(range(10))
    results = await process_with_task_group(tasks)
    print(f"Success: {len(results['success'])}")
    print(f"Failed: {len(results['failed'])}")

asyncio.run(main())

💡

Python 3.11+: Use TaskGroup for structured concurrency. It's cleaner than gather for many use cases.


Production Patterns

Circuit Breaker

import asyncio
import time
from enum import Enum
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for async operations."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        success_threshold: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def record_success(self):
        self.success_count += 1
        if self.state == CircuitState.HALF_OPEN:
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                self.success_count = 0
    
    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
    
    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        elif self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        else:  # HALF_OPEN
            return True

async def call_with_circuit_breaker(
    func: Callable,
    circuit_breaker: CircuitBreaker,
    *args,
    **kwargs
) -> Any:
    """Execute function with circuit breaker."""
    if not circuit_breaker.can_execute():
        raise Exception("Circuit breaker is OPEN")
    
    try:
        result = await func(*args, **kwargs)
        circuit_breaker.record_success()
        return result
    except Exception as e:
        circuit_breaker.record_failure()
        raise

# Usage
async def unreliable_api_call(url: str) -> dict:
    """Simulated unreliable API call."""
    await asyncio.sleep(0.1)
    if hash(url) % 3 == 0:
        raise ConnectionError("API failed")
    return {"url": url, "status": 200}

async def main():
    circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
    
    urls = [f"https://api.example.com/{i}" for i in range(20)]
    
    results = []
    for url in urls:
        try:
            result = await call_with_circuit_breaker(
                unreliable_api_call, circuit, url
            )
            results.append(result)
        except Exception as e:
            results.append({"url": url, "error": str(e)})
    
    print(f"Results: {len(results)}")
    print(f"Circuit state: {circuit.state}")

asyncio.run(main())

Rate Limiting

import asyncio
import time
from collections import deque

class AsyncRateLimiter:
    """Async rate limiter using sliding window."""
    
    def __init__(self, max_calls: int, time_window: float):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = deque()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        async with self.lock:
            now = time.time()
            
            # Remove old calls outside time window
            while self.calls and now - self.calls[0] >= self.time_window:
                self.calls.popleft()
            
            if len(self.calls) >= self.max_calls:
                # Wait until oldest call expires
                wait_time = self.time_window - (now - self.calls[0])
                await asyncio.sleep(wait_time)
                self.calls.popleft()
            
            self.calls.append(time.time())

async def rate_limited_api_call(
    limiter: AsyncRateLimiter,
    url: str
) -> dict:
    """API call with rate limiting."""
    await limiter.acquire()
    await asyncio.sleep(0.1)  # Simulate API call
    return {"url": url, "status": 200}

async def main():
    limiter = AsyncRateLimiter(max_calls=5, time_window=1.0)
    
    urls = [f"https://api.example.com/{i}" for i in range(20)]
    
    tasks = [rate_limited_api_call(limiter, url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    print(f"Processed {len(results)} URLs")

asyncio.run(main())

ℹ️

Production Pattern: Rate limiting prevents overwhelming APIs and respects usage quotas.


Complexity Analysis

Performance Comparison

PatternI/O-boundCPU-boundMemoryComplexity
ThreadPoolGoodPoorMediumLow
ProcessPoolPoorExcellentHighMedium
asyncioExcellentPoorLowHigh
CombinedExcellentExcellentMediumVery High

Time Complexity

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def io_task(n):
    time.sleep(0.1)
    return n

def cpu_task(n):
    return sum(i * i for i in range(n))

# Sequential: O(n * t_io)
# ThreadPool: O(n * t_io / max_workers)
# ProcessPool: O(n * t_cpu / num_cores)
# asyncio: O(n * t_io / concurrency)

# Example timing
async def benchmark():
    n = 20
    
    # Sequential
    start = time.time()
    for i in range(n):
        await asyncio.sleep(0.1)
    seq_time = time.time() - start
    
    # asyncio.gather
    start = time.time()
    await asyncio.gather(*[asyncio.sleep(0.1) for _ in range(n)])
    async_time = time.time() - start
    
    print(f"Sequential: {seq_time:.2f}s")
    print(f"asyncio: {async_time:.2f}s")
    print(f"Speedup: {seq_time/async_time:.1f}x")

asyncio.run(benchmark())

Output:

Architecture Diagram
Sequential: 2.01s
asyncio: 0.11s
Speedup: 18.3x

Interview Tips

Common Follow-up Questions

  1. "When would you use ProcessPoolExecutor over ThreadPoolExecutor?"

    • CPU-bound tasks: ProcessPool
    • I/O-bound tasks: ThreadPool
    • Memory isolation: ProcessPool
    • Shared state: ThreadPool
  2. "How do you handle exceptions in asyncio.gather?"

    • Use return_exceptions=True
    • Use try/except with individual tasks
    • Use TaskGroup (Python 3.11+)
  3. "What are the limits of each approach?"

    • ThreadPool: GIL limits CPU parallelism
    • ProcessPool: High memory overhead
    • asyncio: Single-threaded, async-only

Code Review Tips

# BAD: No error handling
with ThreadPoolExecutor() as executor:
    results = executor.map(unreliable_task, tasks)

# GOOD: With error handling
with ThreadPoolExecutor() as executor:
    futures = [executor.submit(unreliable_task, task) for task in tasks]
    for future in as_completed(futures):
        try:
            result = future.result()
        except Exception as e:
            print(f"Error: {e}")

# BAD: Blocking in asyncio
async def bad_example():
    import time
    time.sleep(1)  # Blocks event loop!

# GOOD: Non-blocking
async def good_example():
    await asyncio.sleep(1)  # Yields control

# BAD: No resource limits
async def bad_unlimited():
    tasks = [asyncio.sleep(0.1) for _ in range(10000)]
    await asyncio.gather(*tasks)

# GOOD: With semaphore
async def good_limited():
    sem = asyncio.Semaphore(100)
    async def limited_task():
        async with sem:
            await asyncio.sleep(0.1)
    tasks = [limited_task() for _ in range(10000)]
    await asyncio.gather(*tasks)

⚠️

Common Mistake: Using time.sleep() in asyncio code blocks the event loop. Always use await asyncio.sleep().


Summary

ApproachBest ForLimitationsComplexity
ThreadPoolI/O-bound, shared stateGIL, CPU-boundLow
ProcessPoolCPU-bound, isolationMemory, IPCMedium
asyncioHigh concurrency I/OAsync-only, single-threadHigh
CombinedMixed workloadsComplexVery High

Decision Framework

  1. I/O-bound (< 100 tasks): ThreadPool
  2. I/O-bound (> 100 tasks): asyncio
  3. CPU-bound: ProcessPool
  4. Mixed: asyncio + ProcessPool/ThreadPool
  5. High concurrency: asyncio with semaphores

ℹ️

Key Takeaway: Choose the right concurrency model for your workload. In production, combine approaches for optimal performance.


Practice Problems

  1. Web Scraper: Build a scraper using asyncio for thousands of pages
  2. Image Processor: Process images in parallel with ProcessPool
  3. API Gateway: Create a gateway with rate limiting and circuit breaker
  4. Task Scheduler: Build a scheduler using asyncio and thread pools
  5. Real-time Dashboard: Create a dashboard with concurrent data fetching

Further Reading

  • Python Docs: concurrent.futures, asyncio
  • PEP 3156: asyncio API
  • Books: "Python Concurrency with asyncio" by Matthew Fowler
  • Advanced: trio, anyio libraries

Remember: Mastering concurrency patterns is essential for building scalable Python applications.

Advertisement