Skip to main content
Data & AnalyticsDocumented

data-pipeline-architect

Modern data pipeline architect. Medallion architecture, Apache Airflow DAGs, Kafka streaming, dbt transformations, Great Expectations data quality, and batch vs streaming decisions. The modern data stack done right.

Share:

Installation

npx clawhub@latest install data-pipeline-architect

View the full skill documentation and source below.

Documentation

Data Pipeline Architecture

Modern Data Stack

Sources → Ingestion → Storage → Transformation → Serving → Consumption

Sources: PostgreSQL, APIs, S3, Kafka, Webhooks, SaaS apps
Ingestion: Fivetran, Airbyte, custom pipelines, Kafka Connect
Storage: Data Lake (S3/GCS) + Data Warehouse (Snowflake/BigQuery/Redshift)
Transform: dbt, Spark, Beam
Serving: BI (Looker/Tableau), APIs, ML models, reverse ETL

Medallion Architecture (Delta Lake / Lakehouse)

Bronze (Raw):
  - Exact copy of source data
  - Never modified, append-only
  - Immutable audit trail
  - Schema: source schema + _ingested_at, _source, _batch_id

Silver (Cleaned):
  - Deduplicated, validated, standardized
  - Type-cast, null-handled
  - Business logic applied
  - Schema: clean, normalized, well-typed

Gold (Business):
  - Aggregated for specific use cases
  - Metric definitions embedded
  - Optimized for query patterns
  - Schema: wide, denormalized for analytics

Apache Airflow DAGs

# Modern Airflow 2.x with TaskFlow API
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import timedelta
import pandas as pd

@dag(
    schedule_interval="0 2 * * *",  # Daily at 2 AM UTC
    start_date=days_ago(1),
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "retry_exponential_backoff": True,
        "max_retry_delay": timedelta(hours=1),
        "email_on_failure": True,
        "email": ["[email protected]"],
    },
    tags=["bronze", "users"],
    doc_md="""
    ## Daily User Data Ingestion
    Extracts user data from PostgreSQL (source), applies bronze transformations,
    and loads to S3 data lake. Part of the users medallion pipeline.
    
    **Schedule**: Daily at 2 AM UTC
    **SLA**: Must complete by 4 AM UTC
    """,
)
def users_bronze_pipeline():

    @task
    def extract_from_postgres(logical_date=None) -> dict:
        """Extract incremental data from source database."""
        hook = PostgresHook(postgres_conn_id="source_db")
        
        # Incremental extraction using watermark
        query = """
            SELECT 
                id, email, name, status, created_at, updated_at
            FROM users
            WHERE updated_at >= %(watermark)s
              AND updated_at < %(end_time)s
            ORDER BY updated_at
        """
        
        watermark = logical_date - timedelta(days=1)
        end_time = logical_date
        
        df = hook.get_pandas_df(
            query,
            parameters={"watermark": watermark, "end_time": end_time}
        )
        
        # Serialize for XCom (use S3 for large datasets)
        return {
            "row_count": len(df),
            "data": df.to_json(orient="records"),
            "watermark": watermark.isoformat(),
        }
    
    @task
    def apply_bronze_transforms(raw_data: dict) -> dict:
        """Apply bronze layer transformations."""
        import json
        from datetime import datetime
        
        df = pd.read_json(raw_data["data"])
        
        # Bronze transformations: add metadata, validate schema
        df["_ingested_at"] = datetime.utcnow().isoformat()
        df["_source"] = "postgres_users"
        df["_batch_id"] = raw_data["watermark"]
        df["_raw_hash"] = df.apply(
            lambda row: hash(tuple(row.values())), axis=1
        )
        
        # Validate required fields
        null_counts = df[["id", "email"]].isnull().sum()
        if null_counts.any():
            raise ValueError(f"Null values in required fields: {null_counts.to_dict()}")
        
        return {
            "row_count": len(df),
            "data": df.to_parquet(index=False).hex(),  # Parquet bytes
            "watermark": raw_data["watermark"],
        }
    
    @task
    def load_to_s3(transformed_data: dict, logical_date=None) -> dict:
        """Write bronze data to S3 as partitioned Parquet."""
        import io, struct
        
        s3_hook = S3Hook(aws_conn_id="aws_default")
        
        # Hive-style partitioning
        date_str = logical_date.strftime("year=%Y/month=%m/day=%d")
        s3_key = f"bronze/users/{date_str}/data.parquet"
        
        parquet_bytes = bytes.fromhex(transformed_data["data"])
        
        s3_hook.load_bytes(
            bytes_data=parquet_bytes,
            key=s3_key,
            bucket_name="my-data-lake",
            replace=True,
        )
        
        return {
            "s3_path": f"s3://my-data-lake/{s3_key}",
            "row_count": transformed_data["row_count"],
        }
    
    @task
    def update_metadata_catalog(load_result: dict) -> None:
        """Register partition in Glue/Hive metastore."""
        # Update Glue catalog partition for Athena queries
        import boto3
        glue = boto3.client("glue")
        # glue.create_partition(...)
    
    # DAG wiring
    raw = extract_from_postgres()
    transformed = apply_bronze_transforms(raw)
    loaded = load_to_s3(transformed)
    update_metadata_catalog(loaded)

