[ 
https://issues.apache.org/jira/browse/KAFKA-14757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17701313#comment-17701313
 ] 

Siddharth Anand edited comment on KAFKA-14757 at 3/16/23 5:35 PM:
------------------------------------------------------------------

Hi [~pnee] 
We implemented onPartitionsRevoked - we call consumer commit sync. We still see 
the over-consumption issue.
{code:java}
package io.datazoom.connector.mock.config;import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;/** 
Kafka rebalance listener. */
@Slf4j
public class KafkaRebalanceListener implements ConsumerAwareRebalanceListener { 
 @Override
  public void onPartitionsRevokedBeforeCommit(
      Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
    log.info(
        "Partitions revoked from consumer_member_id= {} --> {}",
        consumer.groupMetadata().memberId(),
        listPartitionIds(partitions));
    consumer.commitSync(); // Commit the offsets synchronously
    
ConsumerAwareRebalanceListener.super.onPartitionsRevokedBeforeCommit(consumer, 
partitions);
  }  @Override
  public void onPartitionsLost(Consumer<?, ?> consumer, 
Collection<TopicPartition> partitions) {
    log.info(
        "Partitions lost from consumer_member_id= {} --> {}",
        consumer.groupMetadata().memberId(),
        listPartitionIds(partitions));
    ConsumerAwareRebalanceListener.super.onPartitionsLost(consumer, partitions);
  }  @Override
  public void onPartitionsAssigned(Consumer<?, ?> consumer, 
Collection<TopicPartition> partitions) {
    log.info(
        "Partitions assigned to consumer_member_id= {} --> {}",
        consumer.groupMetadata().memberId(),
        listPartitionIds(partitions));
    ConsumerAwareRebalanceListener.super.onPartitionsAssigned(consumer, 
partitions);
  }  /**
   * List partition ids.
   *
   * @param partitions a list of topic partitions
   * @return {@code List<Integer>}
   */
  private static List<Integer> listPartitionIds(final 
Collection<TopicPartition> partitions) {
    return partitions.stream().map(p -> 
p.partition()).collect(Collectors.toList());
  }
}
 {code}
 

!image-2023-03-16-10-34-49-358.png|width=608,height=316!


was (Author: sanand):
Hi [~pnee] 
We implemented onPartitionsRevoked - we call consumer commit sync. We still see 
the over-consumption issue.


{code:java}
package io.datazoom.connector.mock.config;import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;/** 
Kafka rebalance listener. */
@Slf4j
public class KafkaRebalanceListener implements ConsumerAwareRebalanceListener { 
 @Override
  public void onPartitionsRevokedBeforeCommit(
      Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
    log.info(
        "Partitions revoked from consumer_member_id= {} --> {}",
        consumer.groupMetadata().memberId(),
        listPartitionIds(partitions));
    consumer.commitSync(); // Commit the offsets synchronously
    
ConsumerAwareRebalanceListener.super.onPartitionsRevokedBeforeCommit(consumer, 
partitions);
  }  @Override
  public void onPartitionsLost(Consumer<?, ?> consumer, 
Collection<TopicPartition> partitions) {
    log.info(
        "Partitions lost from consumer_member_id= {} --> {}",
        consumer.groupMetadata().memberId(),
        listPartitionIds(partitions));
    ConsumerAwareRebalanceListener.super.onPartitionsLost(consumer, partitions);
  }  @Override
  public void onPartitionsAssigned(Consumer<?, ?> consumer, 
Collection<TopicPartition> partitions) {
    log.info(
        "Partitions assigned to consumer_member_id= {} --> {}",
        consumer.groupMetadata().memberId(),
        listPartitionIds(partitions));
    ConsumerAwareRebalanceListener.super.onPartitionsAssigned(consumer, 
partitions);
  }  /**
   * List partition ids.
   *
   * @param partitions a list of topic partitions
   * @return {@code List<Integer>}
   */
  private static List<Integer> listPartitionIds(final 
Collection<TopicPartition> partitions) {
    return partitions.stream().map(p -> 
p.partition()).collect(Collectors.toList());
  }
}
 {code}

> Kafka Cooperative Sticky Assignor results in significant duplicate consumption
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-14757
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14757
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 3.1.1
>         Environment: AWS MSK (broker) and Spring Kafka (2.8.7) for use in 
> Spring Boot consumers.
>            Reporter: Siddharth Anand
>            Priority: Critical
>         Attachments: image-2023-03-16-10-34-49-358.png
>
>
> Details may be found within the linked document:
> [Kafka Cooperative Sticky Assignor Issue : Duplicate Consumption | 
> [https://docs.google.com/document/d/1E7qAwGOpF8jo_YhF4NwUx9CXxUGJmT8OhHEqIg7-GfI/edit?usp=sharing]]
> In a nutshell, we noticed that the Cooperative Sticky Assignor resulted in 
> significant duplicate message consumption. During last year's F1 Grand Prix 
> events and World Cup soccer events, our company's Kafka-based platform 
> received live-traffic. This live traffic, coupled with autoscaled consumers 
> resulted in as much as 70% duplicate message consumption at the Kafka 
> consumers. 
> In December 2022, we ran a synthetic load test to confirm that duplicate 
> message consumption occurs during consumer scale out/in and Kafka partition 
> rebalancing when using the Cooperative Sticky Assignor. This issue does not 
> occur when using the Range Assignor.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to