πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Capstone: End-to-End Data Platform Build

Module 4: Advanced DE & CareerCapstone Project🟒 Free Lesson

Advertisement

Capstone Project: Building a Production Data Platform

This is the culminating project of the entire data engineering course.

Project Objectives


What You Will Build:

  1. Multi-source data ingestion system β€” batch, streaming, CDC, APIs
  2. Medallion Architecture data lakehouse β€” Bronze, Silver, Gold layers
  3. Dimensional modeling with dbt β€” fact and dimension tables
  4. Real-time streaming β€” for critical metrics
  5. CI/CD and infrastructure as code β€” automated deployment
  6. Monitoring β€” data quality, performance, and costs
  7. Governance and security β€” access control and compliance

Key Insight: You will design, build, test, deploy, and operate a complete data platform that handles real-world data engineering challenges.


Architecture Overview

Complete Platform ArchitectureIngestionBatchS3 + AirflowSchedule DAGsCDCDebeziumPostgreSQL WALStreamingKafka TopicsReal-time eventsAPIREST + GraphQLRate limiting4 Ingestion ModesTerraform IaCStorage (Lake)Bronze LayerRaw ingestionSilver LayerCleansed + dedupGold LayerBusiness-readyDelta Lake / IcebergACID transactionsSchema enforcementTime travelUnity CatalogProcessingdbt (Batch)Spark (Batch)Flink (Stream)Python UDFGreat ExpectationsQuality GatesSchema + data checksAirflow OrchestrationServingSnowflakeRedis CacheFastAPILooker BIGrafanaMulti-modelOLAP + API + BISub-second queriesGovernanceRBAC / ABACData LineageAudit LogsPII DetectionCost TrackingMonitoringUnity CatalogEnd-to-end platform with full governance, CI/CD, and monitoring across all layers

The End-to-End Data Platform consists of five layers: Ingestion Layer (Batch S3, CDC Debezium, Streaming Kafka, API REST), Storage Layer (Data Lake S3 with Bronze Raw {'\u2192'} Silver Clean {'\u2192'} Gold Business-Ready using Delta Lake/Apache Iceberg), Processing Layer (dbt Batch, Spark Batch, Flink Stream, Airflow Orchestration), Serving Layer (Snowflake, Redis Cache, FastAPI API, Looker BI), and Governance Layer (Unity Catalog, Data Lineage, Quality Monitor, Cost Control). Data flows from ingestion through storage to processing to serving, with governance providing cross-cutting oversight.


Medallion Architecture with Technologies

Medallion Architecture: Bronze {'\u2192'} Silver {'\u2192'} GoldBronze LayerRaw IngestionS3 raw bucket (Parquet)Kafka topic raw eventsCDC Debezium logsAPI JSON responsesSchema: UnstructuredETLSilver LayerCleansed & DeduplicatedDeduplicated recordsNull handling + imputationType casting + validationStandardized namingSchema: EnforcedETLGold LayerBusiness-ReadyStar schema facts + dimsAggregated KPIsLooker views + BI dashboardsML feature tablesSchema: DimensionalBronze: Delta Lake append-only {'\u2022'} Silver: dbt + Spark transforms {'\u2022'} Gold: Snowflake serving

Implementation

Phase 1: Infrastructure Provisioning

# terraform/main.tf - Complete infrastructure
terraform {
  required_version = ">= 1.5.0"
  backend "s3" {
    bucket = "capstone-terraform-state"
    key    = "data-platform/terraform.tfstate"
    region = "us-east-1"
  }
}

module "storage" {
  source = "./modules/storage"

  environment    = var.environment
  project_name   = "capstone"
  retention_days = 365
}

module "compute" {
  source = "./modules/compute"

  environment  = var.environment
  project_name = "capstone"

  warehouses = {
    analytics = { size = "medium", auto_suspend = 300 }
    etl       = { size = "large",  auto_suspend = 600 }
    adhoc     = { size = "x-small", auto_suspend = 60 }
  }
}

module "governance" {
  source = "./modules/governance"

  environment  = var.environment
  project_name = "capstone"

  roles = {
    data_analyst  = { grants = ["USAGE", "SELECT"] }
    data_engineer = { grants = ["ALL"] }
    data_admin    = { grants = ["ALL"] }
  }
}

Phase 2: Ingestion Pipeline

# src/ingestion/airflow_dags.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
import requests
import json

default_args = {
    'owner': 'data-platform',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

def extract_shopify_orders(**context):
    """Extract orders from Shopify API."""
    url = "https://store.myshopify.com/admin/api/2024-01/orders.json"
    headers = {"X-Shopify-Access-Token": context['params']['token']}
    params = {"limit": 250, "status": "any"}

    all_orders = []
    while url:
        response = requests.get(url, headers=headers, params=params)
        data = response.json()
        all_orders.extend(data.get('orders', []))
        url = response.links.get('next', {}).get('url')

    # Write to S3
    import boto3
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket='capstone-raw',
        Key=f'shopify/orders/{context["ds"]}/orders.json',
        Body=json.dumps(all_orders)
    )

    return len(all_orders)

def extract_stripe_payments(**context):
    """Extract payments from Stripe API."""
    import stripe
    stripe.api_key = context['params']['api_key']

    charges = stripe.Charge.list(limit=100, created={
        'gte': context['execution_date'].timestamp(),
        'lt': (context['execution_date'] + timedelta(days=1)).timestamp()
    })

    import boto3
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket='capstone-raw',
        Key=f'stripe/payments/{context["ds"]}/payments.json',
        Body=json.dumps([c.to_dict() for c in charges.data])
    )

    return len(charges.data)

with DAG(
    dag_id='capstone_ingestion',
    default_args=default_args,
    schedule_interval='0 1 * * *',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['capstone', 'ingestion']
) as dag:

    extract_shopify = PythonOperator(
        task_id='extract_shopify',
        python_callable=extract_shopify_orders,
        params={'token': '{{ var.value.shopify_token }}'}
    )

    extract_stripe = PythonOperator(
        task_id='extract_stripe',
        python_callable=extract_stripe_payments,
        params={'api_key': '{{ var.value.stripe_api_key }}'}
    )

    extract_shopify >> extract_stripe

Phase 3: dbt Transformation Layer

-- models/staging/stg_shopify_orders.sql
{{
    config(
        materialized='view',
        schema='staging',
        tags=['shopify', 'orders']
    )
}}

WITH source AS (
    SELECT * FROM {{ source('raw', 'shopify_orders') }}
),

renamed AS (
    SELECT
        id                                      AS order_id,
        customer_id                             AS customer_id,
        financial_status                        AS payment_status,
        fulfillment_status                      AS fulfillment_status,
        CAST(total_price AS DECIMAL(14,2))      AS total_amount,
        CAST(subtotal_price AS DECIMAL(14,2))   AS subtotal,
        CAST(total_tax AS DECIMAL(14,2))        AS tax_amount,
        CAST(total_discounts AS DECIMAL(14,2))  AS discount_amount,
        currency,
        CAST(created_at AS TIMESTAMP)           AS created_at,
        CURRENT_TIMESTAMP()                     AS _loaded_at
    FROM source
    WHERE id IS NOT NULL
)

SELECT * FROM renamed

-- models/marts/fact_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_key',
        partition_by={'field': 'order_date_key', 'data_type': 'integer'},
        cluster_by=['customer_key', 'order_status'],
        tags=['marts', 'orders', 'daily-critical']
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('stg_shopify_orders') }}

    {% if is_incremental() %}
        WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
    {% endif %}
),

customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
),

final AS (
    SELECT
        {{ dbt_utils.generate_surrogate_key(['o.order_id']) }} AS order_key,
        o.order_id,
        {{ dbt_utils.generate_surrogate_key(['o.customer_id']) }} AS customer_key,
        {{ dbt_utils.date_key('o.created_at') }} AS order_date_key,
        o.payment_status AS order_status,
        o.subtotal,
        o.discount_amount,
        o.tax_amount,
        o.total_amount AS net_amount,
        o.total_amount - COALESCE(o.discount_amount, 0) AS gross_amount,
        o.currency,
        o.created_at,
        CURRENT_TIMESTAMP() AS _dbt_loaded_at
    FROM orders o
)

SELECT * FROM final

-- models/marts/dim_customers.sql
{{
    config(
        materialized='incremental',
        unique_key='customer_key',
        tags=['marts', 'customers']
    )
}}

WITH customers AS (
    SELECT * FROM {{ ref('stg_shopify_customers') }}
),

orders AS (
    SELECT * FROM {{ ref('stg_shopify_orders') }}
),

customer_stats AS (
    SELECT
        customer_id,
        COUNT(*) AS total_orders,
        SUM(total_amount) AS lifetime_value,
        MIN(created_at) AS first_order_date,
        MAX(created_at) AS last_order_date
    FROM orders
    GROUP BY 1
),

final AS (
    SELECT
        {{ dbt_utils.generate_surrogate_key(['c.customer_id']) }} AS customer_key,
        c.customer_id,
        c.first_name || ' ' || c.last_name AS full_name,
        c.email,
        c.city,
        c.state,
        c.country,
        COALESCE(s.total_orders, 0) AS total_orders,
        COALESCE(s.lifetime_value, 0) AS lifetime_value,
        s.first_order_date,
        s.last_order_date,
        CASE
            WHEN s.lifetime_value > 10000 THEN 'enterprise'
            WHEN s.lifetime_value > 1000 THEN 'mid_market'
            ELSE 'smb'
        END AS customer_segment,
        CURRENT_TIMESTAMP() AS valid_from,
        CAST('9999-12-31' AS TIMESTAMP) AS valid_to,
        TRUE AS is_current
    FROM customers c
    LEFT JOIN customer_stats s ON c.customer_id = s.customer_id
)

SELECT * FROM final

Phase 4: Monitoring and Quality

# src/monitoring/quality_checks.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite

context = gx.get_context()

# Data Quality Suite
quality_suite = ExpectationSuite(expectation_suite_name="capstone_quality")

quality_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
quality_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
quality_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="net_amount", min_value=0.01, max_value=1000000
    )
)
quality_suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=100, max_value=10000000
    )
)

# Run validation
result = context.run_checkpoint(
    checkpoint_name="capstone_checkpoint",
    suite=quality_suite
)

print(f"Quality Score: {result.statistics['success_percent']:.2f}%")

Key Concepts Summary

PhaseComponentTechnologyDeliverable
ProvisioningInfrastructureTerraformCloud resources
IngestionData ExtractionAirflow, PythonRaw data in S3
StorageData LakeDelta Lake, S3Bronze/Silver/Gold
TransformationELTdbt, SQLDimensional model
ServingAnalyticsSnowflake, LookerDashboards
StreamingReal-timeKafka, FlinkReal-time metrics
QualityMonitoringGreat ExpectationsQuality reports
GovernanceAccess ControlUnity Catalog, RBACSecure access
CI/CDAutomationGitHub ActionsAutomated deployment
DocumentationKnowledgedbt docs, READMEData dictionary

Performance Metrics

MetricTargetActualStatus
Pipeline SLA< 2 hoursTBD
Data Freshness< 1 hourTBD
Quality Score> 99%TBD
Query Latency< 10 secTBD
Test Coverage100%TBD
Documentation100%TBD
Cost Budget< $500/monthTBD
Uptime99.9%TBD

10 Best Practices

  1. Start with infrastructure β€” IaC first, then build on top
  2. Ingest raw data first β€” don't transform at ingestion; preserve original format
  3. Use Medallion Architecture β€” Bronze -> Silver -> Gold for data lifecycle
  4. Test everything β€” data quality, schema, freshness, volume
  5. Document as you build β€” don't leave documentation for the end
  6. Monitor from day one β€” pipeline health, data quality, costs
  7. Implement security early β€” RBAC, encryption, access controls
  8. Use CI/CD β€” automate testing and deployment
  9. Measure costs β€” track and optimize spending weekly
  10. Prepare a demo β€” show the working system, not just code

  • A complete data platform requires ingestion, storage, transformation, serving, and governance
  • Each phase builds on the previous β€” infrastructure -> ingestion -> storage -> transformation -> serving
  • Testing, monitoring, and documentation are production requirements, not nice-to-haves
  • CI/CD and IaC enable reproducible, auditable deployments
  • The capstone demonstrates end-to-end data engineering competency for portfolio and interviews

See Also

⭐

Premium Content

Capstone: End-to-End Data Platform Build

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement