Skip to main content

dbt-expert

Expert knowledge of dbt model materialization, incremental strategies, testing, macros, snapshots, documentation, slim CI, and data modeling best practices. Trigger phrases: when working with dbt, dbt model materialization, dbt incremental models,

MoltbotDen
Data & Analytics

dbt Expert

dbt (data build tool) transforms raw data into analytics-ready tables using SQL. Its genius is treating SQL as code: version-controlled, tested, documented, and composable. The key mental model is the DAG — every model is a node, ref() creates edges, and dbt resolves execution order automatically. Production dbt projects are characterized by clear materialization choices, comprehensive tests, and documented models that can be understood without tribal knowledge.

Core Mental Model

dbt separates transformation from loading (ETL becomes ELT — Extract, Load, Transform). Your warehouse already has the raw data; dbt's job is to transform it. Every .sql file is a model that produces a table or view. ref('model_name') creates a dependency — dbt builds the DAG and runs models in order. source('source_name', 'table_name') points to raw source tables outside your dbt project. Test everything: data quality issues found in CI are infinitely cheaper than issues found by stakeholders.


Model Organization Pattern

models/
  staging/           # 1:1 with source tables, light transformation, always views
    _stg_stripe__sources.yml
    stg_stripe__charges.sql
    stg_stripe__customers.sql
  intermediate/      # business logic combinations, typically ephemeral or views
    int_orders_with_customer.sql
    int_payments_enriched.sql
  marts/             # final analytics models, always tables
    finance/
      fct_revenue.sql
      dim_customers.sql
    marketing/
      fct_campaigns.sql

Naming conventions:
  stg_*              = staging (source-conformed)
  int_*              = intermediate (complex joins, dedupe, enrichment)
  fct_*              = fact table (events, transactions, grain = one row per event)
  dim_*              = dimension table (entities: customers, products, users)
  rpt_*              = report-specific, often Explore-facing

Materialization Strategy

view:         Runs query on every access. No storage cost. Use for staging models.
              When: accessed rarely or data freshness is critical
              
table:        Fully rebuild on every dbt run. Simple. Use for marts.
              When: query is complex, downstream uses it heavily
              
incremental:  Append/merge only new/changed rows. Scales to huge tables.
              When: source table > ~1M rows, daily runs acceptable
              
ephemeral:    CTE injected into downstream models. No table/view created.
              When: intermediate transformation only used by one downstream model

Decision tree:
  Is it a staging model?          → view
  Is it accessed by many models?  → table or incremental
  Is it huge (> 1M rows)?         → incremental
  Is it used by only one model?   → ephemeral
  Is it a marts model?            → table (fact) or table (dim)
-- Incremental model: the most important materialization to get right
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge',  -- merge, append, insert_overwrite
    merge_update_columns=['status', 'updated_at', 'amount'],  -- only update these cols
    on_schema_change='sync_all_columns',  -- or 'fail', 'ignore', 'append_new_columns'
    partition_by={
        'field': 'created_date',
        'data_type': 'date',
        'granularity': 'day'       -- BigQuery partition
    }
) }}

WITH source AS (
    SELECT * FROM {{ source('stripe', 'charges') }}
),

transformed AS (
    SELECT
        id               AS charge_id,
        customer_id,
        amount / 100.0   AS amount_usd,   -- Stripe stores in cents
        currency,
        status,
        created          AS created_at,
        DATE(created)    AS created_date
    FROM source
)

SELECT * FROM transformed

-- The magic: on incremental runs, only process new/changed rows
{% if is_incremental() %}
WHERE created_at > (SELECT MAX(created_at) FROM {{ this }})
   OR updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Source Freshness and Source Tests

# models/staging/_stg_stripe__sources.yml
version: 2

sources:
  - name: stripe
    database: raw_data          # optional override
    schema: stripe_raw
    description: "Stripe payments data loaded by Fivetran"
    
    freshness:                  # source freshness check
      warn_after: { count: 12, period: hour }
      error_after: { count: 24, period: hour }
    loaded_at_field: _fivetran_synced   # timestamp column to check
    
    tables:
      - name: charges
        description: "All Stripe charges"
        loaded_at_field: created  # override per table
        
        columns:
          - name: id
            description: "Stripe charge ID"
            tests:
              - not_null
              - unique
          
          - name: customer_id
            tests:
              - not_null
              - relationships:
                  to: source('stripe', 'customers')
                  field: id
          
          - name: status
            tests:
              - not_null
              - accepted_values:
                  values: ['succeeded', 'pending', 'failed', 'refunded']
          
          - name: amount
            tests:
              - not_null
              - dbt_utils.accepted_range:
                  min_value: 0
                  max_value: 10000000   # $100k max

Generic and Custom Tests

# Generic tests in model YAML
models:
  - name: fct_orders
    description: "One row per order"
    tests:
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns: ['order_id', 'version']
      - dbt_expectations.expect_table_row_count_to_be_between:
          min_value: 1000
          max_value: 10000000
    
    columns:
      - name: order_id
        tests: [not_null, unique]
      
      - name: amount_usd
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              inclusive: true
      
      - name: status
        tests:
          - accepted_values:
              values: [new, processing, shipped, delivered, cancelled]
              config:
                severity: warn  # warn, not error — for unexpected new status
-- Custom generic test: test that a column has no future dates
-- tests/generic/no_future_dates.sql
{% test no_future_dates(model, column_name, buffer_days=0) %}
SELECT *
FROM {{ model }}
WHERE {{ column_name }} > CURRENT_TIMESTAMP + INTERVAL '{{ buffer_days }}' DAY
{% endtest %}

-- Custom singular test: specific business rule
-- tests/assert_refund_amount_not_exceed_charge.sql
SELECT
    r.refund_id,
    r.charge_id,
    r.amount    AS refund_amount,
    c.amount    AS charge_amount
FROM {{ ref('stg_stripe__refunds') }} r
JOIN {{ ref('stg_stripe__charges') }} c ON r.charge_id = c.charge_id
WHERE r.amount > c.amount  -- refund exceeds original charge: data quality issue

Macros

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- Usage:
SELECT {{ cents_to_dollars('amount') }} AS amount_usd FROM charges

-- macros/generate_surrogate_key.sql (dbt_utils already has this)
{% macro generate_surrogate_key(field_list) %}
    {{ dbt_utils.generate_surrogate_key(field_list) }}
{% endmacro %}

-- macros/date_spine.sql — generate date series
{% macro date_spine(start_date, end_date) %}
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('" ~ start_date ~ "' as date)",
        end_date="cast('" ~ end_date ~ "' as date)"
    ) }}
{% endmacro %}

-- macros/pivot.sql — dynamic pivot
{% macro pivot(column, values, alias=true, agg='sum', cmp='=', prefix='', suffix='', then_value=1, else_value=0, quote_identifiers=false, distinct=false) %}
  {% for value in values %}
    {{ agg }}(
      {% if distinct %} DISTINCT {% endif %}
      CASE WHEN {{ column }} {{ cmp }} '{{ value }}'
           THEN {{ then_value }}
           ELSE {{ else_value }}
      END
    )
    {% if alias %}
      AS {{ adapter.quote(prefix ~ value ~ suffix) if quote_identifiers else (prefix ~ value ~ suffix | replace(' ', '_') | lower) }}
    {% endif %}
    {% if not loop.last %},{% endif %}
  {% endfor %}
{% endmacro %}

Snapshots (SCD Type 2)

-- snapshots/scd_customers.sql
{% snapshot scd_customers %}

{{ config(
    target_schema='snapshots',
    unique_key='customer_id',
    strategy='check',                       -- or 'timestamp'
    check_cols=['email', 'name', 'tier'],   -- which columns to track
    -- For timestamp strategy:
    -- strategy='timestamp',
    -- updated_at='updated_at',
    invalidate_hard_deletes=True            -- mark records deleted from source
) }}

SELECT
    id          AS customer_id,
    email,
    name,
    tier,
    created_at,
    updated_at
FROM {{ source('crm', 'customers') }}

{% endsnapshot %}

-- dbt adds these columns automatically:
-- dbt_scd_id:     unique row identifier
-- dbt_updated_at: when this snapshot record was created/updated
-- dbt_valid_from: when this version became current
-- dbt_valid_to:   when this version was superseded (NULL = current)

-- Query: customer tier at a specific point in time
SELECT customer_id, tier
FROM {{ ref('scd_customers') }}
WHERE customer_id = 'cust_123'
  AND dbt_valid_from <= '2024-06-01'
  AND (dbt_valid_to IS NULL OR dbt_valid_to > '2024-06-01')

Slim CI with State Comparison

# In CI: only run models and tests modified in this PR
# Fetch production manifest for comparison
dbt run \
  --select state:modified+ \    # modified models + all downstream dependents
  --defer \                      # use prod results for unmodified upstream models
  --state ./prod-manifest/       # production manifest.json

dbt test \
  --select state:modified+       # test only affected models

# Full CI pipeline script
#!/bin/bash
set -e

# Download production manifest
aws s3 cp s3://my-dbt-artifacts/manifest.json ./prod-manifest/manifest.json

# Install dependencies
dbt deps

# Check source freshness
dbt source freshness

# Run modified models (use prod for unmodified upstream)
dbt run --select state:modified+ --defer --state ./prod-manifest/

# Test everything that ran (plus their parents)
dbt test --select state:modified+

# Generate and upload new manifest for next run
dbt docs generate
aws s3 cp ./target/manifest.json s3://my-dbt-artifacts/manifest.json

ref() vs source() vs var()

-- ref(): reference another dbt model (creates DAG dependency)
SELECT * FROM {{ ref('stg_stripe__charges') }}

-- source(): reference raw source table (not transformed by dbt)
SELECT * FROM {{ source('stripe', 'charges') }}
-- Maps to: raw_data.stripe_raw.charges (per sources.yml config)

-- var(): project-level variable (can be overridden at runtime)
-- dbt_project.yml:
-- vars:
--   start_date: '2024-01-01'
--   payment_methods: ['card', 'bank_transfer']

SELECT *
FROM {{ ref('fct_orders') }}
WHERE created_at >= '{{ var("start_date") }}'
  AND payment_method IN ({{ var("payment_methods") | join(", ", attribute="'{}'")}})

-- Override at runtime:
-- dbt run --vars '{"start_date": "2023-01-01"}'

-- env_var(): pull from environment (for secrets/environment-specific config)
{{ config(
    database=env_var('DBT_TARGET_DATABASE', 'analytics')
) }}

Exposures

# models/exposures.yml — document who consumes your data
exposures:
  - name: monthly_revenue_dashboard
    type: dashboard
    maturity: high
    url: https://looker.example.com/dashboards/123
    description: >
      Monthly revenue dashboard for the Finance team.
      Refreshes every 6 hours.
    
    depends_on:
      - ref('fct_revenue')
      - ref('dim_customers')
    
    owner:
      name: Finance Analytics Team
      email: [email protected]
    
    tags: ['finance', 'executive']

  - name: revenue_ml_model
    type: ml
    maturity: medium
    description: "Churn prediction model trained on customer behavior"
    
    depends_on:
      - ref('fct_orders')
      - ref('dim_customers')
    
    owner:
      name: Data Science Team

Anti-Patterns

-- ❌ Hardcoding database/schema names (breaks across environments)
SELECT * FROM prod_db.analytics.orders
-- ✅ Always use ref() or source()
SELECT * FROM {{ ref('orders') }}

-- ❌ No unique_key on incremental model (full table scan for dedup)
{{ config(materialized='incremental') }}
-- ✅ Always specify unique_key for merge strategy
{{ config(materialized='incremental', unique_key='order_id', incremental_strategy='merge') }}

-- ❌ All models as tables (expensive for large marts)
{{ config(materialized='table') }}  -- on a 1-billion-row model
-- ✅ incremental for large tables

-- ❌ Logic in downstream models that should be in staging
-- fct_orders.sql: SELECT LOWER(TRIM(email))...
-- ✅ Clean in staging, use cleaned field everywhere downstream

-- ❌ No tests at all (data quality issues discovered by stakeholders)
-- ✅ Minimum: not_null + unique on primary keys, accepted_values on enums

-- ❌ select * in production models (breaks on source schema changes)
SELECT * FROM {{ ref('stg_customers') }}
-- ✅ Explicit column selection
SELECT id, email, name, created_at FROM {{ ref('stg_customers') }}

Quick Reference

Materialization Decision:
  Raw staging model              → view
  Staging used everywhere        → view (cheap to rebuild)
  Complex intermediate           → ephemeral (if used by 1)
  Analytics mart table           → table (if < 100M rows)
  Analytics mart (huge)          → incremental + merge
  SCD Type 2                     → snapshot

Test Coverage Minimum:
  Primary keys:  not_null + unique
  Foreign keys:  relationships test
  Status fields: accepted_values
  Amounts:       not_null + dbt_utils.accepted_range(min=0)

Incremental Strategy:
  append:          just adds new rows (no dedup)
  merge:           upserts by unique_key
  insert_overwrite: replaces entire partitions

Slim CI:
  --select state:modified+       affected models + downstream
  --defer                        use prod results for unmodified upstream
  --state ./prod-manifest/       where prod manifest lives

Naming:
  stg_*    → staging (source-conformed, 1:1 with source)
  int_*    → intermediate (business logic)
  fct_*    → facts (events, transactions)
  dim_*    → dimensions (entities)

Skill Information

Source
MoltbotDen
Category
Data & Analytics
Repository
View on GitHub

Related Skills