πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Project 3: End-to-End Data Warehouse Build

Module 3: Data Warehouses & StorageCapstone Project🟒 Free Lesson

Advertisement

Project 3: Building a Production Data Warehouse

This capstone project brings together everything from Module 3. You will design, build, test, and deploy a complete data warehouse using modern tools and best practices.

Project Objectives

  • Design a star schema for an e-commerce platform
  • Implement SCD Type 2 for dimension tracking
  • Build an ELT pipeline with dbt on Snowflake
  • Create orchestration with Apache Airflow
  • Implement data quality testing and monitoring
  • Deploy to production with CI/CD

Architecture Overview


Warehouse Build Pipeline ArchitectureSourcesShopify APIStripe PaymentsSegment EventsPostgreSQL CDC10K orders/dayIngestionAirflow DAGsSnowpipeFivetranCOPY INTODaily scheduleStagingdbt Staging ModelsRename ColumnsType CastingDeduplicationViews (ephemeral)Martsdim_customersdim_productsfact_ordersdim_dateIncremental modelsServingSnowflakeLooker Dashboardsdbt DocsREST APISLA: {'<'} 2 hours

Star Schema ERDfact_ordersPKorder_keyFKcustomer_keyFKorder_date_keyFKproduct_keynet_amountorder_statuscurrencydim_customerFKcustomer_keycustomer_idfull_nameemailsegmentSCD Type 2dim_productFKproduct_keyproduct_idproduct_namecategorybrandSCD Type 1dim_datePKdate_keyfull_dateday_of_weekmonth, quarterfiscal_yearfact_itemsFKorder_keyFKproduct_keyquantity, line_totalcost_of_goodsGrain: one row per order

Step 1: Schema Design

-- Dimension: Customer (SCD Type 2)
CREATE TABLE dim_customer (
    customer_key    BIGINT IDENTITY(1,1) PRIMARY KEY,
    customer_id     VARCHAR(50) UNIQUE NOT NULL,
    email           VARCHAR(300),
    full_name       VARCHAR(200),
    phone           VARCHAR(50),
    address_line1   VARCHAR(200),
    city            VARCHAR(100),
    state           VARCHAR(100),
    country         VARCHAR(100),
    postal_code     VARCHAR(20),
    segment         VARCHAR(50),
    acquisition_channel VARCHAR(50),
    first_order_date DATE,
    valid_from      TIMESTAMP NOT NULL,
    valid_to        TIMESTAMP NOT NULL DEFAULT '9999-12-31',
    is_current      BOOLEAN DEFAULT TRUE,
    row_hash        VARCHAR(64)
);

-- Dimension: Product (SCD Type 1 - overwrite)
CREATE TABLE dim_product (
    product_key     INT IDENTITY(1,1) PRIMARY KEY,
    product_id      VARCHAR(50) UNIQUE NOT NULL,
    product_name    VARCHAR(500),
    category        VARCHAR(100),
    subcategory     VARCHAR(100),
    brand           VARCHAR(100),
    unit_cost       DECIMAL(12,2),
    unit_price      DECIMAL(12,2),
    supplier_id     VARCHAR(50),
    is_active       BOOLEAN DEFAULT TRUE,
    last_updated    TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Dimension: Date
CREATE TABLE dim_date (
    date_key        INT PRIMARY KEY,
    full_date       DATE UNIQUE NOT NULL,
    day_of_week     SMALLINT,
    day_name        VARCHAR(10),
    is_weekend      BOOLEAN,
    month_number    SMALLINT,
    month_name      VARCHAR(10),
    quarter         SMALLINT,
    year            INT,
    fiscal_year     INT,
    fiscal_quarter  SMALLINT,
    week_of_year    SMALLINT
);

-- Fact: Orders (Transaction grain)
CREATE TABLE fact_orders (
    order_key        BIGINT IDENTITY(1,1) PRIMARY KEY,
    order_id         VARCHAR(50) NOT NULL,
    customer_key     INT NOT NULL,
    order_date_key   INT NOT NULL,
    ship_date_key    INT,
    order_status     VARCHAR(20),
    line_item_count  INT,
    subtotal         DECIMAL(14,2),
    discount_amount  DECIMAL(14,2),
    tax_amount       DECIMAL(14,2),
    shipping_amount  DECIMAL(14,2),
    total_amount     DECIMAL(14,2),
    currency         VARCHAR(3),
    created_at       TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Fact: Order Line Items (Atomic grain)
CREATE TABLE fact_order_items (
    item_key         BIGINT IDENTITY(1,1) PRIMARY KEY,
    order_key        BIGINT NOT NULL REFERENCES fact_orders(order_key),
    product_key      INT NOT NULL,
    quantity         INT,
    unit_price       DECIMAL(12,2),
    discount_pct     DECIMAL(5,4),
    line_total       DECIMAL(14,2),
    cost_of_goods    DECIMAL(14,2),
    profit           DECIMAL(14,2)
);

-- Fact: Daily Aggregates (Periodic Snapshot)
CREATE TABLE fact_daily_sales (
    daily_key        BIGINT IDENTITY(1,1) PRIMARY KEY,
    date_key         INT NOT NULL,
    product_key      INT,
    customer_key     INT,
    channel          VARCHAR(50),
    order_count      INT,
    revenue          DECIMAL(14,2),
    cost             DECIMAL(14,2),
    profit           DECIMAL(14,2),
    unique_customers INT,
    avg_order_value  DECIMAL(14,2)
);

Step 2: dbt Models

-- models/staging/stg_shopify_orders.sql
{{
    config(
        materialized='view',
        schema='staging'
    )
}}

WITH source AS (
    SELECT * FROM {{ source('shopify', 'orders') }}
),

renamed AS (
    SELECT
        id                                      AS order_id,
        CAST(order_number AS VARCHAR(50))       AS order_number,
        customer_id                             AS customer_id,
        email                                   AS customer_email,
        financial_status                        AS payment_status,
        fulfillment_status                      AS fulfillment_status,
        CAST(total_price AS DECIMAL(14,2))      AS total_amount,
        CAST(subtotal_price AS DECIMAL(14,2))   AS subtotal,
        CAST(total_tax AS DECIMAL(14,2))        AS tax_amount,
        CAST(total_discounts AS DECIMAL(14,2))  AS discount_amount,
        currency,
        CAST(created_at AS TIMESTAMP)           AS created_at,
        CAST.updated_at AS TIMESTAMP)           AS updated_at
    FROM source
    WHERE id IS NOT NULL
)

SELECT * FROM renamed

-- models/marts/dim_customers.sql
{{
    config(
        materialized='incremental',
        unique_key='customer_key',
        incremental_strategy='merge'
    )
}}

WITH customers AS (
    SELECT * FROM {{ ref('stg_shopify_customers') }}
),

orders AS (
    SELECT * FROM {{ ref('stg_shopify_orders') }}
),

customer_stats AS (
    SELECT
        customer_id,
        COUNT(*)                    AS total_orders,
        SUM(total_amount)           AS lifetime_value,
        MIN(created_at)             AS first_order_date,
        MAX(created_at)             AS last_order_date
    FROM orders
    GROUP BY 1
),

final AS (
    SELECT
        {{ dbt_utils.generate_surrogate_key(['c.customer_id']) }} AS customer_key,
        c.customer_id,
        c.email,
        c.first_name || ' ' || c.last_name AS full_name,
        c.phone,
        c.city,
        c.state,
        c.country,
        COALESCE(s.total_orders, 0) AS total_orders,
        COALESCE(s.lifetime_value, 0) AS lifetime_value,
        s.first_order_date,
        s.last_order_date,
        CASE
            WHEN s.lifetime_value > 10000 THEN 'enterprise'
            WHEN s.lifetime_value > 1000 THEN 'mid_market'
            ELSE 'smb'
        END AS customer_segment,
        CURRENT_TIMESTAMP() AS valid_from,
        CAST('9999-12-31' AS TIMESTAMP) AS valid_to,
        TRUE AS is_current
    FROM customers c
    LEFT JOIN customer_stats s ON c.customer_id = s.customer_id
)

SELECT * FROM final

-- models/marts/fact_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_key',
        partition_by={'field': 'order_date_key', 'data_type': 'integer'},
        cluster_by=['customer_key', 'order_status']
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('stg_shopify_orders') }}
),

order_items AS (
    SELECT * FROM {{ ref('stg_shopify_order_items') }}
),

item_aggregates AS (
    SELECT
        order_id,
        COUNT(*)                    AS line_item_count,
        SUM(line_total)             AS items_subtotal,
        SUM(cost_of_goods)          AS items_cost
    FROM order_items
    GROUP BY 1
),

final AS (
    SELECT
        {{ dbt_utils.generate_surrogate_key(['o.order_id']) }} AS order_key,
        o.order_id,
        {{ dbt_utils.generate_surrogate_key(['o.customer_id']) }} AS customer_key,
        {{ dbt_utils.date_key('o.created_at') }} AS order_date_key,
        o.payment_status AS order_status,
        COALESCE(ia.line_item_count, 0) AS line_item_count,
        o.subtotal,
        o.discount_amount,
        o.tax_amount,
        o.total_amount AS net_amount,
        COALESCE(ia.items_cost, 0) AS cost_of_goods,
        o.total_amount - COALESCE(ia.items_cost, 0) AS gross_profit,
        o.currency,
        o.created_at,
        CURRENT_TIMESTAMP() AS dbt_loaded_at
    FROM orders o
    LEFT JOIN item_aggregates ia ON o.order_id = ia.order_id
)

SELECT * FROM final

Step 3: Airflow DAG

