Modern ELT Stack
Fivetran + dbt + Snowflake + Looker for Analytics Engineering
βΉοΈ
Project Difficulty: Advanced | Duration: 3-4 weeks | Cloud: Snowflake/GCP Build a modern data stack with automated ingestion, transformation, and analytics using industry-standard tools.
Project Overview
Problem Statement
Organizations struggle with fragmented data, slow analytics, and inability to make data-driven decisions. The modern data stack solves this by separating ingestion, transformation, and presentation into specialized, best-of-breed tools.
Objectives
- Automate data ingestion from 50+ sources with Fivetran
- Implement dbt for version-controlled transformations
- Build a scalable Snowflake data warehouse
- Create self-service analytics with Looker
- Establish data governance and quality checks
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| Ingestion | Fivetran | Automated data loading |
| Transformation | dbt | Analytics engineering |
| Warehouse | Snowflake | Cloud data warehouse |
| BI & Analytics | Looker | Business intelligence |
| Orchestration | Airflow / dbt Cloud | Pipeline scheduling |
Architecture Diagram
Step-by-Step Implementation Guide
Step 1: Fivetran Connector Setup
# fivetran/connectors.py
import requests
import hashlib
import hmac
import base64
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class FivetranManager:
def __init__(self, api_key: str, api_secret: str, base_url: str = "https://api.fivetran.com/v1"):
self.api_key = api_key
self.api_secret = api_secret
self.base_url = base_url
self.headers = self._generate_headers()
def _generate_headers(self) -> Dict:
timestamp = datetime.utcnow().isoformat()
string_to_sign = f"{timestamp}\n{self.api_secret}\n/api/v1/"
signature = hmac.new(
self.api_secret.encode(),
string_to_sign.encode(),
hashlib.sha512
).hexdigest()
return {
"Authorization": f"Basic {base64.b64encode(f'{self.api_key}:{self.api_secret}'.encode()).decode()}",
"Content-Type": "application/json"
}
def create_connector(self, service: str, group_id: str,
config: Dict = None) -> Dict:
payload = {
"service": service,
"group_id": group_id,
"paused": False,
"pause_after_trial": False,
"config": config or {}
}
response = requests.post(
f"{self.base_url}/connectors",
headers=self.headers,
json=payload
)
if response.status_code == 201:
connector_data = response.json()["data"]
logger.info(f"Created connector: {connector_data['id']}")
return connector_data
else:
raise Exception(f"Failed to create connector: {response.text}")
def setup_postgres_connector(self, group_id: str, host: str,
database: str, user: str, password: str,
schema: str, port: int = 5432) -> Dict:
config = {
"host": host,
"port": port,
"database": database,
"user": user,
"password": password,
"schema": schema,
"sync_method": "cdc",
"replication_method": "primary_key",
"connection_method": "directly"
}
return self.create_connector("postgres", group_id, config)
def setup_mysql_connector(self, group_id: str, host: str,
database: str, user: str, password: str,
port: int = 3306) -> Dict:
config = {
"host": host,
"port": port,
"database": database,
"user": user,
"password": password,
"sync_method": "cdc",
"binary_format": "ROW"
}
return self.create_connector("mysql", group_id, config)
def setup_s3_connector(self, group_id: str, bucket: str,
prefix: str = "", role_arn: str = None) -> Dict:
config = {
"bucket": bucket,
"prefix": prefix,
"role_arn": role_arn or "",
"file_type": "auto",
"delimiter": ","
}
return self.create_connector("s3", group_id, config)
def setup_salesforce_connector(self, group_id: str,
credentials: Dict) -> Dict:
config = {
"credentials": credentials,
"schemas": ["standard", "custom"],
"start_date": (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
}
return self.create_connector("salesforce", group_id, config)
def setup_hubspot_connector(self, group_id: str, api_key: str) -> Dict:
config = {
"api_key": api_key,
"enable_historical": True,
"start_date": (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
}
return self.create_connector("hubspot", group_id, config)
def setup_stripe_connector(self, group_id: str, secret_key: str) -> Dict:
config = {
"secret_key": secret_key,
"start_date": (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
}
return self.create_connector("stripe", group_id, config)
def get_connector_sync_status(self, connector_id: str) -> Dict:
response = requests.get(
f"{self.base_url}/connectors/{connector_id}/status",
headers=self.headers
)
if response.status_code == 200:
return response.json()["data"]
else:
raise Exception(f"Failed to get sync status: {response.text}")
def trigger_connector_sync(self, connector_id: str) -> Dict:
response = requests.post(
f"{self.base_url}/connectors/{connector_id}/force",
headers=self.headers
)
if response.status_code == 200:
logger.info(f"Triggered sync for connector: {connector_id}")
return response.json()["data"]
else:
raise Exception(f"Failed to trigger sync: {response.text}")
def get_connector_log(self, connector_id: str, limit: int = 100) -> List[Dict]:
response = requests.get(
f"{self.base_url}/connectors/{connector_id}/logs",
headers=self.headers,
params={"limit": limit}
)
if response.status_code == 200:
return response.json()["data"]
else:
raise Exception(f"Failed to get logs: {response.text}")
def monitor_all_connectors(self) -> Dict:
response = requests.get(
f"{self.base_url}/connectors",
headers=self.headers
)
if response.status_code == 200:
connectors = response.json()["data"]["items"]
return {
"total_connectors": len(connectors),
"active": sum(1 for c in connectors if c.get("status", {}).get("setup_state") == "connected"),
"paused": sum(1 for c in connectors if c.get("paused")),
"failing": sum(1 for c in connectors if c.get("status", {}).get("sync_state") == "error"),
"connectors": [
{
"id": c["connector_id"],
"service": c["service"],
"status": c.get("status", {}).get("sync_state"),
"last_sync": c.get("status", {}).get("last_sync")
}
for c in connectors
]
}
else:
raise Exception(f"Failed to get connectors: {response.text}")
Step 2: dbt Transformations
# dbt/dbt_project.yml
name: 'analytics_engineering'
version: '1.0.0'
config-version: 2
profile: 'snowflake'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
analytics_engineering:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics
seeds:
analytics_engineering:
+schema: seeds
vars:
start_date: '2024-01-01'
on-run-start:
- "{{ log('Starting dbt run for analytics_engineering', info=True) }}"
on-run-end:
- "{{ log('Completed dbt run for analytics_engineering', info=True) }}"
-- dbt/models/staging/stg_customers.sql
WITH source AS (
SELECT * FROM {{ source('fivetran_raw', 'salesforce__accounts') }}
),
renamed AS (
SELECT
id AS customer_id,
name AS company_name,
industry,
annual_revenue,
number_of_employees,
billing_street AS billing_address,
billing_city,
billing_state,
billing_country,
owner_id AS sales_rep_id,
created_date AS created_at,
last_modified_date AS updated_at,
_fivetran_synced AS _loaded_at
FROM source
WHERE id IS NOT NULL
)
SELECT * FROM renamed
-- dbt/models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('fivetran_raw', 'stripe__payments') }}
),
renamed AS (
SELECT
id AS order_id,
customer_id,
amount AS total_amount_cents,
amount / 100.0 AS total_amount,
currency,
status AS payment_status,
created AS created_at,
_fivetran_synced AS _loaded_at
FROM source
WHERE id IS NOT NULL
AND status != 'failed'
)
SELECT * FROM renamed
-- dbt/models/intermediate/int_customer_orders.sql
WITH customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customer_orders AS (
SELECT
c.customer_id,
c.company_name,
c.industry,
c.annual_revenue,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(o.total_amount) AS total_revenue,
AVG(o.total_amount) AS avg_order_value,
MIN(o.created_at) AS first_order_date,
MAX(o.created_at) AS last_order_date,
DATEDIFF('day', MIN(o.created_at), MAX(o.created_at)) AS customer_tenure_days
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY 1, 2, 3, 4
)
SELECT * FROM customer_orders
-- dbt/models/marts/fct_customer_lifetime_value.sql
WITH customer_orders AS (
SELECT * FROM {{ ref('int_customer_orders') }}
),
clv_calculations AS (
SELECT
*,
CASE
WHEN total_orders = 0 THEN 'Prospect'
WHEN total_orders = 1 THEN 'One-Time'
WHEN total_orders BETWEEN 2 AND 5 THEN 'Returning'
WHEN total_orders BETWEEN 6 AND 10 THEN 'Loyal'
ELSE 'Champion'
END AS customer_segment,
CASE
WHEN DATEDIFF('day', last_order_date, CURRENT_DATE()) <= 30 THEN 'Active'
WHEN DATEDIFF('day', last_order_date, CURRENT_DATE()) <= 90 THEN 'At Risk'
WHEN DATEDIFF('day', last_order_date, CURRENT_DATE()) <= 180 THEN 'Lapsing'
ELSE 'Churned'
END AS activity_status,
COALESCE(
total_revenue * (total_orders / NULLIF(customer_tenure_days, 0) * 365),
total_revenue
) AS projected_annual_value
FROM customer_orders
)
SELECT
{{ dbt_utils.generate_surrogate_key(['customer_id']) }} AS customer_key,
*,
CURRENT_DATE() AS calculated_at
FROM clv_calculations
-- dbt/models/marts/dim_customers.sql
WITH customer_orders AS (
SELECT * FROM {{ ref('int_customer_orders') }}
),
final AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['customer_id']) }} AS customer_key,
customer_id,
company_name,
industry,
annual_revenue,
total_orders,
total_revenue,
avg_order_value,
first_order_date,
last_order_date,
customer_tenure_days,
CASE
WHEN annual_revenue > 1000000 THEN 'Enterprise'
WHEN annual_revenue > 100000 THEN 'Mid-Market'
ELSE 'SMB'
END AS company_size,
{{ dbt_utils.generate_surrogate_key(['industry']) }} AS industry_key,
CURRENT_DATE() AS valid_from,
NULL AS valid_to,
TRUE AS is_current
FROM customer_orders
)
SELECT * FROM final
-- dbt/models/marts/fct_orders.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('dim_customers') }}
),
order_facts AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['o.order_id']) }} AS order_key,
o.order_id,
o.customer_id,
c.customer_key,
c.company_name,
c.industry,
o.total_amount,
o.currency,
o.payment_status,
o.created_at AS order_date,
DATEDIFF('day', o.created_at, CURRENT_DATE()) AS order_age_days,
CASE
WHEN o.total_amount > 10000 THEN 'High Value'
WHEN o.total_amount > 1000 THEN 'Medium Value'
ELSE 'Low Value'
END AS order_value_tier,
{{ dbt_utils.generate_surrogate_key(['c.industry']) }} AS industry_key,
CURRENT_DATE() AS calculated_at
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
)
SELECT * FROM order_facts
-- dbt/models/marts/schema.yml
version: 2
models:
- name: dim_customers
description: "Dimension table for customers with lifetime metrics"
columns:
- name: customer_key
description: "Surrogate key"
tests:
- unique
- not_null
- name: customer_id
description: "Source customer ID"
tests:
- unique
- not_null
- name: company_name
description: "Company name"
tests:
- not_null
- name: total_orders
description: "Total number of orders"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
- name: total_revenue
description: "Total revenue from customer"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
- name: fct_orders
description: "Fact table for orders with customer dimensions"
columns:
- name: order_key
description: "Surrogate key"
tests:
- unique
- not_null
- name: order_id
description: "Source order ID"
tests:
- unique
- not_null
- name: total_amount
description: "Order total amount"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
Infrastructure Setup (Terraform)
# infrastructure/modern_data_stack.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
snowflake = {
source = "Snowflake-Labs/snowflake"
version = "~> 0.89"
}
}
}
variable "fivetran_api_key" {
sensitive = true
}
variable "fivetran_api_secret" {
sensitive = true
}
provider "snowflake" {
account = var.snowflake_account
user = var.snowflake_user
password = var.snowflake_password
role = "ACCOUNTADMIN"
}
resource "snowflake_database" "analytics" {
name = "ANALYTICS"
comment = "Main analytics database"
}
resource "snowflake_warehouse" "dbt_transforms" {
name = "DBT_TRANSFORMS"
comment = "Warehouse for dbt transformations"
warehouse_size = "medium"
auto_suspend = 300
auto_resume = true
scaling_policy = "ECONOMY"
}
resource "snowflake_warehouse" "looker_queries" {
name = "LOOKER_QUERIES"
comment = "Warehouse for Looker analytics"
warehouse_size = "large"
auto_suspend = 60
auto_resume = true
}
resource "snowflake_schema" "raw" {
database = snowflake_database.analytics.name
name = "RAW"
comment = "Raw data from Fivetran"
}
resource "snowflake_schema" "staging" {
database = snowflake_database.analytics.name
name = "STAGING"
comment = "Staged and cleaned data"
}
resource "snowflake_schema" "analytics_schema" {
database = snowflake_database.analytics.name
name = "ANALYTICS"
comment = "Analytics-ready data marts"
}
resource "snowflake_role" "dbt_role" {
name = "DBT_ROLE"
comment = "Role for dbt transformations"
}
resource "snowflake_role" "looker_role" {
name = "LOOKER_ROLE"
comment = "Role for Looker BI"
}
resource "snowflake_grant_privileges_to_role" "dbt_raw" {
role_name = snowflake_role.dbt_role.name
privileges = ["USAGE", "CREATE TABLE", "CREATE VIEW", "INSERT", "UPDATE", "DELETE"]
on_account_object {
object_type = "SCHEMA"
object_name = "${snowflake_database.analytics.name}.${snowflake_schema.raw.name}"
}
}
resource "snowflake_grant_privileges_to_role" "dbt_staging" {
role_name = snowflake_role.dbt_role.name
privileges = ["USAGE", "CREATE TABLE", "CREATE VIEW", "INSERT", "UPDATE", "DELETE"]
on_account_object {
object_type = "SCHEMA"
object_name = "${snowflake_database.analytics.name}.${snowflake_schema.staging.name}"
}
}
resource "snowflake_grant_privileges_to_role" "dbt_analytics" {
role_name = snowflake_role.dbt_role.name
privileges = ["USAGE", "CREATE TABLE", "CREATE VIEW", "INSERT", "UPDATE", "DELETE"]
on_account_object {
object_type = "SCHEMA"
object_name = "${snowflake_database.analytics.name}.${snowflake_schema.analytics_schema.name}"
}
}
resource "snowflake_grant_privileges_to_role" "looker_analytics" {
role_name = snowflake_role.looker_role.name
privileges = ["USAGE", "SELECT"]
on_account_object {
object_type = "SCHEMA"
object_name = "${snowflake_database.analytics.name}.${snowflake_schema.analytics_schema.name}"
}
}
resource "snowflake_grant_account_role" "dbt_to_user" {
role_name = snowflake_role.dbt_role.name
parent_role_name = "SYSADMIN"
}
resource "snowflake_grant_account_role" "looker_to_user" {
role_name = snowflake_role.looker_role.name
parent_role_name = "SYSADMIN"
}
output "snowflake_database" {
value = snowflake_database.analytics.name
}
output "snowflake_warehouse_dbt" {
value = snowflake_warehouse.dbt_transforms.name
}
output "snowflake_warehouse_looker" {
value = snowflake_warehouse.looker_queries.name
}
Testing and Validation
# tests/test_modern_stack.py
import pytest
from datetime import datetime
class TestModernDataStack:
@pytest.fixture
def fivetran_config(self):
return {
"service": "postgres",
"host": "source-db.internal",
"database": "production",
"schema": "public",
"sync_method": "cdc"
}
def test_fivetran_connector_config(self, fivetran_config):
required_fields = ["service", "host", "database", "schema"]
for field in required_fields:
assert field in fivetran_config
def test_dbt_model_naming(self):
valid_names = [
"stg_customers", "stg_orders",
"int_customer_orders",
"fct_orders", "dim_customers"
]
for name in valid_names:
assert name.startswith("stg_") or name.startswith("int_") or name.startswith("fct_") or name.startswith("dim_")
def test_snowflake_schema_layering(self):
layers = ["raw", "staging", "analytics"]
assert len(layers) == 3
assert "raw" in layers
assert "staging" in layers
assert "analytics" in layers
def test_dbt_test_coverage(self):
tests = {
"dim_customers": {"unique": ["customer_key", "customer_id"], "not_null": ["customer_key", "customer_id", "company_name"]},
"fct_orders": {"unique": ["order_key", "order_id"], "not_null": ["order_key", "order_id", "total_amount"]}
}
for model, model_tests in tests.items():
assert len(model_tests) > 0
for test_type, columns in model_tests.items():
assert len(columns) > 0
def test_warehouse_auto_suspend(self):
warehouses = [
{"name": "DBT_TRANSFORMS", "auto_suspend": 300},
{"name": "LOOKER_QUERIES", "auto_suspend": 60}
]
for wh in warehouses:
assert wh["auto_suspend"] > 0
def test_incremental_model_strategy(self):
strategies = ["append", "merge", "delete+insert"]
assert "merge" in strategies
def test_dbt_surrogate_key_generation(self):
fields = ["customer_id"]
key = f"customer_key"
assert len(key) > 0
def test_looker_explores_mapping(self):
explores = {
"orders": {"base_table": "fct_orders", "joins": ["dim_customers"]},
"customers": {"base_table": "dim_customers", "joins": ["fct_orders"]}
}
for explore, config in explores.items():
assert "base_table" in config
assert "joins" in config
Cost Analysis
Monthly Cost Breakdown (Production)
| Component | Specification | Monthly Cost |
|---|---|---|
| Fivetran | 100K MAR, Standard plan | $500 |
| Snowflake | Medium warehouse, 100hrs | $800 |
| dbt Cloud | Team plan | $100 |
| Looker | 10 users | $3,000 |
| Git (GitHub) | Team plan | $20 |
| Total | $4,420 |
Cost Optimization Strategies
π‘
Tip: Optimize modern data stack costs:
- Fivetran MAR: Monitor monthly active rows, use incremental syncs
- Snowflake Credits: Use auto-suspend, right-size warehouses
- dbt Models: Use ephemeral models where possible
- Looker Caching: Enable persistent derived tables
- Selective Sync: Only sync needed columns and tables
ROI Analysis
| Metric | Before Modern Stack | After Modern Stack | Improvement |
|---|---|---|---|
| Time to Insight | 2 weeks | Same day | 10x faster |
| Data Freshness | 24 hours | 1-5 hours | 5-24x faster |
| Analyst Productivity | Manual SQL | Self-service BI | 3x more productive |
| Data Team Velocity | 1 model/week | 10 models/week | 10x faster |
Interview Talking Points
βΉοΈ
Best Practice: Focus on these modern data stack concepts in interviews:
-
Why ELT over ETL?
- Leverage warehouse compute power
- Raw data preserved for reprocessing
- Faster development cycle
- Better data governance
-
Why dbt for transformations?
- Version control for SQL
- Testing built-in
- Documentation auto-generated
- Modular, reusable code
-
Why separate ingestion and transformation tools?
- Best-of-breed for each concern
- Independent scaling
- Easier maintenance
- Vendor flexibility
This project demonstrates modern data stack skills and is highly relevant for analytics engineering interviews.