02. Producer, consumer, keys — the developer's daily surface¶
~11 min read. Writing producers and consumers. Choosing keys. Managing offsets. Schema evolution. The day-to-day Kafka surface.
Builds on: 01-partitions-log-replicas-internals.md.
The previous chapter explained the model. This chapter is what you actually write.
1) The producer — basics¶
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=10,
max_in_flight_requests_per_connection=5,
enable_idempotence=True,
compression_type='lz4',
linger_ms=10,
batch_size=64 * 1024,
)
future = producer.send(
'orders',
key='customer-42',
value={'order_id': 1234, 'amount': 5000},
)
record_metadata = future.get(timeout=10)
print(record_metadata.partition, record_metadata.offset)
Production-grade settings explained:
bootstrap_servers— initial broker list. Producer fetches the full cluster metadata after first contact.acks='all'— wait for all in-sync replicas before acking.retries=10— retry transient send failures.enable_idempotence=True— Kafka deduplicates retried sends. Pairs withacks='all'. Always enable.compression_type='lz4'— compress batches before send. Lz4 is the speed/compression sweet spot; snappy is older; zstd compresses more but uses more CPU.linger_ms=10— wait up to 10ms to batch more messages. Higher = better throughput, slightly higher latency.batch_size=64KB— max batch size before sending.
The defaults are surprisingly close to right for most workloads. The two non-defaults worth setting are acks='all' and enable_idempotence=True.
2) Choosing the key¶
The key determines:
- Which partition the message lands on (
hash(key) % num_partitions). - Whether log compaction keeps it (compaction key).
- Ordering — same key → same partition → in-order.
Choices:
Customer/user ID. Order events for one customer are in order. Different customers parallelise. Standard for most workloads.
Order/transaction ID. Same-order events together. Useful if events update over time (created, paid, shipped).
No key (key=None). Messages go round-robin to partitions. No ordering. Maximum parallelism.
Composite key. f"{tenant_id}:{customer_id}". Useful for multi-tenant ordering.
The trap: skewed keys. If 80% of traffic is one customer, that customer's partition is hot. Mitigations: append randomness for the dominant key (loses strict order but balances load), or accept the skew and scale the slow partition's consumer.
3) The consumer — basics¶
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
group_id='order-processor',
enable_auto_commit=False,
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
max_poll_records=500,
max_poll_interval_ms=300000, # 5 min — must exceed batch processing time
session_timeout_ms=10000,
heartbeat_interval_ms=3000,
)
for message in consumer:
process(message.value, key=message.key)
consumer.commit()
The settings that matter:
group_id— consumer group identifier. Same group = cooperate; different group = independent reads.enable_auto_commit=False— production default; commit manually after processing.auto_offset_reset— what to do when no committed offset exists:'earliest'(replay from start) or'latest'(skip).max_poll_records— max messages per poll. Default 500; tune to processing speed.max_poll_interval_ms— max time between polls before consumer is considered dead. Default 5min. If your batch takes longer, raise this; otherwise rebalances fire.session_timeout_ms— how long without a heartbeat before considered dead. Default 10s.heartbeat_interval_ms— how often to heartbeat. Default 3s.
A common bug: long-running batches exceed max_poll_interval_ms; rebalance fires; consumer thinks it owns the partition; commits stale offset. Always size max_poll_interval_ms larger than your worst-case batch time.
4) The poll-process-commit loop¶
def consume(consumer):
while True:
records = consumer.poll(timeout_ms=1000, max_records=500)
for tp, messages in records.items():
for message in messages:
try:
process(message.value)
except Exception:
log.exception("Processing failed for offset %d", message.offset)
raise # re-raise to skip commit; message will redeliver
# Commit the offset of the LAST processed message + 1
consumer.commit({tp: OffsetAndMetadata(messages[-1].offset + 1, None)})
The commit semantics: you commit "the next offset to process," not the last processed. If the last processed was offset 1042, commit 1043.
For at-least-once: process, then commit. On crash, you re-process.
For at-most-once: commit, then process. On crash, you skip.
The standard is at-least-once with idempotent processing.
5) Manual partition assignment vs. group assignment¶
Two ways to consume:
Subscribe (group assignment). consumer.subscribe(['orders']). The group coordinator assigns partitions; consumers in the group share partitions. Use for scale-out.
Assign (manual assignment). consumer.assign([TopicPartition('orders', 0), TopicPartition('orders', 1)]). The consumer manually picks partitions. No group coordination. Use for special cases — replay from specific offsets, batch jobs reading specific partitions, debugging.
For most workloads, subscribe is right. assign is for ops tools and one-off scripts.
6) Seeking — replay from arbitrary positions¶
consumer.assign([TopicPartition('orders', 0)])
consumer.seek_to_beginning(TopicPartition('orders', 0)) # replay from start
consumer.seek(TopicPartition('orders', 0), 5000) # specific offset
consumer.seek_to_end(TopicPartition('orders', 0)) # skip to current
# Time-based seek
timestamps = {TopicPartition('orders', 0): int(time.time() * 1000) - 86400000} # 24h ago
offsets = consumer.offsets_for_times(timestamps)
consumer.seek(TopicPartition('orders', 0), offsets[TopicPartition('orders', 0)].offset)
Replay is a Kafka superpower. New analytics service can replay 30 days. Bug in consumer caused bad writes? Reset to before the bug and reprocess. The retention setting is your replay window.
7) Schema management — Avro, Protobuf, or JSON¶
Messages on Kafka are bytes. The application chooses serialisation:
JSON. Simple, human-readable, larger on the wire. Schema enforcement at the application layer. Standard for early-stage teams.
Avro. Compact binary, schema-on-write, schema registry for evolution. Standard for mature pipelines.
Protobuf. Compact binary, statically-typed schemas, language-agnostic. Standard for cross-team systems.
Schema Registry. Confluent's Schema Registry (or Apicurio, AWS Glue Schema Registry) stores schemas, versions them, and enforces compatibility. Producer registers schema; consumer fetches schema by ID; evolution happens via compatibility rules (backward, forward, full).
Schema evolution rules:
- Backward compatible. New schema can read old data. Default for production — consumers can deploy before producers.
- Forward compatible. Old schema can read new data. Useful if consumers lag.
- Full compatible. Both. Strictest.
Without a schema registry, evolution is brittle — every producer/consumer must agree on the JSON shape; renaming a field is a coordinated deploy.
8) Producer error handling¶
try:
future = producer.send('orders', key=key, value=value)
record_metadata = future.get(timeout=10)
except KafkaTimeoutError:
# Failed to send within timeout — retries exhausted
log.error("Send timeout")
# Decide: retry, escalate, or DLQ-style handling
except KafkaError as e:
log.exception("Kafka error: %s", e)
For high-throughput producers, don't block on future.get() per message. Instead, use callbacks:
def on_send_success(record_metadata):
log.debug("Sent to %s offset %d", record_metadata.topic, record_metadata.offset)
def on_send_error(exc):
log.error("Send failed: %s", exc)
producer.send('orders', key=key, value=value).add_callback(on_send_success).add_errback(on_send_error)
The producer maintains in-flight batches; callbacks fire on broker ack or error.
For shutdown: producer.flush() waits for all in-flight batches to complete. Call before closing.
9) Multi-region producers¶
For workloads spanning regions:
Local producer, async replication. App writes to local Kafka cluster; MirrorMaker 2 (or Confluent Replicator) replicates to the other region. Low write latency; mirroring adds RPO (recovery point objective) seconds-to-minutes.
Stretched cluster. One Kafka cluster across regions with rack-aware replicas (replicas placed on different racks/regions). Synchronous replication; higher latency; stronger durability.
Active-active. Each region has its own cluster; events flow both ways via MirrorMaker. Application handles deduplication and conflict resolution.
Choice depends on the workload's latency vs. durability vs. operational tolerance.
10) The threaded example — a Django producer¶
# Inside a Django service
from kafka import KafkaProducer
from django.db import transaction
producer = KafkaProducer(
bootstrap_servers=settings.KAFKA_BROKERS,
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode() if k else None,
acks='all',
enable_idempotence=True,
retries=10,
compression_type='lz4',
)
def submit_order(request):
with transaction.atomic():
order = Order.objects.create(...)
transaction.on_commit(lambda: producer.send(
'orders',
key=str(order.customer_id),
value={
'event_type': 'order.created',
'order_id': order.id,
'customer_id': order.customer_id,
'amount': float(order.amount),
'timestamp': order.created_at.isoformat(),
},
))
return JsonResponse({'order_id': order.id})
transaction.on_commit ensures the Kafka send happens only if the database commit succeeds. The order ID becomes the natural idempotency token — consumers process the same ID multiple times safely.
For must-deliver semantics (where a failed send must not silently drop), use a transactional outbox (chapter 02 of SQS) instead — write to an outbox table; a separate worker drains the outbox to Kafka.
Operational signals¶
Healthy. Producer send latency p99 within target; consumer lag stable; per-partition lag balanced (no hot partition); error rate near zero.
First degrading metric. Per-partition consumer lag — one partition climbing means a hot key or a slow consumer for that partition.
Misleading metric. Aggregate lag across partitions — masks per-partition skew.
Expert graph. Per-partition lag, per-consumer-group lag distribution, producer error rate by error class.
Where this appears in production¶
- Netflix — JSON producers transitioning to Avro for schema-rigour at scale.
- Uber — Avro schema registry for cross-team event contracts.
- LinkedIn — the reference; everything Kafka.
- Pinterest — Protobuf for stricter typing across services.
- A Bengaluru fintech —
transaction.on_commitfor every payment event producer. - A Mumbai SaaS — Avro + Schema Registry; backward compatibility enforced in CI.
- A Goa-based startup — manual partition assignment for replay scripts; subscribe for production.
- A Pune analytics platform — multi-region MirrorMaker 2; per-region active producers.
Recall / checkpoint¶
- What is
enable_idempotenceand what does it solve? - How do you pick a message key?
- What does
max_poll_interval_mscontrol? - What is the difference between subscribe and assign?
- What is the schema registry's role?
- What is the seek-by-time pattern?
- Why is
transaction.on_committhe standard for producing from a web handler?
Interview Q&A¶
Q1. The team's producer is dropping messages under load. Walk through diagnosis.
Check producer-side error logs and metrics. Common causes: (1) acks=0 or acks=1 with broker failure — messages acked before durability; (2) retries too low; (3) producer queue full (max.block.ms reached). Fix: acks='all', retries=10 (or more), enable_idempotence=True. Without idempotence, retries can duplicate; with it, retries are safe. Watch for producer-side queue fullness — the producer's internal buffer is finite. Common wrong answer to avoid: "scale brokers" — the loss is producer config.
Q2. A consumer rebalances every minute. Walk through the diagnosis.
Frequent rebalances mean group membership is unstable. Causes: (1) max_poll_interval_ms too small — batches take longer than the interval; consumer is marked dead. (2) session_timeout_ms too small — heartbeats miss due to GC or network. (3) Consumers crashing and restarting. Diagnosis: check consumer logs for "max poll interval" or "session timeout" warnings. Fix: raise max_poll_interval_ms to comfortably exceed worst-case batch time; raise session_timeout_ms if heartbeats are missed; stabilise the consumer process. Common wrong answer to avoid: "shorten the timeout" — fewer false negatives means longer recovery on real crashes.
Q3. A single partition has 10× the lag of others. Walk through the diagnosis. Hot-key skew. The key chosen has unequal distribution; one partition gets disproportionate traffic. Solutions: (1) revise the key — compound key, hash-prefix, or random suffix where order isn't strictly required; (2) add more partitions — only works if keys are diverse; one hot key still maps to one partition; (3) scale the slow consumer for that partition specifically (one consumer can handle multiple partitions; reverse not true). The structural fix is key choice. Common wrong answer to avoid: "rebalance the partitions" — Kafka's hash assignment is deterministic by key.
Q4. The schema for an event needs a new optional field. Walk through the deploy. With a schema registry and backward compatibility: add the field as optional with a default value in the new schema; publish to registry. Producers continue sending old format; consumers that update to the new schema read old data correctly (default fills in for missing field). Consumers can deploy before producers. After all producers are updated, the field becomes required if needed (with a forward-compatible step). Without a schema registry: every consumer must update its parsing code; deploy order matters; coordination is fragile. Common wrong answer to avoid: "version the topic" — adds operational complexity; schema evolution is the structural answer.
Q5. The team uses Kafka for cross-team event distribution. A team wants to add a new consumer to a topic without affecting the existing pipeline. Walk through the design.
A new consumer group. The new group reads the same topic independently; offsets are tracked separately; rebalance is contained to the new group. Set auto_offset_reset='earliest' to replay from the start (limited by retention) or 'latest' to start from current. The new consumer's success or failure doesn't affect the existing groups. This is the structural reason teams choose Kafka for event distribution. Common wrong answer to avoid: "fan out at the producer" — Kafka makes this free.
Q6. Walk through a Django web handler that needs to produce a Kafka event after a database write.
Use transaction.on_commit(lambda: producer.send(...)). The send is queued in the database transaction's on_commit hook; it runs only after the commit succeeds. If the transaction rolls back, the send never queues. Pair with idempotence at the consumer (use the row's primary key as the natural idempotency token). For workloads where send failure must not silently drop (regulated), use a transactional outbox: write the event to an outbox table within the transaction; a separate worker drains the outbox to Kafka. Common wrong answer to avoid: "send before save" — risks emitting events for transactions that never commit.
Operational memory¶
This chapter explained the day-to-day Kafka surface: producer config (acks, idempotence, batching), consumer config (group, offset reset, max poll), key choice, schema management, seek and replay, multi-region patterns, and Django integration. The important idea is that defaults are reasonable but production-grade settings (acks='all', idempotence, manual commit) are the standard.
You learned to configure producers and consumers, choose keys to balance partitions, evolve schemas safely, seek to specific positions, and integrate Kafka with web handlers via on-commit. That solves the day-to-day; production gotchas come next.
Carry this diagnostic forward: when Kafka behaves unexpectedly in your application, ask which configuration is at fault — acks, key, max-poll-interval, schema, or commit timing. Each has a known structural fix.
Remember:
acks='all'+enable_idempotence=Trueis the production default.- The key determines partition; choose for ordering and balance.
max_poll_interval_msmust exceed worst-case batch processing time.- Commit after processing; idempotent processing handles re-delivery.
- Schema registry with backward compatibility enables independent deploys.
Bridge. The developer surface is set. Production has its own surface: rebalance storms, retention surprises, broker failures, monitoring. The next chapter is that catalogue. → 03-rebalance-retention-prod-gotchas.md