Project 1 — Build Your First Data Pipeline
Data engineers build and maintain the infrastructure that powers data pipelines, warehouses, and analytics systems. This project brings together everything you've learned in the foundations module into a real, working pipeline.
Project Overview
What You Will Build
An end-to-end ETL pipeline that:
- Extracts CSV sales data from a local directory
- Validates data quality using Pydantic
- Transforms data with pandas (cleaning, aggregation)
- Loads results to PostgreSQL and S3
- Tests the pipeline with pytest
- Containerizes with Docker
- Automates with a CLI tool (Click)
Project Structure
Architecture Diagram
sales-pipeline/
+-- src/
| +-- __init__.py
| +-- extract.py
| +-- validate.py
| +-- transform.py
| +-- load.py
| +-- pipeline.py
| +-- cli.py
+-- tests/
| +-- __init__.py
| +-- test_extract.py
| +-- test_validate.py
| +-- test_transform.py
| +-- conftest.py
+-- configs/
| +-- config.yaml
+-- data/
| +-- raw/
| +-- processed/
+-- Dockerfile
+-- docker-compose.yml
+-- requirements.txt
+-- pyproject.toml
+-- README.md
Step 1: Configuration
# configs/config.yaml
source:
type: csv
path: data/raw/sales.csv
encoding: utf-8
target:
database:
host: localhost
port: 5432
name: analytics
user: pipeline
password: ${DB_PASSWORD}
s3:
bucket: my-data-lake
prefix: gold/sales
quality:
required_columns:
- order_id
- customer_id
- amount
- order_date
min_rows: 1
max_null_pct: 5.0
Step 2: Extract Module
# src/extract.py
import pandas as pd
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
def extract_csv(
filepath: str,
encoding: str = "utf-8",
dtype: Optional[dict] = None
) -> pd.DataFrame:
"""Extract data from a CSV file.
Args:
filepath: Path to the CSV file.
encoding: File encoding (default: utf-8).
dtype: Optional column type mapping.
Returns:
pandas DataFrame with raw data.
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Source file not found: {filepath}")
logger.info(f"Extracting data from {filepath}")
df = pd.read_csv(filepath, encoding=encoding, dtype=dtype)
logger.info(f"Extracted {len(df)} rows, {len(df.columns)} columns")
return df
def extract_from_database(
connection_string: str,
query: str,
params: Optional[dict] = None
) -> pd.DataFrame:
"""Extract data from a SQL database.
Args:
connection_string: SQLAlchemy connection string.
query: SQL query to execute.
params: Optional query parameters.
Returns:
pandas DataFrame with query results.
"""
from sqlalchemy import create_engine, text
engine = create_engine(connection_string)
with engine.connect() as conn:
df = pd.read_sql(text(query), conn, params=params or {})
logger.info(f"Extracted {len(df)} rows from database")
return df
Step 3: Validate Module
# src/validate.py
import pandas as pd
import logging
from pydantic import BaseModel, validator, ValidationError
from typing import List, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class OrderRecord(BaseModel):
order_id: str
customer_id: int
amount: float
order_date: datetime
product_category: Optional[str] = None
region: Optional[str] = None
@validator('amount')
def amount_must_be_positive(cls, v):
if v <= 0:
raise ValueError(f'Amount must be positive, got {v}')
return v
@validator('order_date')
def date_must_be_reasonable(cls, v):
if v.year < 2020 or v.year > 2030:
raise ValueError(f'Order date year {v.year} is out of range')
return v
def validate_dataframe(
df: pd.DataFrame,
required_columns: List[str],
min_rows: int = 1,
max_null_pct: float = 5.0
) -> pd.DataFrame:
"""Validate a DataFrame against quality rules.
Args:
df: Input DataFrame.
required_columns: Columns that must exist.
min_rows: Minimum row count expected.
max_null_pct: Maximum null percentage per column.
Returns:
Validated DataFrame.
Raises:
ValueError: If validation fails.
"""
# Check required columns
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")
# Check minimum rows
if len(df) < min_rows:
raise ValueError(
f"Expected at least {min_rows} rows, got {len(df)}"
)
# Check null percentages
for col in required_columns:
null_pct = (df[col].isnull().sum() / len(df)) * 100
if null_pct > max_null_pct:
raise ValueError(
f"Column '{col}' has {null_pct:.1f}% nulls "
f"(max allowed: {max_null_pct}%)"
)
logger.info(f"Validation passed: {len(df)} rows, {len(df.columns)} columns")
return df
def validate_records(df: pd.DataFrame) -> List[dict]:
"""Validate individual records using Pydantic.
Args:
df: DataFrame to validate row-by-row.
Returns:
List of valid records as dicts.
"""
valid_records = []
errors = []
for idx, row in df.iterrows():
try:
record = OrderRecord(**row.to_dict())
valid_records.append(record.dict())
except ValidationError as e:
errors.append({"row": idx, "error": str(e)})
if errors:
logger.warning(f"Found {len(errors)} invalid records")
logger.info(f"Validated {len(valid_records)} records successfully")
return valid_records
Step 4: Transform Module
# src/transform.py
import pandas as pd
import numpy as np
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""Clean raw data — remove duplicates, fix types, handle nulls.
Args:
df: Raw DataFrame.
Returns:
Cleaned DataFrame.
"""
initial_rows = len(df)
# Remove exact duplicates
df = df.drop_duplicates()
# Standardize string columns
for col in df.select_dtypes(include=['object']).columns:
df[col] = df[col].str.strip().str.lower()
# Parse dates
df['order_date'] = pd.to_datetime(df['order_date'])
# Remove rows with null amounts
df = df.dropna(subset=['amount'])
# Ensure amount is float
df['amount'] = df['amount'].astype(float)
# Filter out negative amounts
df = df[df['amount'] > 0]
logger.info(
f"Cleaned: {initial_rows} -> {len(df)} rows "
f"({initial_rows - len(df)} removed)"
)
return df
def aggregate_sales(df: pd.DataFrame) -> pd.DataFrame:
"""Aggregate sales by region and product category.
Args:
df: Cleaned sales DataFrame.
Returns:
Aggregated DataFrame with summary metrics.
"""
aggregated = df.groupby(['region', 'product_category']).agg(
total_orders=('order_id', 'count'),
total_revenue=('amount', 'sum'),
avg_order_value=('amount', 'mean'),
max_order=('amount', 'max'),
unique_customers=('customer_id', 'nunique'),
first_order=('order_date', 'min'),
last_order=('order_date', 'max')
).reset_index()
# Add derived columns
aggregated['avg_order_value'] = aggregated['avg_order_value'].round(2)
aggregated['total_revenue'] = aggregated['total_revenue'].round(2)
aggregated['processed_at'] = datetime.now()
logger.info(f"Aggregated to {len(aggregated)} rows")
return aggregated
def add_time_dimensions(df: pd.DataFrame) -> pd.DataFrame:
"""Add time-based dimension columns.
Args:
df: DataFrame with order_date column.
Returns:
DataFrame with added time dimensions.
"""
df['year'] = df['order_date'].dt.year
df['month'] = df['order_date'].dt.month
df['day_of_week'] = df['order_date'].dt.day_name()
df['quarter'] = df['order_date'].dt.quarter
df['is_weekend'] = df['order_date'].dt.dayofweek >= 5
return df
Step 5: Load Module
# src/load.py
import pandas as pd
import boto3
import logging
from io import BytesIO
from sqlalchemy import create_engine, text
from typing import Optional
logger = logging.getLogger(__name__)
def load_to_postgres(
df: pd.DataFrame,
table_name: str,
connection_string: str,
if_exists: str = "append"
) -> int:
"""Load DataFrame to PostgreSQL.
Args:
df: DataFrame to load.
table_name: Target table name.
connection_string: SQLAlchemy connection string.
if_exists: How to handle existing table ('append', 'replace', 'fail').
Returns:
Number of rows loaded.
"""
engine = create_engine(connection_string)
df.to_sql(
table_name,
engine,
if_exists=if_exists,
index=False,
chunksize=1000,
method='multi'
)
logger.info(f"Loaded {len(df)} rows to {table_name}")
return len(df)
def load_to_s3(
df: pd.DataFrame,
bucket: str,
key: str,
format: str = "parquet",
compression: str = "snappy"
) -> str:
"""Upload DataFrame to S3.
Args:
df: DataFrame to upload.
bucket: S3 bucket name.
key: S3 object key.
format: Output format ('parquet' or 'csv').
compression: Compression codec.
Returns:
S3 URI of uploaded file.
"""
s3 = boto3.client('s3')
buffer = BytesIO()
if format == "parquet":
df.to_parquet(buffer, index=False, compression=compression)
elif format == "csv":
df.to_csv(buffer, index=False)
else:
raise ValueError(f"Unsupported format: {format}")
buffer.seek(0)
s3.put_object(
Bucket=bucket,
Key=key,
Body=buffer.getvalue()
)
uri = f"s3://{bucket}/{key}"
logger.info(f"Uploaded to {uri}")
return uri
Step 6: Pipeline Orchestrator
# src/pipeline.py
import logging
import yaml
from pathlib import Path
from datetime import datetime
from src.extract import extract_csv
from src.validate import validate_dataframe, validate_records
from src.transform import clean_data, aggregate_sales, add_time_dimensions
from src.load import load_to_postgres, load_to_s3
logger = logging.getLogger(__name__)
def load_config(config_path: str = "configs/config.yaml") -> dict:
"""Load pipeline configuration."""
with open(config_path) as f:
config = yaml.safe_load(f)
# Resolve environment variables
import os
db_password = os.environ.get("DB_PASSWORD", "")
config['target']['database']['password'] = db_password
return config
def run_pipeline(config: dict) -> dict:
"""Run the complete ETL pipeline.
Args:
config: Pipeline configuration dictionary.
Returns:
Dictionary with pipeline execution results.
"""
start_time = datetime.now()
results = {"steps": [], "status": "success"}
try:
# Step 1: Extract
logger.info("Step 1: Extracting data")
df_raw = extract_csv(
filepath=config['source']['path'],
encoding=config['source'].get('encoding', 'utf-8')
)
results["steps"].append({
"name": "extract",
"rows": len(df_raw),
"duration_ms": 0
})
# Step 2: Validate
logger.info("Step 2: Validating data")
df_validated = validate_dataframe(
df_raw,
required_columns=config['quality']['required_columns'],
min_rows=config['quality']['min_rows'],
max_null_pct=config['quality']['max_null_pct']
)
valid_records = validate_records(df_validated)
results["steps"].append({
"name": "validate",
"valid_rows": len(valid_records),
"duration_ms": 0
})
# Step 3: Transform
logger.info("Step 3: Transforming data")
df_clean = clean_data(df_validated)
df_with_time = add_time_dimensions(df_clean)
df_aggregated = aggregate_sales(df_clean)
results["steps"].append({
"name": "transform",
"clean_rows": len(df_clean),
"aggregated_rows": len(df_aggregated),
"duration_ms": 0
})
# Step 4: Load to PostgreSQL
logger.info("Step 4: Loading to PostgreSQL")
db_config = config['target']['database']
conn_str = (
f"postgresql://{db_config['user']}:{db_config['password']}"
f"@{db_config['host']}:{db_config['port']}/{db_config['name']}"
)
load_to_postgres(df_clean, "sales_detail", conn_str)
load_to_postgres(df_aggregated, "sales_summary", conn_str)
results["steps"].append({"name": "load_postgres", "duration_ms": 0})
# Step 5: Load to S3
logger.info("Step 5: Loading to S3")
s3_config = config['target']['s3']
date_str = datetime.now().strftime("%Y-%m-%d")
load_to_s3(
df_clean,
bucket=s3_config['bucket'],
key=f"{s3_config['prefix']}/detail/{date_str}/data.parquet"
)
load_to_s3(
df_aggregated,
bucket=s3_config['bucket'],
key=f"{s3_config['prefix']}/summary/{date_str}/data.parquet"
)
results["steps"].append({"name": "load_s3", "duration_ms": 0})
except Exception as e:
logger.error(f"Pipeline failed: {e}")
results["status"] = "failed"
results["error"] = str(e)
raise
finally:
results["total_duration"] = str(datetime.now() - start_time)
logger.info(f"Pipeline completed: {results['status']}")
return results
Step 7: CLI Tool
# src/cli.py
import click
import logging
from src.pipeline import run_pipeline, load_config
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
@click.group()
@click.option('--verbose', is_flag=True, help='Enable debug logging')
def cli(verbose):
"""Sales Data Pipeline CLI."""
if verbose:
logging.getLogger().setLevel(logging.DEBUG)
@cli.command()
@click.option('--config', default='configs/config.yaml', help='Config file path')
def run(config):
"""Run the full ETL pipeline."""
logger.info("Starting pipeline")
cfg = load_config(config)
results = run_pipeline(cfg)
click.echo(f"Pipeline status: {results['status']}")
click.echo(f"Total duration: {results['total_duration']}")
@cli.command()
@click.argument('filepath')
def validate(filepath):
"""Validate a CSV file against quality rules."""
from src.extract import extract_csv
from src.validate import validate_dataframe
df = extract_csv(filepath)
required = ['order_id', 'customer_id', 'amount', 'order_date']
validated = validate_dataframe(df, required_columns=required)
click.echo(f"Validation passed: {len(validated)} rows")
if __name__ == '__main__':
cli()
Step 8: Docker Setup
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libpq-dev && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY configs/ ./configs/
ENV PYTHONUNBUFFERED=1
ENTRYPOINT ["python", "-m", "src.cli"]
CMD ["run"]
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: analytics
POSTGRES_USER: pipeline
POSTGRES_PASSWORD: ${DB_PASSWORD}
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U pipeline -d analytics"]
interval: 10s
timeout: 5s
retries: 5
pipeline:
build: .
environment:
DB_PASSWORD: ${DB_PASSWORD}
depends_on:
postgres:
condition: service_healthy
volumes:
- ./data:/app/data
volumes:
pgdata:
Step 9: Tests
# tests/conftest.py
import pandas as pd
import pytest
from datetime import datetime, timedelta
import random
@pytest.fixture
def sample_orders():
"""Generate sample order data for testing."""
return pd.DataFrame({
'order_id': [f'ORD-{i:03d}' for i in range(1, 101)],
'customer_id': [random.randint(1001, 1010) for _ in range(100)],
'amount': [round(random.uniform(10, 500), 2) for _ in range(100)],
'order_date': [
(datetime(2024, 1, 1) + timedelta(days=random.randint(0, 365))).isoformat()
for _ in range(100)
],
'product_category': random.choice(['electronics', 'clothing', 'food']),
'region': random.choice(['north', 'south', 'east', 'west'])
})
@pytest.fixture
def dirty_orders(sample_orders):
"""Orders with data quality issues."""
df = sample_orders.copy()
df.loc[0, 'amount'] = -50 # Negative amount
df.loc[1, 'amount'] = None # Null amount
df.loc[2, 'order_id'] = df.loc[0, 'order_id'] # Duplicate
df = pd.concat([df, df.iloc[[0]]]) # Add exact duplicate
return df
# tests/test_extract.py
from src.extract import extract_csv
import pytest
import os
def test_extract_csv(tmp_path):
"""Test CSV extraction."""
# Create test file
filepath = tmp_path / "test.csv"
filepath.write_text("col1,col2\n1,2\n3,4\n")
df = extract_csv(str(filepath))
assert len(df) == 2
assert list(df.columns) == ['col1', 'col2']
def test_extract_missing_file():
"""Test extraction raises error for missing file."""
with pytest.raises(FileNotFoundError):
extract_csv("nonexistent.csv")
# tests/test_validate.py
from src.validate import validate_dataframe
import pandas as pd
import pytest
def test_validate_passes(sample_orders):
"""Test validation passes for valid data."""
result = validate_dataframe(
sample_orders,
required_columns=['order_id', 'customer_id', 'amount']
)
assert len(result) == 100
def test_validate_fails_nulls():
"""Test validation fails when nulls exceed threshold."""
df = pd.DataFrame({
'id': [1, 2, 3],
'value': [None, None, 100] # 66% null
})
with pytest.raises(ValueError, match="null"):
validate_dataframe(df, required_columns=['id', 'value'], max_null_pct=5.0)
# tests/test_transform.py
from src.transform import clean_data, aggregate_sales
import pandas as pd
def test_clean_data_removes_duplicates(dirty_orders):
"""Test that cleaning removes duplicate rows."""
cleaned = clean_data(dirty_orders)
assert cleaned['order_id'].is_unique
def test_clean_data_removes_negatives(dirty_orders):
"""Test that cleaning removes negative amounts."""
cleaned = clean_data(dirty_orders)
assert (cleaned['amount'] > 0).all()
def test_aggregate_sales(sample_orders):
"""Test aggregation produces expected groups."""
cleaned = clean_data(sample_orders)
agg = aggregate_sales(cleaned)
assert 'total_orders' in agg.columns
assert 'total_revenue' in agg.columns
assert len(agg) > 0
MathSummary Takeaways
- Extract-Validate-Transform-Load (EVTL) is the standard pipeline pattern — each step has a clear responsibility.
- Validate early — catch data quality issues before they propagate through transformations.
- Use Pydantic for row-level validation — type-checked models ensure each record matches the expected schema.
- Clean data systematically — remove duplicates, standardize types, filter invalid values in a defined order.
- Separate concerns — separate extract, transform, and load modules for testability and reusability.
- Configuration outside code — use YAML/JSON configs so the same pipeline works across environments.
- Docker ensures reproducibility — the pipeline runs identically in development, testing, and production.
- Tests catch regressions — unit tests for each module, integration tests for the full pipeline.
See Also
- What is Data Engineering — Introduction to data engineering
- Python for Data Engineers — Python libraries and patterns
- SQL Fundamentals — Essential SQL skills
- Docker for Data Engineers — Containerizing data pipelines
- Version Control with Git — Git for data engineers
- Data Formats — JSON, Parquet, Avro comparison