🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Project 1 — Build Your First Data Pipeline

Data Engineering FoundationsData Engineering Fundamentals🟢 Free Lesson

Advertisement

Project 1 — Build Your First Data Pipeline

Data engineers build and maintain the infrastructure that powers data pipelines, warehouses, and analytics systems. This project brings together everything you've learned in the foundations module into a real, working pipeline.

ETL Pipeline ArchitectureCSV SourceRaw data filessales.csvBronze LayerRaw ingestionValidate schemaSilver LayerClean & validateDeduplicateGold LayerAggregateBusiness metricsPostgreSQLData warehouseS3 / GCSData lakeProject Structuresales-pipeline/src/ | tests/ | configs/ | data/ | Dockerfile | requirements.txt

Project Overview

What You Will Build

An end-to-end ETL pipeline that:

  1. Extracts CSV sales data from a local directory
  2. Validates data quality using Pydantic
  3. Transforms data with pandas (cleaning, aggregation)
  4. Loads results to PostgreSQL and S3
  5. Tests the pipeline with pytest
  6. Containerizes with Docker
  7. Automates with a CLI tool (Click)

Project Structure

Architecture Diagram
sales-pipeline/
+-- src/
|   +-- __init__.py
|   +-- extract.py
|   +-- validate.py
|   +-- transform.py
|   +-- load.py
|   +-- pipeline.py
|   +-- cli.py
+-- tests/
|   +-- __init__.py
|   +-- test_extract.py
|   +-- test_validate.py
|   +-- test_transform.py
|   +-- conftest.py
+-- configs/
|   +-- config.yaml
+-- data/
|   +-- raw/
|   +-- processed/
+-- Dockerfile
+-- docker-compose.yml
+-- requirements.txt
+-- pyproject.toml
+-- README.md

Step 1: Configuration

# configs/config.yaml
source:
  type: csv
  path: data/raw/sales.csv
  encoding: utf-8

target:
  database:
    host: localhost
    port: 5432
    name: analytics
    user: pipeline
    password: ${DB_PASSWORD}
  s3:
    bucket: my-data-lake
    prefix: gold/sales

quality:
  required_columns:
    - order_id
    - customer_id
    - amount
    - order_date
  min_rows: 1
  max_null_pct: 5.0

Step 2: Extract Module

# src/extract.py
import pandas as pd
import logging
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)


def extract_csv(
    filepath: str,
    encoding: str = "utf-8",
    dtype: Optional[dict] = None
) -> pd.DataFrame:
    """Extract data from a CSV file.

    Args:
        filepath: Path to the CSV file.
        encoding: File encoding (default: utf-8).
        dtype: Optional column type mapping.

    Returns:
        pandas DataFrame with raw data.
    """
    path = Path(filepath)
    if not path.exists():
        raise FileNotFoundError(f"Source file not found: {filepath}")

    logger.info(f"Extracting data from {filepath}")
    df = pd.read_csv(filepath, encoding=encoding, dtype=dtype)
    logger.info(f"Extracted {len(df)} rows, {len(df.columns)} columns")
    return df


def extract_from_database(
    connection_string: str,
    query: str,
    params: Optional[dict] = None
) -> pd.DataFrame:
    """Extract data from a SQL database.

    Args:
        connection_string: SQLAlchemy connection string.
        query: SQL query to execute.
        params: Optional query parameters.

    Returns:
        pandas DataFrame with query results.
    """
    from sqlalchemy import create_engine, text

    engine = create_engine(connection_string)
    with engine.connect() as conn:
        df = pd.read_sql(text(query), conn, params=params or {})

    logger.info(f"Extracted {len(df)} rows from database")
    return df

Step 3: Validate Module

# src/validate.py
import pandas as pd
import logging
from pydantic import BaseModel, validator, ValidationError
from typing import List, Optional
from datetime import datetime

logger = logging.getLogger(__name__)


