mumrah commented on code in PR #19142: URL: https://github.com/apache/kafka/pull/19142#discussion_r1985507925
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -135,56 +148,71 @@ private GroupAssignment newAssignmentHomogeneous( List<TopicIdPartition> targetPartitions, Map<TopicIdPartition, List<String>> currentAssignment ) { - Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>(); - - // Step 1: Hash member IDs to topic partitions. - memberHashAssignment(targetPartitions, groupSpec.memberIds(), newAssignment); - - // Step 2: Round-robin assignment for unassigned partitions which do not have members already assigned in the current assignment. - List<TopicIdPartition> unassignedPartitions = targetPartitions.stream() - .filter(targetPartition -> !newAssignment.containsKey(targetPartition)) - .filter(targetPartition -> !currentAssignment.containsKey(targetPartition)) - .toList(); - - roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, newAssignment); - - // Step 3: We combine current assignment and new assignment. - Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>(); - - // As per the KIP, we should revoke the assignments from current assignment for partitions that were assigned by step 1 - // in the new assignment and have members in current assignment by step 2. But we haven't implemented it to avoid the - // complexity in both the implementation and the run time complexity. This step was mentioned in the KIP to reduce - // the burden of certain members of the share groups. This can be achieved with the help of limiting the max - // no. of partitions assignment for every member(KAFKA-18788). Hence, the potential problem of burdening - // the share consumers will be addressed in a future PR. + // For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards. + // That can be expressed as Math.ceil(numTargetPartitions / (double) numGroupMembers) + // Using integer arithmetic, as (numTargetPartitions + numGroupMembers - 1) / numGroupMembers + int numGroupMembers = groupSpec.memberIds().size(); + int numTargetPartitions = targetPartitions.size(); + int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 1) / numGroupMembers; + + Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1)); + + // Hash member IDs to topic partitions. Each member will be assigned one partition, but some partitions + // might have been assigned to more than one member. + memberHashAssignment(groupSpec.memberIds(), targetPartitions, newAssignment); + + // Combine current and new hashed assignments, sized to accommodate the expected number of mappings. + Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>((int) ((numGroupMembers / 0.75f) + 1)); + Map<TopicIdPartition, Set<String>> finalAssignmentByPartition = new HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1)); + + // First, take the members assigned by hashing. + newAssignment.forEach((targetPartition, members) -> members.forEach(member -> { + finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition); + finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new HashSet<>()).add(member); + })); - newAssignment.forEach((targetPartition, members) -> members.forEach(member -> - finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition))); + // Then, take the members from the current assignment, making sure that no member has too many assigned partitions. // When combining current assignment, we need to only consider the topics in current assignment that are also being // subscribed in the new assignment as well. currentAssignment.forEach((targetPartition, members) -> { - if (subscribedTopicIds.contains(targetPartition.topicId())) + if (subscribedTopicIds.contains(targetPartition.topicId())) { members.forEach(member -> { - if (groupSpec.memberIds().contains(member) && !newAssignment.containsKey(targetPartition)) - finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition); + if (groupSpec.memberIds().contains(member)) { + Set<TopicIdPartition> memberPartitions = finalAssignment.computeIfAbsent(member, k -> new HashSet<>()); + if ((memberPartitions.size() < desiredAssignmentCount) && !newAssignment.containsKey(targetPartition)) { + memberPartitions.add(targetPartition); + finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new HashSet<>()).add(member); + } + } }); + } }); + // Finally, round-robin assignment for unassigned partitions which do not already have members assigned. + // The order of steps differs slightly from KIP-932 because the desired assignment count has been taken into + // account when copying partitions across from the current assignment, and this is more convenient. + List<TopicIdPartition> unassignedPartitions = targetPartitions.stream() + .filter(targetPartition -> !finalAssignmentByPartition.containsKey(targetPartition)) + .toList(); + + roundRobinAssignmentWithCount(groupSpec.memberIds(), unassignedPartitions, finalAssignment, desiredAssignmentCount); Review Comment: Even though it should never be the case (since we calculate `desiredAssignmentCount` above), should we check that unassignedPartitions is empty after this call? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -135,56 +148,71 @@ private GroupAssignment newAssignmentHomogeneous( List<TopicIdPartition> targetPartitions, Map<TopicIdPartition, List<String>> currentAssignment ) { - Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>(); - - // Step 1: Hash member IDs to topic partitions. - memberHashAssignment(targetPartitions, groupSpec.memberIds(), newAssignment); - - // Step 2: Round-robin assignment for unassigned partitions which do not have members already assigned in the current assignment. - List<TopicIdPartition> unassignedPartitions = targetPartitions.stream() - .filter(targetPartition -> !newAssignment.containsKey(targetPartition)) - .filter(targetPartition -> !currentAssignment.containsKey(targetPartition)) - .toList(); - - roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, newAssignment); - - // Step 3: We combine current assignment and new assignment. - Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>(); - - // As per the KIP, we should revoke the assignments from current assignment for partitions that were assigned by step 1 - // in the new assignment and have members in current assignment by step 2. But we haven't implemented it to avoid the - // complexity in both the implementation and the run time complexity. This step was mentioned in the KIP to reduce - // the burden of certain members of the share groups. This can be achieved with the help of limiting the max - // no. of partitions assignment for every member(KAFKA-18788). Hence, the potential problem of burdening - // the share consumers will be addressed in a future PR. + // For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards. + // That can be expressed as Math.ceil(numTargetPartitions / (double) numGroupMembers) + // Using integer arithmetic, as (numTargetPartitions + numGroupMembers - 1) / numGroupMembers + int numGroupMembers = groupSpec.memberIds().size(); + int numTargetPartitions = targetPartitions.size(); + int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 1) / numGroupMembers; + + Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1)); Review Comment: Are we doing this to avoid rehashing as we fill up these maps? If so, can we make a little utility method that does this? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -318,4 +367,22 @@ private List<TopicIdPartition> computeTargetPartitions( }); return targetPartitions; } + + private GroupAssignment groupAssignment( + Map<String, Set<TopicIdPartition>> assignmentByMember, + Collection<String> allGroupMembers + ) { + Map<String, MemberAssignment> members = new HashMap<>(); + for (Map.Entry<String, Set<TopicIdPartition>> entry : assignmentByMember.entrySet()) { + Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>(); + entry.getValue().forEach(targetPartition -> targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId())); Review Comment: nit: maybe break after the first `->`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous( return groupAssignment(finalAssignment, groupSpec.memberIds()); } - private GroupAssignment groupAssignment( - Map<String, Set<TopicIdPartition>> assignmentByMember, - Collection<String> allGroupMembers - ) { - Map<String, MemberAssignment> members = new HashMap<>(); - for (Map.Entry<String, Set<TopicIdPartition>> entry : assignmentByMember.entrySet()) { - Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>(); - entry.getValue().forEach(targetPartition -> targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId())); - members.put(entry.getKey(), new MemberAssignmentImpl(targetPartitions)); - } - allGroupMembers.forEach(member -> { - if (!members.containsKey(member)) - members.put(member, new MemberAssignmentImpl(new HashMap<>())); - }); - - return new GroupAssignment(members); - } - /** * This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the - * members based on the hash. This gives approximately even balance. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned. - * @param assignment - the existing assignment by topic partition. We need to pass it as a parameter because this - * function would be called multiple times for heterogeneous assignment. + * members based on the hash, one partition per member. This gives approximately even balance. + * @param memberIds The member ids to which the topic partitions need to be assigned. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void memberHashAssignment( - List<TopicIdPartition> unassignedPartitions, Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { - if (!unassignedPartitions.isEmpty()) + if (!partitionsToAssign.isEmpty()) { for (String memberId : memberIds) { - int topicPartitionIndex = Math.abs(memberId.hashCode() % unassignedPartitions.size()); - TopicIdPartition topicPartition = unassignedPartitions.get(topicPartitionIndex); + int topicPartitionIndex = Math.abs(memberId.hashCode() % partitionsToAssign.size()); + TopicIdPartition topicPartition = partitionsToAssign.get(topicPartitionIndex); assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); } + } } /** - * This functions assigns topic partitions to members by round-robin approach and updates the existing assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned, should be non-empty. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param assignment - the existing assignment by topic partition. + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void roundRobinAssignment( Collection<String> memberIds, - List<TopicIdPartition> unassignedPartitions, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { // We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions), // we again start from the starting index of memberIds. Iterator<String> memberIdIterator = memberIds.iterator(); - for (TopicIdPartition targetPartition : unassignedPartitions) { + for (TopicIdPartition topicPartition : partitionsToAssign) { if (!memberIdIterator.hasNext()) { memberIdIterator = memberIds.iterator(); } String memberId = memberIdIterator.next(); - assignment.computeIfAbsent(targetPartition, k -> new ArrayList<>()).add(memberId); + assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); + } + } + + /** + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. + * @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance. + * Note that this number is slightly higher than strictly required to allow for situations + * in which we have hashing collisions. + */ + void roundRobinAssignmentWithCount( Review Comment: Can we add some direct unit tests for this method? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous( return groupAssignment(finalAssignment, groupSpec.memberIds()); } - private GroupAssignment groupAssignment( - Map<String, Set<TopicIdPartition>> assignmentByMember, - Collection<String> allGroupMembers - ) { - Map<String, MemberAssignment> members = new HashMap<>(); - for (Map.Entry<String, Set<TopicIdPartition>> entry : assignmentByMember.entrySet()) { - Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>(); - entry.getValue().forEach(targetPartition -> targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId())); - members.put(entry.getKey(), new MemberAssignmentImpl(targetPartitions)); - } - allGroupMembers.forEach(member -> { - if (!members.containsKey(member)) - members.put(member, new MemberAssignmentImpl(new HashMap<>())); - }); - - return new GroupAssignment(members); - } - /** * This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the - * members based on the hash. This gives approximately even balance. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned. - * @param assignment - the existing assignment by topic partition. We need to pass it as a parameter because this - * function would be called multiple times for heterogeneous assignment. + * members based on the hash, one partition per member. This gives approximately even balance. + * @param memberIds The member ids to which the topic partitions need to be assigned. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void memberHashAssignment( - List<TopicIdPartition> unassignedPartitions, Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { - if (!unassignedPartitions.isEmpty()) + if (!partitionsToAssign.isEmpty()) { for (String memberId : memberIds) { - int topicPartitionIndex = Math.abs(memberId.hashCode() % unassignedPartitions.size()); - TopicIdPartition topicPartition = unassignedPartitions.get(topicPartitionIndex); + int topicPartitionIndex = Math.abs(memberId.hashCode() % partitionsToAssign.size()); + TopicIdPartition topicPartition = partitionsToAssign.get(topicPartitionIndex); assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); } + } } /** - * This functions assigns topic partitions to members by round-robin approach and updates the existing assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned, should be non-empty. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param assignment - the existing assignment by topic partition. + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void roundRobinAssignment( Collection<String> memberIds, - List<TopicIdPartition> unassignedPartitions, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { // We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions), // we again start from the starting index of memberIds. Iterator<String> memberIdIterator = memberIds.iterator(); - for (TopicIdPartition targetPartition : unassignedPartitions) { + for (TopicIdPartition topicPartition : partitionsToAssign) { if (!memberIdIterator.hasNext()) { memberIdIterator = memberIds.iterator(); } String memberId = memberIdIterator.next(); - assignment.computeIfAbsent(targetPartition, k -> new ArrayList<>()).add(memberId); + assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); + } + } + + /** + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. + * @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance. + * Note that this number is slightly higher than strictly required to allow for situations + * in which we have hashing collisions. + */ + void roundRobinAssignmentWithCount( + Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, + Map<String, Set<TopicIdPartition>> assignment, + int desiredAssignmentCount + ) { + Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds); + + // We iterate through the target partitions which are not in the assignment and assign a memberId to them. + // In case we run out of members (memberIds < partitionsToAssign), we again start from the starting index of memberIds. + Iterator<String> memberIdIterator = memberIdsCopy.iterator(); + ListIterator<TopicIdPartition> partitionListIterator = partitionsToAssign.listIterator(); + while (partitionListIterator.hasNext()) { + TopicIdPartition partition = partitionListIterator.next(); + if (!memberIdIterator.hasNext()) { + memberIdIterator = memberIdsCopy.iterator(); Review Comment: I see we have this cycling iterator pattern twice already. I imagine it's a common pattern for things like assignment. Maybe we should create a utility method/class that provides this functionality and adds things like the infinite loop check below. Doesn't have to be done in this PR, just an idea ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous( return groupAssignment(finalAssignment, groupSpec.memberIds()); } - private GroupAssignment groupAssignment( - Map<String, Set<TopicIdPartition>> assignmentByMember, - Collection<String> allGroupMembers - ) { - Map<String, MemberAssignment> members = new HashMap<>(); - for (Map.Entry<String, Set<TopicIdPartition>> entry : assignmentByMember.entrySet()) { - Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>(); - entry.getValue().forEach(targetPartition -> targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId())); - members.put(entry.getKey(), new MemberAssignmentImpl(targetPartitions)); - } - allGroupMembers.forEach(member -> { - if (!members.containsKey(member)) - members.put(member, new MemberAssignmentImpl(new HashMap<>())); - }); - - return new GroupAssignment(members); - } - /** * This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the - * members based on the hash. This gives approximately even balance. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned. - * @param assignment - the existing assignment by topic partition. We need to pass it as a parameter because this - * function would be called multiple times for heterogeneous assignment. + * members based on the hash, one partition per member. This gives approximately even balance. + * @param memberIds The member ids to which the topic partitions need to be assigned. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void memberHashAssignment( - List<TopicIdPartition> unassignedPartitions, Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { - if (!unassignedPartitions.isEmpty()) + if (!partitionsToAssign.isEmpty()) { for (String memberId : memberIds) { - int topicPartitionIndex = Math.abs(memberId.hashCode() % unassignedPartitions.size()); - TopicIdPartition topicPartition = unassignedPartitions.get(topicPartitionIndex); + int topicPartitionIndex = Math.abs(memberId.hashCode() % partitionsToAssign.size()); + TopicIdPartition topicPartition = partitionsToAssign.get(topicPartitionIndex); assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); } + } } /** - * This functions assigns topic partitions to members by round-robin approach and updates the existing assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned, should be non-empty. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param assignment - the existing assignment by topic partition. + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void roundRobinAssignment( Collection<String> memberIds, - List<TopicIdPartition> unassignedPartitions, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { // We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions), // we again start from the starting index of memberIds. Iterator<String> memberIdIterator = memberIds.iterator(); - for (TopicIdPartition targetPartition : unassignedPartitions) { + for (TopicIdPartition topicPartition : partitionsToAssign) { if (!memberIdIterator.hasNext()) { memberIdIterator = memberIds.iterator(); } String memberId = memberIdIterator.next(); - assignment.computeIfAbsent(targetPartition, k -> new ArrayList<>()).add(memberId); + assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); + } + } + + /** + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. + * @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance. + * Note that this number is slightly higher than strictly required to allow for situations + * in which we have hashing collisions. + */ + void roundRobinAssignmentWithCount( + Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, + Map<String, Set<TopicIdPartition>> assignment, + int desiredAssignmentCount + ) { + Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds); + + // We iterate through the target partitions which are not in the assignment and assign a memberId to them. + // In case we run out of members (memberIds < partitionsToAssign), we again start from the starting index of memberIds. + Iterator<String> memberIdIterator = memberIdsCopy.iterator(); + ListIterator<TopicIdPartition> partitionListIterator = partitionsToAssign.listIterator(); + while (partitionListIterator.hasNext()) { + TopicIdPartition partition = partitionListIterator.next(); + if (!memberIdIterator.hasNext()) { + memberIdIterator = memberIdsCopy.iterator(); + if (memberIdsCopy.isEmpty()) { + // This should never happen, but guarding against an infinite loop + throw new PartitionAssignorException("Inconsistent number of member IDs"); + } + } + String memberId = memberIdIterator.next(); + Set<TopicIdPartition> memberPartitions = assignment.computeIfAbsent(memberId, k -> new HashSet<>()); + if (memberPartitions.size() <= desiredAssignmentCount) { + memberPartitions.add(partition); + } else { + memberIdIterator.remove(); + partitionListIterator.previous(); Review Comment: What happens if all members have reached the desired count but we still have partitions remaining? I'm not sure, but I think it might end up in an infinite loop? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -135,56 +148,71 @@ private GroupAssignment newAssignmentHomogeneous( List<TopicIdPartition> targetPartitions, Map<TopicIdPartition, List<String>> currentAssignment ) { - Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>(); - - // Step 1: Hash member IDs to topic partitions. - memberHashAssignment(targetPartitions, groupSpec.memberIds(), newAssignment); - - // Step 2: Round-robin assignment for unassigned partitions which do not have members already assigned in the current assignment. - List<TopicIdPartition> unassignedPartitions = targetPartitions.stream() - .filter(targetPartition -> !newAssignment.containsKey(targetPartition)) - .filter(targetPartition -> !currentAssignment.containsKey(targetPartition)) - .toList(); - - roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, newAssignment); - - // Step 3: We combine current assignment and new assignment. - Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>(); - - // As per the KIP, we should revoke the assignments from current assignment for partitions that were assigned by step 1 - // in the new assignment and have members in current assignment by step 2. But we haven't implemented it to avoid the - // complexity in both the implementation and the run time complexity. This step was mentioned in the KIP to reduce - // the burden of certain members of the share groups. This can be achieved with the help of limiting the max - // no. of partitions assignment for every member(KAFKA-18788). Hence, the potential problem of burdening - // the share consumers will be addressed in a future PR. + // For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards. + // That can be expressed as Math.ceil(numTargetPartitions / (double) numGroupMembers) + // Using integer arithmetic, as (numTargetPartitions + numGroupMembers - 1) / numGroupMembers + int numGroupMembers = groupSpec.memberIds().size(); + int numTargetPartitions = targetPartitions.size(); + int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 1) / numGroupMembers; + + Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1)); + + // Hash member IDs to topic partitions. Each member will be assigned one partition, but some partitions + // might have been assigned to more than one member. + memberHashAssignment(groupSpec.memberIds(), targetPartitions, newAssignment); + + // Combine current and new hashed assignments, sized to accommodate the expected number of mappings. + Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>((int) ((numGroupMembers / 0.75f) + 1)); + Map<TopicIdPartition, Set<String>> finalAssignmentByPartition = new HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1)); + + // First, take the members assigned by hashing. + newAssignment.forEach((targetPartition, members) -> members.forEach(member -> { + finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition); + finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new HashSet<>()).add(member); + })); - newAssignment.forEach((targetPartition, members) -> members.forEach(member -> - finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition))); + // Then, take the members from the current assignment, making sure that no member has too many assigned partitions. // When combining current assignment, we need to only consider the topics in current assignment that are also being // subscribed in the new assignment as well. currentAssignment.forEach((targetPartition, members) -> { - if (subscribedTopicIds.contains(targetPartition.topicId())) + if (subscribedTopicIds.contains(targetPartition.topicId())) { Review Comment: I want to double check my understanding. Here we start with the current assignment and remove any topics which are no longer subscribed and remove any members which aren't currently around. We then copy assignments from currentAssignment into finalAssignment up to the desired number of assignments per member. This leaves us with a subset of currentAssignment and possibly a number of unassigned partitions. We then round-robbin the remaining unassigned partitions into currentAssignment (up to the desired count). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org