dajac commented on code in PR #13350:
URL: https://github.com/apache/kafka/pull/13350#discussion_r1154372329


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -61,35 +72,51 @@ static final class ConsumerGenerationPair {
     public static final class MemberData {
         public final List<TopicPartition> partitions;
         public final Optional<Integer> generation;
-        public MemberData(List<TopicPartition> partitions, Optional<Integer> 
generation) {
+        public final Optional<String> rackId;
+        public MemberData(List<TopicPartition> partitions, Optional<Integer> 
generation, Optional<String> rackId) {

Review Comment:
   small nit: Could we add an empty line before this one?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -835,7 +1004,7 @@ private List<TopicPartition> assignOwnedPartitions() {
                             // if this topic partition of this consumer no 
longer exists, remove it from currentAssignment of the consumer
                             partitionIter.remove();
                             currentPartitionConsumer.remove(partition);
-                        } else if 
(!consumerSubscription.topics().contains(partition.topic())) {
+                        } else if 
(!consumerSubscription.topics().contains(partition.topic()) || 
rackInfo.racksMismatch(consumer, partition)) {

Review Comment:
   nit: Should we update the comment below this line as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -574,6 +697,44 @@ private void assignOwnedPartitions() {
             }
         }
 
+        // Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+        // otherwise, to minQuota
+        private void assignRackAwareRoundRobin(List<TopicPartition> 
unassignedPartitions) {
+            if (rackInfo.consumerRacks.isEmpty())
+                return;
+            int nextUnfilledConsumerIndex = 0;
+            Iterator<TopicPartition> unassignedIter = 
unassignedPartitions.iterator();
+            while (unassignedIter.hasNext()) {
+                TopicPartition unassignedPartition = unassignedIter.next();
+                String consumer = null;
+                int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex);
+                if (nextIndex >= 0) {
+                    consumer = 
unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex);
+                    int assignmentCount = assignment.get(consumer).size() + 1;
+                    if (assignmentCount >= minQuota) {
+                        
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
+                        if (assignmentCount < maxQuota)
+                            
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+                    } else {
+                        nextIndex++;
+                    }
+                    nextUnfilledConsumerIndex = 
unfilledMembersWithUnderMinQuotaPartitions.isEmpty() ? 0 : nextIndex % 
unfilledMembersWithUnderMinQuotaPartitions.size();
+                } else if 
(!unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
+                    int firstIndex = 
rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithExactlyMinQuotaPartitions, 0);
+                    if (firstIndex >= 0) {
+                        consumer = 
unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex);
+                        if (assignment.get(consumer).size() + 1 == maxQuota)
+                            
unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex);
+                    }
+                }
+                if (consumer == null)
+                    continue;

Review Comment:
   small nit: It may be better to invert the condition and bring the above 
lines into the if branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to