dajac commented on code in PR #12914:
URL: https://github.com/apache/kafka/pull/12914#discussion_r1039767650


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> 
consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        Map<String, TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey()))));
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
+        for (Map.Entry<String, TopicAssignmentState> topicEntry : 
topicAssignmentStates.entrySet()) {

Review Comment:
   Yeah, this is what I thought. I think that we should prioritize the order 
over the locality as co-partitioning is the fundamental contract of the range 
assignor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to