Caching & Messaging Infrastructure

Operating Kafka

18 min Lesson 6 of 30

Operating Kafka

Running Kafka in production at scale means going well beyond "start the broker and publish messages." Four decisions govern cluster health day-to-day: how partitions are sized, how long data is retained, whether compaction is on, and whether consumer lag and ISR health are tracked close to real time. Misconfigure any one of these and you will face disk exhaustion, undetectable consumer drift, or silent data loss — often all three simultaneously on the worst day of the year.

Partition Sizing

Partitions are the unit of parallelism, replication, and file-handle consumption. Getting the count wrong at topic creation is expensive to fix later because repartitioning requires a full data migration.

Throughput formula. Target partition count = max(Tin / p, Tout / c), where Tin is peak ingestion MB/s, p is per-partition write throughput (~10 MB/s on commodity NVMe), and c is per-partition consumer throughput. A topic receiving 500 MB/s of raw events with 5 consumer threads each sustaining 50 MB/s needs at least 10 partitions from the producer side and 10 from the consumer side — so start at 12 with a growth margin.

File-handle cost. Each partition replica opens two file handles (index + segment). A broker hosting 50 000 partition replicas needs at least 100 000 file descriptors; set ulimit -n 200000 in the systemd unit and fs.file-max accordingly. At Google and LinkedIn the practical ceiling per broker is 4 000–6 000 leader partitions before leader election latency and controller overhead degrade.

Partition reassignment. When you must repartition, use kafka-reassign-partitions.sh with throttle flags so rebalancing does not saturate inter-broker network. A 50 MB/s replication throttle is conservative but safe for a cluster that is still serving traffic.

# Create topic with deliberate partition count kafka-topics.sh --bootstrap-server kafka:9092 \ --create --topic payments.raw \ --partitions 24 \ --replication-factor 3 \ --config min.insync.replicas=2 \ --config retention.ms=604800000 # Describe to confirm ISR state immediately after creation kafka-topics.sh --bootstrap-server kafka:9092 \ --describe --topic payments.raw # Generate a reassignment plan (throttle to 50 MB/s = 52428800 bytes/s) kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \ --reassignment-json-file expand-plan.json \ --execute \ --replica-assignment-throttle 52428800 # Verify progress kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \ --reassignment-json-file expand-plan.json \ --verify

Retention Policies

Retention is a storage budget, not a backup strategy. Two dimensions control it: time-based (retention.ms) and size-based (retention.bytes). Both apply per partition; the per-topic size limit is retention.bytes * partition_count. Kafka enforces whichever limit is hit first.

Segment rolling. Retention is applied at segment granularity, not individual message granularity. A segment is only deleted when it is older than retention.ms AND a newer segment exists. The active (open) segment is never deleted. This means a low-volume topic with a very large segment.ms (default 7 days) can silently retain data far longer than retention.ms. For event-sourcing or audit topics, set segment.ms close to retention.ms / 2 to bound overage.

Tiered storage. Confluent and AWS MSK both offer tiered storage where segments older than a threshold are offloaded to object storage (S3/GCS), keeping only recent hot data on local NVMe. This decouples storage cost from compute scaling and is the recommended pattern for topics with retention measured in weeks or months. Configure via remote.storage.enable=true and local.retention.ms (how long to keep locally before delegating reads to the remote tier).

# Per-topic overrides for a high-volume audit log kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type topics \ --entity-name audit.events \ --alter \ --add-config retention.ms=2592000000,\ retention.bytes=107374182400,\ segment.ms=86400000,\ segment.bytes=536870912 # Verify effective config (shows dynamic overrides vs broker defaults) kafka-configs.sh --bootstrap-server kafka:9092 \ --entity-type topics \ --entity-name audit.events \ --describe

Log Compaction

Compaction retains the most recent record for each key, discarding older tombstones over time. It is the right choice for changelog topics (database CDC, entity state) where consumers need the latest value, not full history. It is the wrong choice for event streams where ordering within a time window matters.

Compaction runs in background threads controlled by log.cleaner.threads (default 1, raise to 4–6 on large clusters). The cleaner targets partitions whose "dirty ratio" (uncompacted bytes / total bytes) exceeds min.cleanable.dirty.ratio (default 0.5). For aggressive compaction on low-latency CDC topics, lower this to 0.1 and also set min.compaction.lag.ms to prevent records newer than a threshold from being compacted (useful when downstream consumers have a known maximum lag SLO).

Tombstones. A record with a null value is a tombstone — it signals deletion of that key. Tombstones are preserved for at least delete.retention.ms (default 24 hours) so consumers catch up before the key disappears entirely. If a consumer group restarts after a gap longer than delete.retention.ms, it will miss deletions. This is a documented Kafka limitation; mitigate by enforcing consumer SLOs strictly on compacted topics.

Mixed cleanup.policy trap. Setting cleanup.policy=compact,delete (enabled by default on some MSK presets) applies both: compaction runs, and segments are still dropped after retention.ms. This means even the "latest" value for a key can disappear if the topic is small enough that compaction never wins the race against deletion. For pure changelog semantics, use cleanup.policy=compact only, and accept that disk grows unbounded unless tiered storage is enabled.

Monitoring Consumer Lag

