🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Database Patterns: SQLAlchemy, Connection Pooling, Transactions

PythonDatabase Patterns⭐ Premium

Advertisement

Google, Meta & Amazon Interview

Database Patterns: SQLAlchemy, Connection Pooling, Transactions

Database patterns and ORM optimization in Python

Interview Question

"Explain database patterns in Python. How does SQLAlchemy work? What is connection pooling and why is it important? How do you handle transactions properly?"

Difficulty: Hard | Frequently asked at Google, Meta, Amazon


Theoretical Foundation

Why Database Patterns Matter

# Without patterns:
# 1. Connection leaks
# 2. SQL injection
# 3. N+1 query problems
# 4. Transaction management issues
# 5. No connection reuse

# With patterns:
# 1. Connection pooling
# 2. ORM for safe queries
# 3. Eager/lazy loading
# 4. ACID transactions
# 5. Session management

ℹ️

Key Concept: Proper database patterns ensure security, performance, and data integrity.


SQLAlchemy Basics

Model Definition

from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime

Base = declarative_base()

# Models
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.now)
    
    # Relationships
    posts = relationship("Post", back_populates="author", lazy="select")
    
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}')>"

class Post(Base):
    __tablename__ = 'posts'
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(String(1000))
    author_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime, default=datetime.now)
    
    # Relationships
    author = relationship("User", back_populates="posts")
    tags = relationship("Tag", secondary="post_tags", back_populates="posts")

class Tag(Base):
    __tablename__ = 'tags'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True)
    
    # Many-to-many relationship
    posts = relationship("Post", secondary="post_tags", back_populates="tags")

# Association table
from sqlalchemy import Table
post_tags = Table(
    'post_tags',
    Base.metadata,
    Column('post_id', Integer, ForeignKey('posts.id')),
    Column('tag_id', Integer, ForeignKey('tags.id'))
)

# Create database
engine = create_engine('sqlite:///app.db', echo=True)
Base.metadata.create_all(engine)

Basic CRUD Operations

from sqlalchemy.orm import Session

# Create session
SessionLocal = sessionmaker(bind=engine)

# CRUD Operations
def create_user(name: str, email: str) -> User:
    """Create a new user."""
    with SessionLocal() as session:
        user = User(name=name, email=email)
        session.add(user)
        session.commit()
        session.refresh(user)  # Refresh to get generated ID
        return user

def get_user(user_id: int) -> User:
    """Get user by ID."""
    with SessionLocal() as session:
        user = session.query(User).filter(User.id == user_id).first()
        return user

def update_user(user_id: int, **kwargs) -> User:
    """Update user."""
    with SessionLocal() as session:
        user = session.query(User).filter(User.id == user_id).first()
        if user:
            for key, value in kwargs.items():
                setattr(user, key, value)
            session.commit()
            session.refresh(user)
        return user

def delete_user(user_id: int) -> bool:
    """Delete user."""
    with SessionLocal() as session:
        user = session.query(User).filter(User.id == user_id).first()
        if user:
            session.delete(user)
            session.commit()
            return True
        return False

# Usage
user = create_user("Alice", "alice@example.com")
print(f"Created: {user}")

user = get_user(user.id)
print(f"Got: {user}")

user = update_user(user.id, name="Alice Smith")
print(f"Updated: {user}")

deleted = delete_user(user.id)
print(f"Deleted: {deleted}")

💡

Interview Tip: Always use context managers (with statements) for database sessions to ensure proper cleanup.


Connection Pooling

Why Connection Pooling?

# Without connection pooling:
# 1. Create new connection for each request
# 2. Overhead of TCP handshake, authentication
# 3. Limited by database max connections
# 4. Connection leaks if not properly closed

# With connection pooling:
# 1. Reuse existing connections
# 2. Reduced overhead
# 3. Better resource utilization
# 4. Automatic connection management

