dajac commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1603040292
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -147,6 +147,11 @@ public static class DeadlineAndEpoch { */ private final TimelineHashMap<String, Integer> subscribedTopicNames; + /** + * Partition assignments per topic. + */ + private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> partitionAssignments; Review Comment: nit: Could we please move it next to `targetAssignment` as they go together? It would also be great if we could add a bit more details to the javadoc. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.Map; + +/** + * The assignment specifications for a consumer group. + */ +public interface GroupSpec { + /** + * @return Member metadata keyed by member Id. + */ + Map<String, AssignmentMemberSpec> members(); + + /** + * @return The group's subscription type. + */ + SubscriptionType subscriptionType(); + + /** Review Comment: Should we directly move the assignment from `AssignmentMemberSpec` to here as we discussed? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -523,9 +537,61 @@ public Assignment targetAssignment(String memberId) { * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { + updatePartitionAssignments( + memberId, + targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), + newTargetAssignment + ); targetAssignment.put(memberId, newTargetAssignment); } + /** + * Updates partition assignments of the topics. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + */ + public void updatePartitionAssignments( + String memberId, + Assignment oldTargetAssignment, + Assignment newTargetAssignment + ) { + // Combine keys from both old and new assignments. + Set<Uuid> allTopicIds = new HashSet<>(); + allTopicIds.addAll(oldTargetAssignment.partitions().keySet()); + allTopicIds.addAll(newTargetAssignment.partitions().keySet()); + + for (Uuid topicId : allTopicIds) { + Set<Integer> oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + Set<Integer> newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + + TimelineHashMap<Integer, String> topicPartitionAssignment = partitionAssignments.computeIfAbsent( + topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size())) + ); + + // Remove partitions that aren't present in the new assignment. + for (Integer partition : oldPartitions) { + if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) { + topicPartitionAssignment.remove(partition); + } + } + + // Add partitions that are in new assignment but not in old assignment. + for (Integer partition : newPartitions) { + if (!oldPartitions.contains(partition)) { + topicPartitionAssignment.put(partition, memberId); + } + } + + if (topicPartitionAssignment.isEmpty()) { + partitionAssignments.remove(topicId); + } else { + partitionAssignments.put(topicId, topicPartitionAssignment); + } + } + } Review Comment: I don't see any unit tests for this change. Could we please add some? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -523,9 +537,61 @@ public Assignment targetAssignment(String memberId) { * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { + updatePartitionAssignments( + memberId, + targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), + newTargetAssignment + ); targetAssignment.put(memberId, newTargetAssignment); } + /** + * Updates partition assignments of the topics. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + */ + public void updatePartitionAssignments( Review Comment: nit: Private? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java: ########## @@ -56,11 +66,25 @@ public SubscriptionType subscriptionType() { return subscriptionType; } + /** + * @param topicId The topic Id. + * @param partitionId The partition Id. + * @return True if the partition is currently assigned, false otherwise. + */ + @Override + public boolean isPartitionAssigned(Uuid topicId, int partitionId) { Review Comment: Should we add a unit test for this? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ########## @@ -18,7 +18,7 @@ Review Comment: Should we extend the tests in this class to cover the new method/logic in the builder? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java: ########## @@ -569,4 +630,34 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith assertAssignment(expectedAssignment, computedAssignment); } + + /** + * Generate a map of partition assignments from the given member spec. + * + * @param memberSpec A map where the key is the member Id and the value is an + * AssignmentMemberSpec object containing the member's partition assignments. + * @return Map of topic partition to member assignments. + */ + public Map<Uuid, Map<Integer, String>> partitionAssignments( Review Comment: nit: Could we make it private? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.Map; + +/** + * The assignment specifications for a consumer group. + */ +public interface GroupSpec { + /** + * @return Member metadata keyed by member Id. + */ + Map<String, AssignmentMemberSpec> members(); + + /** + * @return The group's subscription type. + */ + SubscriptionType subscriptionType(); + + /** Review Comment: I take it back. It may be better to do it as a follow-up. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ########## @@ -728,4 +772,34 @@ private static Map<Integer, Set<String>> createPartitionRacks(int numPartitions) } return partitionRacks; } + + /** + * Generate a map of partition assignments from the given member spec. + * + * @param memberSpec A map where the key is the member Id and the value is an + * AssignmentMemberSpec object containing the member's partition assignments. + * @return Map of topic partition to member assignments. + */ + public Map<Uuid, Map<Integer, String>> partitionAssignments( Review Comment: ditto. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java: ########## @@ -56,11 +66,25 @@ public SubscriptionType subscriptionType() { return subscriptionType; } + /** + * @param topicId The topic Id. Review Comment: nit: We usually have a first line before the params. You can take a look at the other javadocs. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ########## @@ -557,10 +612,52 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith mkTopicAssignment(topic2Uuid, 1) )); + GroupSpecImpl groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + partitionAssignments(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertAssignment(expectedAssignment, computedAssignment); checkValidityAndBalance(members, computedAssignment); } + /** + * Generate a map of partition assignments from the given member spec. + * + * @param memberSpec A map where the key is the member Id and the value is an + * AssignmentMemberSpec object containing the member's partition assignments. + * @return Map of topic partition to member assignments. + */ + public Map<Uuid, Map<Integer, String>> partitionAssignments( Review Comment: This seems to be a copy from the general uniform assignment test. Could we share it? We could perhaps move it to a utility test class (e.g. AssignorTestUtils). ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ########## @@ -198,20 +203,26 @@ private Map<String, Integer> assignStickyPartitions(int minQuota) { topicIdPartition.topicId(), topicIdPartition.partitionId() ); - unassignedPartitions.remove(topicIdPartition); }); // The extra partition is located at the last index from the previous step. Review Comment: nit: Should we move this comment to the second if? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ########## @@ -198,20 +203,26 @@ private Map<String, Integer> assignStickyPartitions(int minQuota) { topicIdPartition.topicId(), topicIdPartition.partitionId() ); - unassignedPartitions.remove(topicIdPartition); }); // The extra partition is located at the last index from the previous step. - if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) { - TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount); - addPartitionToAssignment( - targetAssignment, - memberId, - topicIdPartition.topicId(), - topicIdPartition.partitionId() - ); - unassignedPartitions.remove(topicIdPartition); - remainingMembersToGetAnExtraPartition--; + if (remaining < 0) { + if (remainingMembersToGetAnExtraPartition > 0) { + TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); + addPartitionToAssignment( + targetAssignment, + memberId, + topicIdPartition.topicId(), + topicIdPartition.partitionId() + ); + remainingMembersToGetAnExtraPartition--; + } + if (retainedPartitionsCount < currentAssignmentSize) { Review Comment: Should we add a comment here too? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -147,6 +147,11 @@ public static class DeadlineAndEpoch { */ private final TimelineHashMap<String, Integer> subscribedTopicNames; + /** + * Partition assignments per topic. + */ + private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> partitionAssignments; Review Comment: I also wonder whether we could include `targetAssignment` in the same in order to stress that this is the counter part of `targetAssignment`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.Map; + +/** + * The assignment specifications for a consumer group. + */ +public interface GroupSpec { + /** + * @return Member metadata keyed by member Id. + */ + Map<String, AssignmentMemberSpec> members(); + + /** + * @return The group's subscription type. + */ + SubscriptionType subscriptionType(); + + /** + * @return True, if the partition is currently assigned to a member. + * False, otherwise. + */ + boolean isPartitionAssigned(Uuid topicId, int partitionId); + Review Comment: nit: We could remove this empty line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org