dajac commented on code in PR #13350:
URL: https://github.com/apache/kafka/pull/13350#discussion_r1141731445


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -495,8 +615,8 @@ private class ConstrainedAssignmentBuilder extends 
AbstractAssignmentBuilder {
         @Override
         Map<String, List<TopicPartition>> build() {
             if (log.isDebugEnabled()) {
-                log.debug("Performing constrained assign with 
partitionsPerTopic: {}, currentAssignment: {}.",
-                        partitionsPerTopic, currentAssignment);
+                log.debug("Performing constrained assign with 
partitionsPerTopic: {}, currentAssignment: {} rackInfo {}.",

Review Comment:
   nit: missing `,` before `rackInfo`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -519,13 +641,14 @@ Map<String, List<TopicPartition>> build() {
             return assignment;
         }
 
-
         // Reassign previously owned partitions, up to the expected number of 
partitions per consumer
         private void assignOwnedPartitions() {
 
             for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
currentAssignment.entrySet()) {
                 String consumer = consumerEntry.getKey();
-                List<TopicPartition> ownedPartitions = 
consumerEntry.getValue();
+                List<TopicPartition> ownedPartitions = 
consumerEntry.getValue().stream()
+                        .filter(tp -> !rackInfo.racksMismatch(consumer, tp))

Review Comment:
   For my understanding, if rack awareness is disabled, this is a no-op, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -61,35 +72,51 @@ static final class ConsumerGenerationPair {
     public static final class MemberData {
         public final List<TopicPartition> partitions;
         public final Optional<Integer> generation;
-        public MemberData(List<TopicPartition> partitions, Optional<Integer> 
generation) {
+        public final Optional<String> rackId;
+        public MemberData(List<TopicPartition> partitions, Optional<Integer> 
generation, Optional<String> rackId) {
             this.partitions = partitions;
             this.generation = generation;
+            this.rackId = rackId;
+        }
+
+        public MemberData(List<TopicPartition> partitions, Optional<Integer> 
generation) {
+            this(partitions, generation, Optional.empty());
         }
     }
 
     abstract protected MemberData memberData(Subscription subscription);
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<TopicPartition>> consumerToOwnedPartitions = new 
HashMap<>();
         Set<TopicPartition> partitionsWithMultiplePreviousOwners = new 
HashSet<>();
+
+        List<PartitionInfo> allPartitions = new ArrayList<>();
+        partitionsPerTopic.values().forEach(allPartitions::addAll);
+        RackInfo rackInfo =  new RackInfo(allPartitions, subscriptions);

Review Comment:
   nit: There is an extra space.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -574,6 +697,42 @@ private void assignOwnedPartitions() {
             }
         }
 
+        // Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+        // otherwise, to minQuota
+        private void assignRackAwareRoundRobin(List<TopicPartition> 
unassignedPartitions) {
+            int nextUnfilledConsumerIndex = 0;
+            Iterator<TopicPartition> unassignedIter = 
unassignedPartitions.iterator();
+            while (!rackInfo.consumerRacks.isEmpty() && 
unassignedIter.hasNext()) {

Review Comment:
   nit: Do we ever mutate `ackInfo.consumerRacks`? If not, would it make sense 
to put this check as first statement in the method and return if is it empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to