Skip to main content

stream-processing

Expert knowledge of stream processing architectures, event time vs processing time, windowing, stateful operations, exactly-once semantics, Kafka Streams, Flink, Spark Structured Streaming, CQRS with event streams, and Lambda vs Kappa architecture.

MoltbotDen
Data & Analytics

Stream Processing Expert

Stream processing is fundamentally about answering the question: "What has happened recently?" — but "recently" is surprisingly complex when data arrives out of order, systems fail, and you need to handle both the case where events arrive on time and the case where they arrive late. The two hardest problems in streaming are: handling time correctly (event time vs processing time) and maintaining correct state across failures. Everything else is engineering tradeoffs.

Core Mental Model

The central insight is the distinction between event time (when something actually happened) and processing time (when the system processed it). Using processing time is simple but produces incorrect results when events arrive late. Using event time is correct but requires you to define how long you'll wait for late data — that's what watermarks do. State in streaming is just a fault-tolerant distributed hash table (checkpoint it regularly). Exactly-once semantics come at 2-5x throughput cost — evaluate whether your use case needs it.


Batch vs Micro-batch vs Real-time

Batch processing:
  How it works:    Collect data for a period, process all at once
  Latency:         Minutes to hours
  Throughput:      Excellent (I/O amortized over large batches)
  State:           Simple (start fresh each run, or use checkpoints)
  Complexity:      Low
  When to use:
    ✅ End-of-day reporting
    ✅ ML model training
    ✅ Data warehouse ETL
    ✅ Large-scale transformations (TB+ scale)
    ✅ When latency > 5 minutes is acceptable

Micro-batch (Spark Structured Streaming, AWS Kinesis Analytics):
  How it works:    Process small batches every N seconds
  Latency:         Seconds to low minutes (typically 1-60s)
  Throughput:      Good
  State:           Checkpoint-based
  When to use:
    ✅ Near-real-time dashboards
    ✅ Fraud detection with latency tolerance
    ✅ You're already using Spark
    ✅ Complex joins and aggregations

Real-time streaming (Apache Flink, Apache Kafka Streams):
  How it works:    Process each event individually as it arrives
  Latency:         Milliseconds to low seconds
  State:           Continuous incremental checkpoints
  Complexity:      High (time, state, fault tolerance all explicit)
  When to use:
    ✅ Real-time personalization / recommendations
    ✅ Fraud detection (latency must be < 100ms)
    ✅ IoT sensor processing
    ✅ Live scoreboards, trading, alerting
    ✅ Event-driven microservices

Event Time vs Processing Time

Processing time: when the system receives and processes the event
  Simple:   no watermarks needed
  Problem:  late-arriving events cause incorrect results
  Use for:  monitoring, alerting on current system behavior

Event time: when the event actually occurred (embedded in the event)
  Correct:  accurate even with out-of-order, delayed events
  Complex:  requires watermarks to know when to close a window
  Use for:  business analytics, billing, reporting

Ingestion time: when the event was ingested into the stream (Kafka timestamp)
  Compromise between the two
  Use for:  when event time isn't available in the payload

Illustration:
  Event happens at 10:00:00
  Mobile device offline, sends at 10:05:00
  Kafka receives at 10:05:01 (processing time)
  Event timestamp in payload: 10:00:00 (event time)
  
  If you aggregate by minute using processing time: counted in 10:05 bucket (wrong)
  If you aggregate by minute using event time: counted in 10:00 bucket (correct)

Watermarks for Late Data

# Flink: explicit watermark strategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessWindowFunction, AggregateFunction
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.time import Duration, Time
from pyflink.datastream.window import TumblingEventTimeWindows

env = StreamExecutionEnvironment.get_execution_environment()

# Source with watermark: tolerate 30 seconds of out-of-order events
stream = env.add_source(kafka_source) \
    .assign_timestamps_and_watermarks(
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(30))
        .with_timestamp_assigner(lambda event, _: event.timestamp_ms)
    )

# Tumbling 1-hour window on event time
# A window closes when the watermark passes window_end_time
# Events arriving > 30 seconds late after window closes are dropped (or sent to side output)
result = stream \
    .key_by(lambda event: event.user_id) \
    .window(TumblingEventTimeWindows.of(Time.hours(1))) \
    .aggregate(
        OrderCountAggregator(),
        RevenueWindowFunction()
    )

# Side output for late events (optional: collect what was dropped)
late_events_tag = OutputTag("late-events")
result_with_late = stream \
    .key_by(lambda event: event.user_id) \
    .window(TumblingEventTimeWindows.of(Time.hours(1))) \
    .allowed_lateness(Time.minutes(5))  # allow late by 5 extra minutes after watermark
    .side_output_late_data(late_events_tag) \
    .aggregate(OrderCountAggregator())

late_stream = result_with_late.get_side_output(late_events_tag)
late_stream.add_sink(late_events_kafka_sink)  # process late events separately

Windowing Patterns

# Tumbling window: non-overlapping, fixed size
# Use for: per-minute/hourly counts, non-overlapping time buckets
# "How many orders per hour?"
# [00:00-01:00] [01:00-02:00] [02:00-03:00] ...

.window(TumblingEventTimeWindows.of(Time.hours(1)))

# Sliding window: overlapping, fixed size, configurable slide
# Use for: moving averages, "recent N minutes" metrics
# "What's the 5-minute rolling average, updated every minute?"
# [00:00-00:05], [00:01-00:06], [00:02-00:07] ...

.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
# Output: one result per minute, each representing the last 5 minutes

# Session window: activity-based, gaps trigger window close
# Use for: user sessions, device activity bursts
# "Group user activity with < 30 minute inactivity gaps"
# User active 10:00-10:20 (gap) 11:30-11:45 → two sessions

.window(EventTimeSessionWindows.with_gap(Time.minutes(30)))
# Each session closes when no events for 30 minutes

# Global window: one window per key, never closes
# Use for: stateful counters, aggregations without time bounds
# Must provide a custom trigger or it never fires
.window(GlobalWindows.create()) \
.trigger(CountTrigger.of(1000))  # fire every 1000 events

# Window comparison:
# Event pattern:  |--A--B-C---|D|----E---F-G-H--|I|
# Tumbling(5min): [-----][-----][-----]
# Sliding(5,1):   each minute has a 5-minute-looking-back window
# Session(2min):  [A-B-C][D][E-F-G-H][I]

Stateful Stream Processing

// Flink: keyed state (per-key state, distributed across operators)
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
    
    // State: maintained per user_id, checkpointed to fault-tolerant storage
    private ValueState<Double> lastTransactionAmount;
    private ValueState<Long>   lastTransactionTime;
    private ListState<Transaction> recentTransactions;
    
    @Override
    public void open(Configuration parameters) {
        lastTransactionAmount = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-amount", Double.class)
        );
        lastTransactionTime = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-time", Long.class)
        );
        recentTransactions = getRuntimeContext().getListState(
            new ListStateDescriptor<>("recent-txns", Transaction.class)
        );
    }
    
    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) throws Exception {
        Double lastAmount = lastTransactionAmount.value();
        Long   lastTime   = lastTransactionTime.value();
        
        if (lastAmount != null && lastTime != null) {
            boolean largeAmount  = txn.amount > 10 * lastAmount;
            boolean tooFast      = txn.timestamp - lastTime < 60_000;  // 1 minute
            
            if (largeAmount && tooFast) {
                out.collect(new Alert(txn.userId, "Suspicious: large rapid transaction", txn));
                // Register a timer to clear state if no more activity
                ctx.timerService().registerEventTimeTimer(
                    txn.timestamp + TimeUnit.HOURS.toMillis(1)
                );
            }
        }
        
        lastTransactionAmount.update(txn.amount);
        lastTransactionTime.update(txn.timestamp);
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
        // Clear state after inactivity (prevent unbounded state growth)
        lastTransactionAmount.clear();
        lastTransactionTime.clear();
        recentTransactions.clear();
    }
}
// Checkpointing configuration (critical for exactly-once state recovery)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(60_000);  // checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);  // min 30s between checkpoints
env.getCheckpointConfig().setCheckpointTimeout(120_000);  // fail checkpoint after 2 minutes
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// Checkpoint storage (state persisted to fault-tolerant storage)
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/flink-checkpoints/");

Exactly-Once vs At-Least-Once

At-most-once:     Message processed 0 or 1 times. Possible loss.
                  Use when: logging, metrics, some monitoring
                  
At-least-once:    Message processed 1+ times. Possible duplicates.
                  Your application must be idempotent to handle duplicates.
                  Use when: most analytics, aggregations, materialized views
                  Implementation: retry + idempotent consumer

Exactly-once:     Message processed exactly 1 time. No loss, no duplicates.
                  Cost: 2-5x throughput reduction
                  Implementation: distributed transactions + checkpointing

When exactly-once is worth it:
  ✅ Financial transactions, billing, payment ledger
  ✅ Inventory deduction
  ✅ Writes that can't be deduplicated downstream

