[ https://issues.apache.org/jira/browse/KAFKA-14757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17701313#comment-17701313 ]
Siddharth Anand edited comment on KAFKA-14757 at 3/16/23 5:35 PM: ------------------------------------------------------------------ Hi [~pnee] We implemented onPartitionsRevoked - we call consumer commit sync. We still see the over-consumption issue. {code:java} package io.datazoom.connector.mock.config;import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;/** Kafka rebalance listener. */ @Slf4j public class KafkaRebalanceListener implements ConsumerAwareRebalanceListener { @Override public void onPartitionsRevokedBeforeCommit( Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { log.info( "Partitions revoked from consumer_member_id= {} --> {}", consumer.groupMetadata().memberId(), listPartitionIds(partitions)); consumer.commitSync(); // Commit the offsets synchronously ConsumerAwareRebalanceListener.super.onPartitionsRevokedBeforeCommit(consumer, partitions); } @Override public void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { log.info( "Partitions lost from consumer_member_id= {} --> {}", consumer.groupMetadata().memberId(), listPartitionIds(partitions)); ConsumerAwareRebalanceListener.super.onPartitionsLost(consumer, partitions); } @Override public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { log.info( "Partitions assigned to consumer_member_id= {} --> {}", consumer.groupMetadata().memberId(), listPartitionIds(partitions)); ConsumerAwareRebalanceListener.super.onPartitionsAssigned(consumer, partitions); } /** * List partition ids. * * @param partitions a list of topic partitions * @return {@code List<Integer>} */ private static List<Integer> listPartitionIds(final Collection<TopicPartition> partitions) { return partitions.stream().map(p -> p.partition()).collect(Collectors.toList()); } } {code} !image-2023-03-16-10-34-49-358.png|width=608,height=316! was (Author: sanand): Hi [~pnee] We implemented onPartitionsRevoked - we call consumer commit sync. We still see the over-consumption issue. {code:java} package io.datazoom.connector.mock.config;import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;/** Kafka rebalance listener. */ @Slf4j public class KafkaRebalanceListener implements ConsumerAwareRebalanceListener { @Override public void onPartitionsRevokedBeforeCommit( Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { log.info( "Partitions revoked from consumer_member_id= {} --> {}", consumer.groupMetadata().memberId(), listPartitionIds(partitions)); consumer.commitSync(); // Commit the offsets synchronously ConsumerAwareRebalanceListener.super.onPartitionsRevokedBeforeCommit(consumer, partitions); } @Override public void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { log.info( "Partitions lost from consumer_member_id= {} --> {}", consumer.groupMetadata().memberId(), listPartitionIds(partitions)); ConsumerAwareRebalanceListener.super.onPartitionsLost(consumer, partitions); } @Override public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { log.info( "Partitions assigned to consumer_member_id= {} --> {}", consumer.groupMetadata().memberId(), listPartitionIds(partitions)); ConsumerAwareRebalanceListener.super.onPartitionsAssigned(consumer, partitions); } /** * List partition ids. * * @param partitions a list of topic partitions * @return {@code List<Integer>} */ private static List<Integer> listPartitionIds(final Collection<TopicPartition> partitions) { return partitions.stream().map(p -> p.partition()).collect(Collectors.toList()); } } {code} > Kafka Cooperative Sticky Assignor results in significant duplicate consumption > ------------------------------------------------------------------------------ > > Key: KAFKA-14757 > URL: https://issues.apache.org/jira/browse/KAFKA-14757 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 3.1.1 > Environment: AWS MSK (broker) and Spring Kafka (2.8.7) for use in > Spring Boot consumers. > Reporter: Siddharth Anand > Priority: Critical > Attachments: image-2023-03-16-10-34-49-358.png > > > Details may be found within the linked document: > [Kafka Cooperative Sticky Assignor Issue : Duplicate Consumption | > [https://docs.google.com/document/d/1E7qAwGOpF8jo_YhF4NwUx9CXxUGJmT8OhHEqIg7-GfI/edit?usp=sharing]] > In a nutshell, we noticed that the Cooperative Sticky Assignor resulted in > significant duplicate message consumption. During last year's F1 Grand Prix > events and World Cup soccer events, our company's Kafka-based platform > received live-traffic. This live traffic, coupled with autoscaled consumers > resulted in as much as 70% duplicate message consumption at the Kafka > consumers. > In December 2022, we ran a synthetic load test to confirm that duplicate > message consumption occurs during consumer scale out/in and Kafka partition > rebalancing when using the Cooperative Sticky Assignor. This issue does not > occur when using the Range Assignor. > -- This message was sent by Atlassian Jira (v8.20.10#820010)