SQLAlchemy Connection Pooling

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Engine with connection pooling
engine = create_engine(
    "postgresql://user:pass@localhost/dbname",
    poolclass=QueuePool,
    pool_size=20,          # Number of connections to maintain
    max_overflow=10,       # Additional connections beyond pool_size
    pool_timeout=30,       # Seconds to wait for connection
    pool_recycle=1800,     # Recycle connections after 30 minutes
    pool_pre_ping=True,    # Verify connections before use
    echo=True              # Log SQL queries
)

# Monitor pool
from sqlalchemy import event

@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_rec, connection_proxy):
    """Called when connection is retrieved from pool."""
    print(f"Connection checked out: {id(dbapi_conn)}")

@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_rec):
    """Called when connection is returned to pool."""
    print(f"Connection returned: {id(dbapi_conn)}")

# Usage with pool
with engine.connect() as conn:
    result = conn.execute("SELECT 1")
    print(result.fetchone())

# Pool statistics
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")

Custom Connection Pool

import queue
import threading
from contextlib import contextmanager
from typing import Any, Generator
import sqlite3

class SimpleConnectionPool:
    """Simple connection pool implementation."""
    
    def __init__(self, db_path: str, max_size: int = 10):
        self.db_path = db_path
        self.max_size = max_size
        self.pool = queue.Queue(maxsize=max_size)
        self.lock = threading.Lock()
        self._initialize_pool()
    
    def _initialize_pool(self):
        """Initialize connection pool."""
        for _ in range(self.max_size):
            conn = sqlite3.connect(self.db_path)
            self.pool.put(conn)
    
    @contextmanager
    def get_connection(self) -> Generator[sqlite3.Connection, None, None]:
        """Get connection from pool."""
        conn = self.pool.get(timeout=5)
        try:
            yield conn
        finally:
            self.pool.put(conn)
    
    def close_all(self):
        """Close all connections."""
        while not self.pool.empty():
            conn = self.pool.get()
            conn.close()

# Usage
pool = SimpleConnectionPool("app.db", max_size=5)

def query_user(user_id: int):
    with pool.get_connection() as conn:
        cursor = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,))
        return cursor.fetchone()

# Thread-safe usage
import concurrent.futures

def fetch_users(user_ids):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(query_user, uid) for uid in user_ids]
        return [f.result() for f in futures]

⚠️

Important: Always configure connection pooling in production to prevent connection exhaustion.


Transaction Management

Basic Transactions

from sqlalchemy.orm import Session
from contextlib import contextmanager

@contextmanager
def get_session():
    """Get database session with transaction management."""
    session = SessionLocal()
    try:
        yield session
        session.commit()
    except Exception as e:
        session.rollback()
        raise e
    finally:
        session.close()

def transfer_money(from_user_id: int, to_user_id: int, amount: float):
    """Transfer money between users (atomic operation)."""
    with get_session() as session:
        # Get both users
        from_user = session.query(User).filter(User.id == from_user_id).first()
        to_user = session.query(User).filter(User.id == to_user_id).first()
        
        if not from_user or not to_user:
            raise ValueError("User not found")
        
        # Update balances
        from_user.balance -= amount
        to_user.balance += amount
        
        # Both operations succeed or fail together
        # Commit happens automatically when exiting context manager

# Usage
try:
    transfer_money(1, 2, 100.0)
    print("Transfer successful")
except Exception as e:
    print(f"Transfer failed: {e}")

Nested Transactions

from sqlalchemy.orm import Session
from contextlib import contextmanager

class TransactionManager:
    """Manage nested transactions."""
    
    def __init__(self, session: Session):
        self.session = session
        self.savepoints = []
    
    def begin(self):
        """Begin transaction."""
        self.session.begin()
    
    def savepoint(self):
        """Create savepoint."""
        savepoint = self.session.begin_nested()
        self.savepoints.append(savepoint)
        return savepoint
    
    def commit(self):
        """Commit transaction."""
        self.session.commit()
    
    def rollback(self):
        """Rollback to last savepoint or abort."""
        if self.savepoints:
            savepoint = self.savepoints.pop()
            savepoint.rollback()
        else:
            self.session.rollback()

