Skip to content

Fix: Kafka Consumer Not Receiving Messages, Connection Refused, and Rebalancing Errors

FixDevs · (Updated: )

Part of:  JavaScript & TypeScript Errors

Quick Answer

How to fix Apache Kafka issues — consumer not receiving messages, auto.offset.reset, Docker advertised.listeners, max.poll.interval.ms rebalancing, MessageSizeTooLargeException, and KafkaJS errors.

The Error

Your Kafka consumer connects successfully but receives no messages:

[Consumer] INFO Subscribed to topic(s): orders
[Consumer] INFO Kafka Consumer started
# ... silence. No messages.

Or you hit a connection error on startup:

# KafkaJS
KafkaJSConnectionError: Connection error: getaddrinfo ENOTFOUND kafka kafka:9092

# confluent-kafka-python
%3|1672531200.000|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]:
localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed:
Connection refused (after 2ms in state CONNECT)

Or the consumer gets kicked from its group repeatedly:

Heartbeat timeout. The coordinator considers the consumer dead.
Group rebalancing triggered.

Or a producer fails to send:

org.apache.kafka.common.errors.TimeoutException: Expiring 150 record(s) for
orders:0: 30025 ms has passed since batch creation plus linger time

Each of these is a separate root cause that needs a different fix.

Why This Happens

Most Kafka issues come down to three concepts:

  1. Offsets: Kafka tracks where each consumer group left off. If the offset is already at the latest message, the consumer sits idle waiting for new ones — it won’t re-read old messages unless you reset.
  2. Advertised listeners: Kafka tells clients which address to connect to. In Docker, this address is often wrong — the broker advertises a hostname that only resolves inside the container.
  3. Poll interval: Consumers must call poll() within max.poll.interval.ms or Kafka removes them from the group and reassigns their partitions.

A useful frame: Kafka is a distributed log, not a message queue. Producers write to a partitioned, append-only log; consumers track their own read position into that log. Almost every confusing behaviour you’ll meet — silent consumers, sudden rebalances, duplicate processing, “where did my message go” — falls out of that single design choice. The broker doesn’t push messages to consumers and doesn’t track per-consumer state in the same way an AMQP broker does. It hands out partitions to a consumer group and trusts each consumer to advance its own offset.

The other concept that catches teams off guard is the split between client-side state and broker-side state. The broker only knows: which topics exist, where each consumer group’s committed offset is, and which member of a group owns each partition. Everything else — the actual poll loop, the in-flight batch, the heartbeat thread, the deserialization — runs in the client library. That’s why client tuning (max.poll.records, max.poll.interval.ms, fetch.min.bytes) often matters more than broker tuning, and why a “Kafka problem” is frequently a problem with the Java/Python/JS client you chose.

Fix 1: Consumer Not Receiving Messages — Check auto.offset.reset

When a consumer joins a group for the first time (no committed offset), or when its last committed offset has expired from the log, Kafka looks at auto.offset.reset:

ValueBehavior
latestStart from now — only read messages produced after the consumer joined. Old messages are skipped.
earliestStart from the beginning — read all retained messages in the topic.
noneThrow an error if no valid committed offset exists.

The default is latest. This is why a consumer that joins after messages were produced sees nothing — it started after those messages and there’s no new data yet.

Fix: Set auto.offset.reset=earliest when you need to read existing messages:

# confluent-kafka-python
from confluent_kafka import Consumer

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'  # Read from beginning on first join
})
// KafkaJS
const consumer = kafka.consumer({
  groupId: 'my-group',
  fromBeginning: true  // Equivalent to auto.offset.reset=earliest
});

await consumer.subscribe({ topic: 'orders', fromBeginning: true });

Important: auto.offset.reset only applies when there’s no committed offset for the consumer group and topic. If the group already has a committed offset, Kafka uses that instead — regardless of this setting. To force re-reading from the beginning, either use a new group.id or reset the offsets manually.

Common Mistake: Developers test with auto.offset.reset=earliest, successfully read old messages, then deploy to production with latest (the default) and wonder why the consumer is silent. Always be explicit about auto.offset.reset in your configuration — never rely on the default in production code.

