philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966582411


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any 
more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            Set<TopicPartition> ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
+            partitions.addAll(ownedPartitions.stream()
+                    .filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
+                    .collect(Collectors.toSet()));
+            return partitions;
+        }
+
+        log.debug("Invalid protocol: {}. No partition will be revoked.", 
protocol);
+        return partitions;
+    }
+
+    private void pausePartitions(Set<TopicPartition> partitions) {
+        // KAFKA-14196 for more detail, we pause the partition from 
consumption to prevent duplicated
+        //  data returned by the consumer poll loop.  Without pausing the 
partitions, the consumer will move forward
+        //  returning the data w/o committing them.  And the progress will be 
lost once the partition is revoked.
+        //  This only applies to autocommits, as we expect user to handle the 
offsets menually during the partition
+        //  revocation.
+
+        log.debug("Pausing partitions {} before onJoinPrepare", partitions);
+        partitions.forEach(tp -> subscriptionState().pause(tp));

Review Comment:
   Good call there, I think @guozhangwang originally propose marking these 
partitions using a different flag, to achieve what pause does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to