Consumer lag is the difference between the log-end offset (LEO) and the committed offset for each partition. Lag growing monotonically is the first sign of a consumer that cannot keep up; lag that spikes and recovers signals bursty processing. Both are actionable, but the response differs: the former demands scaling consumer instances or optimizing processing, the latter demands smoothing the producer side or tuning fetch sizes.

Native tooling. kafka-consumer-groups.sh --describe shows lag per partition and consumer assignment. It is correct but pulls a point-in-time snapshot; it misses lag that spikes and recovers between polls.

Prometheus integration. Deploy kafka-exporter (danielqsj/kafka-exporter) or Confluent's JMX exporter. The critical metric is kafka_consumergroup_lag per group/topic/partition. Alert on: total lag exceeding 10× the normal steady-state, lag rate-of-change positive for more than 5 minutes, any partition with lag > 0 and no active consumer assignment (orphaned partition).

Kafka Lag and ISR Monitoring Flow Kafka Broker Cluster Partition 0 LEO: 1 200 000 Partition 1 LEO: 980 000 ISR: [0, 1, 2] — healthy All replicas in sync ISR: [0] — degraded Replicas 1, 2 fell behind Log Segments seg-0 (deleted) seg-1 seg-2 (active) retention.ms enforced per segment Observability Stack JMX / kafka-exporter scrape :9308 Prometheus kafka_consumergroup_lag Alertmanager lag > threshold Grafana Dashboard lag / ISR / throughput Consumer Group: payments-svc committed offset tracked per partition Lag = LEO - Committed Offset alert when lag rate-of-change > 0 for 5 min metrics
Kafka lag and ISR monitoring flow: brokers expose metrics via JMX/exporter, Prometheus stores them, Alertmanager fires on lag growth, and Grafana surfaces the full operational picture.

Monitoring ISR (In-Sync Replicas)

The ISR set is the authoritative indicator of replication health. When a replica falls out of ISR — because it is more than replica.lag.time.max.ms (default 30 s) behind the leader — the cluster is operating below its durability guarantee. If your topic has min.insync.replicas=2 and RF=3, losing two replicas from ISR blocks all acks=all producers with NotEnoughReplicasException.

Key JMX metrics to alert on:

  • kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions — any value above 0 is a P1 alert during normal operations (0 is the only acceptable steady state).
  • kafka.server:type=ReplicaManager,name=IsrShrinksPerSec — a positive rate means replicas are actively falling behind; check network, disk I/O, and GC on the lagging broker immediately.
  • kafka.controller:type=KafkaController,name=ActiveControllerCount — must be exactly 1; 0 means a split-brain election is in progress, 2+ is impossible but indicates a monitoring bug.
  • kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce — baseline this to detect traffic spikes that precede ISR shrinkage.
Pre-flight ISR check on deploy day. Before any broker rolling restart (Kubernetes rolling update, AMI replacement, kernel patch), verify that UnderReplicatedPartitions = 0 and that all consumer groups have zero lag on critical topics. A broker restart while ISR is already degraded can knock a topic below min.insync.replicas and cause producer outages. Instrument your deployment pipeline to gate on these checks — the same way you gate on pod readiness probes.
# Check under-replicated partitions across the cluster kafka-topics.sh --bootstrap-server kafka:9092 \ --describe \ --under-replicated-partitions # Check consumer group lag for a specific group kafka-consumer-groups.sh --bootstrap-server kafka:9092 \ --group payments-svc \ --describe # Reset consumer offset to latest (use only in break-glass scenarios) kafka-consumer-groups.sh --bootstrap-server kafka:9092 \ --group payments-svc \ --topic payments.raw \ --reset-offsets \ --to-latest \ --execute # Preferred replica election (fix imbalanced partition leadership after rolling restart) kafka-leader-election.sh --bootstrap-server kafka:9092 \ --election-type PREFERRED \ --all-topic-partitions

Operational Runbook Patterns

Senior engineers encode reactions to ISR shrinkage and runaway lag as runbook steps, not intuition:

  1. ISR shrinkage on one broker. Check iostat -xz 1 and GC logs on that broker. If disk I/O wait exceeds 80%, the broker is log-flushing too eagerly — lower log.flush.interval.messages pressure by ensuring OS page-cache flush is handling it (log.flush.interval.ms should be unset, delegating to the OS). If GC pause exceeds 200 ms, tune heap or upgrade to G1GC/ZGC.
  2. Consumer lag growing steadily. Add consumer instances up to the partition count (beyond that they are idle). Check whether processing is CPU-bound or I/O-bound. If I/O-bound (external DB calls), decouple with an async worker pool. Increase max.poll.records only if processing time per record is very low — larger batches increase commit latency and reprocessing on crash.
  3. Disk saturation. Emergency: reduce retention.bytes on the largest topics, or trigger early segment deletion via kafka-delete-records.sh to the current offset minus a safe buffer. Long-term: add brokers and rebalance partitions.
Kafka operates on trust in the ISR contract. The moment you override unclean.leader.election.enable=true (disabled by default since Kafka 0.11), you trade consistency for availability — a lagging replica can become leader, causing message loss. This is never acceptable for financial or audit topics. For high-availability workloads that can tolerate reprocessing (metrics ingestion, clickstream), you might enable it as a last resort during a full ISR loss event, but document it as an explicit architectural decision and revert immediately after recovery.