Python Context Managers — Resources Done Right
Context managers ensure proper acquisition and release of resources. The with statement guarantees cleanup even if errors occur.
Learning Objectives
- Understand the
withstatement protocol - Create context managers with classes and
contextlib - Use nested and async context managers
- Apply context managers for locks, transactions, and cleanup
- Build real-world context managers for databases and file locking
The with Statement
# Without context manager — error-prone
f = open('file.txt', 'r')
try:
data = f.read()
finally:
f.close()
# With context manager — safe and clean
with open('file.txt', 'r') as f:
data = f.read()
# File is automatically closed, even if exception occurs
How It Works
# The with statement calls __enter__ and __exit__
class ManagedResource:
def __enter__(self):
print("Entering context")
return self # Bound to 'as' variable
def __exit__(self, exc_type, exc_val, exc_tb):
print("Exiting context")
return False # Don't suppress exceptions
with ManagedResource() as r:
print("Inside context")
# Entering context
# Inside context
# Exiting context
Multiple Context Managers
# Python 3.1+ supports multiple context managers in a single with statement
with open('input.txt', 'r') as infile, open('output.txt', 'w') as outfile:
outfile.write(infile.read())
# Python 3.10+ supports parentheses for grouping
with (
open('input.txt', 'r') as infile,
open('output.txt', 'w') as outfile,
open('log.txt', 'a') as log
):
outfile.write(infile.read())
log.write("Copied input to output\n")
Class-Based Context Manager
class ManagedResource:
def __init__(self, name):
self.name = name
def __enter__(self):
print(f"Acquiring {self.name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"Releasing {self.name}")
if exc_type is not None:
print(f"Error occurred: {exc_val}")
return False # Don't suppress exceptions
with ManagedResource("database") as resource:
print(f"Using {resource.name}")
# Acquiring database
# Using database
# Releasing database
Exception Handling in exit
class SafeFile:
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
self.file = open(self.filename, self.mode)
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file:
self.file.close()
if exc_type == ValueError:
print(f"ValueError handled: {exc_val}")
return True # Suppress the exception
return False # Let other exceptions propagate
# ValueError is suppressed
with SafeFile('test.txt', 'w') as f:
raise ValueError("Something went wrong")
# Value error handled, execution continues
# Other exceptions propagate
with SafeFile('test.txt', 'w') as f:
raise TypeError("Type error") # Propagates!
exit Parameters Explained
class DetailedContext:
def __enter__(self):
print("Entering context")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
print("No exception occurred")
else:
print(f"Exception type: {exc_type}")
print(f"Exception value: {exc_val}")
print(f"Traceback: {exc_tb}")
return False
# Without exception
with DetailedContext() as d:
print("Normal execution")
# Entering context
# Normal execution
# No exception occurred
# With exception
try:
with DetailedContext() as d:
raise ValueError("Test error")
except ValueError:
pass
# Entering context
# Exception type: <class 'ValueError'>
# Exception value: Test error
# Traceback: <traceback object at 0x...>
contextlib Decorator
from contextlib import contextmanager
@contextmanager
def managed_file(filename, mode):
try:
f = open(filename, mode)
yield f
finally:
f.close()
with managed_file('test.txt', 'w') as f:
f.write("Hello, World!")
# The @contextmanager decorator converts a generator into a context manager
# 1. Code before yield runs in __enter__
# 2. yield provides the value for 'as'
# 3. Code after yield runs in __exit__
# 4. Exceptions propagate from yield point
More contextmanager Examples
from contextlib import contextmanager
import time
@contextmanager
def timer(label=""):
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
print(f"{label} Elapsed: {elapsed:.4f}s")
with timer("Sort:"):
sorted(range(1000000, 0, -1))
@contextmanager
def temporary_directory():
import tempfile
import shutil
tmp_dir = tempfile.mkdtemp()
try:
yield tmp_dir
finally:
shutil.rmtree(tmp_dir)
with temporary_directory() as td:
# Use temporary directory
pass
# Directory automatically cleaned up
Context Manager with Return Values
from contextlib import contextmanager
@contextmanager
def database_connection(host, port):
print(f"Connecting to {host}:{port}")
connection = {"host": host, "port": port, "connected": True}
try:
yield connection
finally:
connection["connected"] = False
print(f"Disconnected from {host}:{port}")
# Use the context manager
with database_connection("localhost", 5432) as conn:
print(f"Connected: {conn}")
# Connecting to localhost:5432
# Connected: {'host': 'localhost', 'port': 5432, 'connected': True}
# Disconnected from localhost:5432
Common contextlib Utilities
from contextlib import contextmanager, suppress, redirect_stdout, redirect_stderr
import io, os
# suppress — ignore specific exceptions
with suppress(FileNotFoundError):
os.remove('nonexistent.txt')
# redirect_stdout — capture print output
f = io.StringIO()
with redirect_stdout(f):
print("captured output")
captured = f.getvalue()
print(f"Got: {captured!r}") # Got: 'captured output\n'
# redirect_stderr — capture error output
import sys
err_capture = io.StringIO()
with redirect_stderr(err_capture):
print("error message", file=sys.stderr)
# ExitStack — dynamic number of context managers
from contextlib import ExitStack
def process_files(filenames):
with ExitStack() as stack:
files = [stack.enter_context(open(fn)) for fn in filenames]
for f in files:
print(f.read()[:100])
# callback — register cleanup functions
with ExitStack() as stack:
import tempfile
tmp = tempfile.NamedTemporaryFile(delete=False)
stack.callback(os.unlink, tmp.name)
# cleanup guaranteed even if exception occurs
contextlib.suppress Deep Dive
from contextlib import suppress
# Suppress multiple exceptions
with suppress(FileNotFoundError, PermissionError, OSError):
os.remove('nonexistent.txt')
# Suppress in loop
import random
for i in range(10):
with suppress(StopIteration):
value = next(iter([]))
# Practical: safe dictionary access
def safe_dict_access(d, key, default=None):
with suppress(KeyError):
return d[key]
return default
result = safe_dict_access({"a": 1}, "b", "not found")
print(result) # "not found"
redirect_stdout and redirect_stderr
from contextlib import redirect_stdout, redirect_stderr
import io
import sys
# Capture all output
output_buffer = io.StringIO()
error_buffer = io.StringIO()
with redirect_stdout(output_buffer), redirect_stderr(error_buffer):
print("This goes to stdout")
print("This goes to stderr", file=sys.stderr)
stdout_content = output_buffer.getvalue()
stderr_content = error_buffer.getvalue()
print(f"Captured stdout: {stdout_content!r}")
print(f"Captured stderr: {stderr_content!r}")
# Redirect to file
with open('output.txt', 'w') as f:
with redirect_stdout(f):
print("This goes to file")
Exit Stack — Dynamic Context Managers
from contextlib import ExitStack
class ConnectionPool:
def __init__(self):
self.connections = []
def get_connection(self, host):
conn = {"host": host, "active": True}
self.connections.append(conn)
return conn
def release_all(self):
for conn in self.connections:
conn["active"] = False
self.connections.clear()
def dynamic_context_managers(hosts):
pool = ConnectionPool()
with ExitStack() as stack:
# Dynamically add context managers
connections = []
for host in hosts:
conn = pool.get_connection(host)
connections.append(conn)
stack.callback(pool.release_all)
# Use all connections
for conn in connections:
print(f"Processing {conn['host']}")
# Nested context managers
with ExitStack() as stack:
files = [
stack.enter_context(open(f, 'r'))
for f in ['file1.txt', 'file2.txt', 'file3.txt']
]
# All files opened, all guaranteed to close
ExitStack Advanced Patterns
from contextlib import ExitStack
# Pattern 1: Dynamic resource management
def process_dynamic_resources(resource_list):
with ExitStack() as stack:
resources = []
for resource in resource_list:
# Assume resources have context manager protocol
ctx = stack.enter_context(resource)
resources.append(ctx)
# Process all resources
for resource in resources:
resource.process()
# Pattern 2: Cleanup on error
def safe_operation():
with ExitStack() as stack:
# Register cleanup callbacks
stack.callback(lambda: print("Cleanup 1"))
stack.callback(lambda: print("Cleanup 2"))
# Simulate work
raise ValueError("Something went wrong")
# Cleanup 2 (LIFO order)
# Cleanup 1
# Pattern 3: Conditional context managers
def conditional_context(condition):
with ExitStack() as stack:
if condition:
# Only enter context if condition is True
stack.enter_context(some_context_manager())
# Rest of the code
print("Operation completed")
Real-World: Database Connection Manager
from contextlib import contextmanager
import time
class DatabaseConnection:
def __init__(self, host, port):
self.host = host
self.port = port
self.connected = False
self.in_transaction = False
def connect(self):
print(f"Connecting to {self.host}:{self.port}")
self.connected = True
def disconnect(self):
print(f"Disconnecting from {self.host}:{self.port}")
self.connected = False
def begin_transaction(self):
self.in_transaction = True
def commit(self):
self.in_transaction = False
print("Transaction committed")
def rollback(self):
self.in_transaction = False
print("Transaction rolled back")
def execute(self, query):
if not self.connected:
raise RuntimeError("Not connected")
print(f"Executing: {query}")
return {"rows_affected": 1}
@contextmanager
def database_session(host="localhost", port=5432):
conn = DatabaseConnection(host, port)
conn.connect()
try:
conn.begin_transaction()
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.disconnect()
# Usage
with database_session() as db:
db.execute("INSERT INTO users (name) VALUES ('Alice')")
db.execute("INSERT INTO users (name) VALUES ('Bob')")
# Transaction committed, connection closed
# Error handling
try:
with database_session() as db:
db.execute("INSERT INTO users (name) VALUES ('Charlie')")
raise ValueError("Simulated error")
except ValueError:
pass
# Transaction rolled back, connection closed
Connection Pool Context Manager
from contextlib import contextmanager
import threading
class ConnectionPool:
def __init__(self, max_connections=10):
self.connections = []
self.semaphore = threading.Semaphore(max_connections)
self.lock = threading.Lock()
@contextmanager
def get_connection(self):
self.semaphore.acquire()
try:
conn = self._create_connection()
yield conn
finally:
self._release_connection(conn)
self.semaphore.release()
def _create_connection(self):
# Create actual database connection
return {"id": len(self.connections), "active": True}
def _release_connection(self, conn):
with self.lock:
conn["active"] = False
# Usage
pool = ConnectionPool(max_connections=5)
with pool.get_connection() as conn:
# Use connection
print(f"Using connection {conn['id']}")
# Connection automatically released
Real-World: File Locking
from contextlib import contextmanager
import os
import time
@contextmanager
def file_lock(filepath, timeout=10):
"""Simple file-based locking mechanism."""
lock_file = filepath + ".lock"
start_time = time.time()
while True:
try:
# Atomic creation (fails if exists)
fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.write(fd, str(os.getpid()).encode())
os.close(fd)
break
except FileExistsError:
if time.time() - start_time > timeout:
raise TimeoutError(f"Could not acquire lock on {filepath}")
time.sleep(0.1)
try:
yield filepath
finally:
# Release lock
if os.path.exists(lock_file):
os.remove(lock_file)
# Usage
with file_lock("data.json") as f:
# Only one process can access this at a time
with open(f, 'w') as file:
file.write('{"data": "protected"}')
Cross-Process File Locking
import msvcrt
import os
import contextlib
@contextlib.contextmanager
def windows_file_lock(filepath):
"""Windows-specific file locking using msvcrt."""
lock_fd = None
try:
lock_fd = open(filepath + ".lock", 'w')
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
yield filepath
finally:
if lock_fd:
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
lock_fd.close()
os.remove(filepath + ".lock")
# Usage
with windows_file_lock("shared_data.txt") as f:
# Exclusive access to file
pass
Async Context Managers
import asyncio
class AsyncDatabase:
async def connect(self):
print("Async connecting...")
await asyncio.sleep(0.1)
return self
async def disconnect(self):
print("Async disconnecting...")
await asyncio.sleep(0.1)
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
return False
async def main():
async with AsyncDatabase() as db:
print("Using async database")
asyncio.run(main())
# Using contextlib for async
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_timer(label=""):
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
print(f"{label} Async elapsed: {elapsed:.4f}s")
Async Context Manager Patterns
import asyncio
from contextlib import asynccontextmanager
# Pattern 1: Async resource cleanup
@asynccontextmanager
async def async_file_handler(filename, mode='r'):
"""Async context manager for file operations."""
file = await asyncio.to_thread(open, filename, mode)
try:
yield file
finally:
await asyncio.to_thread(file.close)
# Pattern 2: Async connection pool
@asynccontextmanager
async def async_connection_pool(max_size=10):
"""Async context manager for connection pool."""
semaphore = asyncio.Semaphore(max_size)
async def get_connection():
async with semaphore:
# Create async connection
connection = {"id": id(semaphore), "active": True}
yield connection
connection["active"] = False
yield get_connection
# Pattern 3: Async timeout
@asynccontextmanager
async def async_timeout(seconds):
"""Async context manager with timeout."""
try:
yield
except asyncio.TimeoutError:
print(f"Operation timed out after {seconds} seconds")
Common Mistakes
# Mistake 1: Forgetting to yield in contextmanager
@contextmanager
def bad_context():
resource = acquire_resource()
# Missing yield! # This will fail
@contextmanager
def good_context():
resource = acquire_resource()
try:
yield resource
finally:
release_resource(resource)
# Mistake 2: Not using try/finally in contextmanager
@contextmanager
def risky_context():
resource = acquire_resource()
yield resource
release_resource(resource) # Not reached if exception in yield!
@contextmanager
def safe_context():
resource = acquire_resource()
try:
yield resource
finally:
release_resource(resource) # Always runs
# Mistake 3: Suppressing all exceptions in __exit__
class BadContext:
def __exit__(self, exc_type, exc_val, exc_tb):
return True # Suppresses ALL exceptions!
class GoodContext:
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type == ValueError:
return True # Only suppress specific exceptions
return False
# Mistake 4: Not cleaning up on error
def bad_cleanup():
f = open('file.txt', 'w')
f.write("data") # If this fails, f is never closed!
def good_cleanup():
with open('file.txt', 'w') as f:
f.write("data") # File closed even if write fails
Additional Common Mistakes
# Mistake 5: Not returning from __exit__
class IncompleteContext:
def __exit__(self, exc_type, exc_val, exc_tb):
# Forgot to return! This will raise TypeError
pass
# Fix:
class CompleteContext:
def __exit__(self, exc_type, exc_val, exc_tb):
return False # Always return a boolean
# Mistake 6: Modifying __enter__ return value
@contextmanager
def bad_return():
resource = {"status": "active"}
yield resource
resource["status"] = "inactive" # This modifies the yielded object!
@contextmanager
def good_return():
resource = {"status": "active"}
try:
yield resource.copy() # Return a copy
finally:
pass # Original resource cleanup
# Mistake 7: Not handling exceptions in contextmanager
@contextmanager
def fragile_context():
resource = acquire_resource()
yield resource
# If yield raises, this code won't run!
release_resource(resource)
@contextmanager
def robust_context():
resource = acquire_resource()
try:
yield resource
finally:
release_resource(resource) # Always runs
Key Takeaways
- Use
withfor any resource that needs cleanup __enter__acquires,__exit__releasescontextlib.contextmanagersimplifies creation- ExitStack handles dynamic numbers of context managers
- Context managers guarantee cleanup even on exceptions
- Return
Truefrom__exit__to suppress exceptions - Always use try/finally in
@contextmanagerfunctions - Use
suppress()to ignore specific exceptions cleanly - Async context managers use
__aenter__and__aexit__ - Multiple context managers can be combined with ExitStack