Python SQLite — Embedded Database Programming
SQLite is a serverless, zero-configuration database built into Python. It's perfect for local storage, testing, prototyping, and small to medium applications. The sqlite3 module provides a DB-API 2.0 compliant interface.
Learning Objectives
- Create and connect to SQLite databases
- Perform CRUD operations safely with parameterized queries
- Use transactions, context managers, and connection pooling
- Design efficient schemas with indexes and constraints
- Handle data types, NULL values, and date/time storage
- Build real applications like todo lists and data storage systems
Connecting and Creating Tables
import sqlite3
# Connect to database (creates file if it doesn't exist)
conn = sqlite3.connect('app.db')
# Or use :memory: for in-memory database
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# Create a table with proper schema
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER CHECK(age >= 0 AND age <= 150),
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create an index for faster queries
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)
''')
conn.commit()
Schema Design with Foreign Keys
import sqlite3
def create_database():
"""Create a properly normalized database schema."""
conn = sqlite3.connect('ecommerce.db')
cursor = conn.cursor()
# Enable foreign key support
cursor.execute('PRAGMA foreign_keys = ON')
# Categories table
cursor.execute('''
CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT
)
''')
# Products table with foreign key
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
price REAL NOT NULL CHECK(price >= 0),
category_id INTEGER,
stock INTEGER DEFAULT 0 CHECK(stock >= 0),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (category_id) REFERENCES categories(id)
ON DELETE SET NULL
)
''')
# Orders table
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
total REAL NOT NULL,
status TEXT DEFAULT 'pending' CHECK(status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
ON DELETE CASCADE
)
''')
conn.commit()
return conn
CRUD Operations
INSERT — Create Records
import sqlite3
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
# Single insert
cursor.execute(
"INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
("Alice", "alice@email.com", 30)
)
conn.commit()
print(f"Inserted user with id: {cursor.lastrowid}")
# Insert multiple records
users = [
("Bob", "bob@email.com", 25),
("Charlie", "charlie@email.com", 35),
("Diana", "diana@email.com", 28),
("Eve", "eve@email.com", 32)
]
cursor.executemany(
"INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
users
)
conn.commit()
print(f"Inserted {cursor.rowcount} users")
# Insert with dictionary (requires row_factory setup)
conn.row_factory = sqlite3.Row
cursor.execute(
"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
{"name": "Frank", "email": "frank@email.com", "age": 40}
)
conn.commit()
SELECT — Read Records
import sqlite3
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
# Fetch all rows
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
for row in rows:
print(row) # Tuple: (1, 'Alice', 'alice@email.com', 30, 1, ...)
# Fetch one row
cursor.execute("SELECT * FROM users WHERE email = ?", ("alice@email.com",))
user = cursor.fetchone()
print(user)
# Fetch with WHERE clause
cursor.execute("SELECT name, email FROM users WHERE age > ?", (25,))
young_users = cursor.fetchall()
for name, email in young_users:
print(f"{name} ({email})")
# Fetch with ORDER BY and LIMIT
cursor.execute("SELECT * FROM users ORDER BY age DESC LIMIT 5")
oldest_users = cursor.fetchall()
# Fetch with JOIN
cursor.execute('''
SELECT users.name, orders.total, orders.status
FROM users
JOIN orders ON users.id = orders.user_id
WHERE orders.status = 'pending'
''')
pending_orders = cursor.fetchall()
Using Row Factory for Named Access
import sqlite3
conn = sqlite3.connect('app.db')
conn.row_factory = sqlite3.Row # Enable named column access
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
# Access columns by name — much more readable!
print(f"Name: {row['name']}, Email: {row['email']}, Age: {row['age']}")
# Convert to dictionary
user_dict = dict(row)
print(user_dict)
UPDATE — Modify Records
import sqlite3
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
# Update single record
cursor.execute(
"UPDATE users SET age = ? WHERE name = ?",
(31, "Alice")
)
conn.commit()
print(f"Updated {cursor.rowcount} rows")
# Update multiple columns
cursor.execute(
"UPDATE users SET age = ?, is_active = ? WHERE email = ?",
(26, 0, "bob@email.com")
)
conn.commit()
# Update with conditions
cursor.execute(
"UPDATE users SET is_active = 0 WHERE age < ?",
(18,)
)
conn.commit()
print(f"Deactivated {cursor.rowcount} underage users")
DELETE — Remove Records
import sqlite3
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
# Delete single record
cursor.execute("DELETE FROM users WHERE name = ?", ("Charlie",))
conn.commit()
print(f"Deleted {cursor.rowcount} rows")
# Delete with multiple conditions
cursor.execute(
"DELETE FROM users WHERE is_active = ? AND age < ?",
(0, 18)
)
conn.commit()
# Delete all records (keep schema)
cursor.execute("DELETE FROM users")
conn.commit()
Transactions and Context Managers
Manual Transaction Control
import sqlite3
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
try:
# Start transaction (automatic with execute)
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "a@b.com"))
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "b@b.com"))
# Commit transaction
conn.commit()
print("Transaction committed successfully")
except Exception as e:
# Rollback on error
conn.rollback()
print(f"Transaction rolled back: {e}")
finally:
conn.close()
Context Manager Pattern
import sqlite3
from contextlib import contextmanager
@contextmanager
def get_database(db_path='app.db'):
"""Context manager for database connections."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute('PRAGMA foreign_keys = ON')
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# Usage — automatically handles commit/rollback
with get_database() as db:
db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
("Alice", "alice@example.com")
)
users = db.execute("SELECT * FROM users").fetchall()
for user in users:
print(dict(user))
Batch Operations with Transactions
import sqlite3
from contextlib import contextmanager
@contextmanager
def batch_insert(db_path, table, records, batch_size=1000):
"""Efficiently insert many records in batches."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
placeholders = ', '.join(['?' for _ in batch[0]])
cursor.executemany(
f"INSERT INTO {table} VALUES ({placeholders})",
batch
)
conn.commit()
print(f"Inserted batch {i // batch_size + 1}")
except Exception:
conn.rollback()
raise
finally:
conn.close()
Advanced Querying
Aggregate Functions
import sqlite3
with get_database() as db:
# Count, sum, average
stats = db.execute('''
SELECT
COUNT(*) as total_users,
AVG(age) as average_age,
MIN(age) as youngest,
MAX(age) as oldest
FROM users
''').fetchone()
print(f"Total users: {stats['total_users']}")
print(f"Average age: {stats['average_age']:.1f}")
# Group by with having
age_groups = db.execute('''
SELECT
CASE
WHEN age < 18 THEN 'Under 18'
WHEN age BETWEEN 18 AND 30 THEN '18-30'
WHEN age BETWEEN 31 AND 50 THEN '31-50'
ELSE 'Over 50'
END as age_group,
COUNT(*) as count
FROM users
GROUP BY age_group
HAVING count > 0
ORDER BY count DESC
''').fetchall()
for group in age_groups:
print(f"{group['age_group']}: {group['count']} users")
Subqueries
import sqlite3
with get_database() as db:
# Users who have placed orders
users_with_orders = db.execute('''
SELECT * FROM users
WHERE id IN (SELECT DISTINCT user_id FROM orders)
''').fetchall()
# Users with above-average spending
high_spenders = db.execute('''
SELECT users.*, SUM(orders.total) as total_spent
FROM users
JOIN orders ON users.id = orders.user_id
GROUP BY users.id
HAVING total_spent > (SELECT AVG(total) FROM orders)
''').fetchall()
Real-World Examples
Example 1: Todo Application Database
import sqlite3
from datetime import datetime
from contextlib import contextmanager
class TodoDatabase:
"""SQLite database for a todo application."""
def __init__(self, db_path='todos.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize database schema."""
with self._get_conn() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS todos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
completed BOOLEAN DEFAULT 0,
priority INTEGER DEFAULT 3 CHECK(priority BETWEEN 1 AND 5),
due_date TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
completed_at TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS todo_tags (
todo_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (todo_id, tag_id),
FOREIGN KEY (todo_id) REFERENCES todos(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)
''')
@contextmanager
def _get_conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
conn.execute('PRAGMA foreign_keys = ON')
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def add_todo(self, title, description=None, priority=3, due_date=None):
with self._get_conn() as conn:
cursor = conn.execute(
"INSERT INTO todos (title, description, priority, due_date) VALUES (?, ?, ?, ?)",
(title, description, priority, due_date)
)
return cursor.lastrowid
def complete_todo(self, todo_id):
with self._get_conn() as conn:
conn.execute(
"UPDATE todos SET completed = 1, completed_at = ? WHERE id = ?",
(datetime.now().isoformat(), todo_id)
)
def get_pending_todos(self):
with self._get_conn() as conn:
return conn.execute(
"SELECT * FROM todos WHERE completed = 0 ORDER BY priority ASC, due_date ASC"
).fetchall()
def add_tag(self, todo_id, tag_name):
with self._get_conn() as conn:
# Create tag if it doesn't exist
conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag_name,))
tag = conn.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)).fetchone()
# Link tag to todo
conn.execute(
"INSERT OR IGNORE INTO todo_tags (todo_id, tag_id) VALUES (?, ?)",
(todo_id, tag['id'])
)
def search_todos(self, query):
with self._get_conn() as conn:
return conn.execute(
"SELECT * FROM todos WHERE title LIKE ? OR description LIKE ?",
(f"%{query}%", f"%{query}%")
).fetchall()
# Usage
db = TodoDatabase()
todo_id = db.add_todo("Learn SQLite", "Master CRUD operations", priority=1)
db.add_tag(todo_id, "learning")
db.add_tag(todo_id, "database")
db.complete_todo(todo_id)
pending = db.get_pending_todos()
Example 2: Data Storage with Versioning
import sqlite3
import json
from datetime import datetime
from contextlib import contextmanager
class VersionedStorage:
"""Store data with version history."""
def __init__(self, db_path='storage.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
with self._get_conn() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
current_version INTEGER DEFAULT 1,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS document_versions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id INTEGER NOT NULL,
version INTEGER NOT NULL,
content TEXT NOT NULL,
change_note TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE,
UNIQUE(document_id, version)
)
''')
@contextmanager
def _get_conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
conn.execute('PRAGMA foreign_keys = ON')
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def save(self, name, content, change_note=None):
"""Save a new version of a document."""
with self._get_conn() as conn:
# Check if document exists
doc = conn.execute(
"SELECT id, current_version FROM documents WHERE name = ?", (name,)
).fetchone()
if doc:
new_version = doc['current_version'] + 1
document_id = doc['id']
else:
cursor = conn.execute(
"INSERT INTO documents (name) VALUES (?)", (name,)
)
document_id = cursor.lastrowid
new_version = 1
conn.execute(
"INSERT INTO document_versions (document_id, version, content, change_note) VALUES (?, ?, ?, ?)",
(document_id, new_version, json.dumps(content), change_note)
)
conn.execute(
"UPDATE documents SET current_version = ?, updated_at = ? WHERE id = ?",
(new_version, datetime.now().isoformat(), document_id)
)
return new_version
def get(self, name, version=None):
"""Get document content, optionally at a specific version."""
with self._get_conn() as conn:
if version:
row = conn.execute('''
SELECT dv.content, dv.version, dv.created_at
FROM document_versions dv
JOIN documents d ON dv.document_id = d.id
WHERE d.name = ? AND dv.version = ?
''', (name, version)).fetchone()
else:
row = conn.execute('''
SELECT dv.content, dv.version, dv.created_at
FROM document_versions dv
JOIN documents d ON dv.document_id = d.id
WHERE d.name = ?
ORDER BY dv.version DESC LIMIT 1
''', (name,)).fetchone()
if row:
return {
'content': json.loads(row['content']),
'version': row['version'],
'created_at': row['created_at']
}
return None
def get_history(self, name):
"""Get version history for a document."""
with self._get_conn() as conn:
return conn.execute('''
SELECT dv.version, dv.change_note, dv.created_at
FROM document_versions dv
JOIN documents d ON dv.document_id = d.id
WHERE d.name = ?
ORDER BY dv.version DESC
''', (name,)).fetchall()
Common Mistakes
| Mistake | Problem | Solution |
|---|---|---|
| String formatting in queries | SQL injection vulnerability | Always use parameterized queries (?) |
Not calling conn.commit() | Changes lost on close | Commit after each transaction |
Not using row_factory | Hard to read results | Set conn.row_factory = sqlite3.Row |
| Global variables in SQLite | Thread safety issues | Use separate connections per thread |
| Not using context managers | Resource leaks | Use with statements or custom context managers |
| Ignoring foreign keys | Data integrity issues | Enable PRAGMA foreign_keys = ON |
Best Practices
# 1. Use WAL mode for better concurrency
conn.execute('PRAGMA journal_mode=WAL')
# 2. Use EXPLAIN QUERY PLAN to optimize slow queries
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = 'test'")
for row in cursor.fetchall():
print(row)
# 3. Use parameterized queries always
# BAD: f"SELECT * FROM users WHERE name = '{name}'"
# GOOD: "SELECT * FROM users WHERE name = ?", (name,)
# 4. Use INSERT OR IGNORE/REPLACE for idempotent operations
cursor.execute(
"INSERT OR IGNORE INTO users (name, email) VALUES (?, ?)",
("Alice", "alice@example.com")
)
# 5. Use UPSERT (INSERT ... ON CONFLICT) for PostgreSQL-style upserts
cursor.execute('''
INSERT INTO users (name, email, age) VALUES (?, ?, ?)
ON CONFLICT(email) DO UPDATE SET age = excluded.age
''', ("Alice", "alice@example.com", 31))
Key Takeaways
- SQLite is built into Python — no installation needed, perfect for local storage
- Always use parameterized queries (
?) to prevent SQL injection - Use context managers for safe connection handling with automatic commit/rollback
- Set
conn.row_factory = sqlite3.Rowfor named column access instead of tuple indexing - Enable foreign keys with
PRAGMA foreign_keys = ONfor data integrity - Use WAL journal mode for better concurrent read performance
- Design schemas with proper constraints (PRIMARY KEY, FOREIGN KEY, CHECK, UNIQUE)