When at-least-once + idempotent is sufficient:
  ✅ Analytics aggregations (usually idempotent with UPSERT)
  ✅ Search index updates
  ✅ Notification counts (approximate is OK)
  ✅ Most event-driven microservices

EOS in practice:
  Kafka → Flink → Kafka:   end-to-end EOS with Flink transactions
  Kafka → Spark → Delta:   EOS with Delta Lake ACID writes
  Kafka → Kafka Streams:   built-in EOS with idempotent producer + transactions

Kafka Streams vs Apache Flink vs Spark Structured Streaming

Kafka Streams:
  Deployment:   Library (runs in your application)
  State:        RocksDB (local, replicated to Kafka changelog topic)
  Latency:      Low (milliseconds)
  Throughput:   Good
  Exactly-once: Yes
  Windowing:    Yes (tumbling, hopping, session)
  Complex ops:  Limited (joins require co-partitioning)
  Best for:
    ✅ Microservice-style stream processing
    ✅ Already using Kafka, don't want separate cluster
    ✅ Simple-medium complexity transformations
    ✅ Team doesn't want to manage Flink/Spark cluster

Apache Flink:
  Deployment:   Cluster (Kubernetes, YARN, AWS managed)
  State:        RocksDB or heap (checkpointed to S3/HDFS)
  Latency:      Very low (milliseconds, native event-time)
  Throughput:   Excellent
  Exactly-once: Yes (end-to-end with supported sinks)
  Windowing:    Full (all types + custom)
  Complex ops:  Full (complex joins, CEP, ML)
  Best for:
    ✅ Mission-critical low-latency processing
    ✅ Complex stateful logic (fraud, CEP)
    ✅ High throughput (billions of events/day)
    ✅ You need native event-time processing

Spark Structured Streaming:
  Deployment:   Spark cluster (same as batch)
  State:        Spark state store (checkpointed to S3/HDFS)
  Latency:      Higher (seconds, micro-batch model)
  Throughput:   Excellent
  Exactly-once: Yes (with idempotent sinks)
  Windowing:    Yes (with watermarks)
  Best for:
    ✅ Already using Spark for batch (code reuse)
    ✅ Unified batch + streaming (Lambda architecture in one system)
    ✅ Latency tolerance of 1-60 seconds
    ✅ Delta Lake integration (ACID streaming writes)

Backpressure Handling

// Flink: automatic backpressure — operators slow down producers when downstream is slow
// Monitor: Flink UI shows "backpressure" indicator per operator (high/low/OK)

// Kafka Streams: backpressure via consumer poll rate
// If processing is slow, consumer reads less frequently → Kafka accumulates lag

// Patterns to handle slow consumers:
// 1. Increase parallelism
env.setParallelism(8);  // 8 parallel operators

// 2. Async I/O for external calls (don't block the operator thread)
AsyncDataStream.unorderedWait(
    stream,
    new DatabaseAsyncLookup(),    // async lookup function
    1000,                         // timeout ms
    TimeUnit.MILLISECONDS,
    50                            // max concurrent async requests
);

// 3. Buffer configuration for bursty sources
env.setBufferTimeout(100);  // batch micro-results for 100ms before sending downstream

// 4. Checkpoint tuning: if checkpoints are slow, they block processing
// Increase: -Dstate.backend.incremental=true  (incremental checkpoints)
// Increase: checkpoint interval vs latency tradeoff

Lambda vs Kappa Architecture

Lambda Architecture:
  Speed layer:  Stream processing for near-real-time views
  Batch layer:  Recompute complete views from raw data periodically
  Serving layer: Merge speed + batch views at query time
  
  Pros:
    ✅ Batch layer provides "source of truth" correctness
    ✅ Speed layer tolerates approximate results
    ✅ Well-understood, battle-tested
  
  Cons:
    ❌ Two codebases to maintain (batch + stream, often different languages)
    ❌ Divergence between batch and stream logic (bugs)
    ❌ Complex serving layer merge logic

Kappa Architecture:
  Single stream processing layer for everything
  "Replay" the event stream from the beginning for historical recomputes
  Same code handles real-time and historical
  
  Pros:
    ✅ One codebase, one paradigm
    ✅ Simpler architecture
    ✅ Replay enables "what-if" reprocessing
  
  Cons:
    ❌ Replaying large history is slow and expensive
    ❌ Streaming SQL is less expressive than batch SQL for complex analytics
    ❌ Requires long event retention (Kafka compaction/retention costs)