# dags/warehouse_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-team@company.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='warehouse_etl',
    default_args=default_args,
    description='End-to-end data warehouse ETL pipeline',
    schedule_interval='0 6 * * *',  # 6 AM UTC daily
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['warehouse', 'production'],
) as dag:

    # Step 1: Check source freshness
    check_freshness = DbtCloudRunJobOperator(
        job_id=12345,
        check_interval=60,
        timeout=300,
        dbt_kwargs={
            'select': 'source:freshness',
            'full_refresh': False,
        },
    )

    # Step 2: Run staging models
    run_staging = DbtCloudRunJobOperator(
        job_id=12345,
        check_interval=60,
        timeout=300,
        dbt_kwargs={
            'select': 'tag:staging',
            'full_refresh': True,
        },
    )

    # Step 3: Run intermediate models
    run_intermediate = DbtCloudRunJobOperator(
        job_id=12345,
        check_interval=60,
        timeout=300,
        dbt_kwargs={
            'select': 'tag:intermediate',
        },
    )

    # Step 4: Run mart models (incremental)
    run_marts = DbtCloudRunJobOperator(
        job_id=12345,
        check_interval=60,
        timeout=600,
        dbt_kwargs={
            'select': 'tag:marts',
        },
    )

    # Step 5: Run data quality tests
    run_tests = DbtCloudRunJobOperator(
        job_id=12345,
        check_interval=60,
        timeout=300,
        dbt_kwargs={
            'select': 'tag:marts',
            'run_tests': True,
        },
    )

    # Step 6: Update statistics
    update_stats = SnowflakeOperator(
        snowflake_conn_id='snowflake_prod',
        sql="""
            ANALYZE TABLE {{ params.table }} COMPUTE STATISTICS;
        """,
        params={'table': 'fact_orders'},
    )

    # Step 7: Notify completion
    def notify_completion(**context):
        print("ETL pipeline completed successfully")
        # Send notification to Slack/email

    notify = PythonOperator(
        task_id='notify_completion',
        python_callable=notify_completion,
    )

    # Define task dependencies
    check_freshness >> run_staging >> run_intermediate >> run_marts >> run_tests >> update_stats >> notify

Step 4: Data Quality Tests

# models/marts/_marts__tests.yml
version: 2

models:
  - name: fact_orders
    description: "Fact table for all customer orders"
    tests:
      - dbt_utils.expression_is_true:
          expression: "net_amount > 0"
      - dbt_utils.expression_is_true:
          expression: "order_date_key > 20200101"
    columns:
      - name: order_key
        tests:
          - unique
          - not_null
      - name: customer_key
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_key
      - name: order_date_key
        tests:
          - not_null
          - relationships:
              to: ref('dim_date')
              field: date_key
      - name: net_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              inclusive: false
              max_value: 1000000

  - name: dim_customers
    description: "Customer dimension with SCD Type 2"
    tests:
      - unique:
          column_name: "customer_key"
      - dbt_utils.expression_is_true:
          expression: "is_current IN (TRUE, FALSE)"
    columns:
      - name: customer_key
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - unique:
              config:
                where: "is_current = TRUE"

Key Concepts Summary

PhaseComponentToolDeliverable
DesignStar SchemaSQL DDLSchema diagrams, DDL scripts
IngestionExtractAirflow / FivetranRaw data in staging
LoadLoadSnowpipe / COPYStaged data
TransformELTdbtDimensional model
TestQualitydbt testsQuality assertions
DocumentDocsdbt docsData dictionary
OrchestrateDAGAirflowAutomated pipeline
MonitorObservabilityMonte Carlo / customAlerts, dashboards
DeployCI/CDGitHub ActionsProduction deployment
OptimizePerformanceClustering, partitioningFast queries

Performance Metrics

MetricTargetMeasurementOptimization
Pipeline SLA< 2 hoursEnd-to-end runtimeParallelize dbt models
Data Freshness< 1 hourSource-to-serve latencySnowpipe, incremental
Test Pass Rate100%All tests passingBlock deployment on failure
Query Performance< 10 secDashboard load timePartitioning, clustering
Documentation100%Models with descriptionsEnforce in PR reviews
Cost Budget< $500/monthSnowflake computeAuto-suspend, right-sizing

10 Best Practices

  1. Design schema before writing code β€” document grain, measures, and dimensions first
  2. Use dbt for all transformations β€” no ad-hoc SQL outside of version control
  3. Implement SCD Type 2 for all dimensions requiring historical analysis
  4. Partition fact tables by date and cluster by high-cardinality filter columns
  5. Run dbt tests on every PR β€” block merges on test failures
  6. Use incremental models for all tables > 10 GB
  7. Separate staging and production schemas β€” staging is ephemeral, production is governed
  8. Implement Airflow DAG retries and alerting for failed tasks
  9. Monitor Snowflake credit usage weekly and right-size warehouses
  10. Document every metric in dbt docs β€” analysts should never guess what a column means

  • Schema design (grain, dimensions, measures) is the foundation β€” get it right first
  • dbt + Snowflake provides a production-grade ELT platform with minimal infrastructure
  • Automated testing catches data quality issues before they reach analysts
  • Orchestration (Airflow) ties ingestion, transformation, and notification into a reliable DAG
  • Monitoring, documentation, and cost optimization are ongoing production responsibilities

See Also

⭐

Premium Content

Project 3: End-to-End Data Warehouse Build

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement