Skip to main content

ml-engineering

Expert ML engineering patterns for production systems: model serving architectures, feature stores, experiment tracking with MLflow, training-serving skew prevention, model drift monitoring, and GPU-optimized Kubernetes deployments. Trigger phrases: deploying ML models, feature pipeline,

MoltbotDen
AI & LLMs

ML Engineering

Production ML systems fail not because of bad models, but because of bad engineering. Training accuracy means nothing if your serving pipeline corrupts features, your model drifts silently, or your infrastructure can't handle real traffic. This skill covers the patterns that separate ML demos from production ML systems.

Core Mental Model

Think of a production ML system as three separate concerns that must stay synchronized: training (where models are built), serving (where predictions happen), and monitoring (where you learn if serving is broken). The most common production failure — training-serving skew — happens when these three components diverge. Every architectural decision should be evaluated against the question: "Will this keep training and serving in sync at feature computation time?"

Online vs Batch Serving

Batch serving (offline predictions):

  • Pre-compute predictions for all entities; store in a lookup table

  • Latency: doesn't matter (minutes to hours acceptable)

  • Use when: recommendations for email campaigns, next-day forecasting, report generation

  • Risk: predictions go stale between batch runs


Online serving (real-time predictions):
  • Model loaded in memory; requests scored synchronously

  • Latency: must be <100ms p99 for user-facing; <10ms for critical paths

  • Use when: fraud detection, search ranking, personalization at click time

  • Risk: scaling challenges, dependency on feature freshness


Lambda architecture (hybrid):
  • Batch predictions as fallback + online for fresh signals

  • E.g.: batch-computed user embeddings (stable) + real-time session features (fresh)


# Decision tree: online vs batch
def choose_serving_pattern(latency_sla_ms, feature_freshness_required, qps):
    if latency_sla_ms < 50:
        return "online_lightweight"  # simple features, fast model (linear, shallow tree)
    elif latency_sla_ms < 500 and feature_freshness_required == "real-time":
        return "online_full"  # full feature pipeline, GPU if needed
    elif feature_freshness_required == "daily":
        return "batch_with_online_fallback"
    else:
        return "batch"

Feature Store Architecture

The feature store is the bridge between training and serving. Without it, you'll rewrite feature logic twice (once in Python for training, once in Java/Go for serving), and they'll eventually diverge.

┌─────────────────┐    ┌─────────────────┐    ┌──────────────────┐
│  Feature Pipelines │──▶│   Feature Store  │──▶│  Training Jobs   │
│  (Spark/Flink)  │    │  ┌────────────┐  │    └──────────────────┘
└─────────────────┘    │  │ Offline    │  │    ┌──────────────────┐
                        │  │ (S3/Hive)  │  │──▶│  Serving (Redis) │
                        │  ├────────────┤  │    └──────────────────┘
                        │  │ Online     │  │
                        │  │ (Redis)    │  │
                        │  └────────────┘  │
                        └─────────────────┘
# Feast feature store example
from feast import FeatureStore, Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
from datetime import timedelta

# Define entity
user = Entity(name="user_id", join_keys=["user_id"])

# Define feature source (same for training and serving)
user_features_source = FileSource(
    path="s3://my-bucket/user_features.parquet",
    timestamp_field="event_timestamp",
)

# Define feature view
user_feature_view = FeatureView(
    name="user_features",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Field(name="purchase_count_7d", dtype=Int64),
        Field(name="avg_order_value", dtype=Float32),
        Field(name="days_since_last_purchase", dtype=Int64),
    ],
    source=user_features_source,
)

store = FeatureStore(repo_path=".")

# Training: point-in-time correct feature retrieval
training_df = store.get_historical_features(
    entity_df=training_labels_df,  # has user_id + event_timestamp
    features=["user_features:purchase_count_7d", "user_features:avg_order_value"],
).to_df()

# Serving: low-latency online lookup (same feature logic)
online_features = store.get_online_features(
    features=["user_features:purchase_count_7d", "user_features:avg_order_value"],
    entity_rows=[{"user_id": "user_123"}],
).to_dict()

Training-Serving Skew Prevention

This is the #1 production ML failure mode. It happens when the features computed at training time differ from those computed at serving time — even subtly.

