πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Modern ELT Stack (Fivetran + dbt + Snowflake + Looker)

Data Engineering ProjectsModern Data Stack⭐ Premium

Advertisement

Modern ELT Stack

Fivetran + dbt + Snowflake + Looker for Analytics Engineering

ℹ️

Project Difficulty: Advanced | Duration: 3-4 weeks | Cloud: Snowflake/GCP Build a modern data stack with automated ingestion, transformation, and analytics using industry-standard tools.

Project Overview

Problem Statement

Organizations struggle with fragmented data, slow analytics, and inability to make data-driven decisions. The modern data stack solves this by separating ingestion, transformation, and presentation into specialized, best-of-breed tools.

Objectives

  1. Automate data ingestion from 50+ sources with Fivetran
  2. Implement dbt for version-controlled transformations
  3. Build a scalable Snowflake data warehouse
  4. Create self-service analytics with Looker
  5. Establish data governance and quality checks

Tech Stack

ComponentTechnologyPurpose
IngestionFivetranAutomated data loading
TransformationdbtAnalytics engineering
WarehouseSnowflakeCloud data warehouse
BI & AnalyticsLookerBusiness intelligence
OrchestrationAirflow / dbt CloudPipeline scheduling

Architecture Diagram

DATA SOURCES (50+)SaaS (Salesforce, HubSpot)Databases (PG, MySQL)Files (CSV, JSON, Parquet)APIs (REST, GraphQL)FIVETRANConnectors (200+)Sync Engine (CDC + Full)Schema Management (Auto)SNOWFLAKE RAW LAYERFIVETRAN_RAW (Source Tables)FIVETRAN_META (Metadata)FIVETRAN_LOG (Audit Trail)DBT TRANSFORMATIONSStaging (Clean + Name)Intermediate (Business Logic)Mart (Analytics-Ready)SNOWFLAKE TRANSFORMED LAYERANALYTICS SchemaMARKETING SchemaFINANCE SchemaLOOKERDashboards (Self-Serve)Looks (Ad-hoc)Alerts (Proactive)Embedded Analytics

Step-by-Step Implementation Guide

Step 1: Fivetran Connector Setup

# fivetran/connectors.py
import requests
import hashlib
import hmac
import base64
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging

logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)

class FivetranManager:
    def __init__(self, api_key: str, api_secret: str, base_url: str = "https://api.fivetran.com/v1"):
        self.api_key = api_key
        self.api_secret = api_secret
        self.base_url = base_url
        self.headers = self._generate_headers()

    def _generate_headers(self) -> Dict:
        timestamp = datetime.utcnow().isoformat()
        string_to_sign = f"{timestamp}\n{self.api_secret}\n/api/v1/"
        signature = hmac.new(
            self.api_secret.encode(),
            string_to_sign.encode(),
            hashlib.sha512
        ).hexdigest()

        return {
            "Authorization": f"Basic {base64.b64encode(f'{self.api_key}:{self.api_secret}'.encode()).decode()}",
            "Content-Type": "application/json"
        }

    def create_connector(self, service: str, group_id: str,
                        config: Dict = None) -> Dict:
        payload = {
            "service": service,
            "group_id": group_id,
            "paused": False,
            "pause_after_trial": False,
            "config": config or {}
        }

        response = requests.post(
            f"{self.base_url}/connectors",
            headers=self.headers,
            json=payload
        )

        if response.status_code == 201:
            connector_data = response.json()["data"]
            logger.info(f"Created connector: {connector_data['id']}")
            return connector_data
        else:
            raise Exception(f"Failed to create connector: {response.text}")

    def setup_postgres_connector(self, group_id: str, host: str,
                                database: str, user: str, password: str,
                                schema: str, port: int = 5432) -> Dict:
        config = {
            "host": host,
            "port": port,
            "database": database,
            "user": user,
            "password": password,
            "schema": schema,
            "sync_method": "cdc",
            "replication_method": "primary_key",
            "connection_method": "directly"
        }

        return self.create_connector("postgres", group_id, config)

    def setup_mysql_connector(self, group_id: str, host: str,
                             database: str, user: str, password: str,
                             port: int = 3306) -> Dict:
        config = {
            "host": host,
            "port": port,
            "database": database,
            "user": user,
            "password": password,
            "sync_method": "cdc",
            "binary_format": "ROW"
        }

        return self.create_connector("mysql", group_id, config)

    def setup_s3_connector(self, group_id: str, bucket: str,
                          prefix: str = "", role_arn: str = None) -> Dict:
        config = {
            "bucket": bucket,
            "prefix": prefix,
            "role_arn": role_arn or "",
            "file_type": "auto",
            "delimiter": ","
        }

        return self.create_connector("s3", group_id, config)

    def setup_salesforce_connector(self, group_id: str,
                                  credentials: Dict) -> Dict:
        config = {
            "credentials": credentials,
            "schemas": ["standard", "custom"],
            "start_date": (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
        }

        return self.create_connector("salesforce", group_id, config)

    def setup_hubspot_connector(self, group_id: str, api_key: str) -> Dict:
        config = {
            "api_key": api_key,
            "enable_historical": True,
            "start_date": (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
        }

        return self.create_connector("hubspot", group_id, config)

    def setup_stripe_connector(self, group_id: str, secret_key: str) -> Dict:
        config = {
            "secret_key": secret_key,
            "start_date": (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
        }

        return self.create_connector("stripe", group_id, config)

    def get_connector_sync_status(self, connector_id: str) -> Dict:
        response = requests.get(
            f"{self.base_url}/connectors/{connector_id}/status",
            headers=self.headers
        )

        if response.status_code == 200:
            return response.json()["data"]
        else:
            raise Exception(f"Failed to get sync status: {response.text}")

    def trigger_connector_sync(self, connector_id: str) -> Dict:
        response = requests.post(
            f"{self.base_url}/connectors/{connector_id}/force",
            headers=self.headers
        )

        if response.status_code == 200:
            logger.info(f"Triggered sync for connector: {connector_id}")
            return response.json()["data"]
        else:
            raise Exception(f"Failed to trigger sync: {response.text}")

    def get_connector_log(self, connector_id: str, limit: int = 100) -> List[Dict]:
        response = requests.get(
            f"{self.base_url}/connectors/{connector_id}/logs",
            headers=self.headers,
            params={"limit": limit}
        )

        if response.status_code == 200:
            return response.json()["data"]
        else:
            raise Exception(f"Failed to get logs: {response.text}")

    def monitor_all_connectors(self) -> Dict:
        response = requests.get(
            f"{self.base_url}/connectors",
            headers=self.headers
        )

        if response.status_code == 200:
            connectors = response.json()["data"]["items"]
            return {
                "total_connectors": len(connectors),
                "active": sum(1 for c in connectors if c.get("status", {}).get("setup_state") == "connected"),
                "paused": sum(1 for c in connectors if c.get("paused")),
                "failing": sum(1 for c in connectors if c.get("status", {}).get("sync_state") == "error"),
                "connectors": [
                    {
                        "id": c["connector_id"],
                        "service": c["service"],
                        "status": c.get("status", {}).get("sync_state"),
                        "last_sync": c.get("status", {}).get("last_sync")
                    }
                    for c in connectors
                ]
            }
        else:
            raise Exception(f"Failed to get connectors: {response.text}")

Step 2: dbt Transformations

# dbt/dbt_project.yml
name: 'analytics_engineering'
version: '1.0.0'
config-version: 2

profile: 'snowflake'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"

models:
  analytics_engineering:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table
      +schema: analytics

seeds:
  analytics_engineering:
    +schema: seeds

vars:
  start_date: '2024-01-01'

on-run-start:
  - "{{ log('Starting dbt run for analytics_engineering', info=True) }}"

on-run-end:
  - "{{ log('Completed dbt run for analytics_engineering', info=True) }}"
-- dbt/models/staging/stg_customers.sql
WITH source AS (
    SELECT * FROM {{ source('fivetran_raw', 'salesforce__accounts') }}
),

renamed AS (
    SELECT
        id AS customer_id,
        name AS company_name,
        industry,
        annual_revenue,
        number_of_employees,
        billing_street AS billing_address,
        billing_city,
        billing_state,
        billing_country,
        owner_id AS sales_rep_id,
        created_date AS created_at,
        last_modified_date AS updated_at,
        _fivetran_synced AS _loaded_at
    FROM source
    WHERE id IS NOT NULL
)

SELECT * FROM renamed
-- dbt/models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('fivetran_raw', 'stripe__payments') }}
),

renamed AS (
    SELECT
        id AS order_id,
        customer_id,
        amount AS total_amount_cents,
        amount / 100.0 AS total_amount,
        currency,
        status AS payment_status,
        created AS created_at,
        _fivetran_synced AS _loaded_at
    FROM source
    WHERE id IS NOT NULL
      AND status != 'failed'
)

SELECT * FROM renamed
-- dbt/models/intermediate/int_customer_orders.sql
WITH customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),

orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customer_orders AS (
    SELECT
        c.customer_id,
        c.company_name,
        c.industry,
        c.annual_revenue,
        COUNT(DISTINCT o.order_id) AS total_orders,
        SUM(o.total_amount) AS total_revenue,
        AVG(o.total_amount) AS avg_order_value,
        MIN(o.created_at) AS first_order_date,
        MAX(o.created_at) AS last_order_date,
        DATEDIFF('day', MIN(o.created_at), MAX(o.created_at)) AS customer_tenure_days
    FROM customers c
    LEFT JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY 1, 2, 3, 4
)

SELECT * FROM customer_orders
-- dbt/models/marts/fct_customer_lifetime_value.sql
WITH customer_orders AS (
    SELECT * FROM {{ ref('int_customer_orders') }}
),

clv_calculations AS (
    SELECT
        *,
        CASE
            WHEN total_orders = 0 THEN 'Prospect'
            WHEN total_orders = 1 THEN 'One-Time'
            WHEN total_orders BETWEEN 2 AND 5 THEN 'Returning'
            WHEN total_orders BETWEEN 6 AND 10 THEN 'Loyal'
            ELSE 'Champion'
        END AS customer_segment,
        CASE
            WHEN DATEDIFF('day', last_order_date, CURRENT_DATE()) <= 30 THEN 'Active'
            WHEN DATEDIFF('day', last_order_date, CURRENT_DATE()) <= 90 THEN 'At Risk'
            WHEN DATEDIFF('day', last_order_date, CURRENT_DATE()) <= 180 THEN 'Lapsing'
            ELSE 'Churned'
        END AS activity_status,
        COALESCE(
            total_revenue * (total_orders / NULLIF(customer_tenure_days, 0) * 365),
            total_revenue
        ) AS projected_annual_value
    FROM customer_orders
)

SELECT
    {{ dbt_utils.generate_surrogate_key(['customer_id']) }} AS customer_key,
    *,
    CURRENT_DATE() AS calculated_at
FROM clv_calculations
-- dbt/models/marts/dim_customers.sql
WITH customer_orders AS (
    SELECT * FROM {{ ref('int_customer_orders') }}
),

final AS (
    SELECT
        {{ dbt_utils.generate_surrogate_key(['customer_id']) }} AS customer_key,
        customer_id,
        company_name,
        industry,
        annual_revenue,
        total_orders,
        total_revenue,
        avg_order_value,
        first_order_date,
        last_order_date,
        customer_tenure_days,
        CASE
            WHEN annual_revenue > 1000000 THEN 'Enterprise'
            WHEN annual_revenue > 100000 THEN 'Mid-Market'
            ELSE 'SMB'
        END AS company_size,
        {{ dbt_utils.generate_surrogate_key(['industry']) }} AS industry_key,
        CURRENT_DATE() AS valid_from,
        NULL AS valid_to,
        TRUE AS is_current
    FROM customer_orders
)

SELECT * FROM final
-- dbt/models/marts/fct_orders.sql
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
),

order_facts AS (
    SELECT
        {{ dbt_utils.generate_surrogate_key(['o.order_id']) }} AS order_key,
        o.order_id,
        o.customer_id,
        c.customer_key,
        c.company_name,
        c.industry,
        o.total_amount,
        o.currency,
        o.payment_status,
        o.created_at AS order_date,
        DATEDIFF('day', o.created_at, CURRENT_DATE()) AS order_age_days,
        CASE
            WHEN o.total_amount > 10000 THEN 'High Value'
            WHEN o.total_amount > 1000 THEN 'Medium Value'
            ELSE 'Low Value'
        END AS order_value_tier,
        {{ dbt_utils.generate_surrogate_key(['c.industry']) }} AS industry_key,
        CURRENT_DATE() AS calculated_at
    FROM orders o
    LEFT JOIN customers c ON o.customer_id = c.customer_id
)

SELECT * FROM order_facts
-- dbt/models/marts/schema.yml
version: 2

models:
  - name: dim_customers
    description: "Dimension table for customers with lifetime metrics"
    columns:
      - name: customer_key
        description: "Surrogate key"
        tests:
          - unique
          - not_null
      - name: customer_id
        description: "Source customer ID"
        tests:
          - unique
          - not_null
      - name: company_name
        description: "Company name"
        tests:
          - not_null
      - name: total_orders
        description: "Total number of orders"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
      - name: total_revenue
        description: "Total revenue from customer"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

  - name: fct_orders
    description: "Fact table for orders with customer dimensions"
    columns:
      - name: order_key
        description: "Surrogate key"
        tests:
          - unique
          - not_null
      - name: order_id
        description: "Source order ID"
        tests:
          - unique
          - not_null
      - name: total_amount
        description: "Order total amount"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

Infrastructure Setup (Terraform)

# infrastructure/modern_data_stack.tf
terraform {
  required_version = ">= 1.5.0"
  required_providers {
    snowflake = {
      source  = "Snowflake-Labs/snowflake"
      version = "~> 0.89"
    }
  }
}

variable "fivetran_api_key" {
  sensitive = true
}

variable "fivetran_api_secret" {
  sensitive = true
}

provider "snowflake" {
  account  = var.snowflake_account
  user     = var.snowflake_user
  password = var.snowflake_password
  role     = "ACCOUNTADMIN"
}

resource "snowflake_database" "analytics" {
  name    = "ANALYTICS"
  comment = "Main analytics database"
}

resource "snowflake_warehouse" "dbt_transforms" {
  name           = "DBT_TRANSFORMS"
  comment        = "Warehouse for dbt transformations"
  warehouse_size = "medium"
  auto_suspend   = 300
  auto_resume    = true
  scaling_policy = "ECONOMY"
}

resource "snowflake_warehouse" "looker_queries" {
  name           = "LOOKER_QUERIES"
  comment        = "Warehouse for Looker analytics"
  warehouse_size = "large"
  auto_suspend   = 60
  auto_resume    = true
}

resource "snowflake_schema" "raw" {
  database = snowflake_database.analytics.name
  name     = "RAW"
  comment  = "Raw data from Fivetran"
}

resource "snowflake_schema" "staging" {
  database = snowflake_database.analytics.name
  name     = "STAGING"
  comment  = "Staged and cleaned data"
}

resource "snowflake_schema" "analytics_schema" {
  database = snowflake_database.analytics.name
  name     = "ANALYTICS"
  comment  = "Analytics-ready data marts"
}

resource "snowflake_role" "dbt_role" {
  name    = "DBT_ROLE"
  comment = "Role for dbt transformations"
}

resource "snowflake_role" "looker_role" {
  name    = "LOOKER_ROLE"
  comment = "Role for Looker BI"
}

resource "snowflake_grant_privileges_to_role" "dbt_raw" {
  role_name = snowflake_role.dbt_role.name
  privileges = ["USAGE", "CREATE TABLE", "CREATE VIEW", "INSERT", "UPDATE", "DELETE"]
  on_account_object {
    object_type = "SCHEMA"
    object_name = "${snowflake_database.analytics.name}.${snowflake_schema.raw.name}"
  }
}

resource "snowflake_grant_privileges_to_role" "dbt_staging" {
  role_name = snowflake_role.dbt_role.name
  privileges = ["USAGE", "CREATE TABLE", "CREATE VIEW", "INSERT", "UPDATE", "DELETE"]
  on_account_object {
    object_type = "SCHEMA"
    object_name = "${snowflake_database.analytics.name}.${snowflake_schema.staging.name}"
  }
}

resource "snowflake_grant_privileges_to_role" "dbt_analytics" {
  role_name = snowflake_role.dbt_role.name
  privileges = ["USAGE", "CREATE TABLE", "CREATE VIEW", "INSERT", "UPDATE", "DELETE"]
  on_account_object {
    object_type = "SCHEMA"
    object_name = "${snowflake_database.analytics.name}.${snowflake_schema.analytics_schema.name}"
  }
}

resource "snowflake_grant_privileges_to_role" "looker_analytics" {
  role_name = snowflake_role.looker_role.name
  privileges = ["USAGE", "SELECT"]
  on_account_object {
    object_type = "SCHEMA"
    object_name = "${snowflake_database.analytics.name}.${snowflake_schema.analytics_schema.name}"
  }
}

resource "snowflake_grant_account_role" "dbt_to_user" {
  role_name        = snowflake_role.dbt_role.name
  parent_role_name = "SYSADMIN"
}

resource "snowflake_grant_account_role" "looker_to_user" {
  role_name        = snowflake_role.looker_role.name
  parent_role_name = "SYSADMIN"
}

output "snowflake_database" {
  value = snowflake_database.analytics.name
}

output "snowflake_warehouse_dbt" {
  value = snowflake_warehouse.dbt_transforms.name
}

output "snowflake_warehouse_looker" {
  value = snowflake_warehouse.looker_queries.name
}

Testing and Validation

# tests/test_modern_stack.py
import pytest
from datetime import datetime

class TestModernDataStack:
    @pytest.fixture
    def fivetran_config(self):
        return {
            "service": "postgres",
            "host": "source-db.internal",
            "database": "production",
            "schema": "public",
            "sync_method": "cdc"
        }

    def test_fivetran_connector_config(self, fivetran_config):
        required_fields = ["service", "host", "database", "schema"]
        for field in required_fields:
            assert field in fivetran_config

    def test_dbt_model_naming(self):
        valid_names = [
            "stg_customers", "stg_orders",
            "int_customer_orders",
            "fct_orders", "dim_customers"
        ]
        for name in valid_names:
            assert name.startswith("stg_") or name.startswith("int_") or name.startswith("fct_") or name.startswith("dim_")

    def test_snowflake_schema_layering(self):
        layers = ["raw", "staging", "analytics"]
        assert len(layers) == 3
        assert "raw" in layers
        assert "staging" in layers
        assert "analytics" in layers

    def test_dbt_test_coverage(self):
        tests = {
            "dim_customers": {"unique": ["customer_key", "customer_id"], "not_null": ["customer_key", "customer_id", "company_name"]},
            "fct_orders": {"unique": ["order_key", "order_id"], "not_null": ["order_key", "order_id", "total_amount"]}
        }
        for model, model_tests in tests.items():
            assert len(model_tests) > 0
            for test_type, columns in model_tests.items():
                assert len(columns) > 0

    def test_warehouse_auto_suspend(self):
        warehouses = [
            {"name": "DBT_TRANSFORMS", "auto_suspend": 300},
            {"name": "LOOKER_QUERIES", "auto_suspend": 60}
        ]
        for wh in warehouses:
            assert wh["auto_suspend"] > 0

    def test_incremental_model_strategy(self):
        strategies = ["append", "merge", "delete+insert"]
        assert "merge" in strategies

    def test_dbt_surrogate_key_generation(self):
        fields = ["customer_id"]
        key = f"customer_key"
        assert len(key) > 0

    def test_looker_explores_mapping(self):
        explores = {
            "orders": {"base_table": "fct_orders", "joins": ["dim_customers"]},
            "customers": {"base_table": "dim_customers", "joins": ["fct_orders"]}
        }
        for explore, config in explores.items():
            assert "base_table" in config
            assert "joins" in config

Cost Analysis

Monthly Cost Breakdown (Production)

ComponentSpecificationMonthly Cost
Fivetran100K MAR, Standard plan$500
SnowflakeMedium warehouse, 100hrs$800
dbt CloudTeam plan$100
Looker10 users$3,000
Git (GitHub)Team plan$20
Total$4,420

Cost Optimization Strategies

πŸ’‘

Tip: Optimize modern data stack costs:

  1. Fivetran MAR: Monitor monthly active rows, use incremental syncs
  2. Snowflake Credits: Use auto-suspend, right-size warehouses
  3. dbt Models: Use ephemeral models where possible
  4. Looker Caching: Enable persistent derived tables
  5. Selective Sync: Only sync needed columns and tables

ROI Analysis

MetricBefore Modern StackAfter Modern StackImprovement
Time to Insight2 weeksSame day10x faster
Data Freshness24 hours1-5 hours5-24x faster
Analyst ProductivityManual SQLSelf-service BI3x more productive
Data Team Velocity1 model/week10 models/week10x faster

Interview Talking Points

ℹ️

Best Practice: Focus on these modern data stack concepts in interviews:

  1. Why ELT over ETL?

    • Leverage warehouse compute power
    • Raw data preserved for reprocessing
    • Faster development cycle
    • Better data governance
  2. Why dbt for transformations?

    • Version control for SQL
    • Testing built-in
    • Documentation auto-generated
    • Modular, reusable code
  3. Why separate ingestion and transformation tools?

    • Best-of-breed for each concern
    • Independent scaling
    • Easier maintenance
    • Vendor flexibility

This project demonstrates modern data stack skills and is highly relevant for analytics engineering interviews.

Advertisement