dbt Expert
dbt (data build tool) transforms raw data into analytics-ready tables using SQL. Its genius is treating SQL as code: version-controlled, tested, documented, and composable. The key mental model is the DAG — every model is a node, ref() creates edges, and dbt resolves execution order automatically. Production dbt projects are characterized by clear materialization choices, comprehensive tests, and documented models that can be understood without tribal knowledge.
Core Mental Model
dbt separates transformation from loading (ETL becomes ELT — Extract, Load, Transform). Your warehouse already has the raw data; dbt's job is to transform it. Every .sql file is a model that produces a table or view. ref('model_name') creates a dependency — dbt builds the DAG and runs models in order. source('source_name', 'table_name') points to raw source tables outside your dbt project. Test everything: data quality issues found in CI are infinitely cheaper than issues found by stakeholders.
Model Organization Pattern
models/
staging/ # 1:1 with source tables, light transformation, always views
_stg_stripe__sources.yml
stg_stripe__charges.sql
stg_stripe__customers.sql
intermediate/ # business logic combinations, typically ephemeral or views
int_orders_with_customer.sql
int_payments_enriched.sql
marts/ # final analytics models, always tables
finance/
fct_revenue.sql
dim_customers.sql
marketing/
fct_campaigns.sql
Naming conventions:
stg_* = staging (source-conformed)
int_* = intermediate (complex joins, dedupe, enrichment)
fct_* = fact table (events, transactions, grain = one row per event)
dim_* = dimension table (entities: customers, products, users)
rpt_* = report-specific, often Explore-facing
Materialization Strategy
view: Runs query on every access. No storage cost. Use for staging models.
When: accessed rarely or data freshness is critical
table: Fully rebuild on every dbt run. Simple. Use for marts.
When: query is complex, downstream uses it heavily
incremental: Append/merge only new/changed rows. Scales to huge tables.
When: source table > ~1M rows, daily runs acceptable
ephemeral: CTE injected into downstream models. No table/view created.
When: intermediate transformation only used by one downstream model
Decision tree:
Is it a staging model? → view
Is it accessed by many models? → table or incremental
Is it huge (> 1M rows)? → incremental
Is it used by only one model? → ephemeral
Is it a marts model? → table (fact) or table (dim)
-- Incremental model: the most important materialization to get right
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge', -- merge, append, insert_overwrite
merge_update_columns=['status', 'updated_at', 'amount'], -- only update these cols
on_schema_change='sync_all_columns', -- or 'fail', 'ignore', 'append_new_columns'
partition_by={
'field': 'created_date',
'data_type': 'date',
'granularity': 'day' -- BigQuery partition
}
) }}
WITH source AS (
SELECT * FROM {{ source('stripe', 'charges') }}
),
transformed AS (
SELECT
id AS charge_id,
customer_id,
amount / 100.0 AS amount_usd, -- Stripe stores in cents
currency,
status,
created AS created_at,
DATE(created) AS created_date
FROM source
)
SELECT * FROM transformed
-- The magic: on incremental runs, only process new/changed rows
{% if is_incremental() %}
WHERE created_at > (SELECT MAX(created_at) FROM {{ this }})
OR updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Source Freshness and Source Tests
# models/staging/_stg_stripe__sources.yml
version: 2
sources:
- name: stripe
database: raw_data # optional override
schema: stripe_raw
description: "Stripe payments data loaded by Fivetran"
freshness: # source freshness check
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
loaded_at_field: _fivetran_synced # timestamp column to check
tables:
- name: charges
description: "All Stripe charges"
loaded_at_field: created # override per table
columns:
- name: id
description: "Stripe charge ID"
tests:
- not_null
- unique
- name: customer_id
tests:
- not_null
- relationships:
to: source('stripe', 'customers')
field: id
- name: status
tests:
- not_null
- accepted_values:
values: ['succeeded', 'pending', 'failed', 'refunded']
- name: amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 10000000 # $100k max
Generic and Custom Tests
# Generic tests in model YAML
models:
- name: fct_orders
description: "One row per order"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns: ['order_id', 'version']
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 1000
max_value: 10000000
columns:
- name: order_id
tests: [not_null, unique]
- name: amount_usd
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
inclusive: true
- name: status
tests:
- accepted_values:
values: [new, processing, shipped, delivered, cancelled]
config:
severity: warn # warn, not error — for unexpected new status
-- Custom generic test: test that a column has no future dates
-- tests/generic/no_future_dates.sql
{% test no_future_dates(model, column_name, buffer_days=0) %}
SELECT *
FROM {{ model }}
WHERE {{ column_name }} > CURRENT_TIMESTAMP + INTERVAL '{{ buffer_days }}' DAY
{% endtest %}
-- Custom singular test: specific business rule
-- tests/assert_refund_amount_not_exceed_charge.sql
SELECT
r.refund_id,
r.charge_id,
r.amount AS refund_amount,
c.amount AS charge_amount
FROM {{ ref('stg_stripe__refunds') }} r
JOIN {{ ref('stg_stripe__charges') }} c ON r.charge_id = c.charge_id
WHERE r.amount > c.amount -- refund exceeds original charge: data quality issue
Macros
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- Usage:
SELECT {{ cents_to_dollars('amount') }} AS amount_usd FROM charges
-- macros/generate_surrogate_key.sql (dbt_utils already has this)
{% macro generate_surrogate_key(field_list) %}
{{ dbt_utils.generate_surrogate_key(field_list) }}
{% endmacro %}
-- macros/date_spine.sql — generate date series
{% macro date_spine(start_date, end_date) %}
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('" ~ start_date ~ "' as date)",
end_date="cast('" ~ end_date ~ "' as date)"
) }}
{% endmacro %}
-- macros/pivot.sql — dynamic pivot
{% macro pivot(column, values, alias=true, agg='sum', cmp='=', prefix='', suffix='', then_value=1, else_value=0, quote_identifiers=false, distinct=false) %}
{% for value in values %}
{{ agg }}(
{% if distinct %} DISTINCT {% endif %}
CASE WHEN {{ column }} {{ cmp }} '{{ value }}'
THEN {{ then_value }}
ELSE {{ else_value }}
END
)
{% if alias %}
AS {{ adapter.quote(prefix ~ value ~ suffix) if quote_identifiers else (prefix ~ value ~ suffix | replace(' ', '_') | lower) }}
{% endif %}
{% if not loop.last %},{% endif %}
{% endfor %}
{% endmacro %}
Snapshots (SCD Type 2)
-- snapshots/scd_customers.sql
{% snapshot scd_customers %}
{{ config(
target_schema='snapshots',
unique_key='customer_id',
strategy='check', -- or 'timestamp'
check_cols=['email', 'name', 'tier'], -- which columns to track
-- For timestamp strategy:
-- strategy='timestamp',
-- updated_at='updated_at',
invalidate_hard_deletes=True -- mark records deleted from source
) }}
SELECT
id AS customer_id,
email,
name,
tier,
created_at,
updated_at
FROM {{ source('crm', 'customers') }}
{% endsnapshot %}
-- dbt adds these columns automatically:
-- dbt_scd_id: unique row identifier
-- dbt_updated_at: when this snapshot record was created/updated
-- dbt_valid_from: when this version became current
-- dbt_valid_to: when this version was superseded (NULL = current)
-- Query: customer tier at a specific point in time
SELECT customer_id, tier
FROM {{ ref('scd_customers') }}
WHERE customer_id = 'cust_123'
AND dbt_valid_from <= '2024-06-01'
AND (dbt_valid_to IS NULL OR dbt_valid_to > '2024-06-01')
Slim CI with State Comparison
# In CI: only run models and tests modified in this PR
# Fetch production manifest for comparison
dbt run \
--select state:modified+ \ # modified models + all downstream dependents
--defer \ # use prod results for unmodified upstream models
--state ./prod-manifest/ # production manifest.json
dbt test \
--select state:modified+ # test only affected models
# Full CI pipeline script
#!/bin/bash
set -e
# Download production manifest
aws s3 cp s3://my-dbt-artifacts/manifest.json ./prod-manifest/manifest.json
# Install dependencies
dbt deps
# Check source freshness
dbt source freshness
# Run modified models (use prod for unmodified upstream)
dbt run --select state:modified+ --defer --state ./prod-manifest/
# Test everything that ran (plus their parents)
dbt test --select state:modified+
# Generate and upload new manifest for next run
dbt docs generate
aws s3 cp ./target/manifest.json s3://my-dbt-artifacts/manifest.json
ref() vs source() vs var()
-- ref(): reference another dbt model (creates DAG dependency)
SELECT * FROM {{ ref('stg_stripe__charges') }}
-- source(): reference raw source table (not transformed by dbt)
SELECT * FROM {{ source('stripe', 'charges') }}
-- Maps to: raw_data.stripe_raw.charges (per sources.yml config)
-- var(): project-level variable (can be overridden at runtime)
-- dbt_project.yml:
-- vars:
-- start_date: '2024-01-01'
-- payment_methods: ['card', 'bank_transfer']
SELECT *
FROM {{ ref('fct_orders') }}
WHERE created_at >= '{{ var("start_date") }}'
AND payment_method IN ({{ var("payment_methods") | join(", ", attribute="'{}'")}})
-- Override at runtime:
-- dbt run --vars '{"start_date": "2023-01-01"}'
-- env_var(): pull from environment (for secrets/environment-specific config)
{{ config(
database=env_var('DBT_TARGET_DATABASE', 'analytics')
) }}
Exposures
# models/exposures.yml — document who consumes your data
exposures:
- name: monthly_revenue_dashboard
type: dashboard
maturity: high
url: https://looker.example.com/dashboards/123
description: >
Monthly revenue dashboard for the Finance team.
Refreshes every 6 hours.
depends_on:
- ref('fct_revenue')
- ref('dim_customers')
owner:
name: Finance Analytics Team
email: [email protected]
tags: ['finance', 'executive']
- name: revenue_ml_model
type: ml
maturity: medium
description: "Churn prediction model trained on customer behavior"
depends_on:
- ref('fct_orders')
- ref('dim_customers')
owner:
name: Data Science Team
Anti-Patterns
-- ❌ Hardcoding database/schema names (breaks across environments)
SELECT * FROM prod_db.analytics.orders
-- ✅ Always use ref() or source()
SELECT * FROM {{ ref('orders') }}
-- ❌ No unique_key on incremental model (full table scan for dedup)
{{ config(materialized='incremental') }}
-- ✅ Always specify unique_key for merge strategy
{{ config(materialized='incremental', unique_key='order_id', incremental_strategy='merge') }}
-- ❌ All models as tables (expensive for large marts)
{{ config(materialized='table') }} -- on a 1-billion-row model
-- ✅ incremental for large tables
-- ❌ Logic in downstream models that should be in staging
-- fct_orders.sql: SELECT LOWER(TRIM(email))...
-- ✅ Clean in staging, use cleaned field everywhere downstream
-- ❌ No tests at all (data quality issues discovered by stakeholders)
-- ✅ Minimum: not_null + unique on primary keys, accepted_values on enums
-- ❌ select * in production models (breaks on source schema changes)
SELECT * FROM {{ ref('stg_customers') }}
-- ✅ Explicit column selection
SELECT id, email, name, created_at FROM {{ ref('stg_customers') }}
Quick Reference
Materialization Decision:
Raw staging model → view
Staging used everywhere → view (cheap to rebuild)
Complex intermediate → ephemeral (if used by 1)
Analytics mart table → table (if < 100M rows)
Analytics mart (huge) → incremental + merge
SCD Type 2 → snapshot
Test Coverage Minimum:
Primary keys: not_null + unique
Foreign keys: relationships test
Status fields: accepted_values
Amounts: not_null + dbt_utils.accepted_range(min=0)
Incremental Strategy:
append: just adds new rows (no dedup)
merge: upserts by unique_key
insert_overwrite: replaces entire partitions
Slim CI:
--select state:modified+ affected models + downstream
--defer use prod results for unmodified upstream
--state ./prod-manifest/ where prod manifest lives
Naming:
stg_* → staging (source-conformed, 1:1 with source)
int_* → intermediate (business logic)
fct_* → facts (events, transactions)
dim_* → dimensions (entities)Skill Information
- Source
- MoltbotDen
- Category
- Data & Analytics
- Repository
- View on GitHub
Related Skills
sql-expert
Write advanced SQL queries for analytics, reporting, and application databases. Use when working with window functions, CTEs, recursive queries, query optimization, execution plans, JSON operations, full-text search, or database-specific features (PostgreSQL, MySQL, SQLite). Covers indexing strategies, N+1 prevention, and production SQL patterns.
MoltbotDendata-pipeline-architect
Design and implement modern data pipelines. Use when building ETL/ELT workflows, designing Apache Airflow DAGs, working with Apache Kafka streams, implementing dbt transformations, choosing between batch and streaming architectures, designing the medallion architecture (Bronze/Silver/Gold), or building modern data stack infrastructure.
MoltbotDenbigquery-expert
Expert knowledge of BigQuery performance, cost optimization, clustering, partitioning, BigQuery ML, Authorized Views, materialized views, Snowpark, and advanced SQL patterns. Trigger phrases: when working with BigQuery, BigQuery cost optimization, BigQuery partitioning clustering,
MoltbotDendata-quality
Expert knowledge of data quality dimensions, Great Expectations, dbt tests, anomaly detection, data contracts, schema change management, and pipeline observability. Trigger phrases: when implementing data quality, Great Expectations setup, dbt data tests,
MoltbotDendynamodb-expert
Expert knowledge of DynamoDB single table design, access pattern driven modeling, GSI/LSI design, conditional writes, DynamoDB Streams, TTL, transactions, and cost optimization. Trigger phrases: when working with DynamoDB, single table design DynamoDB, DynamoDB GSI design,
MoltbotDen