Wrong topic name: Also check for typos. Kafka topic names are case-sensitive. Orders and orders are different topics. If auto-create is enabled, Kafka may silently create a new empty topic instead of throwing an error.

Fix 2: Consumer Not Receiving Messages — Too Many Consumers

You can have at most one consumer per partition within the same consumer group. Extra consumers sit idle with zero partitions assigned.

Topic: orders
  ├── partition 0 → consumer-1
  ├── partition 1 → consumer-2
  └── partition 2 → consumer-3

consumer-4 → (no partition assigned, receives nothing)
consumer-5 → (no partition assigned, receives nothing)

If you have 2 consumers but 1 partition, one consumer handles all messages and the other receives nothing. The fix is to increase the partition count — you need at least as many partitions as the maximum number of consumers you want to run in parallel:

# Check partition count
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic orders

# Increase partitions (cannot be decreased after creation)
kafka-topics.sh --bootstrap-server localhost:9092 --alter \
  --topic orders --partitions 6

Note: Increasing partitions on an existing topic doesn’t rebalance existing data. New messages are distributed across all partitions going forward. Consider partition count at topic creation time — the rule of thumb is: partitions = max expected consumer parallelism.

Fix 3: Connection Refused — Docker advertised.listeners Not Set

This is the most common Docker + Kafka issue. When a Kafka broker starts, it tells clients which address to use for future connections. This is the advertised listener — and by default it’s the broker’s hostname, which is only resolvable inside the Docker network.

Your client connects, gets metadata saying “connect to kafka:9092”, then fails because kafka doesn’t resolve outside the container.

Fix: Configure two listener names — one for Docker-internal connections, one for host connections:

The env var names differ by Docker image — Confluent (confluentinc/cp-kafka) uses KAFKA_*, Bitnami uses KAFKA_CFG_* — but the underlying Kafka property names are the same. Here’s the Bitnami image format which also works well with Kafka 4.x KRaft mode:

# docker-compose.yml
services:
  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    environment:
      # KRaft mode (no ZooKeeper required in Kafka 4.x)
      KAFKA_CFG_NODE_ID: "1"
      KAFKA_CFG_PROCESS_ROLES: "broker,controller"
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      # Two listeners: one for Docker network, one for host
      KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  • Applications inside Docker (same docker-compose.yml) connect to kafka:29092
  • Applications on your host machine connect to localhost:9092
# From host machine
consumer = Consumer({'bootstrap.servers': 'localhost:9092', ...})

# From another Docker container in the same network
consumer = Consumer({'bootstrap.servers': 'kafka:29092', ...})

Pro Tip: If you see ENOTFOUND kafka from your host application, you’re using the internal listener address. If you see Connection refused from inside Docker, you’re using the external one. Check which bootstrap server address your application is actually using. For general Docker networking issues between containers, see docker-compose networking not working.

Fix 4: Consumer Group Rebalancing — max.poll.interval.ms Exceeded

Kafka requires consumers to call poll() regularly to prove they’re still alive. If processing takes longer than max.poll.interval.ms (default: 5 minutes), Kafka declares the consumer dead and triggers a rebalance — all consumers in the group stop processing while partitions are reassigned.

You’ll see something like:

WARN  The coordinator considers the consumer dead. Sending the consumer to rejoin the group.
INFO  Group rebalancing triggered (max.poll.interval.ms exceeded)

Two fixes depending on the root cause:

If your processing is slow but expected:

# confluent-kafka-python — increase the interval
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'max.poll.interval.ms': 600000,  # 10 minutes
    'max.poll.records': 50           # Process fewer records per poll
})
// KafkaJS — heartbeat is sent automatically in eachMessage
// For slow processing, use eachBatch and call heartbeat manually
await consumer.run({
  eachBatch: async ({ batch, heartbeat }) => {
    for (const message of batch.messages) {
      await processMessage(message);  // Slow operation
      await heartbeat();              // Tell Kafka we're still alive
    }
  }
});

If processing is fast but messages per batch are too many:

Reduce max.poll.records so each poll cycle finishes quickly:

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'max.poll.records': 100  # Process 100 instead of 500 per poll
})

