hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        switch (protocol) {
+            case EAGER:
+                partitions.addAll(subscriptions.assignedPartitions());
+                break;
+
+            case COOPERATIVE:
+                // Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   I was looking into the cooperative code path. We revoke the partition in 
`onJoinComplete`, so that made me wonder why we don't have the same issue 
there. In fact, there is no additional offset commit in the current logic, 
which makes me think that the cooperative logic would already be more prone to 
duplicate consumption. We don't need to fix this here since it seems to be a 
pre-existing issue, but I am wondering if the failing system tests also cover 
cooperative assignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to