class OrderRecord(BaseModel):
    order_id: str
    customer_id: int
    amount: float
    order_date: datetime
    product_category: Optional[str] = None
    region: Optional[str] = None

    @validator('amount')
    def amount_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError(f'Amount must be positive, got {v}')
        return v

    @validator('order_date')
    def date_must_be_reasonable(cls, v):
        if v.year < 2020 or v.year > 2030:
            raise ValueError(f'Order date year {v.year} is out of range')
        return v


def validate_dataframe(
    df: pd.DataFrame,
    required_columns: List[str],
    min_rows: int = 1,
    max_null_pct: float = 5.0
) -> pd.DataFrame:
    """Validate a DataFrame against quality rules.

    Args:
        df: Input DataFrame.
        required_columns: Columns that must exist.
        min_rows: Minimum row count expected.
        max_null_pct: Maximum null percentage per column.

    Returns:
        Validated DataFrame.

    Raises:
        ValueError: If validation fails.
    """
    # Check required columns
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    # Check minimum rows
    if len(df) < min_rows:
        raise ValueError(
            f"Expected at least {min_rows} rows, got {len(df)}"
        )

    # Check null percentages
    for col in required_columns:
        null_pct = (df[col].isnull().sum() / len(df)) * 100
        if null_pct > max_null_pct:
            raise ValueError(
                f"Column '{col}' has {null_pct:.1f}% nulls "
                f"(max allowed: {max_null_pct}%)"
            )

    logger.info(f"Validation passed: {len(df)} rows, {len(df.columns)} columns")
    return df


def validate_records(df: pd.DataFrame) -> List[dict]:
    """Validate individual records using Pydantic.

    Args:
        df: DataFrame to validate row-by-row.

    Returns:
        List of valid records as dicts.
    """
    valid_records = []
    errors = []

    for idx, row in df.iterrows():
        try:
            record = OrderRecord(**row.to_dict())
            valid_records.append(record.dict())
        except ValidationError as e:
            errors.append({"row": idx, "error": str(e)})

    if errors:
        logger.warning(f"Found {len(errors)} invalid records")
    logger.info(f"Validated {len(valid_records)} records successfully")
    return valid_records

Step 4: Transform Module

# src/transform.py
import pandas as pd
import numpy as np
import logging
from datetime import datetime

logger = logging.getLogger(__name__)