users_bronze = users_bronze_pipeline()

Kafka Streaming Pipeline

from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError
import json
import logging
from dataclasses import dataclass
from typing import Iterator

logger = logging.getLogger(__name__)

@dataclass
class ProcessingResult:
    topic: str
    partition: int
    offset: int
    success: bool
    error: str = None

class StreamProcessor:
    """Base class for Kafka stream processing."""
    
    def __init__(
        self,
        brokers: list[str],
        input_topic: str,
        output_topic: str,
        consumer_group: str,
        dlq_topic: str = None,  # Dead letter queue
    ):
        self.consumer = KafkaConsumer(
            input_topic,
            bootstrap_servers=brokers,
            group_id=consumer_group,
            auto_offset_reset="earliest",
            enable_auto_commit=False,  # Manual commit for exactly-once
            value_deserializer=lambda v: json.loads(v.decode("utf-8")),
            max_poll_interval_ms=300000,  # 5 min for slow processing
            session_timeout_ms=10000,
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=brokers,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
            acks="all",  # Wait for all replicas
            retries=3,
            compression_type="snappy",
        )
        
        self.output_topic = output_topic
        self.dlq_topic = dlq_topic or f"{input_topic}-dlq"
    
    def transform(self, message: dict) -> dict:
        """Override this in subclass."""
        raise NotImplementedError
    
    def run(self, batch_size: int = 100):
        """Process messages in batches for efficiency."""
        while True:
            messages = self.consumer.poll(timeout_ms=1000, max_records=batch_size)
            
            if not messages:
                continue
            
            for tp, records in messages.items():
                for record in records:
                    try:
                        transformed = self.transform(record.value)
                        
                        self.producer.send(
                            self.output_topic,
                            value=transformed,
                            key=str(record.key).encode() if record.key else None,
                        )
                        
                    except Exception as e:
                        logger.error(
                            f"Failed to process message",
                            extra={
                                "topic": tp.topic,
                                "partition": tp.partition,
                                "offset": record.offset,
                                "error": str(e),
                            }
                        )
                        
                        # Send to dead letter queue
                        self.producer.send(
                            self.dlq_topic,
                            value={
                                "original": record.value,
                                "error": str(e),
                                "source_topic": tp.topic,
                                "source_offset": record.offset,
                            }
                        )
            
            # Flush and commit after each batch
            self.producer.flush()
            self.consumer.commit()


# Example: User event enrichment processor
class UserEventEnricher(StreamProcessor):
    def __init__(self, *args, db, **kwargs):
        super().__init__(*args, **kwargs)
        self.db = db
        self._cache = {}  # Simple in-process cache
    
    def transform(self, event: dict) -> dict:
        user_id = event["user_id"]
        
        # Cache user data to avoid DB hit per event
        if user_id not in self._cache:
            user = self.db.get_user(user_id)
            self._cache[user_id] = {
                "email": user.email,
                "segment": user.segment,
                "plan": user.plan,
            }
        
        return {
            **event,
            "user": self._cache[user_id],
            "_enriched_at": datetime.utcnow().isoformat(),
        }