The heartbeat / session timeout relationship:

heartbeat.interval.ms = 3000     (send heartbeat every 3s)
session.timeout.ms = 10000       (broker waits 10s for heartbeat)
max.poll.interval.ms = 300000    (5 min max between poll() calls)

Keep heartbeat.interval.ms at roughly 1/3 of session.timeout.ms. Setting session.timeout.ms too low causes frequent unnecessary rebalances; too high means failed consumers take longer to be detected.

Fix 5: Offset Management — Consumers Reprocessing After Restart

If your consumer reprocesses the same messages every time it restarts, it’s not committing offsets.

With enable.auto.commit=true (default):

Auto-commit happens every auto.commit.interval.ms (default: 5 seconds). If the consumer crashes between processing a message and the next auto-commit, those messages are reprocessed on restart. This is acceptable for idempotent workloads (e.g., updating a counter), but problematic for side effects (e.g., sending emails).

With enable.auto.commit=false (manual commit):

You must explicitly commit after processing:

# confluent-kafka-python
from confluent_kafka import Consumer, KafkaException

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    if msg.error():
        raise KafkaException(msg.error())

    # Process the message
    process_order(msg.value())

    # Commit only after successful processing
    consumer.commit(message=msg, asynchronous=False)
// KafkaJS — autoCommit is a run() option, not a consumer() option
const consumer = kafka.consumer({ groupId: 'my-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });

await consumer.run({
  autoCommit: false,  // Disable auto-commit here
  eachMessage: async ({ topic, partition, message }) => {
    await processOrder(message.value);
    // Commit offset + 1 (next message to read)
    await consumer.commitOffsets([{
      topic,
      partition,
      offset: (Number(message.offset) + 1).toString()
    }]);
  }
});

Common Mistake: When committing manually, commit offset + 1, not the message’s offset. Kafka’s committed offset means “the next offset to read”, not “the last offset read.”

Fix 6: Producer Send Failures

TimeoutException: Expiring X record(s):

The producer buffered messages but couldn’t deliver them to the broker before request.timeout.ms expired. Check the broker is running and reachable first. Then tune the producer:

from confluent_kafka import Producer

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'request.timeout.ms': 30000,    # Wait 30s for broker ack (default varies)
    'linger.ms': 100,               # Wait 100ms for batch to fill before sending
    'batch.size': 32768,            # 32KB batches
    'buffer.memory': 67108864,      # 64MB buffer
    'acks': 'all'                   # Wait for all replicas to acknowledge
})

MessageSizeTooLargeException:

Default max message size is 1 MB. You must increase it in three places — they must all be consistent:

# Broker: message.max.bytes (in server.properties or environment variable)
KAFKA_MESSAGE_MAX_BYTES=10485760  # 10MB

# Producer: max.request.size
# Consumer: max.partition.fetch.bytes
# Producer
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'max.request.size': 10485760  # 10MB
})

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'max.partition.fetch.bytes': 10485760  # 10MB
})

BufferExhaustedException:

The producer’s memory buffer is full — you’re producing faster than messages are being sent. Increase buffer.memory (default: 32 MB) or reduce the production rate:

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'buffer.memory': 134217728,  # 128MB
    'max.block.ms': 5000         # Block for 5s before throwing (default: 60s)
})

Kafka vs RabbitMQ, NATS, Redpanda, Pulsar, Kinesis, Pub/Sub

Picking a message system is mostly about how much durability, throughput, and operational overhead you can absorb. A short comparison frames where Kafka fits.

Kafka. Distributed append-only log with strong durability and replay. Hundreds of MB/s per broker, multi-day retention, exact-once semantics with transactions. Ops cost is high — brokers, controller (or KRaft) quorum, JVM tuning, partition planning. Pick Kafka when you need durable, replayable streams and you have ops capacity.

RabbitMQ. Classic AMQP broker. Lower throughput than Kafka but much simpler routing (exchanges, bindings, queues). Optimized for work-queue and pub/sub patterns where messages disappear after they’re acknowledged. Pick RabbitMQ for traditional task queues, RPC-over-broker, fan-out without replay. Connection-level failure patterns are covered in Fix: RabbitMQ Connection Refused.

