AndrewJSchofield commented on code in PR #19977: URL: https://github.com/apache/kafka/pull/19977#discussion_r2158858679
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java: ########## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; +import org.apache.kafka.server.common.TopicIdPartition; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.assignor.SimpleAssignor.computeTargetPartitions; +import static org.apache.kafka.coordinator.group.assignor.SimpleAssignor.newHashMap; +import static org.apache.kafka.coordinator.group.assignor.SimpleAssignor.newHashSet; + +/** + * The homogeneous simple assignment builder is used to generate the target assignment for a share group with + * all its members subscribed to the same set of topics. + * <p> + * Assignments are done according to the following principles: + * <ol> + * <li>Balance: Ensure partitions are distributed equally among all members. + * The difference in assignments sizes between any two members + * should not exceed one partition.</li> + * <li>Stickiness: Minimize partition movements among members by retaining + * as much of the existing assignment as possible.</li> + * </ol> + * <p> + * Balance is prioritized above stickiness. + */ +public class SimpleHomogeneousAssignmentBuilder { + + /** + * The list of all the topic Ids that the share group is subscribed to. + */ + private final Set<Uuid> subscribedTopicIds; + + /** + * The list of members in the consumer group. + */ + private final List<String> memberIds; + + /** + * Maps member ids to their indices in the memberIds list. + */ + private final Map<String, Integer> memberIndices; + + /** + * The list of all the topic-partitions assignable for the share group. + */ + private final List<TopicIdPartition> targetPartitions; + + /** + * The number of members in the share group. + */ + private final int numGroupMembers; + + /** + * The desired sharing for each target partition. + * For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards. + * That can be expressed as: Math.ceil(numTargetPartitions / (double) numGroupMembers) + */ + private final int desiredSharing; + + /** + * The desired number of assignments for each share group member. + * <p> + * Members are stored as integer indices into the memberIds array. + */ + private final int[] desiredAssignmentCount; + + /** + * The share group assignment from the group metadata specification at the start of the assignment operation. + * <p> + * Members are stored as integer indices into the memberIds array. + */ + private final Map<Integer, Map<Uuid, Set<Integer>>> oldGroupAssignment; + + /** + * The share group assignment calculated iteratively by the assignment operation. Entries in this map override those + * in the old group assignment map. + * <p> + * Members are stored as integer indices into the memberIds array. + */ + private final Map<Integer, Map<Uuid, Set<Integer>>> newGroupAssignment; + + /** + * The final assignment keyed by topic-partition mapping to member. + * <p> + * Members are stored as integer indices into the memberIds array. + */ + private final Map<TopicIdPartition, Set<Integer>> finalAssignmentByPartition; + + /** + * The final assignment keyed by member ID mapping to topic-partitions. + * <p> + * Members are stored as integer indices into the memberIds array. + */ + private final Map<Integer, Set<TopicIdPartition>> finalAssignmentByMember; + + /** + * The set of members which have too few assigned partitions. + * <p> + * Members are stored as integer indices into the memberIds array. + */ + private final Set<Integer> unfilledMembers; + + /** + * The set of members which have too many assigned partitions. + * <p> + * Members are stored as integer indices into the memberIds array. + */ + private final Set<Integer> overfilledMembers; + + public SimpleHomogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + this.subscribedTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()).subscribedTopicIds(); + Review Comment: The check is at the start of the `build()` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org