ableegoldman commented on a change in pull request #8668: URL: https://github.com/apache/kafka/pull/8668#discussion_r434021704
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -65,9 +72,206 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation) @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { + Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>(); + if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) { + log.debug("Detected that all consumers were subscribed to same set of topics, invoking the " + + "optimized assignment algorithm"); + partitionsTransferringOwnership = new HashMap<>(); + return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions); + } else { + log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the " + + "general case assignment algorithm"); + partitionsTransferringOwnership = null; + return generalAssign(partitionsPerTopic, subscriptions); + } + } + + /** + * Returns true iff all consumers have an identical subscription. Also fills out the passed in + * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions + */ + private boolean allSubscriptionsEqual(Set<String> allTopics, + Map<String, Subscription> subscriptions, + Map<String, List<TopicPartition>> consumerToOwnedPartitions) { + Set<String> membersWithOldGeneration = new HashSet<>(); + Set<String> membersOfCurrentHighestGeneration = new HashSet<>(); + int maxGeneration = DEFAULT_GENERATION; + + Set<String> subscribedTopics = new HashSet<>(); + + for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { + String consumer = subscriptionEntry.getKey(); + Subscription subscription = subscriptionEntry.getValue(); + + // initialize the subscribed topics set if this is the first subscription + if (subscribedTopics.isEmpty()) { + subscribedTopics.addAll(subscription.topics()); + } else if (!(subscription.topics().size() == subscribedTopics.size() + && subscribedTopics.containsAll(subscription.topics()))) { + return false; + } + + MemberData memberData = memberData(subscription); + + List<TopicPartition> ownedPartitions = new ArrayList<>(); + consumerToOwnedPartitions.put(consumer, ownedPartitions); + + // Only consider this consumer's owned partitions as valid if it is a member of the current highest + // generation, or it's generation is not present but we have not seen any known generation so far + if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration + || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) { + + membersOfCurrentHighestGeneration.add(consumer); + for (final TopicPartition tp : memberData.partitions) { + // filter out any topics that no longer exist or aren't part of the current subscription + if (allTopics.contains(tp.topic())) { + ownedPartitions.add(tp); + } + } + + // If the current member's generation is higher, all the previous owned partitions are invalid + if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { + membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); + membersOfCurrentHighestGeneration.clear(); Review comment: Just FYI, I introduced this bug right before merging. Luckily the tests caught it -- fix is https://github.com/apache/kafka/pull/8777 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org