dbt: Models, Tests, Incremental Loading
Building reliable data transformations with dbt
Interview Question
"Design a dbt project for an e-commerce company that: (1) ingests data from 5 sources, (2) has staging, intermediate, and mart layers, (3) implements incremental models for large tables, (4) includes data quality tests, (5) generates documentation. Include project structure, configuration, and example models."
Difficulty: Medium-Hard | Frequently asked at dbt Labs, Fishtown Analytics, Snowflake
Theoretical Foundation
What is dbt?
dbt (data build tool) is a SQL-based transformation tool that enables analytics engineering.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β dbt Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Source Data βββΆ dbt Models βββΆ Transformed Data β
β β
β dbt Components: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Models: SQL SELECT statements β β
β β Tests: Data quality assertions β β
β β Snapshots: Historical data (SCD Type 2) β β
β β Macros: Reusable SQL logic β β
β β Seeds: CSV files for reference data β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β dbt does NOT: β
β - Extract data (use Fivetran, Airbyte) β
β - Load data (use COPY INTO, Snowpipe) β
β - Orchestrate (use Airflow, Dagster) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
dbt Models
A model is a SQL SELECT statement that creates a table or view.
-- models/staging/stg_orders.sql
-- Materialized as a view (default)
SELECT
id AS order_id,
customer_id,
amount,
status,
created_at,
updated_at
FROM {{ source('production', 'orders') }}
WHERE deleted_at IS NULL
Materializations:
- view: Default, no storage, always fresh
- table: Stored, refreshed on each run
- incremental: Only new/changed rows
- ephemeral: CTE, not stored
- materialized_view: Managed materialized view
dbt Tests
Tests are assertions on models.
# models/staging/_staging__models.yml
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled']
Incremental Models
Incremental models only process new or changed data.
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
}
)
}}
WITH orders AS (
SELECT
order_id,
customer_id,
amount,
status,
DATE(order_date) AS order_date,
updated_at
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
)
SELECT * FROM orders
Incremental strategies:
- merge: UPSERT (default for Snowflake, BigQuery)
- append: Add new rows only
- delete+insert: Delete old rows, insert new
dbt Project Structure
my_dbt_project/
βββ dbt_project.yml # Project configuration
βββ packages.yml # Package dependencies
βββ profiles.yml # Connection profiles
βββ models/
β βββ staging/ # Raw data cleaning
β β βββ _staging__models.yml # Schema documentation
β β βββ _sources.yml # Source definitions
β β βββ stg_orders.sql
β β βββ stg_customers.sql
β β βββ stg_products.sql
β βββ intermediate/ # Business logic
β β βββ int_orders_enriched.sql
β β βββ int_customer_lifetime.sql
β βββ marts/ # Business-level aggregates
β βββ _marts__models.yml
β βββ fct_orders.sql
β βββ dim_customers.sql
β βββ rpt_daily_revenue.sql
βββ tests/ # Custom data tests
β βββ test_order_amount_positive.sql
β βββ test_customer_unique.sql
βββ macros/ # Reusable SQL logic
β βββ generate_schema_name.sql
β βββ materializations.sql
βββ seeds/ # CSV reference data
β βββ country_codes.csv
β βββ product_categories.csv
βββ snapshots/ # SCD Type 2 snapshots
βββ customers_snapshot.sql
dbt Configuration
# dbt_project.yml
name: 'ecommerce'
version: '1.0.0'
config-version: 2
profile: 'snowflake_prod'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
models:
ecommerce:
staging:
+materialized: view
+schema: staging
+tags: ["staging"]
intermediate:
+materialized: ephemeral
+tags: ["intermediate"]
marts:
+materialized: table
+schema: analytics
+tags: ["mart", "daily"]
vars:
start_date: '2024-01-01'
end_date: '2024-12-31'
# Package dependencies
packages:
- package: dbt-labs/dbt_utils
version: ">=1.0.0"
- package: dbt-labs/codegen
version: ">=0.11.0"
- package: calogica/dbt_expectations
version: ">=0.10.0"
dbt Source Definitions
# models/staging/_sources.yml
version: 2
sources:
- name: production
database: production
schema: public
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: {count: 6, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: orders
description: "Customer orders from e-commerce platform"
columns:
- name: id
description: "Primary key"
tests:
- unique
- name: customer_id
description: "Foreign key to customers"
- name: amount
description: "Order total amount"
- name: status
description: "Order status"
- name: created_at
description: "Order creation timestamp"
- name: updated_at
description: "Last update timestamp"
- name: customers
description: "Customer information"
columns:
- name: id
description: "Primary key"
- name: email
description: "Customer email"
- name: name
description: "Customer name"
dbt Macros
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{%- endmacro %}
-- macros/insert_audit_columns.sql
{% macro insert_audit_columns() %}
, CURRENT_TIMESTAMP AS _loaded_at
, '{{ invocation_id }}' AS _dbt_invocation_id
, '{{ run_started_at }}' AS _dbt_run_started_at
{% endmacro %}
-- macros/validate_email.sql
{% macro validate_email(column_name) %}
{{ column_name }} REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'
{% endmacro %}
dbt Snapshots
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='id',
strategy='timestamp',
updated_at='updated_at',
)
}}
SELECT * FROM {{ source('production', 'customers') }}
{% endsnapshot %}
dbt Tests
-- tests/test_order_amount_positive.sql
-- Custom data test
SELECT
order_id,
amount
FROM {{ ref('fct_orders') }}
WHERE amount < 0
-- Should return 0 rows to pass
# models/marts/_marts__models.yml
version: 2
models:
- name: fct_orders
tests:
- dbt_utils.expression_is_true:
expression: "amount >= 0"
- dbt_utils.unique Combination of columns:
combination_of_columns:
- order_id
- order_date
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_date
tests:
- not_null
- dbt_utils.accepted_range:
min_value: "2020-01-01"
max_value: "2030-12-31"
Code Implementation
Complete dbt Project Example
-- ============================================================
-- STAGING LAYER
-- ============================================================
-- models/staging/stg_orders.sql
{{
config(
materialized='view',
tags=['staging', 'orders']
)
}}
SELECT
id AS order_id,
customer_id,
amount,
status,
created_at AS order_created_at,
updated_at AS order_updated_at,
_fivetran_synced AS _loaded_at
FROM {{ source('production', 'orders') }}
WHERE deleted_at IS NULL
-- models/staging/stg_customers.sql
{{
config(
materialized='view',
tags=['staging', 'customers']
)
}}
SELECT
id AS customer_id,
email,
name AS customer_name,
created_at AS customer_created_at,
updated_at AS customer_updated_at,
_fivetran_synced AS _loaded_at
FROM {{ source('production', 'customers') }}
WHERE deleted_at IS NULL
-- models/staging/stg_products.sql
{{
config(
materialized='view',
tags=['staging', 'products']
)
}}
SELECT
id AS product_id,
name AS product_name,
category,
price,
created_at AS product_created_at,
_fivetran_synced AS _loaded_at
FROM {{ source('production', 'products') }}
WHERE deleted_at IS NULL
-- ============================================================
-- INTERMEDIATE LAYER
-- ============================================================
-- models/intermediate/int_orders_enriched.sql
{{
config(
materialized='ephemeral',
tags=['intermediate']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
enriched_orders AS (
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.email AS customer_email,
o.amount,
o.status,
o.order_created_at,
o.order_updated_at,
{{ dbt_utils.date_trunc('day', 'o.order_created_at') }} AS order_date
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
)
SELECT * FROM enriched_orders
-- models/intermediate/int_customer_lifetime.sql
{{
config(
materialized='ephemeral',
tags=['intermediate']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('int_orders_enriched') }}
),
customer_lifetime AS (
SELECT
customer_id,
customer_name,
MIN(order_created_at) AS first_order_date,
MAX(order_created_at) AS last_order_date,
COUNT(DISTINCT order_id) AS total_orders,
SUM(amount) AS lifetime_value,
AVG(amount) AS avg_order_value
FROM orders
WHERE status = 'completed'
GROUP BY customer_id, customer_name
)
SELECT * FROM customer_lifetime
-- ============================================================
-- MARTS LAYER
-- ============================================================
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
},
cluster_by=['customer_id', 'status'],
tags=['mart', 'orders', 'daily']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('int_orders_enriched') }}
),
final AS (
SELECT
order_id,
customer_id,
customer_name,
customer_email,
amount,
status,
order_date,
order_created_at,
order_updated_at,
{{ current_timestamp() }} AS _dbt_loaded_at
FROM orders
{% if is_incremental() %}
WHERE order_updated_at > (SELECT MAX(order_updated_at) FROM {{ this }})
{% endif %}
)
SELECT * FROM final
-- models/marts/dim_customers.sql
{{
config(
materialized='incremental',
unique_key='customer_id',
incremental_strategy='merge',
tags=['mart', 'customers']
)
}}
WITH customer_lifetime AS (
SELECT * FROM {{ ref('int_customer_lifetime') }}
),
customer_orders AS (
SELECT
customer_id,
COUNT(DISTINCT order_id) AS recent_orders,
SUM(amount) AS recent_value
FROM {{ ref('fct_orders') }}
WHERE order_date >= DATEADD(month, -3, CURRENT_DATE())
GROUP BY customer_id
),
final AS (
SELECT
cl.customer_id,
cl.customer_name,
cl.first_order_date,
cl.last_order_date,
cl.total_orders,
cl.lifetime_value,
cl.avg_order_value,
COALESCE(co.recent_orders, 0) AS recent_orders,
COALESCE(co.recent_value, 0) AS recent_value,
CASE
WHEN cl.lifetime_value > 10000 THEN 'VIP'
WHEN cl.lifetime_value > 1000 THEN 'Gold'
WHEN cl.lifetime_value > 100 THEN 'Silver'
ELSE 'Bronze'
END AS customer_tier,
{{ current_timestamp() }} AS _dbt_loaded_at
FROM customer_lifetime cl
LEFT JOIN customer_orders co ON cl.customer_id = co.customer_id
{% if is_incremental() %}
WHERE cl.last_order_date > (SELECT MAX(last_order_date) FROM {{ this }})
{% endif %}
)
SELECT * FROM final
-- models/marts/rpt_daily_revenue.sql
{{
config(
materialized='table',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
},
cluster_by=['customer_segment'],
tags=['mart', 'report', 'daily']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('fct_orders') }}
),
daily_summary AS (
SELECT
order_date,
COUNT(DISTINCT order_id) AS order_count,
COUNT(DISTINCT customer_id) AS customer_count,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
SUM(CASE WHEN status = 'completed' THEN amount ELSE 0 END) AS completed_revenue,
SUM(CASE WHEN status = 'cancelled' THEN amount ELSE 0 END) AS cancelled_revenue
FROM orders
GROUP BY order_date
),
customer_segments AS (
SELECT
order_date,
customer_tier,
COUNT(DISTINCT customer_id) AS customers,
SUM(amount) AS revenue
FROM orders
JOIN {{ ref('dim_customers') }} USING (customer_id)
GROUP BY order_date, customer_tier
),
final AS (
SELECT
ds.order_date,
ds.order_count,
ds.customer_count,
ds.total_revenue,
ds.avg_order_value,
ds.completed_revenue,
ds.cancelled_revenue,
cs.customer_tier,
cs.customers AS segment_customers,
cs.revenue AS segment_revenue,
{{ current_timestamp() }} AS _dbt_loaded_at
FROM daily_summary ds
LEFT JOIN customer_segments cs ON ds.order_date = cs.order_date
)
SELECT * FROM final
dbt Commands
# Run all models
dbt run
# Run specific model
dbt run --select stg_orders
# Run models downstream of a model
dbt run --select +fct_orders
# Run tests
dbt test
# Run specific test
dbt test --select stg_orders
# Generate documentation
dbt docs generate
# Serve documentation
dbt docs serve
# Run snapshot
dbt snapshot
# Seed CSV files
dbt seed
# Clean project
dbt clean
# Compile models (without running)
dbt compile
π‘
Production Tip: Use dbt's --select flag extensively for targeted runs. During development, run only changed models. In production, run full DAG but use --fail-fast to stop on first error.
Common Follow-Up Questions
Q1: How do you handle late-arriving data in dbt?
-- Use is_incremental() to capture late-arriving data
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
OR created_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Q2: How do you test data quality in dbt?
# Use dbt_expectations for advanced tests
- name: fct_orders
tests:
- dbt_expectations.expect_table_columns_to_not_contain_nulls:
column_list:
- order_id
- customer_id
- amount
- dbt_expectations.expect_column_values_to_be_unique:
column_name: order_id
- dbt_expectations.expect_column_values_to_be_between:
column_name: amount
min_value: 0
max_value: 1000000
Q3: How do you handle schema evolution in dbt?
-- dbt handles schema evolution automatically
-- New columns are added to the target table
-- Use {{ config(materialized='incremental') }} for automatic schema evolution
-- For column drops, use dbt's --full-refresh flag
dbt run --select fct_orders --full-refresh
Q4: How do you schedule dbt in production?
# Airflow DAG for dbt
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
with DAG('dbt_pipeline', schedule_interval='0 6 * * *', start_date=days_ago(1)) as dag:
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /opt/dbt && dbt run --fail-fast',
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /opt/dbt && dbt test --fail-fast',
)
dbt_run >> dbt_test
β οΈ
Critical Consideration: dbt models are idempotentβrunning them multiple times should produce the same result. Always use unique_key for incremental models and avoid non-deterministic logic.
Company-Specific Tips
dbt Labs Interview Tips
- Discuss dbt architecture and design principles
- Explain materializations and when to use each
- Mention testing strategies
- Talk about packages and macros
Snowflake Interview Tips
- Focus on dbt + Snowflake integration
- Discuss incremental models with Snowflake
- Mention time travel for testing
- Talk about clustering in dbt models
Data Analytics Interview Tips
- Explain analytics engineering role
- Discuss data modeling with dbt
- Mention documentation as code
- Talk about CI/CD for dbt
βΉοΈ
Final Takeaway: dbt is the standard for SQL-based transformations. Use staging for cleaning, intermediate for business logic, and marts for analytics. Always test your models, document your work, and use incremental models for large tables.