Hi Team,
We are experiencing some odd behaviour of CooperativeStickyAssignor - during 
rebalancing of partitions a consumer receives new partition and keeps old ones. 
Still there is CommitFailedException received for one of the old partitions and 
the offsets are not retried, we receive directly the next batch and thus 
skipping some of the offsets.Partition in question is receivedPartition: [27] 
and as you can see offset 21551049 is received, then the exception and next 
received offset is  21551051. How can we handle this? Logs: 
INFO;com.rewe.digital.mystique.kafka.consumer.listener.StringKafkaListener;Received
 kafka message for topic: [picking-job-v1], key: 
[d52e171e-7a53-4e8c-8f4a-220d8e493237], offset: [21551051],  receivedPartition: 
[27], time: 1725975726868, publishedOn: 
1725975726848;INFO;org.springframework.kafka.listener.KafkaMessageListenerContainer;productrecall-pickingjob-message-consumer:
 partitions assigned: 
[picking-job-v1-20];INFO;com.rewe.digital.mystique.kafka.consumer.listener.StringKafkaListener;onPartitionsAssigned
 was called for topic: 
picking-job-v1;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Setting offset for partition 
picking-job-v1-20 to the committed offset FetchPosition{offset=21471231, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[broker-12.ffp-prd.kafka-private.rewe.cloud:9093
 (id: 12 rack: europe-west1-d)], 
epoch=408}};INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Adding newly assigned 
partitions: 
picking-job-v1-20;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Notifying assignor about the 
new Assignment(partitions=[picking-job-v1-27, picking-job-v1-37, 
picking-job-v1-44, picking-job-v1-51, 
picking-job-v1-20]);INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;"[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Updating assignment 
withAssigned partitions:                       [picking-job-v1-20, 
picking-job-v1-27, picking-job-v1-37, picking-job-v1-44, 
picking-job-v1-51]Current owned partitions:                  
[picking-job-v1-27, picking-job-v1-37, picking-job-v1-44, 
picking-job-v1-51]Added partitions (assigned - owned):       
[picking-job-v1-20]Revoked partitions (owned - assigned):     
[]";ERROR;org.springframework.kafka.listener.KafkaMessageListenerContainer;"Consumer
 exception: java.lang.IllegalStateException: This error handler cannot process 
'org.apache.kafka.clients.consumer.CommitFailedException's; no record 
information is availableat 
org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)at
 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1966)at
 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1382)at
 java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(Unknown 
Source)at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot 
be completed since the consumer is not part of an active group for auto 
partition assignment; it is likely that the consumer was kicked out of the 
group.at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1351)at
 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1188)at
 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1450)at
 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3300)at
 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3295)at
 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkRebalanceCommits(KafkaMessageListenerContainer.java:1710)at
 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1658)at
 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1439)at
 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1330)...
 2 common frames 
omitted";INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Failing OffsetCommit request 
since the consumer is not part of an active 
group;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Successfully joined group 
with generation Generation{generationId=2222, 
memberId='consumer-productrecall-pickingjob-message-consumer-34-50a678c9-ccd6-4610-b606-f31093711e48',
 
protocol='cooperative-sticky'};INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Failing OffsetCommit request 
since the consumer is not part of an active 
group;INFO;com.rewe.digital.mystique.kafka.consumer.listener.StringKafkaListener;Received
 kafka message for topic: [picking-job-v1], key: 
[a4ef4bf4-0028-4f32-9ffa-c78b44886167], offset: [21551049],  receivedPartition: 
[27], time: 1725975724934, publishedOn: 
1725975724734;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Failing OffsetCommit request 
since the consumer is not part of an active 
group;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
 clientId=consumer-productrecall-pickingjob-message-consumer-34, 
groupId=productrecall-pickingjob-message-consumer] Failing OffsetCommit request 
since the consumer is not part of an active group;

Reply via email to