AD created KAFKA-10489: -------------------------- Summary: Committed consumer offsets not sent to consumer on rebalance Key: KAFKA-10489 URL: https://issues.apache.org/jira/browse/KAFKA-10489 Project: Kafka Issue Type: Bug Components: consumer, offset manager Affects Versions: 2.1.0 Reporter: AD
Committed consumer offsets not sent to consumer on rebalance Hi, I recently ran into an issue that my kafka cluster did not report committed consumer offsets to a client even though they are available in the __consumer_offset topic. h2. Preparation h3. Setup * Kafka 2.10 (2.12) * Three node cluster (one kafka & one zookeeper each) * Consumer Application (springboot 2.2.6 (w/ spring-kafka-2.3.7 & kafka-clients-2.3.1)) * all timestamps are in UTC h3. Scenario: * Topic: topic-0 (one partition, timebased retention: -1) * Group: group-0 (one group member only) h2. Issue After one node of the cluster was rebooted a rebalance lead to the consumer not getting the offsets stored in the {{__consumer_offsets}} topic but got the default (which in our case is {{latest}}). This lead to the consumer missing some data that got produced in the meantime. h2. Expectation I would have expected that the consumer receives the last committed offset, in this case {{334389}}. h2. Logs h3. Kafka h4. Server Log {noformat} 2020-09-10 10:12:11,349 INFO [KafkaServer id=0] Starting controlled shutdown (kafka.server.KafkaServer) ... 2020-09-10 10:12:12,299 INFO [ProducerStateManager partition=consumer-topic-0] Writing producer snapshot at offset 334408 (kafka.log.ProducerStateManager) ... 2020-09-10 10:12:12,601 INFO Shutdown complete. (kafka.log.LogManager) ... 2020-09-10 10:13:31,095 INFO starting (kafka.server.KafkaServer) {noformat} h4. Group coordinator {noformat} [consumer,topic,0]::OffsetAndMetadata(offset=334389, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1599732710804, expireTimestamp=None) // timestamp: 2020-09-10 10:11:50.804 [consumer,topic,0]::OffsetAndMetadata(offset=334408, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1599732737661, expireTimestamp=None) // timestamp: 2020-09-10 10:12:17.661 {noformat} h3. Consumer h4. Log {noformat} Sep 10, 2020 @ 10:12:17.541 [Consumer clientId=consumer-topic-0, groupId=group-0] Successfully joined group with generation 1 Sep 10, 2020 @ 10:12:17.549 [Consumer clientId=consumer-topic-0, groupId=group-0] Setting newly assigned partitions: topic-0 Sep 10, 2020 @ 10:12:17.553 [Consumer clientId=consumer-topic-0, groupId=group-0] Found no committed offset for partition topic-0 Sep 10, 2020 @ 10:12:17.661 [Consumer clientId=consumer-topic-0, groupId=group-0] Resetting offset for partition topic-0 to offset 334408. {noformat} h4. Stacktrace {noformat} Consumer exception java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1173) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:955) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2039) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1862) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:983) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:929) ... 3 common frames omitted {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)