dajac commented on code in PR #12914: URL: https://github.com/apache/kafka/pull/12914#discussion_r1039767650
########## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ########## @@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); - MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); - for (String topic : subscriptionEntry.getValue().topics()) { + Subscription subscription = subscriptionEntry.getValue(); + MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); + for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions); + Map<String, TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey())))); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); - for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) { + for (Map.Entry<String, TopicAssignmentState> topicEntry : topicAssignmentStates.entrySet()) { Review Comment: Yeah, this is what I thought. I think that we should prioritize the order over the locality as co-partitioning is the fundamental contract of the range assignor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org