NATS / NATS JetStream. Lightweight, Go-based broker. Core NATS is fire-and-forget pub/sub at extreme speed; JetStream adds Kafka-like persistence and consumer groups. Operationally far simpler than Kafka — a single self-clustering binary, no ZooKeeper, no JVM. Throughput is competitive for in-memory traffic; JetStream durability is solid but the ecosystem is smaller than Kafka’s. Pick NATS for service-to-service messaging in microservices, especially when you want low operational overhead.

Redpanda. Kafka-API compatible broker written in C++. Designed as a drop-in Kafka replacement with no ZooKeeper, no page cache reliance, and substantially lower tail latency. Same clients, same protocols. Trade-off: smaller community than Kafka, commercial features (tiered storage, schema registry integrations) behind a paid tier. Pick Redpanda when you want Kafka semantics without JVM tuning or want predictable sub-millisecond P99.

WarpStream. Newer entrant: also Kafka-API compatible but uses object storage (S3) as the durability layer. Brokers are stateless and cheap to scale. Latency is higher than Kafka or Redpanda (S3 round trip per write), but cost is dramatically lower for high-throughput retain-forever workloads. Pick WarpStream for analytics ingestion where seconds-of-latency is acceptable and storage cost dominates the bill.

Apache Pulsar. Distributed log + queue hybrid, with separate storage layer (Apache BookKeeper). Multi-tenancy and geo-replication are first-class. Operationally heavier than Kafka. Used at very large enterprises that need tiered storage and tenant isolation built in.

AWS Kinesis. Managed Kafka-like service from AWS. No brokers to run. Pricing is per shard-hour + per PUT; replay is limited to 24h by default, extendable to 365 days. Pick Kinesis when you’re committed to AWS and you don’t want to operate brokers. The Kafka API differences mean you can’t reuse Kafka clients directly. See Fix: AWS SQS Not Working for how AWS’s other queue offering compares — Kinesis is for streams, SQS is for queues, and they answer different questions.

Google Cloud Pub/Sub. Managed, fully serverless. No partition concept; ordering is per ordering key, not per partition. Pull or push delivery, automatic scaling. Pick Pub/Sub for GCP-native event distribution where you don’t care about replay and want zero ops.

Quick decision table:

NeedPick
Durable replayable log, high throughputKafka, Redpanda
Traditional task queue, RPCRabbitMQ
Microservice mesh, ultra-low opsNATS / JetStream
Kafka semantics, cheaper storageWarpStream
Multi-tenant + tiered storagePulsar
Managed AWS, in-AWS onlyKinesis (streams) / SQS (queues)
Managed GCP, no replayPub/Sub
Kafka semantics, no JVM opsRedpanda

Throughput vs durability is the main axis. Kafka and Redpanda sit at the high-throughput-with-strong-durability corner. NATS core sits at high-throughput-with-weak-durability. RabbitMQ sits at moderate-throughput-with-strong-routing. WarpStream sits at high-throughput-with-cheap-but-slow-durability. Pick the corner that matches your workload before tuning anything.

If you’ve inherited a Kafka deployment and you’re considering migration, evaluate the move carefully. Kafka clients are widely available (Java, Python, JS, Go, .NET, Rust), the on-disk format is stable, and KRaft mode has matured. The most common reasons to move are ops cost (NATS or Redpanda) or storage cost (WarpStream) — not “Kafka itself is broken.”

Still Not Working?

LEADER_NOT_AVAILABLE on First Connection

This error appears briefly during topic creation or broker startup while partition leaders are being elected. It’s transient and resolves within a few seconds. If it persists:

  1. Check that all brokers are running: kafka-broker-api-versions.sh --bootstrap-server localhost:9092
  2. Check partition leader status: kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic your-topic — a leader of -1 means no leader is elected
  3. Ensure the topic exists and has the correct replication factor for your broker count (can’t have replication factor of 3 with only 1 broker)

Kafka 4.x — ZooKeeper Is Gone (KRaft Mode)

