ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r434021704



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +72,206 @@ public MemberData(List<TopicPartition> partitions, 
Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
                                                     Map<String, Subscription> 
subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new 
HashMap<>();
+        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, 
consumerToOwnedPartitions)) {
+            log.debug("Detected that all consumers were subscribed to same set 
of topics, invoking the "
+                          + "optimized assignment algorithm");
+            partitionsTransferringOwnership = new HashMap<>();
+            return constrainedAssign(partitionsPerTopic, 
consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same 
set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            partitionsTransferringOwnership = null;
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    /**
+     * Returns true iff all consumers have an identical subscription. Also 
fills out the passed in
+     * {@code consumerToOwnedPartitions} with each consumer's previously owned 
and still-subscribed partitions
+     */
+    private boolean allSubscriptionsEqual(Set<String> allTopics,
+                                          Map<String, Subscription> 
subscriptions,
+                                          Map<String, List<TopicPartition>> 
consumerToOwnedPartitions) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = DEFAULT_GENERATION;
+
+        Set<String> subscribedTopics = new HashSet<>();
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first 
subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == 
subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            List<TopicPartition> ownedPartitions = new ArrayList<>();
+            consumerToOwnedPartitions.put(consumer, ownedPartitions);
+
+            // Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
+            // generation, or it's generation is not present but we have not 
seen any known generation so far
+            if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
+                || !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
+
+                membersOfCurrentHighestGeneration.add(consumer);
+                for (final TopicPartition tp : memberData.partitions) {
+                    // filter out any topics that no longer exist or aren't 
part of the current subscription
+                    if (allTopics.contains(tp.topic())) {
+                        ownedPartitions.add(tp);
+                    }
+                }
+
+                // If the current member's generation is higher, all the 
previous owned partitions are invalid
+                if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
+                    
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();

Review comment:
       Just FYI, I introduced this bug right before merging. Luckily the tests 
caught it -- fix is https://github.com/apache/kafka/pull/8777




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to