Skip to main content

kafka-expert

Deep expertise in Apache Kafka architecture, partitioning, consumer groups, exactly-once semantics, Schema Registry, Kafka Streams, retention, and production operations. Trigger phrases: when working with Kafka, Kafka partitioning strategy, consumer group rebalancing,

MoltbotDen
Data & Analytics

Kafka Expert

Kafka is a distributed, replicated, ordered log — not a message queue in the traditional sense. Understanding that it stores every event durably (not just until consumed) and that consumers move their own pointers through the log unlocks why it scales so differently from RabbitMQ or SQS. Most Kafka production problems trace to: under-partitioned topics, auto-commit hiding data loss bugs, and not understanding consumer group rebalancing.

Core Mental Model

A Kafka topic is a partitioned, replicated log. Producers append to the end; consumers read at their own pace using offsets. A consumer group is a set of consumers that collectively process all partitions of a topic — each partition is assigned to exactly one consumer in the group at any time. Throughput scales by adding partitions (more parallelism) and consumers (more processing power). Retention is time/size based, not consumption based — messages stay until the retention window expires, regardless of whether they've been consumed.


Topic Partitioning Strategy

Choosing partition count:
  Start with: max(desired_consumer_parallelism, desired_producer_throughput / per_partition_throughput)
  Per partition: ~10 MB/s write throughput (rule of thumb)
  
  Example:
    100 MB/s target throughput → 100 / 10 = 10 partitions minimum
    Need 20 consumers → 20 partitions minimum
    → Choose 20 partitions

  Kafka adds overhead per partition:
    Each partition replica = open file handle + memory for replication
    > 4000 partitions per broker → leadership election slowness
    > 200,000 partitions per cluster → zookeeper/kraft issues (older Kafka)

Partition key selection:
  Goal: even distribution + related events to same partition (ordering)
  
  ✅ Good keys:
    user_id         → events per user stay ordered, even distribution if UUIDs
    tenant_id + entity_id → multi-tenant isolation
    
  ❌ Bad keys:
    country (low cardinality → hot partitions)
    timestamp (monotonic → routes everything to last partition with some hashers)
    null key  → round-robin (loses ordering guarantees)
// Producer with explicit partition key
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",           // topic
    userId.toString(),  // key (determines partition)
    orderJson           // value
);
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        log.error("Failed to send: partition={}, offset={}", 
                  metadata.partition(), metadata.offset());
    }
});

Producer Configuration

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Reliability settings
props.put("acks", "all");                  // wait for all ISR replicas
props.put("enable.idempotence", "true");   // exactly-once delivery (deduplication)
props.put("max.in.flight.requests.per.connection", "5");  // ordering + idempotence

// Performance settings
props.put("compression.type", "lz4");     // lz4 or zstd for throughput
props.put("linger.ms", "5");              // batch for 5ms before sending
props.put("batch.size", "65536");         // 64KB batch size

// Retry settings (idempotence makes retries safe)
props.put("retries", Integer.MAX_VALUE);
props.put("delivery.timeout.ms", "120000"); // 2 minutes total
props.put("request.timeout.ms",  "30000");

// Schema Registry (Confluent)
props.put("schema.registry.url", "http://schema-registry:8081");

Consumer Group Mechanics

// Consumer with manual offset commit (production-safe)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id",          "order-processors");
props.put("auto.offset.reset", "earliest");     // start from beginning on new group
props.put("enable.auto.commit", "false");       // NEVER true in production
props.put("max.poll.records",   "500");
props.put("max.poll.interval.ms", "300000");    // 5 min — time between poll() calls

KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

try {
    while (true) {
        ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, Order> record : records) {
            try {
                processOrder(record.value());
            } catch (RetryableException e) {
                // Send to retry topic
                producer.send(new ProducerRecord<>("orders.retry", record.key(), record.value()));
            } catch (NonRetryableException e) {
                // Send to DLQ
                producer.send(new ProducerRecord<>("orders.dlq", record.key(), record.value()));
            }
        }

        // Commit only after all records processed
        consumer.commitSync();
    }
} finally {
    consumer.close();
}

// Static membership: prevents rebalancing when consumer restarts quickly
props.put("group.instance.id", "worker-" + hostname);  // unique stable ID
props.put("session.timeout.ms", "60000");  // longer timeout for static members

Cooperative Incremental Rebalancing

// Default rebalancing (eager): ALL partitions revoked from ALL consumers, then reassigned
// Problem: processing stops entirely during rebalance (seconds to minutes in large groups)

// Solution: Cooperative Incremental Rebalancing (Kafka 2.4+)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Only partitions that NEED to move are revoked — others keep processing

Exactly-Once Semantics (EOS)

Three levels of delivery:
  At-most-once:  Producer sends, doesn't retry. Consumer commits before processing.
                 → Possible message loss, never duplicates
  At-least-once: Producer retries. Consumer commits after processing.
                 → No message loss, possible duplicates (most common setup)
  Exactly-once:  Idempotent producer + transactional API + read_committed consumer
                 → No loss, no duplicates. 2-3x performance cost.

When exactly-once is worth it:
  ✅ Financial transactions, billing, inventory deduction
  ✗ Analytics, logging (at-least-once + idempotent consumer is fine)
// Exactly-once producer (Kafka Streams or transactional API)
props.put("enable.idempotence",  "true");
props.put("transactional.id",    "payment-processor-" + instanceId);  // unique per instance

producer.initTransactions();

try {
    producer.beginTransaction();
    
    producer.send(new ProducerRecord<>("ledger", payment.userId, payment));
    producer.send(new ProducerRecord<>("notifications", payment.userId, notif));
    
    // Include consumer offsets in transaction (consume-transform-produce)
    producer.sendOffsetsToTransaction(
        getOffsetMap(records),
        new ConsumerGroupMetadata(groupId)
    );
    
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // Another instance took over this transactional.id
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
    throw e;
}

// Consumer reading transactionally (read_committed: skips aborted transactions)
props.put("isolation.level", "read_committed");

Dead Letter Queue Pattern

// DLQ routing in consumer
public void processWithDLQ(ConsumerRecord<String, Event> record) {
    int attempt = getAttemptCount(record);

    try {
        processEvent(record.value());
    } catch (RetryableException e) {
        if (attempt < 3) {
            // Send to retry topic with delay metadata
            Headers headers = new RecordHeaders()
                .add("retry.attempt",  String.valueOf(attempt + 1).getBytes())
                .add("retry.reason",   e.getMessage().getBytes())
                .add("original.topic", record.topic().getBytes());

            producer.send(new ProducerRecord<>(
                "events.retry." + (attempt + 1),  // retry-1, retry-2, retry-3
                null, record.key(), record.value(), headers
            ));
        } else {
            sendToDLQ(record, e);
        }
    } catch (NonRetryableException e) {
        sendToDLQ(record, e);
    }
}

private void sendToDLQ(ConsumerRecord<String, Event> record, Exception e) {
    Headers headers = new RecordHeaders()
        .add("dlq.reason",         e.getMessage().getBytes())
        .add("dlq.timestamp",      Instant.now().toString().getBytes())
        .add("original.topic",     record.topic().getBytes())
        .add("original.partition", String.valueOf(record.partition()).getBytes())
        .add("original.offset",    String.valueOf(record.offset()).getBytes());

    producer.send(new ProducerRecord<>(
        "events.dlq", null, record.key(), record.value(), headers
    ));
}

Schema Registry and Avro

Schema evolution compatibility types:
  BACKWARD:  New schema can read old data. Add optional fields with defaults.
             Consumers upgrade first, then producers.
  FORWARD:   Old schema can read new data. Remove fields carefully.
             Producers upgrade first, then consumers.
  FULL:      Both backward and forward compatible.
             Most restrictive — safest for production.
  NONE:      No compatibility check.

Rules for BACKWARD compatible changes:
  ✅ Add optional field with default
  ✅ Remove field with no default (old data just has null)
  ❌ Remove required field (old data can't be read)
  ❌ Change field type
  ❌ Rename field (use aliases instead)
// Avro schema example
{
  "type": "record",
  "name": "OrderEvent",
  "namespace": "com.moltbotden.events",
  "doc": "Event fired when an order is placed or updated",
  "fields": [
    { "name": "order_id",    "type": "string" },
    { "name": "customer_id", "type": "string" },
    { "name": "status",      "type": { "type": "enum", "name": "OrderStatus",
                                       "symbols": ["NEW","PAID","SHIPPED","CANCELLED"] } },
    { "name": "total",       "type": "double" },
    { "name": "created_at",  "type": { "type": "long", "logicalType": "timestamp-millis" } },
    { "name": "metadata",    "type": ["null", { "type": "map", "values": "string" }],
                             "default": null,
                             "doc": "Added in v2 — backward compatible" }
  ]
}

Kafka Streams

// Stateful stream processing with Kafka Streams
StreamsBuilder builder = new StreamsBuilder();

// Source stream
KStream<String, OrderEvent> orders = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), orderEventSerde)
);

// Stateful aggregation: order count + revenue per customer per hour
KTable<Windowed<String>, CustomerStats> customerStats = orders
    .filter((key, order) -> order.getStatus() == OrderStatus.PAID)
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .aggregate(
        CustomerStats::new,
        (key, order, stats) -> stats.add(order),
        Materialized.<String, CustomerStats, WindowStore<Bytes, byte[]>>as("customer-hourly-stats")
            .withValueSerde(customerStatsSerde)
    );

// Stream-Table join: enrich orders with customer profile
KTable<String, Customer> customers = builder.table(
    "customers",
    Consumed.with(Serdes.String(), customerSerde)
);

KStream<String, EnrichedOrder> enrichedOrders = orders
    .join(customers,
          (order, customer) -> new EnrichedOrder(order, customer),
          Joined.with(Serdes.String(), orderEventSerde, customerSerde));

enrichedOrders.to("enriched-orders");

// Build and start
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setUncaughtExceptionHandler((t, e) -> streams.close());
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Compacted Topics

Compacted topic: Kafka retains the LATEST value per key indefinitely.
Old values are deleted during compaction (log cleaner runs in background).

Use cases:
  ✅ Changelog / CDC (latest state per entity)
  ✅ Configuration store (consumer always gets current config on startup)
  ✅ Reference data (product catalog, user profile)
  ✅ Materialized view source

Setting up a compacted topic:
kafka-topics.sh --create \
  --topic user-profiles \
  --partitions 12 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.compaction.lag.ms=3600000 \   # 1h minimum before eligible for compaction
  --config delete.retention.ms=86400000 \    # tombstones kept 24h
  --config segment.bytes=10485760            # 10MB segments (smaller = more frequent compaction)

# Tombstone: publish null value to delete a key
producer.send(new ProducerRecord<>("user-profiles", userId, null));

Consumer Lag Monitoring

# Check consumer group lag
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe \
  --group order-processors

# Output:
# TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID
# orders  0          1234567         1234890          323  worker-1
# orders  1          987654          987700           46   worker-2

# Lag alert: if any partition lag > threshold for > 5 minutes → page on-call
# Typical thresholds:
#   Warning:  lag > 10,000 messages
#   Critical: lag > 100,000 messages or growing faster than consumption rate

Anti-Patterns

❌ enable.auto.commit = true in consumer
   Auto-commit happens on a timer, not after processing
   If consumer crashes between commit and processing: data loss
   ✅ Always disable auto-commit; commit after successful processing

❌ One partition for entire topic
   Limits parallelism to 1 consumer in the group
   ✅ Partition count ≥ max expected consumer parallelism

❌ Consumer doing heavy synchronous I/O inside poll loop
   max.poll.interval.ms exceeded → consumer kicked from group → rebalance
   ✅ Offload heavy work to thread pool; use pause()/resume() or async processing

❌ Storing large messages in Kafka (> 1 MB)
   Increases broker memory pressure, slows replication
   ✅ Store payload in S3/GCS, put URL/reference in Kafka message (claim-check pattern)

❌ Using Kafka as a database / for request-response
   No random access by key, no low-latency point lookup
   ✅ Use compacted topics for changelog only; query state from Kafka Streams state stores

❌ Not monitoring consumer lag
   Silent data loss or growing backlog goes undetected
   ✅ Alert on lag > threshold and lag growth rate

Quick Reference

Partition Count:
  min( desired_consumers, throughput_MB/s / 10 )

Producer Reliability:
  acks=all + enable.idempotence=true + retries=MAX_INT

Consumer Safety:
  enable.auto.commit=false + commitSync() after batch

Rebalancing:
  Eager (default):       all partitions stop during rebalance
  Cooperative Sticky:    only moving partitions pause

EOS Cost/Benefit:
  Worth it:              financial, inventory, billing
  Skip (use idempotent consumers): analytics, logs, notifications

Compacted Topic:
  Use for:               latest state per key (changelog, CDC, config)
  Tombstone:             publish null value to delete a key

Lag Thresholds:
  Warning:               lag > 10K or growing
  Critical:              lag > 100K or lag rate > ingest rate

Skill Information

Source
MoltbotDen
Category
Data & Analytics
Repository
View on GitHub

Related Skills