[
https://issues.apache.org/jira/browse/KAFKA-14757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17701313#comment-17701313
]
Siddharth Anand edited comment on KAFKA-14757 at 3/16/23 6:00 PM:
------------------------------------------------------------------
Hi [~pnee]
We implemented onPartitionsRevoked - we call consumer.commitSync(). We still
see the over-consumption issue.
{code:java}
package io.datazoom.connector.mock.config;import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;/**
Kafka rebalance listener. */
@Slf4j
public class KafkaRebalanceListener implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsRevokedBeforeCommit(
Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
log.info(
"Partitions revoked from consumer_member_id= {} --> {}",
consumer.groupMetadata().memberId(),
listPartitionIds(partitions));
consumer.commitSync(); // Commit the offsets synchronously
ConsumerAwareRebalanceListener.super.onPartitionsRevokedBeforeCommit(consumer,
partitions);
} @Override
public void onPartitionsLost(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
log.info(
"Partitions lost from consumer_member_id= {} --> {}",
consumer.groupMetadata().memberId(),
listPartitionIds(partitions));
ConsumerAwareRebalanceListener.super.onPartitionsLost(consumer, partitions);
} @Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
log.info(
"Partitions assigned to consumer_member_id= {} --> {}",
consumer.groupMetadata().memberId(),
listPartitionIds(partitions));
ConsumerAwareRebalanceListener.super.onPartitionsAssigned(consumer,
partitions);
} /**
* List partition ids.
*
* @param partitions a list of topic partitions
* @return {@code List<Integer>}
*/
private static List<Integer> listPartitionIds(final
Collection<TopicPartition> partitions) {
return partitions.stream().map(p ->
p.partition()).collect(Collectors.toList());
}
}
{code}
Here is a visual representation of the overconsumption when we had during a
synthetic load test. At the peak (of the blue line), we see
* {color:#de350b}*1,252,450*{color} events consumed from
{color:#0747a6}*1,190,142*{color} produced
* This represents a *{color:#de350b}5.2%{color}* over consumption related to a
pod count drop of *{color:#0747a6}6%{color}* (50 pods --> 47 pods)
!image-2023-03-16-10-34-49-358.png|width=608,height=316!
If I overlay the consumer pod count on this graph, I can see that this
overconsumption occurs when the pod count drops as part of natural scale-in
behavior. The CW graphs are a little delayed with respect to pod counts, but we
observed the pod count drop followed by over-consumption.
!image-2023-03-16-10-39-18-510.png|width=606,height=212!
was (Author: sanand):
Hi [~pnee]
We implemented onPartitionsRevoked - we call consumer.commitSync(). We still
see the over-consumption issue.
{code:java}
package io.datazoom.connector.mock.config;import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;/**
Kafka rebalance listener. */
@Slf4j
public class KafkaRebalanceListener implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsRevokedBeforeCommit(
Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
log.info(
"Partitions revoked from consumer_member_id= {} --> {}",
consumer.groupMetadata().memberId(),
listPartitionIds(partitions));
consumer.commitSync(); // Commit the offsets synchronously
ConsumerAwareRebalanceListener.super.onPartitionsRevokedBeforeCommit(consumer,
partitions);
} @Override
public void onPartitionsLost(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
log.info(
"Partitions lost from consumer_member_id= {} --> {}",
consumer.groupMetadata().memberId(),
listPartitionIds(partitions));
ConsumerAwareRebalanceListener.super.onPartitionsLost(consumer, partitions);
} @Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
log.info(
"Partitions assigned to consumer_member_id= {} --> {}",
consumer.groupMetadata().memberId(),
listPartitionIds(partitions));
ConsumerAwareRebalanceListener.super.onPartitionsAssigned(consumer,
partitions);
} /**
* List partition ids.
*
* @param partitions a list of topic partitions
* @return {@code List<Integer>}
*/
private static List<Integer> listPartitionIds(final
Collection<TopicPartition> partitions) {
return partitions.stream().map(p ->
p.partition()).collect(Collectors.toList());
}
}
{code}
Here is a visual representation of the overconsumption when we had a
!image-2023-03-16-10-34-49-358.png|width=608,height=316!
If I overlay the consumer pod count on this graph, I can see that this
overconsumption occurs when the pod count drops as part of natural scale-in
behavior. The CW graphs are a little delayed with respect to pod counts, but we
observed the pod count drop followed by over-consumption.
!image-2023-03-16-10-39-18-510.png|width=606,height=212!
> Kafka Cooperative Sticky Assignor results in significant duplicate consumption
> ------------------------------------------------------------------------------
>
> Key: KAFKA-14757
> URL: https://issues.apache.org/jira/browse/KAFKA-14757
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 3.1.1
> Environment: AWS MSK (broker) and Spring Kafka (2.8.7) for use in
> Spring Boot consumers.
> Reporter: Siddharth Anand
> Priority: Critical
> Attachments: image-2023-03-16-10-34-49-358.png,
> image-2023-03-16-10-39-18-510.png
>
>
> Details may be found within the linked document:
> [Kafka Cooperative Sticky Assignor Issue : Duplicate Consumption |
> [https://docs.google.com/document/d/1E7qAwGOpF8jo_YhF4NwUx9CXxUGJmT8OhHEqIg7-GfI/edit?usp=sharing]]
> In a nutshell, we noticed that the Cooperative Sticky Assignor resulted in
> significant duplicate message consumption. During last year's F1 Grand Prix
> events and World Cup soccer events, our company's Kafka-based platform
> received live-traffic. This live traffic, coupled with autoscaled consumers
> resulted in as much as 70% duplicate message consumption at the Kafka
> consumers.
> In December 2022, we ran a synthetic load test to confirm that duplicate
> message consumption occurs during consumer scale out/in and Kafka partition
> rebalancing when using the Cooperative Sticky Assignor. This issue does not
> occur when using the Range Assignor.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)