data-quality
Expert knowledge of data quality dimensions, Great Expectations, dbt tests, anomaly detection, data contracts, schema change management, and pipeline observability. Trigger phrases: when implementing data quality, Great Expectations setup, dbt data t
Installation
npx clawhub@latest install data-qualityView the full skill documentation and source below.
Documentation
Data Quality Engineering
Data quality failures are silent production incidents. Unlike application bugs that throw errors, bad data propagates silently through pipelines, corrupts dashboards, and trains wrong ML models — often discovered by a stakeholder, not a monitor. The engineering discipline is: define expectations explicitly, validate them automatically in CI/CD, alert on violations before users see them, and design pipelines that degrade gracefully rather than fail catastrophically.
Core Mental Model
Data quality has six measurable dimensions: accuracy, completeness, consistency, timeliness, uniqueness, and validity. Each requires different detection techniques. The key architectural principle: quality gates should be in-pipeline (fail fast) for critical dimensions, and monitoring/alerting for statistical anomalies. Never let bad data reach consumers silently — either block it (quarantine) or alert loudly. Data contracts formalize the agreement between producers and consumers so schema changes don't silently break downstream systems.
Data Quality Dimensions
Accuracy: Values represent reality. Example: age = 150 (impossible).
Detect: range checks, reference table validation, cross-system reconciliation.
Completeness: Required fields are populated. Example: order with no customer_id.
Detect: not_null tests, null rate monitoring, row count vs expected.
Consistency: Same fact expressed consistently across systems/tables.
Example: order total ≠ sum(line_items).
Detect: cross-table assertion tests, referential integrity checks.
Timeliness: Data is fresh enough for its use case.
Example: daily orders table not updated in 36 hours.
Detect: source freshness checks, max(updated_at) lag monitoring.
Uniqueness: No duplicate records. Example: duplicate order IDs.
Detect: unique tests on primary keys, COUNT vs COUNT DISTINCT.
Validity: Values conform to defined formats/rules.
Example: email missing @, status = "INVALID_VALUE".
Detect: regex validation, accepted_values tests, FK constraints.
Great Expectations
# Great Expectations: define expectations, run checkpoints, generate data docs
import great_expectations as gx
# Initialize context
context = gx.get_context()
# Define a datasource
datasource = context.sources.add_or_update_pandas(name="orders_datasource")
asset = datasource.add_dataframe_asset(name="orders")
batch_request = asset.build_batch_request(dataframe=orders_df)
# Create expectation suite
suite = context.add_or_update_expectation_suite("orders_quality_suite")
# Create validator and add expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_quality_suite"
)
# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("created_at")
# Uniqueness
validator.expect_column_values_to_be_unique("order_id")
# Validity — format and range
validator.expect_column_values_to_be_of_type("amount", "float")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
)
# Accepted values
validator.expect_column_values_to_be_in_set(
"status",
{"new", "processing", "shipped", "delivered", "cancelled", "refunded"}
)
# Row count (freshness / completeness at table level)
validator.expect_table_row_count_to_be_between(
min_value=1000,
max_value=10_000_000
)
# Statistical distribution (anomaly detection)
validator.expect_column_mean_to_be_between(
"amount",
min_value=50.0,
max_value=500.0
)
validator.expect_column_stdev_to_be_between(
"amount",
min_value=10.0,
max_value=1000.0
)
# Column set completeness (no unexpected new columns, no missing columns)
validator.expect_table_columns_to_match_ordered_list(
["order_id", "customer_id", "amount", "status", "created_at"]
)
# Save expectations
validator.save_expectation_suite()
# Checkpoint: run suite against data + alert on failure
checkpoint = context.add_or_update_checkpoint(
name="orders_daily_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": "orders_quality_suite"
}
],
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"}
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"}
},
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "{{ slack_webhook }}",
"notify_on": "failure"
}
}
]
)
result = context.run_checkpoint("orders_daily_checkpoint")
if not result.success:
failed = [
v for v in result.list_validation_results()
if not v.success
]
raise DataQualityException(f"{len(failed)} expectations failed")
Anomaly Detection
import pandas as pd
import numpy as np
from scipy import stats
def detect_anomalies_zscore(series: pd.Series, threshold: float = 3.0) -> pd.Series:
"""Z-score method: flag values > N standard deviations from mean."""
z_scores = np.abs(stats.zscore(series.dropna()))
return pd.Series(z_scores > threshold, index=series.dropna().index).reindex(series.index, fill_value=False)
def detect_anomalies_iqr(series: pd.Series, multiplier: float = 1.5) -> pd.Series:
"""IQR method: flag values beyond Q1/Q3 ± multiplier*IQR."""
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - multiplier * IQR
upper = Q3 + multiplier * IQR
return (series < lower) | (series > upper)
# Time series anomaly detection with seasonality
def detect_time_series_anomalies(
ts: pd.Series,
window: int = 7,
seasonal_period: int = 7,
threshold_sigma: float = 3.0
) -> pd.DataFrame:
"""Detect anomalies accounting for weekly seasonality."""
df = pd.DataFrame({'value': ts})
# Rolling mean and std (trend component)
df['rolling_mean'] = ts.rolling(window, center=True).mean()
df['rolling_std'] = ts.rolling(window, center=True).std()
# Seasonal component: average by day of week
df['day_of_week'] = ts.index.dayofweek
df['seasonal_factor'] = df.groupby('day_of_week')['value'].transform('mean')
# Residual after trend + seasonal
df['residual'] = df['value'] - df['rolling_mean'] * (df['seasonal_factor'] / df['seasonal_factor'].mean())
# Anomaly if residual > N sigma
residual_std = df['residual'].std()
df['is_anomaly'] = df['residual'].abs() > threshold_sigma * residual_std
df['anomaly_score'] = df['residual'].abs() / residual_std
return df[df['is_anomaly']][['value', 'rolling_mean', 'anomaly_score']]
# Pipeline metric monitoring
def check_pipeline_metrics(
current: dict,
historical: list[dict],
metrics: list[str],
sigma_threshold: float = 3.0
) -> list[dict]:
"""Check if current pipeline run metrics are anomalous vs history."""
alerts = []
historical_df = pd.DataFrame(historical)
for metric in metrics:
if metric not in historical_df.columns:
continue
hist_values = historical_df[metric].dropna()
current_val = current.get(metric)
if current_val is None or len(hist_values) < 10:
continue
mean = hist_values.mean()
std = hist_values.std()
z_score = abs(current_val - mean) / (std + 1e-9)
if z_score > sigma_threshold:
alerts.append({
'metric': metric,
'current': current_val,
'expected_mean': round(mean, 2),
'z_score': round(z_score, 2),
'severity': 'critical' if z_score > 5 else 'warning'
})
return alerts
Schema Change Detection
from dataclasses import dataclass
from enum import Enum
class ChangeType(Enum):
COLUMN_ADDED = "column_added" # NON-BREAKING
COLUMN_REMOVED = "column_removed" # BREAKING
COLUMN_RENAMED = "column_renamed" # BREAKING
TYPE_WIDENED = "type_widened" # NON-BREAKING (int→bigint)
TYPE_NARROWED = "type_narrowed" # BREAKING (bigint→int)
NULLABLE_TO_REQUIRED = "nullable_to_required" # BREAKING
REQUIRED_TO_NULLABLE = "required_to_nullable" # NON-BREAKING
@dataclass
class SchemaChange:
change_type: ChangeType
column_name: str
old_value: str | None
new_value: str | None
is_breaking: bool
def detect_schema_changes(
old_schema: dict[str, dict],
new_schema: dict[str, dict]
) -> list[SchemaChange]:
"""Compare two schema dicts {col_name: {type, nullable, ...}}."""
changes = []
for col in set(old_schema.keys()) | set(new_schema.keys()):
if col not in new_schema:
changes.append(SchemaChange(
ChangeType.COLUMN_REMOVED, col,
old_schema[col]['type'], None, is_breaking=True
))
elif col not in old_schema:
changes.append(SchemaChange(
ChangeType.COLUMN_ADDED, col,
None, new_schema[col]['type'], is_breaking=False
))
else:
old, new = old_schema[col], new_schema[col]
if old['type'] != new['type']:
breaking = not is_type_widening(old['type'], new['type'])
changes.append(SchemaChange(
ChangeType.TYPE_NARROWED if breaking else ChangeType.TYPE_WIDENED,
col, old['type'], new['type'], is_breaking=breaking
))
if not old['nullable'] and new['nullable']:
changes.append(SchemaChange(
ChangeType.NULLABLE_TO_REQUIRED, col, None, None, is_breaking=True
))
return changes
def is_type_widening(old_type: str, new_type: str) -> bool:
widening_pairs = {
('int', 'bigint'), ('float', 'double'),
('varchar(100)', 'varchar(255)'), ('date', 'timestamp')
}
return (old_type, new_type) in widening_pairs
Data Contracts
# data-contract.yaml: consumer-driven contract between producer and consumer
# Producer: orders service
# Consumer: analytics team
dataContractSpecification: 0.9.3
id: urn:datacontract:orders:v1.2
info:
title: Orders Data Contract
version: 1.2.0
description: "Orders events published by the orders service"
owner: orders-team
contact:
email: [email protected]
servers:
production:
type: kafka
host: kafka.production.example.com:9092
topic: orders.v1
models:
OrderEvent:
description: "Emitted when an order is created or updated"
fields:
order_id:
type: string
required: true
unique: true
description: "UUID v4"
pattern: "^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
customer_id:
type: string
required: true
amount:
type: number
required: true
minimum: 0
maximum: 100000
description: "Order total in USD"
status:
type: string
required: true
enum: [new, processing, shipped, delivered, cancelled, refunded]
created_at:
type: timestamp
required: true
description: "ISO 8601 UTC"
metadata:
type: object
required: false
description: "Optional key-value metadata, added in v1.2"
quality:
type: SodaCL
specification: |
checks for OrderEvent:
- row_count > 0
- missing_count(order_id) = 0
- duplicate_count(order_id) = 0
- invalid_count(status) = 0:
valid values: [new, processing, shipped, delivered, cancelled, refunded]
- avg(amount) between 50 and 500
servicelevels:
freshness:
description: "Orders appear in stream within 5 seconds of creation"
threshold: 5s
completeness:
description: "Zero orders lost — 100% delivery guarantee"
threshold: "100%"
Pipeline Observability
from dataclasses import dataclass, field
from datetime import datetime
import time
@dataclass
class PipelineRun:
pipeline_name: str
run_id: str
started_at: datetime = field(default_factory=datetime.utcnow)
metrics: dict = field(default_factory=dict)
alerts: list = field(default_factory=list)
class PipelineObservability:
"""Track pipeline run metrics and alert on anomalies."""
def __init__(self, pipeline_name: str, metrics_store, alert_fn):
self.pipeline_name = pipeline_name
self.metrics_store = metrics_store
self.alert_fn = alert_fn
def __call__(self, pipeline_fn):
def wrapper(*args, **kwargs):
run = PipelineRun(self.pipeline_name, run_id=str(uuid.uuid4()))
start = time.time()
try:
result = pipeline_fn(*args, **kwargs)
run.metrics.update({
'status': 'success',
'duration_seconds': time.time() - start,
'rows_processed': getattr(result, 'row_count', None),
'rows_written': getattr(result, 'written_count', None),
'null_rate': getattr(result, 'null_rate', None),
})
return result
except Exception as e:
run.metrics.update({
'status': 'failed',
'error': str(e),
'duration_seconds': time.time() - start,
})
self.alert_fn(f"Pipeline {self.pipeline_name} FAILED: {e}", severity='critical')
raise
finally:
self._check_anomalies(run)
self.metrics_store.save(run)
return wrapper
def _check_anomalies(self, run: PipelineRun):
history = self.metrics_store.get_history(self.pipeline_name, days=30)
alerts = check_pipeline_metrics(
current=run.metrics,
historical=history,
metrics=['rows_processed', 'null_rate', 'duration_seconds']
)
for alert in alerts:
self.alert_fn(
f"Anomaly in {self.pipeline_name}.{alert['metric']}: "
f"current={alert['current']}, expected≈{alert['expected_mean']} "
f"(z={alert['z_score']})",
severity=alert['severity']
)
# Usage
@PipelineObservability("daily_orders_etl", metrics_store=db, alert_fn=pagerduty_alert)
def run_orders_etl(date: str) -> PipelineResult:
...
Quarantine Pattern
def process_with_quarantine(
records: list[dict],
validate_fn,
good_sink,
quarantine_sink
) -> tuple[int, int]:
"""Route invalid records to quarantine instead of failing the pipeline."""
good_records = []
quarantine_records = []
for record in records:
validation_result = validate_fn(record)
if validation_result.is_valid:
good_records.append(record)
else:
quarantine_records.append({
**record,
'_quarantine_reason': validation_result.errors,
'_quarantine_timestamp': datetime.utcnow().isoformat(),
'_quarantine_pipeline': 'orders_etl',
'_quarantine_version': '1.0'
})
good_sink.write(good_records)
quarantine_sink.write(quarantine_records)
# Alert if quarantine rate is high
quarantine_rate = len(quarantine_records) / len(records)
if quarantine_rate > 0.01: # > 1% quarantined
alert(f"High quarantine rate: {quarantine_rate:.1%} of records quarantined",
severity='warning' if quarantine_rate < 0.05 else 'critical')
return len(good_records), len(quarantine_records)
Anti-Patterns
❌ Silent data loss: pipeline continues on bad records without alerting
✅ Quarantine + alert on high quarantine rate
❌ Testing only in staging: data quality differs between environments
✅ Run quality checks in production with alerting, not just CI/CD
❌ Point-in-time tests only (test on full table, not incremental)
✅ Test each batch + monitor trends over time (anomaly detection)
❌ No schema registry: schema changes break consumers silently
✅ Confluent Schema Registry (Kafka), dbt column lineage, data contracts
❌ Treating all quality issues as blocking failures
✅ Use severity levels: ERROR (block pipeline), WARNING (alert + continue)
❌ Monitoring only row counts (misses structural issues)
✅ Monitor: row count, null rates, value distributions, column-level stats
❌ Data quality as afterthought (added after incidents)
✅ Define data contract before building the pipeline
Quick Reference
Six Dimensions:
Accuracy → range checks, cross-system reconciliation
Completeness → not_null, row count vs expected
Consistency → cross-table assertions, referential integrity
Timeliness → source freshness, max(updated_at) lag
Uniqueness → unique tests on PKs, COUNT vs COUNT DISTINCT
Validity → regex, accepted_values, FK relationships
Anomaly Detection:
Z-score: |value - mean| / std > 3 → anomaly
IQR: value < Q1 - 1.5*IQR or > Q3 + 1.5*IQR → anomaly
Seasonal: compare to same-day-of-week average
Schema Changes:
Non-breaking: add column, widen type, nullable→required
Breaking: remove column, narrow type, required→nullable
Alert Thresholds:
Row count: < 80% or > 150% of 7-day average
Null rate: > 5× historical baseline
Quarantine rate: > 1% warning, > 5% critical
Freshness lag: > 2× expected interval
Quarantine Flow:
Record → validate → valid: write to good_sink
→ invalid: write to quarantine_sink + alert if rate high