Modern recommendation:
  Use Kappa with Delta Lake / Apache Iceberg:
    - Stream to Delta Lake (micro-batch or Flink → Delta)
    - Delta Lake acts as both the streaming sink and batch source
    - Time travel for replay
    - One storage layer, no serving merge needed

CQRS with Event Streams

// CQRS: Command Query Responsibility Segregation
// Commands mutate state (write side), emitting events to Kafka
// Queries read materialized views updated by event consumers

// Write side: command handler
@CommandHandler
public void handle(PlaceOrderCommand cmd) {
    // Validate
    if (!inventoryService.hasStock(cmd.productId, cmd.quantity)) {
        throw new InsufficientStockException();
    }
    // Emit event (don't mutate query model directly)
    eventBus.publish(new OrderPlacedEvent(
        orderId:    UUID.randomUUID().toString(),
        customerId: cmd.customerId,
        productId:  cmd.productId,
        quantity:   cmd.quantity,
        price:      cmd.price,
        timestamp:  Instant.now()
    ));
}

// Read side: projection (Kafka consumer rebuilding query model)
@KafkaListener(topics = "order-events")
public void project(OrderPlacedEvent event) {
    // Update denormalized read model optimized for queries
    orderRepository.upsert(OrderSummary.builder()
        .orderId(event.orderId)
        .customerId(event.customerId)
        .total(event.price * event.quantity)
        .status("placed")
        .createdAt(event.timestamp)
        .build());
    
    // Update customer aggregate stats
    customerStatsRepository.incrementOrderCount(event.customerId);
    customerStatsRepository.addRevenue(event.customerId, event.price * event.quantity);
}

// Query side: reads optimized projections (not the event log)
public OrderSummaryDTO getOrder(String orderId) {
    return orderRepository.findById(orderId);  // fast read from projection
}

// Replay: rebuild projection from scratch (Kafka consumer from offset 0)
// This is the superpower of event sourcing:
// Change the projection logic → replay events → correct view

Anti-Patterns

❌ Using processing time for business logic
   "Orders per day" computed on processing time = wrong when events arrive late
   ✅ Always use event time for business metrics, set appropriate watermarks

❌ Unbounded state accumulation
   Keyed state growing forever → OOM in Flink/Streams
   ✅ Set state TTL, use timers to clear idle state

❌ Watermarks that are too tight (drop too much late data)
   Watermark delay = 0 → every late-by-1ms event is dropped
   ✅ Set watermark delay based on measured 95th-99th percentile event lateness

❌ Watermarks that are too loose (delay output unnecessarily)
   Watermark delay = 1 hour → output delayed by 1 hour
   ✅ Measure actual lateness distribution, set watermark at 99th percentile

❌ Not checkpointing stateful jobs
   Operator fails → state lost → wrong aggregate results
   ✅ Checkpoint to durable storage (S3/HDFS) every 30-60 seconds

❌ Exactly-once everywhere for throughput-sensitive workloads
   At-least-once + idempotent consumer is often sufficient and 2-5x faster
   ✅ Use EOS only for genuinely non-idempotent operations

❌ Lambda architecture maintained as two separate codebases
   Drift between batch and streaming logic creates subtle bugs
   ✅ Kappa with Delta Lake, or unified batch/streaming framework

Quick Reference

Processing Model:
  Latency > 5 minutes OK    → batch (simplest, most efficient)
  Latency 1-60 seconds      → micro-batch (Spark, Kinesis)
  Latency < 1 second        → real-time streaming (Flink, Kafka Streams)

Time Semantics:
  Business metrics           → always event time
  System monitoring          → processing time (fine for current system state)
  Event time requires        → watermarks to bound late data

Window Selection:
  Non-overlapping buckets    → Tumbling
  Rolling averages           → Sliding
  User session grouping      → Session
  Custom triggers            → Global + custom trigger

State Management:
  Per-key counters           → ValueState
  Per-key lists              → ListState
  Per-key maps               → MapState
  State TTL                  → ALWAYS set to prevent unbounded growth

Delivery Semantics:
  Worth exactly-once:        financial, billing, inventory
  At-least-once + idempotent: analytics, search, notifications

Framework Selection:
  Simple, microservice-style → Kafka Streams
  Low-latency, complex state → Apache Flink
  Spark ecosystem, unified   → Spark Structured Streaming

Architecture:
  Real-time + historical     → Kappa + Delta Lake (unified)
  Legacy/complex analytics   → Lambda (two codebases, avoid if possible)

Skill Information

Source
MoltbotDen
Category
Data & Analytics
Repository
View on GitHub

Related Skills