def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean raw data — remove duplicates, fix types, handle nulls.

    Args:
        df: Raw DataFrame.

    Returns:
        Cleaned DataFrame.
    """
    initial_rows = len(df)

    # Remove exact duplicates
    df = df.drop_duplicates()

    # Standardize string columns
    for col in df.select_dtypes(include=['object']).columns:
        df[col] = df[col].str.strip().str.lower()

    # Parse dates
    df['order_date'] = pd.to_datetime(df['order_date'])

    # Remove rows with null amounts
    df = df.dropna(subset=['amount'])

    # Ensure amount is float
    df['amount'] = df['amount'].astype(float)

    # Filter out negative amounts
    df = df[df['amount'] > 0]

    logger.info(
        f"Cleaned: {initial_rows} -> {len(df)} rows "
        f"({initial_rows - len(df)} removed)"
    )
    return df


def aggregate_sales(df: pd.DataFrame) -> pd.DataFrame:
    """Aggregate sales by region and product category.

    Args:
        df: Cleaned sales DataFrame.

    Returns:
        Aggregated DataFrame with summary metrics.
    """
    aggregated = df.groupby(['region', 'product_category']).agg(
        total_orders=('order_id', 'count'),
        total_revenue=('amount', 'sum'),
        avg_order_value=('amount', 'mean'),
        max_order=('amount', 'max'),
        unique_customers=('customer_id', 'nunique'),
        first_order=('order_date', 'min'),
        last_order=('order_date', 'max')
    ).reset_index()

    # Add derived columns
    aggregated['avg_order_value'] = aggregated['avg_order_value'].round(2)
    aggregated['total_revenue'] = aggregated['total_revenue'].round(2)
    aggregated['processed_at'] = datetime.now()

    logger.info(f"Aggregated to {len(aggregated)} rows")
    return aggregated


def add_time_dimensions(df: pd.DataFrame) -> pd.DataFrame:
    """Add time-based dimension columns.

    Args:
        df: DataFrame with order_date column.

    Returns:
        DataFrame with added time dimensions.
    """
    df['year'] = df['order_date'].dt.year
    df['month'] = df['order_date'].dt.month
    df['day_of_week'] = df['order_date'].dt.day_name()
    df['quarter'] = df['order_date'].dt.quarter
    df['is_weekend'] = df['order_date'].dt.dayofweek >= 5

    return df

Step 5: Load Module

# src/load.py
import pandas as pd
import boto3
import logging
from io import BytesIO
from sqlalchemy import create_engine, text
from typing import Optional

logger = logging.getLogger(__name__)


def load_to_postgres(
    df: pd.DataFrame,
    table_name: str,
    connection_string: str,
    if_exists: str = "append"
) -> int:
    """Load DataFrame to PostgreSQL.

    Args:
        df: DataFrame to load.
        table_name: Target table name.
        connection_string: SQLAlchemy connection string.
        if_exists: How to handle existing table ('append', 'replace', 'fail').

    Returns:
        Number of rows loaded.
    """
    engine = create_engine(connection_string)

    df.to_sql(
        table_name,
        engine,
        if_exists=if_exists,
        index=False,
        chunksize=1000,
        method='multi'
    )

    logger.info(f"Loaded {len(df)} rows to {table_name}")
    return len(df)


def load_to_s3(
    df: pd.DataFrame,
    bucket: str,
    key: str,
    format: str = "parquet",
    compression: str = "snappy"
) -> str:
    """Upload DataFrame to S3.

    Args:
        df: DataFrame to upload.
        bucket: S3 bucket name.
        key: S3 object key.
        format: Output format ('parquet' or 'csv').
        compression: Compression codec.

    Returns:
        S3 URI of uploaded file.
    """
    s3 = boto3.client('s3')
    buffer = BytesIO()

    if format == "parquet":
        df.to_parquet(buffer, index=False, compression=compression)
    elif format == "csv":
        df.to_csv(buffer, index=False)
    else:
        raise ValueError(f"Unsupported format: {format}")

    buffer.seek(0)
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=buffer.getvalue()
    )

    uri = f"s3://{bucket}/{key}"
    logger.info(f"Uploaded to {uri}")
    return uri

Step 6: Pipeline Orchestrator

# src/pipeline.py
import logging
import yaml
from pathlib import Path
from datetime import datetime

from src.extract import extract_csv
from src.validate import validate_dataframe, validate_records
from src.transform import clean_data, aggregate_sales, add_time_dimensions
from src.load import load_to_postgres, load_to_s3

logger = logging.getLogger(__name__)


def load_config(config_path: str = "configs/config.yaml") -> dict:
    """Load pipeline configuration."""
    with open(config_path) as f:
        config = yaml.safe_load(f)

    # Resolve environment variables
    import os
    db_password = os.environ.get("DB_PASSWORD", "")
    config['target']['database']['password'] = db_password

    return config


def run_pipeline(config: dict) -> dict:
    """Run the complete ETL pipeline.

    Args:
        config: Pipeline configuration dictionary.

    Returns:
        Dictionary with pipeline execution results.
    """
    start_time = datetime.now()
    results = {"steps": [], "status": "success"}

    try:
        # Step 1: Extract
        logger.info("Step 1: Extracting data")
        df_raw = extract_csv(
            filepath=config['source']['path'],
            encoding=config['source'].get('encoding', 'utf-8')
        )
        results["steps"].append({
            "name": "extract",
            "rows": len(df_raw),
            "duration_ms": 0
        })

        # Step 2: Validate
        logger.info("Step 2: Validating data")
        df_validated = validate_dataframe(
            df_raw,
            required_columns=config['quality']['required_columns'],
            min_rows=config['quality']['min_rows'],
            max_null_pct=config['quality']['max_null_pct']
        )
        valid_records = validate_records(df_validated)
        results["steps"].append({
            "name": "validate",
            "valid_rows": len(valid_records),
            "duration_ms": 0
        })

        # Step 3: Transform
        logger.info("Step 3: Transforming data")
        df_clean = clean_data(df_validated)
        df_with_time = add_time_dimensions(df_clean)
        df_aggregated = aggregate_sales(df_clean)
        results["steps"].append({
            "name": "transform",
            "clean_rows": len(df_clean),
            "aggregated_rows": len(df_aggregated),
            "duration_ms": 0
        })

        # Step 4: Load to PostgreSQL
        logger.info("Step 4: Loading to PostgreSQL")
        db_config = config['target']['database']
        conn_str = (
            f"postgresql://{db_config['user']}:{db_config['password']}"
            f"@{db_config['host']}:{db_config['port']}/{db_config['name']}"
        )
        load_to_postgres(df_clean, "sales_detail", conn_str)
        load_to_postgres(df_aggregated, "sales_summary", conn_str)
        results["steps"].append({"name": "load_postgres", "duration_ms": 0})

        # Step 5: Load to S3
        logger.info("Step 5: Loading to S3")
        s3_config = config['target']['s3']
        date_str = datetime.now().strftime("%Y-%m-%d")
        load_to_s3(
            df_clean,
            bucket=s3_config['bucket'],
            key=f"{s3_config['prefix']}/detail/{date_str}/data.parquet"
        )
        load_to_s3(
            df_aggregated,
            bucket=s3_config['bucket'],
            key=f"{s3_config['prefix']}/summary/{date_str}/data.parquet"
        )
        results["steps"].append({"name": "load_s3", "duration_ms": 0})

    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        results["status"] = "failed"
        results["error"] = str(e)
        raise

    finally:
        results["total_duration"] = str(datetime.now() - start_time)
        logger.info(f"Pipeline completed: {results['status']}")

    return results

Step 7: CLI Tool

# src/cli.py
import click
import logging
from src.pipeline import run_pipeline, load_config

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)


@click.group()
@click.option('--verbose', is_flag=True, help='Enable debug logging')
def cli(verbose):
    """Sales Data Pipeline CLI."""
    if verbose:
        logging.getLogger().setLevel(logging.DEBUG)


@cli.command()
@click.option('--config', default='configs/config.yaml', help='Config file path')
def run(config):
    """Run the full ETL pipeline."""
    logger.info("Starting pipeline")
    cfg = load_config(config)
    results = run_pipeline(cfg)
    click.echo(f"Pipeline status: {results['status']}")
    click.echo(f"Total duration: {results['total_duration']}")


@cli.command()
@click.argument('filepath')
def validate(filepath):
    """Validate a CSV file against quality rules."""
    from src.extract import extract_csv
    from src.validate import validate_dataframe

    df = extract_csv(filepath)
    required = ['order_id', 'customer_id', 'amount', 'order_date']
    validated = validate_dataframe(df, required_columns=required)
    click.echo(f"Validation passed: {len(validated)} rows")


if __name__ == '__main__':
    cli()

Step 8: Docker Setup

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc libpq-dev && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY configs/ ./configs/

ENV PYTHONUNBUFFERED=1

ENTRYPOINT ["python", "-m", "src.cli"]
CMD ["run"]
# docker-compose.yml
version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: analytics
      POSTGRES_USER: pipeline
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U pipeline -d analytics"]
      interval: 10s
      timeout: 5s
      retries: 5

  pipeline:
    build: .
    environment:
      DB_PASSWORD: ${DB_PASSWORD}
    depends_on:
      postgres:
        condition: service_healthy
    volumes:
      - ./data:/app/data

volumes:
  pgdata:

Step 9: Tests

# tests/conftest.py
import pandas as pd
import pytest
from datetime import datetime, timedelta
import random

@pytest.fixture
def sample_orders():
    """Generate sample order data for testing."""
    return pd.DataFrame({
        'order_id': [f'ORD-{i:03d}' for i in range(1, 101)],
        'customer_id': [random.randint(1001, 1010) for _ in range(100)],
        'amount': [round(random.uniform(10, 500), 2) for _ in range(100)],
        'order_date': [
            (datetime(2024, 1, 1) + timedelta(days=random.randint(0, 365))).isoformat()
            for _ in range(100)
        ],
        'product_category': random.choice(['electronics', 'clothing', 'food']),
        'region': random.choice(['north', 'south', 'east', 'west'])
    })

@pytest.fixture
def dirty_orders(sample_orders):
    """Orders with data quality issues."""
    df = sample_orders.copy()
    df.loc[0, 'amount'] = -50        # Negative amount
    df.loc[1, 'amount'] = None       # Null amount
    df.loc[2, 'order_id'] = df.loc[0, 'order_id']  # Duplicate
    df = pd.concat([df, df.iloc[[0]]])  # Add exact duplicate
    return df
# tests/test_extract.py
from src.extract import extract_csv
import pytest
import os

def test_extract_csv(tmp_path):
    """Test CSV extraction."""
    # Create test file
    filepath = tmp_path / "test.csv"
    filepath.write_text("col1,col2\n1,2\n3,4\n")

    df = extract_csv(str(filepath))
    assert len(df) == 2
    assert list(df.columns) == ['col1', 'col2']

def test_extract_missing_file():
    """Test extraction raises error for missing file."""
    with pytest.raises(FileNotFoundError):
        extract_csv("nonexistent.csv")
# tests/test_validate.py
from src.validate import validate_dataframe
import pandas as pd
import pytest

def test_validate_passes(sample_orders):
    """Test validation passes for valid data."""
    result = validate_dataframe(
        sample_orders,
        required_columns=['order_id', 'customer_id', 'amount']
    )
    assert len(result) == 100

def test_validate_fails_nulls():
    """Test validation fails when nulls exceed threshold."""
    df = pd.DataFrame({
        'id': [1, 2, 3],
        'value': [None, None, 100]  # 66% null
    })
    with pytest.raises(ValueError, match="null"):
        validate_dataframe(df, required_columns=['id', 'value'], max_null_pct=5.0)
# tests/test_transform.py
from src.transform import clean_data, aggregate_sales
import pandas as pd

def test_clean_data_removes_duplicates(dirty_orders):
    """Test that cleaning removes duplicate rows."""
    cleaned = clean_data(dirty_orders)
    assert cleaned['order_id'].is_unique

def test_clean_data_removes_negatives(dirty_orders):
    """Test that cleaning removes negative amounts."""
    cleaned = clean_data(dirty_orders)
    assert (cleaned['amount'] > 0).all()

def test_aggregate_sales(sample_orders):
    """Test aggregation produces expected groups."""
    cleaned = clean_data(sample_orders)
    agg = aggregate_sales(cleaned)
    assert 'total_orders' in agg.columns
    assert 'total_revenue' in agg.columns
    assert len(agg) > 0

MathSummary Takeaways

  1. Extract-Validate-Transform-Load (EVTL) is the standard pipeline pattern — each step has a clear responsibility.
  2. Validate early — catch data quality issues before they propagate through transformations.
  3. Use Pydantic for row-level validation — type-checked models ensure each record matches the expected schema.
  4. Clean data systematically — remove duplicates, standardize types, filter invalid values in a defined order.
  5. Separate concerns — separate extract, transform, and load modules for testability and reusability.
  6. Configuration outside code — use YAML/JSON configs so the same pipeline works across environments.
  7. Docker ensures reproducibility — the pipeline runs identically in development, testing, and production.
  8. Tests catch regressions — unit tests for each module, integration tests for the full pipeline.

See Also

Premium Content

Project 1 — Build Your First Data Pipeline

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement