Skip to main content
Data & AnalyticsDocumented

data-quality

Expert knowledge of data quality dimensions, Great Expectations, dbt tests, anomaly detection, data contracts, schema change management, and pipeline observability. Trigger phrases: when implementing data quality, Great Expectations setup, dbt data t

Share:

Installation

npx clawhub@latest install data-quality

View the full skill documentation and source below.

Documentation

Data Quality Engineering

Data quality failures are silent production incidents. Unlike application bugs that throw errors, bad data propagates silently through pipelines, corrupts dashboards, and trains wrong ML models — often discovered by a stakeholder, not a monitor. The engineering discipline is: define expectations explicitly, validate them automatically in CI/CD, alert on violations before users see them, and design pipelines that degrade gracefully rather than fail catastrophically.

Core Mental Model

Data quality has six measurable dimensions: accuracy, completeness, consistency, timeliness, uniqueness, and validity. Each requires different detection techniques. The key architectural principle: quality gates should be in-pipeline (fail fast) for critical dimensions, and monitoring/alerting for statistical anomalies. Never let bad data reach consumers silently — either block it (quarantine) or alert loudly. Data contracts formalize the agreement between producers and consumers so schema changes don't silently break downstream systems.


Data Quality Dimensions

Accuracy:     Values represent reality. Example: age = 150 (impossible).
              Detect: range checks, reference table validation, cross-system reconciliation.

Completeness: Required fields are populated. Example: order with no customer_id.
              Detect: not_null tests, null rate monitoring, row count vs expected.

Consistency:  Same fact expressed consistently across systems/tables.
              Example: order total ≠ sum(line_items). 
              Detect: cross-table assertion tests, referential integrity checks.

Timeliness:   Data is fresh enough for its use case.
              Example: daily orders table not updated in 36 hours.
              Detect: source freshness checks, max(updated_at) lag monitoring.

Uniqueness:   No duplicate records. Example: duplicate order IDs.
              Detect: unique tests on primary keys, COUNT vs COUNT DISTINCT.

Validity:     Values conform to defined formats/rules.
              Example: email missing @, status = "INVALID_VALUE".
              Detect: regex validation, accepted_values tests, FK constraints.

Great Expectations

# Great Expectations: define expectations, run checkpoints, generate data docs

import great_expectations as gx

# Initialize context
context = gx.get_context()

# Define a datasource
datasource = context.sources.add_or_update_pandas(name="orders_datasource")
asset = datasource.add_dataframe_asset(name="orders")
batch_request = asset.build_batch_request(dataframe=orders_df)

# Create expectation suite
suite = context.add_or_update_expectation_suite("orders_quality_suite")

# Create validator and add expectations
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_quality_suite"
)

# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("created_at")

# Uniqueness
validator.expect_column_values_to_be_unique("order_id")

# Validity — format and range
validator.expect_column_values_to_be_of_type("amount", "float")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_match_regex(
    "email",
    r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
)

# Accepted values
validator.expect_column_values_to_be_in_set(
    "status",
    {"new", "processing", "shipped", "delivered", "cancelled", "refunded"}
)

# Row count (freshness / completeness at table level)
validator.expect_table_row_count_to_be_between(
    min_value=1000,
    max_value=10_000_000
)

# Statistical distribution (anomaly detection)
validator.expect_column_mean_to_be_between(
    "amount",
    min_value=50.0,
    max_value=500.0
)
validator.expect_column_stdev_to_be_between(
    "amount",
    min_value=10.0,
    max_value=1000.0
)

# Column set completeness (no unexpected new columns, no missing columns)
validator.expect_table_columns_to_match_ordered_list(
    ["order_id", "customer_id", "amount", "status", "created_at"]
)

# Save expectations
validator.save_expectation_suite()
# Checkpoint: run suite against data + alert on failure
checkpoint = context.add_or_update_checkpoint(
    name="orders_daily_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "orders_quality_suite"
        }
    ],
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"}
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"}
        },
        {
            "name": "send_slack_notification",
            "action": {
                "class_name": "SlackNotificationAction",
                "slack_webhook": "{{ slack_webhook }}",
                "notify_on": "failure"
            }
        }
    ]
)

result = context.run_checkpoint("orders_daily_checkpoint")
if not result.success:
    failed = [
        v for v in result.list_validation_results()
        if not v.success
    ]
    raise DataQualityException(f"{len(failed)} expectations failed")

Anomaly Detection

import pandas as pd
import numpy as np
from scipy import stats

def detect_anomalies_zscore(series: pd.Series, threshold: float = 3.0) -> pd.Series:
    """Z-score method: flag values > N standard deviations from mean."""
    z_scores = np.abs(stats.zscore(series.dropna()))
    return pd.Series(z_scores > threshold, index=series.dropna().index).reindex(series.index, fill_value=False)

def detect_anomalies_iqr(series: pd.Series, multiplier: float = 1.5) -> pd.Series:
    """IQR method: flag values beyond Q1/Q3 ± multiplier*IQR."""
    Q1 = series.quantile(0.25)
    Q3 = series.quantile(0.75)
    IQR = Q3 - Q1
    lower = Q1 - multiplier * IQR
    upper = Q3 + multiplier * IQR
    return (series < lower) | (series > upper)

# Time series anomaly detection with seasonality
def detect_time_series_anomalies(
    ts: pd.Series,
    window: int = 7,
    seasonal_period: int = 7,
    threshold_sigma: float = 3.0
) -> pd.DataFrame:
    """Detect anomalies accounting for weekly seasonality."""
    df = pd.DataFrame({'value': ts})
    
    # Rolling mean and std (trend component)
    df['rolling_mean'] = ts.rolling(window, center=True).mean()
    df['rolling_std']  = ts.rolling(window, center=True).std()
    
    # Seasonal component: average by day of week
    df['day_of_week'] = ts.index.dayofweek
    df['seasonal_factor'] = df.groupby('day_of_week')['value'].transform('mean')
    
    # Residual after trend + seasonal
    df['residual'] = df['value'] - df['rolling_mean'] * (df['seasonal_factor'] / df['seasonal_factor'].mean())
    
    # Anomaly if residual > N sigma
    residual_std = df['residual'].std()
    df['is_anomaly'] = df['residual'].abs() > threshold_sigma * residual_std
    df['anomaly_score'] = df['residual'].abs() / residual_std
    
    return df[df['is_anomaly']][['value', 'rolling_mean', 'anomaly_score']]

# Pipeline metric monitoring
def check_pipeline_metrics(
    current: dict,
    historical: list[dict],
    metrics: list[str],
    sigma_threshold: float = 3.0
) -> list[dict]:
    """Check if current pipeline run metrics are anomalous vs history."""
    alerts = []
    historical_df = pd.DataFrame(historical)
    
    for metric in metrics:
        if metric not in historical_df.columns:
            continue
        
        hist_values = historical_df[metric].dropna()
        current_val = current.get(metric)
        
        if current_val is None or len(hist_values) < 10:
            continue
        
        mean = hist_values.mean()
        std  = hist_values.std()
        z_score = abs(current_val - mean) / (std + 1e-9)
        
        if z_score > sigma_threshold:
            alerts.append({
                'metric':       metric,
                'current':      current_val,
                'expected_mean': round(mean, 2),
                'z_score':       round(z_score, 2),
                'severity':      'critical' if z_score > 5 else 'warning'
            })
    
    return alerts

Schema Change Detection

from dataclasses import dataclass
from enum import Enum

class ChangeType(Enum):
    COLUMN_ADDED         = "column_added"          # NON-BREAKING
    COLUMN_REMOVED       = "column_removed"         # BREAKING
    COLUMN_RENAMED       = "column_renamed"         # BREAKING
    TYPE_WIDENED         = "type_widened"           # NON-BREAKING (int→bigint)
    TYPE_NARROWED        = "type_narrowed"          # BREAKING (bigint→int)
    NULLABLE_TO_REQUIRED = "nullable_to_required"  # BREAKING
    REQUIRED_TO_NULLABLE = "required_to_nullable"  # NON-BREAKING

@dataclass
class SchemaChange:
    change_type:   ChangeType
    column_name:   str
    old_value:     str | None
    new_value:     str | None
    is_breaking:   bool

def detect_schema_changes(
    old_schema: dict[str, dict],
    new_schema: dict[str, dict]
) -> list[SchemaChange]:
    """Compare two schema dicts {col_name: {type, nullable, ...}}."""
    changes = []
    
    for col in set(old_schema.keys()) | set(new_schema.keys()):
        if col not in new_schema:
            changes.append(SchemaChange(
                ChangeType.COLUMN_REMOVED, col,
                old_schema[col]['type'], None, is_breaking=True
            ))
        elif col not in old_schema:
            changes.append(SchemaChange(
                ChangeType.COLUMN_ADDED, col,
                None, new_schema[col]['type'], is_breaking=False
            ))
        else:
            old, new = old_schema[col], new_schema[col]
            if old['type'] != new['type']:
                breaking = not is_type_widening(old['type'], new['type'])
                changes.append(SchemaChange(
                    ChangeType.TYPE_NARROWED if breaking else ChangeType.TYPE_WIDENED,
                    col, old['type'], new['type'], is_breaking=breaking
                ))
            if not old['nullable'] and new['nullable']:
                changes.append(SchemaChange(
                    ChangeType.NULLABLE_TO_REQUIRED, col, None, None, is_breaking=True
                ))
    
    return changes

def is_type_widening(old_type: str, new_type: str) -> bool:
    widening_pairs = {
        ('int', 'bigint'), ('float', 'double'),
        ('varchar(100)', 'varchar(255)'), ('date', 'timestamp')
    }
    return (old_type, new_type) in widening_pairs

Data Contracts

# data-contract.yaml: consumer-driven contract between producer and consumer
# Producer: orders service
# Consumer: analytics team

dataContractSpecification: 0.9.3
id: urn:datacontract:orders:v1.2
info:
  title: Orders Data Contract
  version: 1.2.0
  description: "Orders events published by the orders service"
  owner: orders-team
  contact:
    email: [email protected]

servers:
  production:
    type: kafka
    host: kafka.production.example.com:9092
    topic: orders.v1

models:
  OrderEvent:
    description: "Emitted when an order is created or updated"
    fields:
      order_id:
        type: string
        required: true
        unique: true
        description: "UUID v4"
        pattern: "^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
      
      customer_id:
        type: string
        required: true
      
      amount:
        type: number
        required: true
        minimum: 0
        maximum: 100000
        description: "Order total in USD"
      
      status:
        type: string
        required: true
        enum: [new, processing, shipped, delivered, cancelled, refunded]
      
      created_at:
        type: timestamp
        required: true
        description: "ISO 8601 UTC"
      
      metadata:
        type: object
        required: false
        description: "Optional key-value metadata, added in v1.2"

quality:
  type: SodaCL
  specification: |
    checks for OrderEvent:
      - row_count > 0
      - missing_count(order_id) = 0
      - duplicate_count(order_id) = 0
      - invalid_count(status) = 0:
          valid values: [new, processing, shipped, delivered, cancelled, refunded]
      - avg(amount) between 50 and 500

servicelevels:
  freshness:
    description: "Orders appear in stream within 5 seconds of creation"
    threshold: 5s
  completeness:
    description: "Zero orders lost — 100% delivery guarantee"
    threshold: "100%"

Pipeline Observability

from dataclasses import dataclass, field
from datetime import datetime
import time

@dataclass
class PipelineRun:
    pipeline_name: str
    run_id:        str
    started_at:    datetime = field(default_factory=datetime.utcnow)
    metrics:       dict = field(default_factory=dict)
    alerts:        list = field(default_factory=list)

class PipelineObservability:
    """Track pipeline run metrics and alert on anomalies."""
    
    def __init__(self, pipeline_name: str, metrics_store, alert_fn):
        self.pipeline_name = pipeline_name
        self.metrics_store = metrics_store
        self.alert_fn = alert_fn
    
    def __call__(self, pipeline_fn):
        def wrapper(*args, **kwargs):
            run = PipelineRun(self.pipeline_name, run_id=str(uuid.uuid4()))
            start = time.time()
            
            try:
                result = pipeline_fn(*args, **kwargs)
                run.metrics.update({
                    'status':           'success',
                    'duration_seconds': time.time() - start,
                    'rows_processed':   getattr(result, 'row_count', None),
                    'rows_written':     getattr(result, 'written_count', None),
                    'null_rate':        getattr(result, 'null_rate', None),
                })
                return result
                
            except Exception as e:
                run.metrics.update({
                    'status':  'failed',
                    'error':   str(e),
                    'duration_seconds': time.time() - start,
                })
                self.alert_fn(f"Pipeline {self.pipeline_name} FAILED: {e}", severity='critical')
                raise
                
            finally:
                self._check_anomalies(run)
                self.metrics_store.save(run)
        
        return wrapper
    
    def _check_anomalies(self, run: PipelineRun):
        history = self.metrics_store.get_history(self.pipeline_name, days=30)
        alerts = check_pipeline_metrics(
            current=run.metrics,
            historical=history,
            metrics=['rows_processed', 'null_rate', 'duration_seconds']
        )
        for alert in alerts:
            self.alert_fn(
                f"Anomaly in {self.pipeline_name}.{alert['metric']}: "
                f"current={alert['current']}, expected≈{alert['expected_mean']} "
                f"(z={alert['z_score']})",
                severity=alert['severity']
            )

# Usage
@PipelineObservability("daily_orders_etl", metrics_store=db, alert_fn=pagerduty_alert)
def run_orders_etl(date: str) -> PipelineResult:
    ...

Quarantine Pattern

def process_with_quarantine(
    records: list[dict],
    validate_fn,
    good_sink,
    quarantine_sink
) -> tuple[int, int]:
    """Route invalid records to quarantine instead of failing the pipeline."""
    good_records = []
    quarantine_records = []
    
    for record in records:
        validation_result = validate_fn(record)
        
        if validation_result.is_valid:
            good_records.append(record)
        else:
            quarantine_records.append({
                **record,
                '_quarantine_reason':    validation_result.errors,
                '_quarantine_timestamp': datetime.utcnow().isoformat(),
                '_quarantine_pipeline':  'orders_etl',
                '_quarantine_version':   '1.0'
            })
    
    good_sink.write(good_records)
    quarantine_sink.write(quarantine_records)
    
    # Alert if quarantine rate is high
    quarantine_rate = len(quarantine_records) / len(records)
    if quarantine_rate > 0.01:  # > 1% quarantined
        alert(f"High quarantine rate: {quarantine_rate:.1%} of records quarantined",
              severity='warning' if quarantine_rate < 0.05 else 'critical')
    
    return len(good_records), len(quarantine_records)

Anti-Patterns

❌ Silent data loss: pipeline continues on bad records without alerting
   ✅ Quarantine + alert on high quarantine rate

❌ Testing only in staging: data quality differs between environments
   ✅ Run quality checks in production with alerting, not just CI/CD

❌ Point-in-time tests only (test on full table, not incremental)
   ✅ Test each batch + monitor trends over time (anomaly detection)

❌ No schema registry: schema changes break consumers silently
   ✅ Confluent Schema Registry (Kafka), dbt column lineage, data contracts

❌ Treating all quality issues as blocking failures
   ✅ Use severity levels: ERROR (block pipeline), WARNING (alert + continue)

❌ Monitoring only row counts (misses structural issues)
   ✅ Monitor: row count, null rates, value distributions, column-level stats

❌ Data quality as afterthought (added after incidents)
   ✅ Define data contract before building the pipeline

Quick Reference

Six Dimensions:
  Accuracy     → range checks, cross-system reconciliation
  Completeness → not_null, row count vs expected
  Consistency  → cross-table assertions, referential integrity
  Timeliness   → source freshness, max(updated_at) lag
  Uniqueness   → unique tests on PKs, COUNT vs COUNT DISTINCT
  Validity     → regex, accepted_values, FK relationships

Anomaly Detection:
  Z-score:           |value - mean| / std > 3 → anomaly
  IQR:               value < Q1 - 1.5*IQR or > Q3 + 1.5*IQR → anomaly
  Seasonal:          compare to same-day-of-week average

Schema Changes:
  Non-breaking:      add column, widen type, nullable→required
  Breaking:          remove column, narrow type, required→nullable

Alert Thresholds:
  Row count:         < 80% or > 150% of 7-day average
  Null rate:         > 5× historical baseline
  Quarantine rate:   > 1% warning, > 5% critical
  Freshness lag:     > 2× expected interval

Quarantine Flow:
  Record → validate → valid: write to good_sink
                    → invalid: write to quarantine_sink + alert if rate high