dajac commented on code in PR #13350: URL: https://github.com/apache/kafka/pull/13350#discussion_r1154372329
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -61,35 +72,51 @@ static final class ConsumerGenerationPair { public static final class MemberData { public final List<TopicPartition> partitions; public final Optional<Integer> generation; - public MemberData(List<TopicPartition> partitions, Optional<Integer> generation) { + public final Optional<String> rackId; + public MemberData(List<TopicPartition> partitions, Optional<Integer> generation, Optional<String> rackId) { Review Comment: small nit: Could we add an empty line before this one? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -835,7 +1004,7 @@ private List<TopicPartition> assignOwnedPartitions() { // if this topic partition of this consumer no longer exists, remove it from currentAssignment of the consumer partitionIter.remove(); currentPartitionConsumer.remove(partition); - } else if (!consumerSubscription.topics().contains(partition.topic())) { + } else if (!consumerSubscription.topics().contains(partition.topic()) || rackInfo.racksMismatch(consumer, partition)) { Review Comment: nit: Should we update the comment below this line as well? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -574,6 +697,44 @@ private void assignOwnedPartitions() { } } + // Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, + // otherwise, to minQuota + private void assignRackAwareRoundRobin(List<TopicPartition> unassignedPartitions) { + if (rackInfo.consumerRacks.isEmpty()) + return; + int nextUnfilledConsumerIndex = 0; + Iterator<TopicPartition> unassignedIter = unassignedPartitions.iterator(); + while (unassignedIter.hasNext()) { + TopicPartition unassignedPartition = unassignedIter.next(); + String consumer = null; + int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex); + if (nextIndex >= 0) { + consumer = unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex); + int assignmentCount = assignment.get(consumer).size() + 1; + if (assignmentCount >= minQuota) { + unfilledMembersWithUnderMinQuotaPartitions.remove(consumer); + if (assignmentCount < maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.add(consumer); + } else { + nextIndex++; + } + nextUnfilledConsumerIndex = unfilledMembersWithUnderMinQuotaPartitions.isEmpty() ? 0 : nextIndex % unfilledMembersWithUnderMinQuotaPartitions.size(); + } else if (!unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) { + int firstIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithExactlyMinQuotaPartitions, 0); + if (firstIndex >= 0) { + consumer = unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex); + if (assignment.get(consumer).size() + 1 == maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex); + } + } + if (consumer == null) + continue; Review Comment: small nit: It may be better to invert the condition and bring the above lines into the if branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org