philipnee commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r966582411
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } + Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId); + + isLeader = false; + subscriptions.resetGroupSubscription(); + joinPrepareTimer = null; + autoCommitOffsetRequestFuture = null; + timer.update(); + + if (exception.isPresent()) { + throw new KafkaException("User rebalance callback throws an error", exception.get()); + } + return true; + } + + private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { + SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR); + if (generation == Generation.NO_GENERATION.generationId || + memberId.equals(Generation.NO_GENERATION.memberId)) { + partitions.addAll(subscriptions.assignedPartitions()); + return partitions; + } + + // Revoke all partitions + if (protocol == RebalanceProtocol.EAGER) { + partitions.addAll(subscriptions.assignedPartitions()); + return partitions; + } + + // only revoke those partitions that are not in the subscription any more. + if (protocol == RebalanceProtocol.COOPERATIVE) { + Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions()); + partitions.addAll(ownedPartitions.stream() + .filter(tp -> !subscriptions.subscription().contains(tp.topic())) + .collect(Collectors.toSet())); + return partitions; + } + + log.debug("Invalid protocol: {}. No partition will be revoked.", protocol); + return partitions; + } + + private void pausePartitions(Set<TopicPartition> partitions) { + // KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated + // data returned by the consumer poll loop. Without pausing the partitions, the consumer will move forward + // returning the data w/o committing them. And the progress will be lost once the partition is revoked. + // This only applies to autocommits, as we expect user to handle the offsets menually during the partition + // revocation. + + log.debug("Pausing partitions {} before onJoinPrepare", partitions); + partitions.forEach(tp -> subscriptionState().pause(tp)); Review Comment: Good call there, I think @guozhangwang originally propose marking these partitions using a different flag, to achieve what pause does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org