dajac commented on code in PR #13350: URL: https://github.com/apache/kafka/pull/13350#discussion_r1141731445
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -495,8 +615,8 @@ private class ConstrainedAssignmentBuilder extends AbstractAssignmentBuilder { @Override Map<String, List<TopicPartition>> build() { if (log.isDebugEnabled()) { - log.debug("Performing constrained assign with partitionsPerTopic: {}, currentAssignment: {}.", - partitionsPerTopic, currentAssignment); + log.debug("Performing constrained assign with partitionsPerTopic: {}, currentAssignment: {} rackInfo {}.", Review Comment: nit: missing `,` before `rackInfo`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -519,13 +641,14 @@ Map<String, List<TopicPartition>> build() { return assignment; } - // Reassign previously owned partitions, up to the expected number of partitions per consumer private void assignOwnedPartitions() { for (Map.Entry<String, List<TopicPartition>> consumerEntry : currentAssignment.entrySet()) { String consumer = consumerEntry.getKey(); - List<TopicPartition> ownedPartitions = consumerEntry.getValue(); + List<TopicPartition> ownedPartitions = consumerEntry.getValue().stream() + .filter(tp -> !rackInfo.racksMismatch(consumer, tp)) Review Comment: For my understanding, if rack awareness is disabled, this is a no-op, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -61,35 +72,51 @@ static final class ConsumerGenerationPair { public static final class MemberData { public final List<TopicPartition> partitions; public final Optional<Integer> generation; - public MemberData(List<TopicPartition> partitions, Optional<Integer> generation) { + public final Optional<String> rackId; + public MemberData(List<TopicPartition> partitions, Optional<Integer> generation, Optional<String> rackId) { this.partitions = partitions; this.generation = generation; + this.rackId = rackId; + } + + public MemberData(List<TopicPartition> partitions, Optional<Integer> generation) { + this(partitions, generation, Optional.empty()); } } abstract protected MemberData memberData(Subscription subscription); @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>(); Set<TopicPartition> partitionsWithMultiplePreviousOwners = new HashSet<>(); + + List<PartitionInfo> allPartitions = new ArrayList<>(); + partitionsPerTopic.values().forEach(allPartitions::addAll); + RackInfo rackInfo = new RackInfo(allPartitions, subscriptions); Review Comment: nit: There is an extra space. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -574,6 +697,42 @@ private void assignOwnedPartitions() { } } + // Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, + // otherwise, to minQuota + private void assignRackAwareRoundRobin(List<TopicPartition> unassignedPartitions) { + int nextUnfilledConsumerIndex = 0; + Iterator<TopicPartition> unassignedIter = unassignedPartitions.iterator(); + while (!rackInfo.consumerRacks.isEmpty() && unassignedIter.hasNext()) { Review Comment: nit: Do we ever mutate `ackInfo.consumerRacks`? If not, would it make sense to put this check as first statement in the method and return if is it empty? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org