[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pritam Kumar resolved KAFKA-14220. ---------------------------------- Reviewer: (was: Chris Egerton) Resolution: Abandoned > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > --------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Affects Versions: 3.0.1 > Reporter: Pritam Kumar > Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set<TopicPartition> assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)