As of Kafka 4.0 (released early 2025), ZooKeeper support is completely removed. If you’re upgrading from Kafka 3.x and your config still references ZooKeeper, it won’t start:

# Old config (Kafka 3.x with ZooKeeper) — no longer works in 4.x
zookeeper.connect=localhost:2181

# New KRaft config (Kafka 4.x)
# No ZooKeeper needed — Kafka manages its own metadata

For local development with Docker on Kafka 4.x, use the official apache/kafka or bitnami/kafka image which defaults to KRaft mode. Remove any ZooKeeper containers from your docker-compose.yml. See docker-compose depends-on not working if your application container starts before Kafka is ready and fails on the first connection attempt.

KafkaJS — Graceful Shutdown

Calling consumer.disconnect() from inside an eachMessage or eachBatch handler throws an error. Disconnect must happen outside the message loop:

const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'my-group' });

const shutdown = async () => {
  await consumer.disconnect(); // Safe to call here
  process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });

// eachMessage handler — do NOT call consumer.disconnect() here
await consumer.run({
  eachMessage: async ({ message }) => {
    await processMessage(message);
  }
});

SASL Authentication Errors

If you’re connecting to a managed Kafka service (Confluent Cloud, AWS MSK, Aiven) and getting authentication errors:

# confluent-kafka-python with SASL PLAIN + TLS
consumer = Consumer({
    'bootstrap.servers': 'your-broker.example.com:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'your-api-key',
    'sasl.password': 'your-api-secret',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})
// KafkaJS with SASL PLAIN
const kafka = new Kafka({
  brokers: ['your-broker.example.com:9092'],
  ssl: true,
  sasl: {
    mechanism: 'plain',
    username: 'your-api-key',
    password: 'your-api-secret'
  }
});

Check that the sasl.mechanism matches what the broker requires — PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 are not interchangeable. If you’re using a similar message broker and hitting connection issues, RabbitMQ connection refused covers the same class of AMQP authentication and network problems.

Checking Consumer Group Status

When a consumer connects but receives nothing, inspect the consumer group:

# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Describe a group — shows lag, offsets, and partition assignments
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-group

The LAG column tells you how many messages the consumer is behind. If lag is 0 and no messages arrive, the consumer is caught up and waiting for new messages (expected behavior). If lag is large and not decreasing, the consumer is stuck.

If your KAFKA_CFG_* environment variables are set but not taking effect, double-check the .env file precedence rules in Compose — variable substitution happens at compose-parse time, not at runtime, so values written to a .env after docker compose up don’t propagate into running containers.

To reset offsets to the beginning for a stopped consumer group:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group --topic orders \
  --reset-offsets --to-earliest --execute

Consumer Lag Increases Even Though Throughput Looks Fine

Lag grows when consume rate is lower than produce rate, even if the consumer reports “no errors”. Common hidden causes: a per-message DB write that’s slow (the consumer is healthy but blocked on the sink), a partition skew where one key dominates volume so one consumer handles 90% of the traffic, or auto-commit succeeding on records that never finished processing because of an unhandled exception in a worker thread. Add per-consumer Prometheus metrics for records-consumed-rate and broker metrics for messages-in-rate and graph them on the same chart — divergence between the two lines tells you which side is at fault.

Records Disappear Silently — Producer Acks vs Retention

If a producer sends with acks=0 or acks=1 and the leader broker crashes before replicas catch up, the message is gone with no error logged. Set acks=all and min.insync.replicas=2 (with replication factor 3) for any topic where data loss is unacceptable. The other silent loss case is retention. The broker enforces retention.ms and retention.bytes per partition; if a topic is busier than expected, old records are deleted to keep within the byte budget even though the time hasn’t elapsed. Check retention.bytes on the topic before assuming the consumer dropped the message.

OffsetOutOfRange on Consumer Restart

Happens when the consumer’s committed offset is older than what the broker still has on disk. Retention rolled past it. The consumer then applies auto.offset.reset — and if you set it to latest, it silently jumps to the head and skips all the messages between the old committed offset and now. For long-stopped consumers, set auto.offset.reset=earliest before restart and accept the reprocessing, or use the consumer-groups CLI to seek to the closest available offset explicitly.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles