rajinisivaram commented on code in PR #12914:
URL: https://github.com/apache/kafka/pull/12914#discussion_r1039763712


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> 
consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        Map<String, TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey()))));
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
+        for (Map.Entry<String, TopicAssignmentState> topicEntry : 
topicAssignmentStates.entrySet()) {

Review Comment:
   We maintain the ordering across partitions only when racks are not specified 
or if all racks have all partitions. If each rack only has replicas of a subset 
of partitions, we prioritize locality over preserving the order. I will change 
this to be similar to the sticky case to reduce re-ordering where possible. We 
can follow up in the PR for range assignor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to