dbt with PostgreSQL
PostgreSQL Architecture
Indexing Strategy
Partitioning Strategy
Detailed Explanation
What is PostgreSQL + dbt?
PostgreSQL is a powerful, open-source object-relational database with extensibility and SQL compliance.
Core Features
| Feature | Description |
|---|
| Extensions | Add functionality (PostGIS, pg_trgm) |
| Indexing | Multiple index types for optimization |
| Partitioning | Table partitioning for large datasets |
| Full-text Search | Built-in text search capabilities |
| JSON Support | Native JSON/JSONB data types |
Index Types
| Index | Best For |
|---|
| B-tree | Range and equality queries (default) |
| Hash | Exact equality matches |
| GiST | Geometric and full-text search |
| GIN | Array and full-text search |
| BRIN | Large tables with natural ordering |
Partitioning Strategies
- Range β Partition by value ranges (e.g., dates)
- List β Partition by explicit values (e.g., regions)
- Hash β Partition by hash of column for uniform distribution
Key Takeaway: Use B-tree indexes for most queries. For large tables with sequential data (like timestamps), BRIN indexes offer excellent performance with minimal storage overhead.
Code Examples
PostgreSQL Profile Configuration
# profiles.yml
my_profile:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: my_user
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: analytics_dev
schema: dbt_dev
threads: 4
connect_timeout: 10
sslmode: prefer
prod:
type: postgres
host: my-rds-instance.xxxxxxxxxxxx.us-west-2.rds.amazonaws.com
port: 5432
user: service_account
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: analytics_prod
schema: public
threads: 8
connect_timeout: 30
sslmode: require
Index Creation
-- macros/postgres/create_indexes.sql
{% macro create_indexes(table_name, columns, index_type='btree') %}
{% for column in columns %}
create index if not exists idx_{{ table_name }}_{{ column }}
on {{ table_name }} using {{ index_type }} ({{ column }});
{% endfor %}
{% endmacro %}
-- Usage in model
{{
config(
materialized='table',
post_hook=[
"{{ create_indexes(this, ['customer_id', 'order_date']) }}"
]
)
}}
Partitioned Table
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
partition_by={
"field": "order_date",
"data_type": "date"
}
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
final as (
select
order_id,
customer_id,
order_date,
order_status,
amount,
current_timestamp() as updated_at
from orders
)
select * from final
{% if is_incremental() %}
where order_date >= date_sub(
(select max(order_date) from {{ this }}),
interval 7 day
)
{% endif %}
PostgreSQL Extensions
-- macros/postgres/enable_extensions.sql
{% macro enable_extension(extension_name) %}
create extension if not exists {{ extension_name }};
{% endmacro %}
-- Enable PostGIS
{{ enable_extension('postgis') }}
-- Enable pg_trgm for fuzzy matching
{{ enable_extension('pg_trgm') }}
Full-Text Search
-- macros/postgres/full_text_search.sql
{% macro full_text_search(column_name, search_term) %}
to_tsvector('english', {{ column_name }}) @@ plainto_tsquery('english', '{{ search_term }}')
{% endmacro %}
-- Usage in model
select *
from {{ ref('articles') }}
where {{ full_text_search('content', 'dbt tutorial') }}
JSON Operations
-- macros/postgres/json_extract.sql
{% macro json_extract(column_name, path) %}
{{ column_name }}::json->'{{ path }}'
{% endmacro %}
{% macro json_extract_text(column_name, path) %}
{{ column_name }}::json->>'{{ path }}'
{% endmacro %}
-- Usage in model
select
id,
{{ json_extract_text('metadata', 'author') }} as author,
{{ json_extract_text('metadata', 'tags') }} as tags
from {{ ref('articles') }}
Optimized PostgreSQL Model
-- models/marts/fct_orders_optimized.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
post_hook=[
"vacuum analyze {{ this }}",
"create index if not exists idx_{{ this.name }}_customer_id on {{ this }} (customer_id)",
"create index if not exists idx_{{ this.name }}_order_date on {{ this }} (order_date)"
]
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
final as (
select
order_id,
customer_id,
order_date,
order_status,
amount,
current_timestamp() as updated_at
from orders
)
select * from final
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
Performance Metrics
| Metric | Description | Target |
|---|
| Query Time | Average query execution | <5s |
| Index Creation | Time to create indexes | <1min |
| Vacuum Time | Time to vacuum tables | <5min |
| Connection Count | Active connections | <100 |
| Cache Hit Ratio | Buffer cache hits | >99% |
Best Practices
- Create appropriate indexes - Match query patterns
- Use partitioning - For large tables
- Analyze tables regularly - Update statistics
- Vacuum tables - Reclaim space
- Use connection pooling - Manage connections
- Monitor performance - Use pg_stat tables
- Use extensions - Add functionality as needed
- Implement archival - Archive old data
See Also
- Performance Tuning β General optimization strategies
- dbt BigQuery β BigQuery partitioning and clustering
- dbt Snowflake β Snowflake warehouse optimization
- dbt Best Practices β Project structure and patterns