# Usage
def complex_operation():
    with SessionLocal() as session:
        tx = TransactionManager(session)
        tx.begin()
        
        try:
            # First operation
            user = User(name="Alice", email="alice@example.com")
            session.add(user)
            
            # Create savepoint
            tx.savepoint()
            
            # Second operation (might fail)
            post = Post(title="Hello", author=user)
            session.add(post)
            
            # If post creation fails, rollback to savepoint
            # but keep user
            
            tx.commit()
            
        except Exception as e:
            tx.rollback()
            raise

Optimistic Locking

from sqlalchemy import Column, Integer, DateTime, func
from sqlalchemy.orm import Session
from datetime import datetime

class OptimisticLockMixin:
    """Mixin for optimistic locking."""
    
    version = Column(Integer, default=1, nullable=False)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

class User(OptimisticLockMixin, Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    
    def update_with_lock(self, session: Session, **kwargs):
        """Update with optimistic lock check."""
        # Get current version
        current_version = self.version
        
        # Update with version check
        for key, value in kwargs.items():
            setattr(self, key, value)
        
        self.version = current_version + 1
        
        # Check if row was modified
        rows_updated = session.query(User).filter(
            User.id == self.id,
            User.version == current_version
        ).update({
            **kwargs,
            'version': current_version + 1
        })
        
        if rows_updated == 0:
            raise ValueError("Record was modified by another user")
        
        session.commit()

# Usage
def update_user_concurrently(user_id: int):
    with SessionLocal() as session:
        user = session.query(User).filter(User.id == user_id).first()
        user.update_with_lock(session, name="Updated Name")

💡

Interview Tip: Optimistic locking is better for read-heavy workloads, pessimistic locking for write-heavy.


Advanced Patterns

Query Optimization

from sqlalchemy.orm import Session, joinedload, selectinload
from sqlalchemy import select

# N+1 Problem Example
def get_users_with_posts_bad():
    """Bad: N+1 queries."""
    with SessionLocal() as session:
        users = session.query(User).all()  # 1 query
        
        for user in users:
            # N additional queries!
            posts = session.query(Post).filter(Post.author_id == user.id).all()
            print(f"{user.name}: {len(posts)} posts")

# Eager Loading Solutions
def get_users_with_posts_good():
    """Good: Eager loading."""
    with SessionLocal() as session:
        # Option 1: joinedload (single query with JOIN)
        users = session.query(User).options(joinedload(User.posts)).all()
        
        # Option 2: selectinload (separate queries, IN clause)
        users = session.query(User).options(selectinload(User.posts)).all()
        
        for user in users:
            print(f"{user.name}: {len(user.posts)} posts")

# Query Optimization Examples
def optimized_queries():
    with SessionLocal() as session:
        # 1. Select only needed columns
        users = session.query(User.id, User.name).all()
        
        # 2. Filter early
        users = session.query(User).filter(User.name.like('A%')).all()
        
        # 3. Use indexes (ensure proper indexing in DB)
        users = session.query(User).filter(User.email == 'alice@example.com').first()
        
        # 4. Pagination
        page = session.query(User).offset(0).limit(10).all()
        
        # 5. Use exists instead of count for checks
        from sqlalchemy import exists
        has_users = session.query(exists().select_from(User)).scalar()
        
        # 6. Bulk operations
        session.bulk_update_mappings(User, [
            {'id': 1, 'name': 'New Name'},
            {'id': 2, 'name': 'Another Name'}
        ])
        session.commit()

Repository Pattern

from abc import ABC, abstractmethod
from typing import List, Optional
from sqlalchemy.orm import Session

# Abstract Repository
class UserRepository(ABC):
    @abstractmethod
    def get_by_id(self, user_id: int) -> Optional[User]:
        pass
    
    @abstractmethod
    def get_all(self) -> List[User]:
        pass
    
    @abstractmethod
    def create(self, user: User) -> User:
        pass
    
    @abstractmethod
    def update(self, user: User) -> User:
        pass
    
    @abstractmethod
    def delete(self, user_id: int) -> bool:
        pass

# SQLAlchemy Implementation
class SQLAlchemyUserRepository(UserRepository):
    def __init__(self, session: Session):
        self.session = session
    
    def get_by_id(self, user_id: int) -> Optional[User]:
        return self.session.query(User).filter(User.id == user_id).first()
    
    def get_all(self) -> List[User]:
        return self.session.query(User).all()
    
    def create(self, user: User) -> User:
        self.session.add(user)
        self.session.commit()
        self.session.refresh(user)
        return user
    
    def update(self, user: User) -> User:
        self.session.merge(user)
        self.session.commit()
        return user
    
    def delete(self, user_id: int) -> bool:
        user = self.get_by_id(user_id)
        if user:
            self.session.delete(user)
            self.session.commit()
            return True
        return False

# Usage
with SessionLocal() as session:
    repo = SQLAlchemyUserRepository(session)
    user = repo.get_by_id(1)
    users = repo.get_all()

Unit of Work Pattern

from typing import List
from sqlalchemy.orm import Session

class UnitOfWork:
    """Unit of Work pattern for transaction management."""
    
    def __init__(self, session: Session):
        self.session = session
        self._new_objects: List = []
        self._dirty_objects: List = []
        self._deleted_objects: List = []
    
    def register_new(self, obj):
        self._new_objects.append(obj)
    
    def register_dirty(self, obj):
        if obj not in self._dirty_objects:
            self._dirty_objects.append(obj)
    
    def register_deleted(self, obj):
        self._deleted_objects.append(obj)
    
    def commit(self):
        try:
            # Insert new objects
            for obj in self._new_objects:
                self.session.add(obj)
            
            # Update dirty objects
            for obj in self._dirty_objects:
                self.session.merge(obj)
            
            # Delete objects
            for obj in self._deleted_objects:
                self.session.delete(obj)
            
            self.session.commit()
            
            # Clear tracking
            self._new_objects.clear()
            self._dirty_objects.clear()
            self._deleted_objects.clear()
            
        except Exception as e:
            self.session.rollback()
            raise e
    
    def rollback(self):
        self.session.rollback()
        self._new_objects.clear()
        self._dirty_objects.clear()
        self._deleted_objects.clear()

# Usage
def complex_business_operation():
    with SessionLocal() as session:
        uow = UnitOfWork(session)
        
        # Track changes
        new_user = User(name="Alice", email="alice@example.com")
        uow.register_new(new_user)
        
        existing_user = session.query(User).filter(User.id == 1).first()
        existing_user.name = "Updated"
        uow.register_dirty(existing_user)
        
        # Commit all changes atomically
        uow.commit()

ℹ️

Pattern: Repository and Unit of Work patterns separate business logic from data access.


Performance Optimization

Query Performance

import time
from sqlalchemy.orm import Session

def benchmark_queries():
    """Benchmark different query approaches."""
    
    with SessionLocal() as session:
        # Bad: Load all
        start = time.time()
        users = session.query(User).all()
        print(f"Load all: {len(users)} users in {time.time()-start:.3f}s")
        
        # Good: Filter early
        start = time.time()
        users = session.query(User).filter(User.name.like('A%')).all()
        print(f"Filtered: {len(users)} users in {time.time()-start:.3f}s")
        
        # Good: Select specific columns
        start = time.time()
        users = session.query(User.id, User.name).all()
        print(f"Select columns: {len(users)} users in {time.time()-start:.3f}s")
        
        # Good: Use exists
        start = time.time()
        from sqlalchemy import exists
        has_users = session.query(exists().select_from(User)).scalar()
        print(f"Exists check: {has_users} in {time.time()-start:.3f}s")

# Connection pooling performance
def connection_pool_performance():
    """Test connection pool performance."""
    import time
    
    start = time.time()
    for i in range(100):
        with SessionLocal() as session:
            session.execute("SELECT 1")
    print(f"100 queries: {time.time()-start:.3f}s")

Caching

from functools import lru_cache
from typing import Optional

class UserRepositoryWithCache:
    """User repository with caching."""
    
    def __init__(self, session: Session):
        self.session = session
        self.cache = {}
    
    def get_by_id(self, user_id: int) -> Optional[User]:
        # Check cache first
        if user_id in self.cache:
            return self.cache[user_id]
        
        # Query database
        user = self.session.query(User).filter(User.id == user_id).first()
        
        if user:
            self.cache[user_id] = user
        
        return user
    
    def invalidate_cache(self, user_id: int):
        """Invalidate cache for user."""
        self.cache.pop(user_id, None)
    
    def clear_cache(self):
        """Clear all cache."""
        self.cache.clear()

# Usage
repo = UserRepositoryWithCache(session)
user = repo.get_by_id(1)  # Database query
user = repo.get_by_id(1)  # Cache hit

Interview Tips

Common Follow-up Questions

  1. "When would you use raw SQL vs ORM?"

    • ORM: Most cases, type safety, maintainability
    • Raw SQL: Complex queries, performance optimization
    • Hybrid: ORM for CRUD, raw for complex queries
  2. "How do you prevent N+1 queries?"

    • Eager loading (joinedload, selectinload)
    • Subquery loading
    • Batch loading
  3. "What's the difference between eager and lazy loading?"

    • Eager: Load related objects immediately
    • Lazy: Load on access (can cause N+1)
    • Select: Load when explicitly requested

Code Review Tips

# BAD: N+1 queries
users = session.query(User).all()
for user in users:
    posts = user.posts  # Triggers query!

# GOOD: Eager loading
users = session.query(User).options(joinedload(User.posts)).all()

# BAD: No session management
user = session.query(User).first()
session.close()  # May not execute if error occurs

# GOOD: Context manager
with SessionLocal() as session:
    user = session.query(User).first()

# BAD: No transaction handling
session.add(user)
session.commit()  # What if commit fails?

# GOOD: Transaction handling
try:
    session.add(user)
    session.commit()
except:
    session.rollback()
    raise

⚠️

Common Mistake: Not using eager loading causes N+1 query problems in production.


Summary

PatternPurposeUse Case
RepositoryData access abstractionClean architecture
Unit of WorkTransaction managementComplex operations
Connection PoolResource reuseHigh concurrency
Eager LoadingQuery optimizationRelated data
Optimistic LockingConcurrency controlRead-heavy apps

Best Practices

  1. Use connection pooling in production
  2. Eager load related data
  3. Handle transactions properly
  4. Use context managers for sessions
  5. Monitor query performance
  6. Implement caching for read-heavy data

ℹ️

Key Takeaway: Proper database patterns ensure security, performance, and maintainability.


Practice Problems

  1. Repository Pattern: Implement a complete repository for a domain entity
  2. Connection Pool: Build a custom connection pool
  3. Transaction Manager: Create a transaction manager with savepoints
  4. Query Optimizer: Optimize N+1 queries in existing code
  5. Caching Layer: Implement a caching repository

Further Reading

  • SQLAlchemy Docs: https://docs.sqlalchemy.org/
  • Database Design: Normalization, indexing strategies
  • Performance: Query optimization, EXPLAIN plans
  • Patterns: Repository, Unit of Work, Data Mapper

Remember: Database patterns are essential for building scalable, maintainable applications.

Advertisement