dajac commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1603040292


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -147,6 +147,11 @@ public static class DeadlineAndEpoch {
      */
     private final TimelineHashMap<String, Integer> subscribedTopicNames;
 
+    /**
+     * Partition assignments per topic.
+     */
+    private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> 
partitionAssignments;

Review Comment:
   nit: Could we please move it next to `targetAssignment` as they go together? 
It would also be great if we could add a bit more details to the javadoc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+
+/**
+ * The assignment specifications for a consumer group.
+ */
+public interface GroupSpec {
+    /**
+     * @return Member metadata keyed by member Id.
+     */
+    Map<String, AssignmentMemberSpec> members();
+
+    /**
+     * @return The group's subscription type.
+     */
+    SubscriptionType subscriptionType();
+
+    /**

Review Comment:
   Should we directly move the assignment from `AssignmentMemberSpec` to here 
as we discussed?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -523,9 +537,61 @@ public Assignment targetAssignment(String memberId) {
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updatePartitionAssignments(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
+    /**
+     * Updates partition assignments of the topics.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    public void updatePartitionAssignments(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<Uuid> allTopicIds = new HashSet<>();
+        allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+        allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+        for (Uuid topicId : allTopicIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+            TimelineHashMap<Integer, String> topicPartitionAssignment = 
partitionAssignments.computeIfAbsent(
+                topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {
+                    topicPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in new assignment but not in old 
assignment.
+            for (Integer partition : newPartitions) {
+                if (!oldPartitions.contains(partition)) {
+                    topicPartitionAssignment.put(partition, memberId);
+                }
+            }
+
+            if (topicPartitionAssignment.isEmpty()) {
+                partitionAssignments.remove(topicId);
+            } else {
+                partitionAssignments.put(topicId, topicPartitionAssignment);
+            }
+        }
+    }

Review Comment:
   I don't see any unit tests for this change. Could we please add some?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -523,9 +537,61 @@ public Assignment targetAssignment(String memberId) {
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updatePartitionAssignments(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
+    /**
+     * Updates partition assignments of the topics.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    public void updatePartitionAssignments(

Review Comment:
   nit: Private?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -56,11 +66,25 @@ public SubscriptionType subscriptionType() {
         return subscriptionType;
     }
 
+    /**
+     * @param topicId           The topic Id.
+     * @param partitionId       The partition Id.
+     * @return True if the partition is currently assigned, false otherwise.
+     */
+    @Override
+    public boolean isPartitionAssigned(Uuid topicId, int partitionId) {

Review Comment:
   Should we add a unit test for this?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -18,7 +18,7 @@
 

Review Comment:
   Should we extend the tests in this class to cover the new method/logic in 
the builder?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -569,4 +630,34 @@ public void 
testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith
 
         assertAssignment(expectedAssignment, computedAssignment);
     }
+
+    /**
+     * Generate a map of partition assignments from the given member spec.
+     *
+     * @param memberSpec        A map where the key is the member Id and the 
value is an
+     *                          AssignmentMemberSpec object containing the 
member's partition assignments.
+     * @return Map of topic partition to member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> partitionAssignments(

Review Comment:
   nit: Could we make it private?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+
+/**
+ * The assignment specifications for a consumer group.
+ */
+public interface GroupSpec {
+    /**
+     * @return Member metadata keyed by member Id.
+     */
+    Map<String, AssignmentMemberSpec> members();
+
+    /**
+     * @return The group's subscription type.
+     */
+    SubscriptionType subscriptionType();
+
+    /**

Review Comment:
   I take it back. It may be better to do it as a follow-up.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##########
@@ -728,4 +772,34 @@ private static Map<Integer, Set<String>> 
createPartitionRacks(int numPartitions)
         }
         return partitionRacks;
     }
+
+    /**
+     * Generate a map of partition assignments from the given member spec.
+     *
+     * @param memberSpec        A map where the key is the member Id and the 
value is an
+     *                          AssignmentMemberSpec object containing the 
member's partition assignments.
+     * @return Map of topic partition to member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> partitionAssignments(

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -56,11 +66,25 @@ public SubscriptionType subscriptionType() {
         return subscriptionType;
     }
 
+    /**
+     * @param topicId           The topic Id.

Review Comment:
   nit: We usually have a first line before the params. You can take a look at 
the other javadocs.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##########
@@ -557,10 +612,52 @@ public void 
testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith
             mkTopicAssignment(topic2Uuid, 1)
         ));
 
+        GroupSpecImpl groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            partitionAssignments(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
         assertAssignment(expectedAssignment, computedAssignment);
         checkValidityAndBalance(members, computedAssignment);
     }
 
+    /**
+     * Generate a map of partition assignments from the given member spec.
+     *
+     * @param memberSpec        A map where the key is the member Id and the 
value is an
+     *                          AssignmentMemberSpec object containing the 
member's partition assignments.
+     * @return Map of topic partition to member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> partitionAssignments(

Review Comment:
   This seems to be a copy from the general uniform assignment test. Could we 
share it? We could perhaps move it to a utility test class (e.g. 
AssignorTestUtils).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##########
@@ -198,20 +203,26 @@ private Map<String, Integer> assignStickyPartitions(int 
minQuota) {
                         topicIdPartition.topicId(),
                         topicIdPartition.partitionId()
                     );
-                    unassignedPartitions.remove(topicIdPartition);
                 });
 
                 // The extra partition is located at the last index from the 
previous step.

Review Comment:
   nit: Should we move this comment to the second if?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##########
@@ -198,20 +203,26 @@ private Map<String, Integer> assignStickyPartitions(int 
minQuota) {
                         topicIdPartition.topicId(),
                         topicIdPartition.partitionId()
                     );
-                    unassignedPartitions.remove(topicIdPartition);
                 });
 
                 // The extra partition is located at the last index from the 
previous step.
-                if (remaining < 0 && remainingMembersToGetAnExtraPartition > 
0) {
-                    TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount);
-                    addPartitionToAssignment(
-                        targetAssignment,
-                        memberId,
-                        topicIdPartition.topicId(),
-                        topicIdPartition.partitionId()
-                    );
-                    unassignedPartitions.remove(topicIdPartition);
-                    remainingMembersToGetAnExtraPartition--;
+                if (remaining < 0) {
+                    if (remainingMembersToGetAnExtraPartition > 0) {
+                        TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
+                        addPartitionToAssignment(
+                            targetAssignment,
+                            memberId,
+                            topicIdPartition.topicId(),
+                            topicIdPartition.partitionId()
+                        );
+                        remainingMembersToGetAnExtraPartition--;
+                    }
+                    if (retainedPartitionsCount < currentAssignmentSize) {

Review Comment:
   Should we add a comment here too?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -147,6 +147,11 @@ public static class DeadlineAndEpoch {
      */
     private final TimelineHashMap<String, Integer> subscribedTopicNames;
 
+    /**
+     * Partition assignments per topic.
+     */
+    private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> 
partitionAssignments;

Review Comment:
   I also wonder whether we could include `targetAssignment` in the same in 
order to stress that this is the counter part of `targetAssignment`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+
+/**
+ * The assignment specifications for a consumer group.
+ */
+public interface GroupSpec {
+    /**
+     * @return Member metadata keyed by member Id.
+     */
+    Map<String, AssignmentMemberSpec> members();
+
+    /**
+     * @return The group's subscription type.
+     */
+    SubscriptionType subscriptionType();
+
+    /**
+     * @return True, if the partition is currently assigned to a member.
+     *         False, otherwise.
+     */
+    boolean isPartitionAssigned(Uuid topicId, int partitionId);
+

Review Comment:
   nit: We could remove this empty line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to