Module 3: Data Warehouses & StorageCapstone Projectπ’ Free Lesson
Advertisement
Project 3: Building a Production Data Warehouse
This capstone project brings together everything from Module 3. You will design, build, test, and deploy a complete data warehouse using modern tools and best practices.
Project Objectives
Design a star schema for an e-commerce platform
Implement SCD Type 2 for dimension tracking
Build an ELT pipeline with dbt on Snowflake
Create orchestration with Apache Airflow
Implement data quality testing and monitoring
Deploy to production with CI/CD
Architecture Overview
Step 1: Schema Design
-- Dimension: Customer (SCD Type 2)
CREATE TABLE dim_customer (
customer_key BIGINT IDENTITY(1,1) PRIMARY KEY,
customer_id VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(300),
full_name VARCHAR(200),
phone VARCHAR(50),
address_line1 VARCHAR(200),
city VARCHAR(100),
state VARCHAR(100),
country VARCHAR(100),
postal_code VARCHAR(20),
segment VARCHAR(50),
acquisition_channel VARCHAR(50),
first_order_date DATE,
valid_from TIMESTAMP NOT NULL,
valid_to TIMESTAMP NOT NULL DEFAULT '9999-12-31',
is_current BOOLEAN DEFAULT TRUE,
row_hash VARCHAR(64)
);
-- Dimension: Product (SCD Type 1 - overwrite)
CREATE TABLE dim_product (
product_key INT IDENTITY(1,1) PRIMARY KEY,
product_id VARCHAR(50) UNIQUE NOT NULL,
product_name VARCHAR(500),
category VARCHAR(100),
subcategory VARCHAR(100),
brand VARCHAR(100),
unit_cost DECIMAL(12,2),
unit_price DECIMAL(12,2),
supplier_id VARCHAR(50),
is_active BOOLEAN DEFAULT TRUE,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Dimension: Date
CREATE TABLE dim_date (
date_key INT PRIMARY KEY,
full_date DATE UNIQUE NOT NULL,
day_of_week SMALLINT,
day_name VARCHAR(10),
is_weekend BOOLEAN,
month_number SMALLINT,
month_name VARCHAR(10),
quarter SMALLINT,
year INT,
fiscal_year INT,
fiscal_quarter SMALLINT,
week_of_year SMALLINT
);
-- Fact: Orders (Transaction grain)
CREATE TABLE fact_orders (
order_key BIGINT IDENTITY(1,1) PRIMARY KEY,
order_id VARCHAR(50) NOT NULL,
customer_key INT NOT NULL,
order_date_key INT NOT NULL,
ship_date_key INT,
order_status VARCHAR(20),
line_item_count INT,
subtotal DECIMAL(14,2),
discount_amount DECIMAL(14,2),
tax_amount DECIMAL(14,2),
shipping_amount DECIMAL(14,2),
total_amount DECIMAL(14,2),
currency VARCHAR(3),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Fact: Order Line Items (Atomic grain)
CREATE TABLE fact_order_items (
item_key BIGINT IDENTITY(1,1) PRIMARY KEY,
order_key BIGINT NOT NULL REFERENCES fact_orders(order_key),
product_key INT NOT NULL,
quantity INT,
unit_price DECIMAL(12,2),
discount_pct DECIMAL(5,4),
line_total DECIMAL(14,2),
cost_of_goods DECIMAL(14,2),
profit DECIMAL(14,2)
);
-- Fact: Daily Aggregates (Periodic Snapshot)
CREATE TABLE fact_daily_sales (
daily_key BIGINT IDENTITY(1,1) PRIMARY KEY,
date_key INT NOT NULL,
product_key INT,
customer_key INT,
channel VARCHAR(50),
order_count INT,
revenue DECIMAL(14,2),
cost DECIMAL(14,2),
profit DECIMAL(14,2),
unique_customers INT,
avg_order_value DECIMAL(14,2)
);
Step 2: dbt Models
-- models/staging/stg_shopify_orders.sql
{{
config(
materialized='view',
schema='staging'
)
}}
WITH source AS (
SELECT * FROM {{ source('shopify', 'orders') }}
),
renamed AS (
SELECT
id AS order_id,
CAST(order_number AS VARCHAR(50)) AS order_number,
customer_id AS customer_id,
email AS customer_email,
financial_status AS payment_status,
fulfillment_status AS fulfillment_status,
CAST(total_price AS DECIMAL(14,2)) AS total_amount,
CAST(subtotal_price AS DECIMAL(14,2)) AS subtotal,
CAST(total_tax AS DECIMAL(14,2)) AS tax_amount,
CAST(total_discounts AS DECIMAL(14,2)) AS discount_amount,
currency,
CAST(created_at AS TIMESTAMP) AS created_at,
CAST.updated_at AS TIMESTAMP) AS updated_at
FROM source
WHERE id IS NOT NULL
)
SELECT * FROM renamed
-- models/marts/dim_customers.sql
{{
config(
materialized='incremental',
unique_key='customer_key',
incremental_strategy='merge'
)
}}
WITH customers AS (
SELECT * FROM {{ ref('stg_shopify_customers') }}
),
orders AS (
SELECT * FROM {{ ref('stg_shopify_orders') }}
),
customer_stats AS (
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(total_amount) AS lifetime_value,
MIN(created_at) AS first_order_date,
MAX(created_at) AS last_order_date
FROM orders
GROUP BY 1
),
final AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['c.customer_id']) }} AS customer_key,
c.customer_id,
c.email,
c.first_name || ' ' || c.last_name AS full_name,
c.phone,
c.city,
c.state,
c.country,
COALESCE(s.total_orders, 0) AS total_orders,
COALESCE(s.lifetime_value, 0) AS lifetime_value,
s.first_order_date,
s.last_order_date,
CASE
WHEN s.lifetime_value > 10000 THEN 'enterprise'
WHEN s.lifetime_value > 1000 THEN 'mid_market'
ELSE 'smb'
END AS customer_segment,
CURRENT_TIMESTAMP() AS valid_from,
CAST('9999-12-31' AS TIMESTAMP) AS valid_to,
TRUE AS is_current
FROM customers c
LEFT JOIN customer_stats s ON c.customer_id = s.customer_id
)
SELECT * FROM final
-- models/marts/fact_orders.sql
{{
config(
materialized='incremental',
unique_key='order_key',
partition_by={'field': 'order_date_key', 'data_type': 'integer'},
cluster_by=['customer_key', 'order_status']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_shopify_orders') }}
),
order_items AS (
SELECT * FROM {{ ref('stg_shopify_order_items') }}
),
item_aggregates AS (
SELECT
order_id,
COUNT(*) AS line_item_count,
SUM(line_total) AS items_subtotal,
SUM(cost_of_goods) AS items_cost
FROM order_items
GROUP BY 1
),
final AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['o.order_id']) }} AS order_key,
o.order_id,
{{ dbt_utils.generate_surrogate_key(['o.customer_id']) }} AS customer_key,
{{ dbt_utils.date_key('o.created_at') }} AS order_date_key,
o.payment_status AS order_status,
COALESCE(ia.line_item_count, 0) AS line_item_count,
o.subtotal,
o.discount_amount,
o.tax_amount,
o.total_amount AS net_amount,
COALESCE(ia.items_cost, 0) AS cost_of_goods,
o.total_amount - COALESCE(ia.items_cost, 0) AS gross_profit,
o.currency,
o.created_at,
CURRENT_TIMESTAMP() AS dbt_loaded_at
FROM orders o
LEFT JOIN item_aggregates ia ON o.order_id = ia.order_id
)
SELECT * FROM final