# BAD: Feature logic duplicated in two places
# training_pipeline.py
def compute_user_age_days(signup_date, reference_date):
    return (reference_date - signup_date).days

# serving_api.py (written 6 months later by different team)
def compute_user_age_days(signup_date):
    return (datetime.now() - signup_date).days  # Bug: timezone issue, different reference point

# GOOD: Single source of truth for feature computation
# features/user_features.py (used by BOTH training and serving)
def compute_user_features(user_id: str, reference_timestamp: datetime) -> dict:
    """
    Canonical feature computation. Import this in training pipeline AND serving API.
    Never reimplement feature logic — always import from here.
    """
    user = db.get_user(user_id)
    return {
        "account_age_days": (reference_timestamp - user.signup_date).days,
        "purchase_count_30d": db.count_purchases(user_id, reference_timestamp, days=30),
        "is_premium": user.subscription_tier == "premium",
    }

MLflow Experiment Tracking

import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature

mlflow.set_experiment("fraud-detection-v2")

with mlflow.start_run(run_name="xgboost-baseline"):
    # Log parameters
    params = {
        "n_estimators": 200,
        "max_depth": 6,
        "learning_rate": 0.1,
        "subsample": 0.8,
        "colsample_bytree": 0.8,
    }
    mlflow.log_params(params)

    # Train
    model = XGBClassifier(**params)
    model.fit(X_train, y_train, eval_set=[(X_val, y_val)], early_stopping_rounds=20)

    # Log metrics
    y_pred = model.predict_proba(X_test)[:, 1]
    mlflow.log_metric("auc_roc", roc_auc_score(y_test, y_pred))
    mlflow.log_metric("avg_precision", average_precision_score(y_test, y_pred))
    mlflow.log_metric("best_iteration", model.best_iteration)

    # Log the model with signature and input example
    signature = infer_signature(X_train, model.predict(X_train))
    mlflow.sklearn.log_model(
        model,
        "model",
        signature=signature,
        input_example=X_train.head(5),
        registered_model_name="fraud-detector",
    )

    # Log feature importance as artifact
    importance_df = pd.DataFrame({
        "feature": X_train.columns,
        "importance": model.feature_importances_,
    }).sort_values("importance", ascending=False)
    importance_df.to_csv("feature_importance.csv", index=False)
    mlflow.log_artifact("feature_importance.csv")

# Promote to staging after evaluation
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="fraud-detector",
    version=3,
    stage="Staging",
    archive_existing_versions=False,
)

Model Drift Detection

Data drift: Input feature distribution has changed (population shift, data pipeline bug).
Concept drift: Relationship between features and target has changed (real-world change, adversarial shift).

# PSI (Population Stability Index) for data drift detection
import numpy as np

def compute_psi(expected, actual, buckets=10, epsilon=1e-6):
    """
    PSI < 0.1: No significant drift
    PSI 0.1-0.2: Moderate drift — monitor closely
    PSI > 0.2: Significant drift — investigate, consider retraining
    """
    breakpoints = np.percentile(expected, np.linspace(0, 100, buckets + 1))
    breakpoints[0] = -np.inf
    breakpoints[-1] = np.inf

    expected_counts = np.histogram(expected, bins=breakpoints)[0] / len(expected)
    actual_counts = np.histogram(actual, bins=breakpoints)[0] / len(actual)

    # Clip to avoid log(0)
    expected_counts = np.clip(expected_counts, epsilon, None)
    actual_counts = np.clip(actual_counts, epsilon, None)

    psi = np.sum((actual_counts - expected_counts) * np.log(actual_counts / expected_counts))
    return psi

# KS test for distribution shift
from scipy import stats

def check_drift(reference_data: pd.DataFrame, current_data: pd.DataFrame, 
                threshold_psi=0.2, threshold_ks_pvalue=0.05) -> dict:
    drift_report = {}
    for col in reference_data.columns:
        psi = compute_psi(reference_data[col], current_data[col])
        ks_stat, ks_pvalue = stats.ks_2samp(reference_data[col], current_data[col])
        drift_report[col] = {
            "psi": psi,
            "psi_alert": psi > threshold_psi,
            "ks_pvalue": ks_pvalue,
            "ks_alert": ks_pvalue < threshold_ks_pvalue,
        }
    return drift_report

