Concurrency: ThreadPoolExecutor, ProcessPool, asyncio.gather
Advanced concurrency patterns for production applications
Interview Question
"Compare ThreadPoolExecutor, ProcessPoolExecutor, and asyncio.gather. When would you use each? How do you handle errors, timeouts, and resource limits? Write production-ready concurrent code."
Difficulty: Hard | Frequently asked at Google, Meta, Amazon
Theoretical Foundation
Concurrency Models Overview
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import cpu_count
# Task types
def io_bound_task(n):
"""Simulate I/O-bound task."""
time.sleep(0.1)
return f"IO result: {n}"
def cpu_bound_task(n):
"""Simulate CPU-bound task."""
return sum(i * i for i in range(n))
# Comparison
print(f"CPU cores: {cpu_count()}")
print(f"I/O-bound: Use threading or asyncio")
print(f"CPU-bound: Use multiprocessing")
ℹ️
Key Concept: Choose concurrency model based on task type: I/O-bound (asyncio/threading) vs CPU-bound (multiprocessing).
ThreadPoolExecutor
Basic Usage
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
from typing import List, Callable, Any
def fetch_url(url: str) -> dict:
"""Simulate HTTP request."""
time.sleep(0.1)
return {"url": url, "status": 200, "size": 1024}
def process_urls_basic(urls: List[str]) -> List[dict]:
"""Process URLs with thread pool."""
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_url, urls))
return results
def process_urls_with_progress(urls: List[str]) -> List[dict]:
"""Process URLs with progress tracking."""
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit all tasks
future_to_url = {
executor.submit(fetch_url, url): url
for url in urls
}
# Process completed tasks
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
print(f"Completed: {url}")
except Exception as e:
print(f"Failed {url}: {e}")
results.append({"url": url, "error": str(e)})
return results
# Usage
urls = [f"https://api.example.com/{i}" for i in range(10)]
results = process_urls_with_progress(urls)
print(f"Processed {len(results)} URLs")
Error Handling
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from typing import List
def unreliable_task(n: int) -> int:
"""Task that might fail."""
time.sleep(0.1)
if n % 3 == 0:
raise ValueError(f"Failed on {n}")
return n * 2
def process_with_error_handling(tasks: List[int]) -> dict:
"""Process tasks with comprehensive error handling."""
results = {"success": [], "failed": []}
with ThreadPoolExecutor(max_workers=3) as executor:
future_to_task = {
executor.submit(unreliable_task, task): task
for task in tasks
}
for future in as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result(timeout=5)
results["success"].append({"task": task, "result": result})
except ValueError as e:
results["failed"].append({"task": task, "error": str(e)})
except TimeoutError:
results["failed"].append({"task": task, "error": "Timeout"})
except Exception as e:
results["failed"].append({"task": task, "error": f"Unexpected: {e}"})
return results
# Usage
tasks = list(range(10))
results = process_with_error_handling(tasks)
print(f"Success: {len(results['success'])}")
print(f"Failed: {len(results['failed'])}")
Output:
Success: 7
Failed: 3
Resource Management
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
from typing import List
from contextlib import contextmanager
class RateLimiter:
"""Rate limiter for thread pool."""
def __init__(self, max_calls_per_second: float):
self.min_interval = 1.0 / max_calls_per_second
self.last_called = [0.0]
self.lock = threading.Lock()
def wait(self):
with self.lock:
elapsed = time.time() - self.last_called[0]
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_called[0] = time.time()
def rate_limited_task(url: str, limiter: RateLimiter) -> dict:
"""Task with rate limiting."""
limiter.wait()
time.sleep(0.1) # Simulate work
return {"url": url, "status": 200}
def process_with_rate_limiting(urls: List[str], max_rps: float = 10) -> List[dict]:
"""Process URLs with rate limiting."""
limiter = RateLimiter(max_rps)
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(rate_limited_task, url, limiter)
for url in urls
]
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Error: {e}")
return results
# Usage
urls = [f"https://api.example.com/{i}" for i in range(20)]
results = process_with_rate_limiting(urls, max_rps=5)
print(f"Processed {len(results)} URLs with rate limiting")
💡
Interview Tip: Discuss thread safety, resource limits, and error handling when talking about thread pools.
ProcessPoolExecutor
Basic Usage
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
import time
from typing import List
def cpu_intensive_task(n: int) -> dict:
"""CPU-bound task."""
start = time.time()
result = sum(i * i for i in range(n))
return {
"input": n,
"result": result,
"pid": os.getpid(),
"time": time.time() - start
}
def parallel_computation(numbers: List[int], max_workers: int = None) -> List[dict]:
"""Process CPU-bound tasks in parallel."""
if max_workers is None:
max_workers = os.cpu_count()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(cpu_intensive_task, n): n
for n in numbers
}
results = []
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Error: {e}")
return results
# Usage
numbers = [100000, 200000, 300000, 400000]
results = parallel_computation(numbers)
for result in results:
print(f"Input: {result['input']}, PID: {result['pid']}, Time: {result['time']:.3f}s")
Shared State
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time
from typing import List
def worker_with_shared_state(shared_counter, shared_array, task_id):
"""Worker that uses shared state."""
# Modify shared counter
with shared_counter.get_lock():
shared_counter.value += 1
# Modify shared array
with shared_array.get_lock():
shared_array[task_id % len(shared_array)] += 1
time.sleep(0.1)
return f"Task {task_id} completed"
def parallel_with_shared_state():
"""Demonstrate shared state in process pool."""
shared_counter = multiprocessing.Value('i', 0)
shared_array = multiprocessing.Array('i', [0, 0, 0, 0, 0])
tasks = list(range(20))
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(worker_with_shared_state, shared_counter, shared_array, task)
for task in tasks
]
results = [f.result() for f in futures]
print(f"Counter: {shared_counter.value}")
print(f"Array: {list(shared_array)}")
print(f"Results: {len(results)}")
if __name__ == "__main__":
parallel_with_shared_state()
Inter-Process Communication
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time
from typing import List
def producer(queue, n_items):
"""Producer task."""
for i in range(n_items):
item = {"id": i, "data": f"item_{i}"}
queue.put(item)
time.sleep(0.01)
queue.put(None) # Sentinel
def consumer(queue, results):
"""Consumer task."""
while True:
item = queue.get()
if item is None:
break
results.append(item)
def producer_consumer_example():
"""Producer-consumer with process pool."""
queue = multiprocessing.Queue()
manager = multiprocessing.Manager()
results = manager.list()
# Start producer
producer_process = multiprocessing.Process(
target=producer, args=(queue, 10)
)
producer_process.start()
# Start consumers
consumer_processes = []
for _ in range(3):
p = multiprocessing.Process(
target=consumer, args=(queue, results)
)
consumer_processes.append(p)
p.start()
# Wait for completion
producer_process.join()
for p in consumer_processes:
p.join()
print(f"Processed {len(results)} items")
if __name__ == "__main__":
producer_consumer_example()
⚠️
Memory Note: ProcessPoolExecutor has higher memory overhead than ThreadPoolExecutor due to separate Python interpreters.
asyncio.gather
Basic Usage
import asyncio
import time
from typing import List
async def async_fetch_url(url: str) -> dict:
"""Async HTTP request (simulated)."""
await asyncio.sleep(0.1) # Simulate I/O
return {"url": url, "status": 200, "size": 1024}
async def fetch_all_urls(urls: List[str]) -> List[dict]:
"""Fetch multiple URLs concurrently."""
tasks = [async_fetch_url(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_with_progress(urls: List[str]) -> List[dict]:
"""Fetch URLs with progress tracking."""
results = []
tasks = []
for url in urls:
task = asyncio.create_task(async_fetch_url(url))
tasks.append(task)
for task in asyncio.as_completed(tasks):
result = await task
results.append(result)
print(f"Completed: {result['url']}")
return results
# Usage
async def main():
urls = [f"https://api.example.com/{i}" for i in range(10)]
results = await fetch_all_urls(urls)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
Error Handling
import asyncio
from typing import List
async def unreliable_async_task(n: int) -> int:
"""Async task that might fail."""
await asyncio.sleep(0.1)
if n % 3 == 0:
raise ValueError(f"Failed on {n}")
return n * 2
async def gather_with_error_handling(tasks: List[int]) -> dict:
"""asyncio.gather with error handling."""
results = {"success": [], "failed": []}
# Create coroutine objects
coroutines = [unreliable_async_task(task) for task in tasks]
# Use return_exceptions=True to get exceptions as results
task_results = await asyncio.gather(*coroutines, return_exceptions=True)
for task, result in zip(tasks, task_results):
if isinstance(result, Exception):
results["failed"].append({"task": task, "error": str(result)})
else:
results["success"].append({"task": task, "result": result})
return results
# Usage
async def main():
tasks = list(range(10))
results = await gather_with_error_handling(tasks)
print(f"Success: {len(results['success'])}")
print(f"Failed: {len(results['failed'])}")
asyncio.run(main())
Output:
Success: 7
Failed: 3
Timeout and Cancellation
import asyncio
from typing import List
async def slow_task(n: int, delay: float) -> int:
"""Task that might be slow."""
await asyncio.sleep(delay)
return n
async def fetch_with_timeout(tasks: List[tuple], timeout: float) -> dict:
"""Fetch with timeout and cancellation."""
results = {"completed": [], "timeout": [], "cancelled": []}
async def run_with_timeout(task_id, coro, task_timeout):
try:
result = await asyncio.wait_for(coro, timeout=task_timeout)
results["completed"].append({"task": task_id, "result": result})
except asyncio.TimeoutError:
results["timeout"].append(task_id)
# Create tasks
coroutines = [
run_with_timeout(task_id, slow_task(task_id, delay), timeout)
for task_id, delay in tasks
]
await asyncio.gather(*coroutines, return_exceptions=True)
return results
# Usage
async def main():
tasks = [(1, 0.1), (2, 0.2), (3, 1.0), (4, 0.3), (5, 2.0)]
results = await fetch_with_timeout(tasks, timeout=0.5)
print(f"Completed: {results['completed']}")
print(f"Timeout: {results['timeout']}")
asyncio.run(main())
Output:
Completed: [{'task': 1, 'result': 1}, {'task': 2, 'result': 2}, {'task': 4, 'result': 4}]
Timeout: [3, 5]
ℹ️
Key Feature: asyncio.gather(return_exceptions=True) allows handling exceptions without cancelling other tasks.
Advanced Patterns
Combining asyncio with ProcessPool
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
from typing import List
def cpu_intensive_task(data: List[int]) -> int:
"""CPU-bound task in separate process."""
return sum(x * x for x in data)
async def async_cpu_task(data: List[int]) -> int:
"""Run CPU-bound task in process pool from async code."""
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive_task, data)
return result
async def process_concurrent_cpu_tasks(data_chunks: List[List[int]]) -> List[int]:
"""Process multiple CPU-bound tasks concurrently."""
tasks = [async_cpu_task(chunk) for chunk in data_chunks]
results = await asyncio.gather(*tasks)
return results
async def main():
# Split data into chunks
data = list(range(1000000))
chunk_size = len(data) // 4
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# Process concurrently
start = time.time()
results = await process_concurrent_cpu_tasks(chunks)
elapsed = time.time() - start
print(f"Processed {len(chunks)} chunks in {elapsed:.3f}s")
print(f"Results: {results[:5]}...")
asyncio.run(main())
Combining asyncio with ThreadPool
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from typing import List
def blocking_io_operation(url: str) -> dict:
"""Blocking I/O operation."""
time.sleep(0.1) # Simulate blocking I/O
return {"url": url, "status": 200}
async def async_io_operation(url: str) -> dict:
"""Run blocking I/O in thread pool."""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io_operation, url)
return result
async def process_concurrent_io_tasks(urls: List[str]) -> List[dict]:
"""Process multiple I/O-bound tasks concurrently."""
tasks = [async_io_operation(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [f"https://api.example.com/{i}" for i in range(20)]
start = time.time()
results = await process_concurrent_io_tasks(urls)
elapsed = time.time() - start
print(f"Processed {len(urls)} URLs in {elapsed:.3f}s")
asyncio.run(main())
Task Groups (Python 3.11+)
import asyncio
from typing import List
async def async_task(n: int) -> int:
"""Simple async task."""
await asyncio.sleep(0.1)
if n % 3 == 0:
raise ValueError(f"Failed on {n}")
return n * 2
async def process_with_task_group(tasks: List[int]) -> dict:
"""Process tasks using TaskGroup."""
results = {"success": [], "failed": []}
try:
async with asyncio.TaskGroup() as tg:
# All tasks run concurrently
task_coros = [async_task(task) for task in tasks]
task_objects = [tg.create_task(coro) for coro in task_coros]
# All tasks completed successfully
for task, result in zip(tasks, task_objects):
results["success"].append({"task": task, "result": result.result()})
except* ValueError as eg:
# Handle exceptions
for exc in eg.exceptions:
results["failed"].append({"error": str(exc)})
return results
# Usage (Python 3.11+)
async def main():
tasks = list(range(10))
results = await process_with_task_group(tasks)
print(f"Success: {len(results['success'])}")
print(f"Failed: {len(results['failed'])}")
asyncio.run(main())
💡
Python 3.11+: Use TaskGroup for structured concurrency. It's cleaner than gather for many use cases.
Production Patterns
Circuit Breaker
import asyncio
import time
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for async operations."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
success_threshold: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def record_success(self):
self.success_count += 1
if self.state == CircuitState.HALF_OPEN:
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
else: # HALF_OPEN
return True
async def call_with_circuit_breaker(
func: Callable,
circuit_breaker: CircuitBreaker,
*args,
**kwargs
) -> Any:
"""Execute function with circuit breaker."""
if not circuit_breaker.can_execute():
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
circuit_breaker.record_success()
return result
except Exception as e:
circuit_breaker.record_failure()
raise
# Usage
async def unreliable_api_call(url: str) -> dict:
"""Simulated unreliable API call."""
await asyncio.sleep(0.1)
if hash(url) % 3 == 0:
raise ConnectionError("API failed")
return {"url": url, "status": 200}
async def main():
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
urls = [f"https://api.example.com/{i}" for i in range(20)]
results = []
for url in urls:
try:
result = await call_with_circuit_breaker(
unreliable_api_call, circuit, url
)
results.append(result)
except Exception as e:
results.append({"url": url, "error": str(e)})
print(f"Results: {len(results)}")
print(f"Circuit state: {circuit.state}")
asyncio.run(main())
Rate Limiting
import asyncio
import time
from collections import deque
class AsyncRateLimiter:
"""Async rate limiter using sliding window."""
def __init__(self, max_calls: int, time_window: float):
self.max_calls = max_calls
self.time_window = time_window
self.calls = deque()
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
# Remove old calls outside time window
while self.calls and now - self.calls[0] >= self.time_window:
self.calls.popleft()
if len(self.calls) >= self.max_calls:
# Wait until oldest call expires
wait_time = self.time_window - (now - self.calls[0])
await asyncio.sleep(wait_time)
self.calls.popleft()
self.calls.append(time.time())
async def rate_limited_api_call(
limiter: AsyncRateLimiter,
url: str
) -> dict:
"""API call with rate limiting."""
await limiter.acquire()
await asyncio.sleep(0.1) # Simulate API call
return {"url": url, "status": 200}
async def main():
limiter = AsyncRateLimiter(max_calls=5, time_window=1.0)
urls = [f"https://api.example.com/{i}" for i in range(20)]
tasks = [rate_limited_api_call(limiter, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} URLs")
asyncio.run(main())
ℹ️
Production Pattern: Rate limiting prevents overwhelming APIs and respects usage quotas.
Complexity Analysis
Performance Comparison
| Pattern | I/O-bound | CPU-bound | Memory | Complexity |
|---|---|---|---|---|
| ThreadPool | Good | Poor | Medium | Low |
| ProcessPool | Poor | Excellent | High | Medium |
| asyncio | Excellent | Poor | Low | High |
| Combined | Excellent | Excellent | Medium | Very High |
Time Complexity
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def io_task(n):
time.sleep(0.1)
return n
def cpu_task(n):
return sum(i * i for i in range(n))
# Sequential: O(n * t_io)
# ThreadPool: O(n * t_io / max_workers)
# ProcessPool: O(n * t_cpu / num_cores)
# asyncio: O(n * t_io / concurrency)
# Example timing
async def benchmark():
n = 20
# Sequential
start = time.time()
for i in range(n):
await asyncio.sleep(0.1)
seq_time = time.time() - start
# asyncio.gather
start = time.time()
await asyncio.gather(*[asyncio.sleep(0.1) for _ in range(n)])
async_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")
print(f"asyncio: {async_time:.2f}s")
print(f"Speedup: {seq_time/async_time:.1f}x")
asyncio.run(benchmark())
Output:
Sequential: 2.01s
asyncio: 0.11s
Speedup: 18.3x
Interview Tips
Common Follow-up Questions
-
"When would you use ProcessPoolExecutor over ThreadPoolExecutor?"
- CPU-bound tasks: ProcessPool
- I/O-bound tasks: ThreadPool
- Memory isolation: ProcessPool
- Shared state: ThreadPool
-
"How do you handle exceptions in asyncio.gather?"
- Use
return_exceptions=True - Use
try/exceptwith individual tasks - Use
TaskGroup(Python 3.11+)
- Use
-
"What are the limits of each approach?"
- ThreadPool: GIL limits CPU parallelism
- ProcessPool: High memory overhead
- asyncio: Single-threaded, async-only
Code Review Tips
# BAD: No error handling
with ThreadPoolExecutor() as executor:
results = executor.map(unreliable_task, tasks)
# GOOD: With error handling
with ThreadPoolExecutor() as executor:
futures = [executor.submit(unreliable_task, task) for task in tasks]
for future in as_completed(futures):
try:
result = future.result()
except Exception as e:
print(f"Error: {e}")
# BAD: Blocking in asyncio
async def bad_example():
import time
time.sleep(1) # Blocks event loop!
# GOOD: Non-blocking
async def good_example():
await asyncio.sleep(1) # Yields control
# BAD: No resource limits
async def bad_unlimited():
tasks = [asyncio.sleep(0.1) for _ in range(10000)]
await asyncio.gather(*tasks)
# GOOD: With semaphore
async def good_limited():
sem = asyncio.Semaphore(100)
async def limited_task():
async with sem:
await asyncio.sleep(0.1)
tasks = [limited_task() for _ in range(10000)]
await asyncio.gather(*tasks)
⚠️
Common Mistake: Using time.sleep() in asyncio code blocks the event loop. Always use await asyncio.sleep().
Summary
| Approach | Best For | Limitations | Complexity |
|---|---|---|---|
| ThreadPool | I/O-bound, shared state | GIL, CPU-bound | Low |
| ProcessPool | CPU-bound, isolation | Memory, IPC | Medium |
| asyncio | High concurrency I/O | Async-only, single-thread | High |
| Combined | Mixed workloads | Complex | Very High |
Decision Framework
- I/O-bound (< 100 tasks): ThreadPool
- I/O-bound (> 100 tasks): asyncio
- CPU-bound: ProcessPool
- Mixed: asyncio + ProcessPool/ThreadPool
- High concurrency: asyncio with semaphores
ℹ️
Key Takeaway: Choose the right concurrency model for your workload. In production, combine approaches for optimal performance.
Practice Problems
- Web Scraper: Build a scraper using asyncio for thousands of pages
- Image Processor: Process images in parallel with ProcessPool
- API Gateway: Create a gateway with rate limiting and circuit breaker
- Task Scheduler: Build a scheduler using asyncio and thread pools
- Real-time Dashboard: Create a dashboard with concurrent data fetching
Further Reading
- Python Docs:
concurrent.futures,asyncio - PEP 3156: asyncio API
- Books: "Python Concurrency with asyncio" by Matthew Fowler
- Advanced: trio, anyio libraries
Remember: Mastering concurrency patterns is essential for building scalable Python applications.