Capstone Project: Building a Production Data Platform
This is the culminating project of the entire data engineering course.
Project Objectives
What You Will Build:
- Multi-source data ingestion system β batch, streaming, CDC, APIs
- Medallion Architecture data lakehouse β Bronze, Silver, Gold layers
- Dimensional modeling with dbt β fact and dimension tables
- Real-time streaming β for critical metrics
- CI/CD and infrastructure as code β automated deployment
- Monitoring β data quality, performance, and costs
- Governance and security β access control and compliance
Key Insight: You will design, build, test, deploy, and operate a complete data platform that handles real-world data engineering challenges.
Architecture Overview
The End-to-End Data Platform consists of five layers: Ingestion Layer (Batch S3, CDC Debezium, Streaming Kafka, API REST), Storage Layer (Data Lake S3 with Bronze Raw {'\u2192'} Silver Clean {'\u2192'} Gold Business-Ready using Delta Lake/Apache Iceberg), Processing Layer (dbt Batch, Spark Batch, Flink Stream, Airflow Orchestration), Serving Layer (Snowflake, Redis Cache, FastAPI API, Looker BI), and Governance Layer (Unity Catalog, Data Lineage, Quality Monitor, Cost Control). Data flows from ingestion through storage to processing to serving, with governance providing cross-cutting oversight.
Medallion Architecture with Technologies
Implementation
Phase 1: Infrastructure Provisioning
# terraform/main.tf - Complete infrastructure
terraform {
required_version = ">= 1.5.0"
backend "s3" {
bucket = "capstone-terraform-state"
key = "data-platform/terraform.tfstate"
region = "us-east-1"
}
}
module "storage" {
source = "./modules/storage"
environment = var.environment
project_name = "capstone"
retention_days = 365
}
module "compute" {
source = "./modules/compute"
environment = var.environment
project_name = "capstone"
warehouses = {
analytics = { size = "medium", auto_suspend = 300 }
etl = { size = "large", auto_suspend = 600 }
adhoc = { size = "x-small", auto_suspend = 60 }
}
}
module "governance" {
source = "./modules/governance"
environment = var.environment
project_name = "capstone"
roles = {
data_analyst = { grants = ["USAGE", "SELECT"] }
data_engineer = { grants = ["ALL"] }
data_admin = { grants = ["ALL"] }
}
}
Phase 2: Ingestion Pipeline
# src/ingestion/airflow_dags.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
import requests
import json
default_args = {
'owner': 'data-platform',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
def extract_shopify_orders(**context):
"""Extract orders from Shopify API."""
url = "https://store.myshopify.com/admin/api/2024-01/orders.json"
headers = {"X-Shopify-Access-Token": context['params']['token']}
params = {"limit": 250, "status": "any"}
all_orders = []
while url:
response = requests.get(url, headers=headers, params=params)
data = response.json()
all_orders.extend(data.get('orders', []))
url = response.links.get('next', {}).get('url')
# Write to S3
import boto3
s3 = boto3.client('s3')
s3.put_object(
Bucket='capstone-raw',
Key=f'shopify/orders/{context["ds"]}/orders.json',
Body=json.dumps(all_orders)
)
return len(all_orders)
def extract_stripe_payments(**context):
"""Extract payments from Stripe API."""
import stripe
stripe.api_key = context['params']['api_key']
charges = stripe.Charge.list(limit=100, created={
'gte': context['execution_date'].timestamp(),
'lt': (context['execution_date'] + timedelta(days=1)).timestamp()
})
import boto3
s3 = boto3.client('s3')
s3.put_object(
Bucket='capstone-raw',
Key=f'stripe/payments/{context["ds"]}/payments.json',
Body=json.dumps([c.to_dict() for c in charges.data])
)
return len(charges.data)
with DAG(
dag_id='capstone_ingestion',
default_args=default_args,
schedule_interval='0 1 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['capstone', 'ingestion']
) as dag:
extract_shopify = PythonOperator(
task_id='extract_shopify',
python_callable=extract_shopify_orders,
params={'token': '{{ var.value.shopify_token }}'}
)
extract_stripe = PythonOperator(
task_id='extract_stripe',
python_callable=extract_stripe_payments,
params={'api_key': '{{ var.value.stripe_api_key }}'}
)
extract_shopify >> extract_stripe
Phase 3: dbt Transformation Layer
-- models/staging/stg_shopify_orders.sql
{{
config(
materialized='view',
schema='staging',
tags=['shopify', 'orders']
)
}}
WITH source AS (
SELECT * FROM {{ source('raw', 'shopify_orders') }}
),
renamed AS (
SELECT
id AS order_id,
customer_id AS customer_id,
financial_status AS payment_status,
fulfillment_status AS fulfillment_status,
CAST(total_price AS DECIMAL(14,2)) AS total_amount,
CAST(subtotal_price AS DECIMAL(14,2)) AS subtotal,
CAST(total_tax AS DECIMAL(14,2)) AS tax_amount,
CAST(total_discounts AS DECIMAL(14,2)) AS discount_amount,
currency,
CAST(created_at AS TIMESTAMP) AS created_at,
CURRENT_TIMESTAMP() AS _loaded_at
FROM source
WHERE id IS NOT NULL
)
SELECT * FROM renamed
-- models/marts/fact_orders.sql
{{
config(
materialized='incremental',
unique_key='order_key',
partition_by={'field': 'order_date_key', 'data_type': 'integer'},
cluster_by=['customer_key', 'order_status'],
tags=['marts', 'orders', 'daily-critical']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_shopify_orders') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}
),
customers AS (
SELECT * FROM {{ ref('dim_customers') }}
),
final AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['o.order_id']) }} AS order_key,
o.order_id,
{{ dbt_utils.generate_surrogate_key(['o.customer_id']) }} AS customer_key,
{{ dbt_utils.date_key('o.created_at') }} AS order_date_key,
o.payment_status AS order_status,
o.subtotal,
o.discount_amount,
o.tax_amount,
o.total_amount AS net_amount,
o.total_amount - COALESCE(o.discount_amount, 0) AS gross_amount,
o.currency,
o.created_at,
CURRENT_TIMESTAMP() AS _dbt_loaded_at
FROM orders o
)
SELECT * FROM final
-- models/marts/dim_customers.sql
{{
config(
materialized='incremental',
unique_key='customer_key',
tags=['marts', 'customers']
)
}}
WITH customers AS (
SELECT * FROM {{ ref('stg_shopify_customers') }}
),
orders AS (
SELECT * FROM {{ ref('stg_shopify_orders') }}
),
customer_stats AS (
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(total_amount) AS lifetime_value,
MIN(created_at) AS first_order_date,
MAX(created_at) AS last_order_date
FROM orders
GROUP BY 1
),
final AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['c.customer_id']) }} AS customer_key,
c.customer_id,
c.first_name || ' ' || c.last_name AS full_name,
c.email,
c.city,
c.state,
c.country,
COALESCE(s.total_orders, 0) AS total_orders,
COALESCE(s.lifetime_value, 0) AS lifetime_value,
s.first_order_date,
s.last_order_date,
CASE
WHEN s.lifetime_value > 10000 THEN 'enterprise'
WHEN s.lifetime_value > 1000 THEN 'mid_market'
ELSE 'smb'
END AS customer_segment,
CURRENT_TIMESTAMP() AS valid_from,
CAST('9999-12-31' AS TIMESTAMP) AS valid_to,
TRUE AS is_current
FROM customers c
LEFT JOIN customer_stats s ON c.customer_id = s.customer_id
)
SELECT * FROM final
Phase 4: Monitoring and Quality
# src/monitoring/quality_checks.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
context = gx.get_context()
# Data Quality Suite
quality_suite = ExpectationSuite(expectation_suite_name="capstone_quality")
quality_suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
quality_suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
quality_suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="net_amount", min_value=0.01, max_value=1000000
)
)
quality_suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=100, max_value=10000000
)
)
# Run validation
result = context.run_checkpoint(
checkpoint_name="capstone_checkpoint",
suite=quality_suite
)
print(f"Quality Score: {result.statistics['success_percent']:.2f}%")
Key Concepts Summary
| Phase | Component | Technology | Deliverable |
|---|---|---|---|
| Provisioning | Infrastructure | Terraform | Cloud resources |
| Ingestion | Data Extraction | Airflow, Python | Raw data in S3 |
| Storage | Data Lake | Delta Lake, S3 | Bronze/Silver/Gold |
| Transformation | ELT | dbt, SQL | Dimensional model |
| Serving | Analytics | Snowflake, Looker | Dashboards |
| Streaming | Real-time | Kafka, Flink | Real-time metrics |
| Quality | Monitoring | Great Expectations | Quality reports |
| Governance | Access Control | Unity Catalog, RBAC | Secure access |
| CI/CD | Automation | GitHub Actions | Automated deployment |
| Documentation | Knowledge | dbt docs, README | Data dictionary |
Performance Metrics
| Metric | Target | Actual | Status |
|---|---|---|---|
| Pipeline SLA | < 2 hours | TBD | |
| Data Freshness | < 1 hour | TBD | |
| Quality Score | > 99% | TBD | |
| Query Latency | < 10 sec | TBD | |
| Test Coverage | 100% | TBD | |
| Documentation | 100% | TBD | |
| Cost Budget | < $500/month | TBD | |
| Uptime | 99.9% | TBD |
10 Best Practices
- Start with infrastructure β IaC first, then build on top
- Ingest raw data first β don't transform at ingestion; preserve original format
- Use Medallion Architecture β Bronze -> Silver -> Gold for data lifecycle
- Test everything β data quality, schema, freshness, volume
- Document as you build β don't leave documentation for the end
- Monitor from day one β pipeline health, data quality, costs
- Implement security early β RBAC, encryption, access controls
- Use CI/CD β automate testing and deployment
- Measure costs β track and optimize spending weekly
- Prepare a demo β show the working system, not just code
- A complete data platform requires ingestion, storage, transformation, serving, and governance
- Each phase builds on the previous β infrastructure -> ingestion -> storage -> transformation -> serving
- Testing, monitoring, and documentation are production requirements, not nice-to-haves
- CI/CD and IaC enable reproducible, auditable deployments
- The capstone demonstrates end-to-end data engineering competency for portfolio and interviews
See Also
- Data Warehouse Concepts β Dimensional modeling fundamentals
- dbt Fundamentals β SQL transformation layer
- Snowflake Fundamentals β Cloud data warehouse
- Data Lake Architecture β Medallion architecture patterns
- Infrastructure as Code β Terraform provisioning
- CI/CD for Data Pipelines β Automated testing and deployment
- Data Governance & Catalog β Metadata management and lineage
- Portfolio Projects β Additional portfolio project ideas