rajinisivaram commented on code in PR #12914: URL: https://github.com/apache/kafka/pull/12914#discussion_r1039763712
########## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ########## @@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); - MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); - for (String topic : subscriptionEntry.getValue().topics()) { + Subscription subscription = subscriptionEntry.getValue(); + MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); + for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions); + Map<String, TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey())))); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); - for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) { + for (Map.Entry<String, TopicAssignmentState> topicEntry : topicAssignmentStates.entrySet()) { Review Comment: We maintain the ordering across partitions only when racks are not specified or if all racks have all partitions. If each rack only has replicas of a subset of partitions, we prioritize locality over preserving the order. I will change this to be similar to the sticky case to reduce re-ordering where possible. We can follow up in the PR for range assignor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org