dbt Data Transformation

-- models/silver/users.sql
{{
  config(
    materialized='incremental',
    unique_key='user_id',
    incremental_strategy='merge',
    cluster_by=['created_date'],
    tags=['silver', 'daily'],
    meta={
      'owner': 'data-platform-team',
      'description': 'Cleaned and deduplicated user records'
    }
  )
}}

WITH source AS (
    SELECT * FROM {{ source('bronze', 'users') }}
    {% if is_incremental() %}
    -- Only process new/changed records
    WHERE _ingested_at > (SELECT MAX(_ingested_at) FROM {{ this }})
    {% endif %}
),

deduplicated AS (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY id
               ORDER BY updated_at DESC, _ingested_at DESC
           ) AS rn
    FROM source
    WHERE id IS NOT NULL
      AND email IS NOT NULL
),

cleaned AS (
    SELECT
        id AS user_id,
        LOWER(TRIM(email)) AS email,
        TRIM(name) AS full_name,
        INITCAP(SPLIT_PART(TRIM(name), ' ', 1)) AS first_name,
        INITCAP(SPLIT_PART(TRIM(name), ' ', -1)) AS last_name,
        CASE
            WHEN status IN ('active', 'enabled', '1') THEN 'active'
            WHEN status IN ('inactive', 'disabled', '0') THEN 'inactive'
            WHEN status = 'pending' THEN 'pending'
            ELSE 'unknown'
        END AS status,
        created_at::TIMESTAMP AS created_at,
        updated_at::TIMESTAMP AS updated_at,
        DATE_TRUNC('day', created_at) AS created_date,
        _ingested_at,
        _source
    FROM deduplicated
    WHERE rn = 1
)

SELECT * FROM cleaned
# models/silver/schema.yml
version: 2

models:
  - name: users
    description: "Cleaned and deduplicated user records from all sources"
    columns:
      - name: user_id
        description: "Unique user identifier"
        tests:
          - unique
          - not_null
      - name: email
        tests:
          - not_null
          - unique
          - dbt_utils.expression_is_true:
              expression: "LOWER(email) = email"  # Must be lowercase
      - name: status
        tests:
          - not_null
          - accepted_values:
              values: ['active', 'inactive', 'pending', 'unknown']
      - name: created_at
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "created_at <= CURRENT_TIMESTAMP"
# dbt_project.yml
name: my_analytics
version: '1.0.0'

vars:
  start_date: '2023-01-01'
  environment: "{{ env_var('DBT_TARGET', 'dev') }}"

models:
  my_analytics:
    bronze:
      +materialized: table
      +tags: ['bronze']
    silver:
      +materialized: incremental
      +tags: ['silver']
    gold:
      +materialized: table
      +tags: ['gold']
      +grants:
        select: ['ROLE_ANALYSTS', 'ROLE_BI_TOOLS']

Pipeline Monitoring and Quality

# Great Expectations for data quality
import great_expectations as gx

context = gx.get_context()

# Define expectations
suite = context.create_expectation_suite("users_bronze")

validator = context.get_validator(
    datasource_name="s3_bronze",
    data_connector_name="default_runtime_data_connector",
    data_asset_name="users",
    batch_identifiers={"default_identifier_name": "2024-01-15"},
    expectation_suite=suite,
)

# Add expectations
validator.expect_column_values_to_not_be_null("id")
validator.expect_column_values_to_be_unique("id")
validator.expect_column_values_to_match_regex("email", r"^[^@]+@[^@]+\.[^@]+$")
validator.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=None)

# Run validation
results = validator.validate()
if not results.success:
    raise ValueError(f"Data quality check failed: {results}")

Batch vs Streaming Decision Matrix

FactorBatchStreaming
Latency neededHours / DailySeconds / Minutes
Data volumeAnyHigh throughput
ComplexityLowerHigher
CostLowerHigher
Use casesReports, ML training, DWH loadsFraud detection, personalization, monitoring
ToolsAirflow, Spark, dbtKafka, Flink, Spark Streaming
Error handlingRetry whole batchDLQ, per-message retry
Default to batch unless you actually need low latency.