Shadow Mode + Canary Deployment

# Shadow mode: new model runs alongside production, results not served but logged
from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.post("/predict")
async def predict(request: PredictionRequest):
    # Production model always wins
    prod_prediction = production_model.predict(request.features)

    # Shadow model runs in background, result discarded
    asyncio.create_task(
        shadow_model_predict_and_log(request, shadow_model)
    )

    return {"prediction": prod_prediction, "model_version": "v1.2"}

async def shadow_model_predict_and_log(request, shadow_model):
    try:
        shadow_result = shadow_model.predict(request.features)
        # Log for offline comparison — never serve this
        metrics_client.record({
            "shadow_prediction": shadow_result,
            "prod_prediction": prod_prediction,  # from closure
            "request_id": request.id,
        })
    except Exception as e:
        logger.warning(f"Shadow model failed: {e}")  # Never affects users

Kubernetes + KEDA for GPU Auto-scaling

# keda-gpu-scaler.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: ml-inference-scaler
spec:
  scaleTargetRef:
    name: ml-inference-deployment
  minReplicaCount: 1
  maxReplicaCount: 10
  triggers:
    - type: prometheus
      metadata:
        serverAddress: http://prometheus:9090
        metricName: inference_queue_depth
        threshold: "5"   # Scale up when >5 requests queued per pod
        query: sum(inference_queue_size) / count(kube_pod_info{pod=~"ml-inference.*"})
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-inference-deployment
spec:
  template:
    spec:
      containers:
        - name: inference-server
          image: my-registry/ml-model:v1.2
          resources:
            limits:
              nvidia.com/gpu: "1"
            requests:
              nvidia.com/gpu: "1"
              memory: "8Gi"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

ONNX Cross-Framework Deployment

# Export PyTorch model to ONNX for framework-agnostic serving
import torch
import onnx
import onnxruntime as ort

# Export
dummy_input = torch.randn(1, 3, 224, 224)  # batch_size=1, channels=3, H=224, W=224
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    opset_version=17,
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
)

# Validate
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)

# Inference with ONNX Runtime (faster than PyTorch for CPU)
session = ort.InferenceSession(
    "model.onnx",
    providers=["CUDAExecutionProvider", "CPUExecutionProvider"],  # GPU with CPU fallback
)
outputs = session.run(None, {"input": input_array})

Anti-Patterns

❌ Recomputing features differently in training vs serving
Each reimplementation diverges. Use a shared feature library or feature store.

❌ Using datetime.now() in feature computation
Never use wall-clock time in features. Always pass reference_timestamp explicitly — this is what breaks point-in-time correctness in training.

❌ No baseline model
Always have a simple baseline (moving average, most-frequent-class, rule-based). If your ML model doesn't beat it, you have a data or framing problem, not a model problem.

❌ Monitoring only model metrics, not data pipeline health
Null rates, schema changes, and distribution shifts in your feature pipeline are invisible without explicit monitoring. 80% of "model degradation" is actually pipeline degradation.

❌ Loading model from disk on every request
Load at startup, cache in memory. Model loading can take seconds.

❌ Training on future data leakage
Always perform train/test split by time, not randomly, for time-series problems. Random split gives optimistic results that won't generalize.

Quick Reference

Serving pattern decision:
  p99 < 50ms + simple features     → Lightweight REST (FastAPI + sklearn)
  p99 < 500ms + complex features   → Online serving (FastAPI + GPU + feature store)
  p99 < 5000ms                     → Async queue (Celery/Redis + model worker)
  Freshness > 1 hour OK            → Batch predictions + lookup table

Drift thresholds:
  PSI < 0.1    → No action
  PSI 0.1-0.2  → Alert, monitor daily
  PSI > 0.2    → Investigate + consider retrain
  KS p < 0.05  → Statistically significant shift

Deployment stages:
  Shadow mode (0% traffic, log only) → Canary (5%) → Staged (25%) → Full rollout

MLflow model stages:
  None → Staging (after eval) → Production (after canary) → Archived (after replacement)

Skill Information

Source
MoltbotDen
Category
AI & LLMs
Repository
View on GitHub

Related Skills