mumrah commented on code in PR #19142:
URL: https://github.com/apache/kafka/pull/19142#discussion_r1985507925


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -135,56 +148,71 @@ private GroupAssignment newAssignmentHomogeneous(
         List<TopicIdPartition> targetPartitions,
         Map<TopicIdPartition, List<String>> currentAssignment
     ) {
-        Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
-
-        // Step 1: Hash member IDs to topic partitions.
-        memberHashAssignment(targetPartitions, groupSpec.memberIds(), 
newAssignment);
-
-        // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
-        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
-            .filter(targetPartition -> 
!newAssignment.containsKey(targetPartition))
-            .filter(targetPartition -> 
!currentAssignment.containsKey(targetPartition))
-            .toList();
-
-        roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, 
newAssignment);
-
-        // Step 3: We combine current assignment and new assignment.
-        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
-
-        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
-        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
-        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
-        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
-        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
-        // the share consumers will be addressed in a future PR.
+        // For entirely balanced assignment, we would expect 
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
+        // That can be expressed as         Math.ceil(numTargetPartitions / 
(double) numGroupMembers)
+        // Using integer arithmetic, as     (numTargetPartitions + 
numGroupMembers - 1) / numGroupMembers
+        int numGroupMembers = groupSpec.memberIds().size();
+        int numTargetPartitions = targetPartitions.size();
+        int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 
1) / numGroupMembers;
+
+        Map<TopicIdPartition, List<String>> newAssignment = new 
HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1));
+
+        // Hash member IDs to topic partitions. Each member will be assigned 
one partition, but some partitions
+        // might have been assigned to more than one member.
+        memberHashAssignment(groupSpec.memberIds(), targetPartitions, 
newAssignment);
+
+        // Combine current and new hashed assignments, sized to accommodate 
the expected number of mappings.
+        Map<String, Set<TopicIdPartition>> finalAssignment = new 
HashMap<>((int) ((numGroupMembers / 0.75f) + 1));
+        Map<TopicIdPartition, Set<String>> finalAssignmentByPartition = new 
HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1));
+
+        // First, take the members assigned by hashing.
+        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member -> {
+            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
+            finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> 
new HashSet<>()).add(member);
+        }));
 
-        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member ->
-            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition)));
+        // Then, take the members from the current assignment, making sure 
that no member has too many assigned partitions.
         // When combining current assignment, we need to only consider the 
topics in current assignment that are also being
         // subscribed in the new assignment as well.
         currentAssignment.forEach((targetPartition, members) -> {
-            if (subscribedTopicIds.contains(targetPartition.topicId()))
+            if (subscribedTopicIds.contains(targetPartition.topicId())) {
                 members.forEach(member -> {
-                    if (groupSpec.memberIds().contains(member) && 
!newAssignment.containsKey(targetPartition))
-                        finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
+                    if (groupSpec.memberIds().contains(member)) {
+                        Set<TopicIdPartition> memberPartitions = 
finalAssignment.computeIfAbsent(member, k -> new HashSet<>());
+                        if ((memberPartitions.size() < desiredAssignmentCount) 
&& !newAssignment.containsKey(targetPartition)) {
+                            memberPartitions.add(targetPartition);
+                            
finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new 
HashSet<>()).add(member);
+                        }
+                    }
                 });
+            }
         });
 
+        // Finally, round-robin assignment for unassigned partitions which do 
not already have members assigned.
+        // The order of steps differs slightly from KIP-932 because the 
desired assignment count has been taken into
+        // account when copying partitions across from the current assignment, 
and this is more convenient.
+        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
+            .filter(targetPartition -> 
!finalAssignmentByPartition.containsKey(targetPartition))
+            .toList();
+
+        roundRobinAssignmentWithCount(groupSpec.memberIds(), 
unassignedPartitions, finalAssignment, desiredAssignmentCount);

Review Comment:
   Even though it should never be the case (since we calculate 
`desiredAssignmentCount` above), should we check that unassignedPartitions is 
empty after this call?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -135,56 +148,71 @@ private GroupAssignment newAssignmentHomogeneous(
         List<TopicIdPartition> targetPartitions,
         Map<TopicIdPartition, List<String>> currentAssignment
     ) {
-        Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
-
-        // Step 1: Hash member IDs to topic partitions.
-        memberHashAssignment(targetPartitions, groupSpec.memberIds(), 
newAssignment);
-
-        // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
-        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
-            .filter(targetPartition -> 
!newAssignment.containsKey(targetPartition))
-            .filter(targetPartition -> 
!currentAssignment.containsKey(targetPartition))
-            .toList();
-
-        roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, 
newAssignment);
-
-        // Step 3: We combine current assignment and new assignment.
-        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
-
-        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
-        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
-        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
-        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
-        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
-        // the share consumers will be addressed in a future PR.
+        // For entirely balanced assignment, we would expect 
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
+        // That can be expressed as         Math.ceil(numTargetPartitions / 
(double) numGroupMembers)
+        // Using integer arithmetic, as     (numTargetPartitions + 
numGroupMembers - 1) / numGroupMembers
+        int numGroupMembers = groupSpec.memberIds().size();
+        int numTargetPartitions = targetPartitions.size();
+        int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 
1) / numGroupMembers;
+
+        Map<TopicIdPartition, List<String>> newAssignment = new 
HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1));

Review Comment:
   Are we doing this to avoid rehashing as we fill up these maps? If so, can we 
make a little utility method that does this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -318,4 +367,22 @@ private List<TopicIdPartition> computeTargetPartitions(
         });
         return targetPartitions;
     }
+
+    private GroupAssignment groupAssignment(
+        Map<String, Set<TopicIdPartition>> assignmentByMember,
+        Collection<String> allGroupMembers
+    ) {
+        Map<String, MemberAssignment> members = new HashMap<>();
+        for (Map.Entry<String, Set<TopicIdPartition>> entry : 
assignmentByMember.entrySet()) {
+            Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
+            entry.getValue().forEach(targetPartition -> 
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new 
HashSet<>()).add(targetPartition.partitionId()));

Review Comment:
   nit: maybe break after the first `->`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous(
         return groupAssignment(finalAssignment, groupSpec.memberIds());
     }
 
-    private GroupAssignment groupAssignment(
-        Map<String, Set<TopicIdPartition>> assignmentByMember,
-        Collection<String> allGroupMembers
-    ) {
-        Map<String, MemberAssignment> members = new HashMap<>();
-        for (Map.Entry<String, Set<TopicIdPartition>> entry : 
assignmentByMember.entrySet()) {
-            Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
-            entry.getValue().forEach(targetPartition -> 
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new 
HashSet<>()).add(targetPartition.partitionId()));
-            members.put(entry.getKey(), new 
MemberAssignmentImpl(targetPartitions));
-        }
-        allGroupMembers.forEach(member -> {
-            if (!members.containsKey(member))
-                members.put(member, new MemberAssignmentImpl(new HashMap<>()));
-        });
-
-        return new GroupAssignment(members);
-    }
-
     /**
      * This function updates assignment by hashing the member IDs of the 
members and maps the partitions assigned to the
-     * members based on the hash. This gives approximately even balance.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned.
-     * @param assignment - the existing assignment by topic partition. We need 
to pass it as a parameter because this
-     *                   function would be called multiple times for 
heterogeneous assignment.
+     * members based on the hash, one partition per member. This gives 
approximately even balance.
+     * @param memberIds           The member ids to which the topic partitions 
need to be assigned.
+     * @param partitionsToAssign  The subscribed topic partitions which needs 
assignment.
+     * @param assignment          The existing assignment by topic partition. 
We need to pass it as a parameter because this
+     *                            method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void memberHashAssignment(
-        List<TopicIdPartition> unassignedPartitions,
         Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
-        if (!unassignedPartitions.isEmpty())
+        if (!partitionsToAssign.isEmpty()) {
             for (String memberId : memberIds) {
-                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
unassignedPartitions.size());
-                TopicIdPartition topicPartition = 
unassignedPartitions.get(topicPartitionIndex);
+                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
partitionsToAssign.size());
+                TopicIdPartition topicPartition = 
partitionsToAssign.get(topicPartitionIndex);
                 assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
             }
+        }
     }
 
     /**
-     * This functions assigns topic partitions to members by round-robin 
approach and updates the existing assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned, should be non-empty.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param assignment - the existing assignment by topic partition.
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void roundRobinAssignment(
         Collection<String> memberIds,
-        List<TopicIdPartition> unassignedPartitions,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
         // We iterate through the target partitions and assign a memberId to 
them. In case we run out of members (members < targetPartitions),
         // we again start from the starting index of memberIds.
         Iterator<String> memberIdIterator = memberIds.iterator();
-        for (TopicIdPartition targetPartition : unassignedPartitions) {
+        for (TopicIdPartition topicPartition : partitionsToAssign) {
             if (!memberIdIterator.hasNext()) {
                 memberIdIterator = memberIds.iterator();
             }
             String memberId = memberIdIterator.next();
-            assignment.computeIfAbsent(targetPartition, k -> new 
ArrayList<>()).add(memberId);
+            assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
+        }
+    }
+
+    /**
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
+     * @param desiredAssignmentCount The number of partitions which can be 
assigned to each member to give even balance.
+     *                               Note that this number is slightly higher 
than strictly required to allow for situations
+     *                               in which we have hashing collisions.
+     */
+    void roundRobinAssignmentWithCount(

Review Comment:
   Can we add some direct unit tests for this method?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous(
         return groupAssignment(finalAssignment, groupSpec.memberIds());
     }
 
-    private GroupAssignment groupAssignment(
-        Map<String, Set<TopicIdPartition>> assignmentByMember,
-        Collection<String> allGroupMembers
-    ) {
-        Map<String, MemberAssignment> members = new HashMap<>();
-        for (Map.Entry<String, Set<TopicIdPartition>> entry : 
assignmentByMember.entrySet()) {
-            Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
-            entry.getValue().forEach(targetPartition -> 
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new 
HashSet<>()).add(targetPartition.partitionId()));
-            members.put(entry.getKey(), new 
MemberAssignmentImpl(targetPartitions));
-        }
-        allGroupMembers.forEach(member -> {
-            if (!members.containsKey(member))
-                members.put(member, new MemberAssignmentImpl(new HashMap<>()));
-        });
-
-        return new GroupAssignment(members);
-    }
-
     /**
      * This function updates assignment by hashing the member IDs of the 
members and maps the partitions assigned to the
-     * members based on the hash. This gives approximately even balance.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned.
-     * @param assignment - the existing assignment by topic partition. We need 
to pass it as a parameter because this
-     *                   function would be called multiple times for 
heterogeneous assignment.
+     * members based on the hash, one partition per member. This gives 
approximately even balance.
+     * @param memberIds           The member ids to which the topic partitions 
need to be assigned.
+     * @param partitionsToAssign  The subscribed topic partitions which needs 
assignment.
+     * @param assignment          The existing assignment by topic partition. 
We need to pass it as a parameter because this
+     *                            method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void memberHashAssignment(
-        List<TopicIdPartition> unassignedPartitions,
         Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
-        if (!unassignedPartitions.isEmpty())
+        if (!partitionsToAssign.isEmpty()) {
             for (String memberId : memberIds) {
-                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
unassignedPartitions.size());
-                TopicIdPartition topicPartition = 
unassignedPartitions.get(topicPartitionIndex);
+                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
partitionsToAssign.size());
+                TopicIdPartition topicPartition = 
partitionsToAssign.get(topicPartitionIndex);
                 assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
             }
+        }
     }
 
     /**
-     * This functions assigns topic partitions to members by round-robin 
approach and updates the existing assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned, should be non-empty.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param assignment - the existing assignment by topic partition.
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void roundRobinAssignment(
         Collection<String> memberIds,
-        List<TopicIdPartition> unassignedPartitions,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
         // We iterate through the target partitions and assign a memberId to 
them. In case we run out of members (members < targetPartitions),
         // we again start from the starting index of memberIds.
         Iterator<String> memberIdIterator = memberIds.iterator();
-        for (TopicIdPartition targetPartition : unassignedPartitions) {
+        for (TopicIdPartition topicPartition : partitionsToAssign) {
             if (!memberIdIterator.hasNext()) {
                 memberIdIterator = memberIds.iterator();
             }
             String memberId = memberIdIterator.next();
-            assignment.computeIfAbsent(targetPartition, k -> new 
ArrayList<>()).add(memberId);
+            assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
+        }
+    }
+
+    /**
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
+     * @param desiredAssignmentCount The number of partitions which can be 
assigned to each member to give even balance.
+     *                               Note that this number is slightly higher 
than strictly required to allow for situations
+     *                               in which we have hashing collisions.
+     */
+    void roundRobinAssignmentWithCount(
+        Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
+        Map<String, Set<TopicIdPartition>> assignment,
+        int desiredAssignmentCount
+    ) {
+        Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
+
+        // We iterate through the target partitions which are not in the 
assignment and assign a memberId to them.
+        // In case we run out of members (memberIds < partitionsToAssign), we 
again start from the starting index of memberIds.
+        Iterator<String> memberIdIterator = memberIdsCopy.iterator();
+        ListIterator<TopicIdPartition> partitionListIterator = 
partitionsToAssign.listIterator();
+        while (partitionListIterator.hasNext()) {
+            TopicIdPartition partition = partitionListIterator.next();
+            if (!memberIdIterator.hasNext()) {
+                memberIdIterator = memberIdsCopy.iterator();

Review Comment:
   I see we have this cycling iterator pattern twice already. I imagine it's a 
common pattern for things like assignment. Maybe we should create a utility 
method/class that provides this functionality and adds things like the infinite 
loop check below. Doesn't have to be done in this PR, just an idea



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous(
         return groupAssignment(finalAssignment, groupSpec.memberIds());
     }
 
-    private GroupAssignment groupAssignment(
-        Map<String, Set<TopicIdPartition>> assignmentByMember,
-        Collection<String> allGroupMembers
-    ) {
-        Map<String, MemberAssignment> members = new HashMap<>();
-        for (Map.Entry<String, Set<TopicIdPartition>> entry : 
assignmentByMember.entrySet()) {
-            Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
-            entry.getValue().forEach(targetPartition -> 
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new 
HashSet<>()).add(targetPartition.partitionId()));
-            members.put(entry.getKey(), new 
MemberAssignmentImpl(targetPartitions));
-        }
-        allGroupMembers.forEach(member -> {
-            if (!members.containsKey(member))
-                members.put(member, new MemberAssignmentImpl(new HashMap<>()));
-        });
-
-        return new GroupAssignment(members);
-    }
-
     /**
      * This function updates assignment by hashing the member IDs of the 
members and maps the partitions assigned to the
-     * members based on the hash. This gives approximately even balance.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned.
-     * @param assignment - the existing assignment by topic partition. We need 
to pass it as a parameter because this
-     *                   function would be called multiple times for 
heterogeneous assignment.
+     * members based on the hash, one partition per member. This gives 
approximately even balance.
+     * @param memberIds           The member ids to which the topic partitions 
need to be assigned.
+     * @param partitionsToAssign  The subscribed topic partitions which needs 
assignment.
+     * @param assignment          The existing assignment by topic partition. 
We need to pass it as a parameter because this
+     *                            method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void memberHashAssignment(
-        List<TopicIdPartition> unassignedPartitions,
         Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
-        if (!unassignedPartitions.isEmpty())
+        if (!partitionsToAssign.isEmpty()) {
             for (String memberId : memberIds) {
-                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
unassignedPartitions.size());
-                TopicIdPartition topicPartition = 
unassignedPartitions.get(topicPartitionIndex);
+                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
partitionsToAssign.size());
+                TopicIdPartition topicPartition = 
partitionsToAssign.get(topicPartitionIndex);
                 assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
             }
+        }
     }
 
     /**
-     * This functions assigns topic partitions to members by round-robin 
approach and updates the existing assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned, should be non-empty.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param assignment - the existing assignment by topic partition.
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void roundRobinAssignment(
         Collection<String> memberIds,
-        List<TopicIdPartition> unassignedPartitions,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
         // We iterate through the target partitions and assign a memberId to 
them. In case we run out of members (members < targetPartitions),
         // we again start from the starting index of memberIds.
         Iterator<String> memberIdIterator = memberIds.iterator();
-        for (TopicIdPartition targetPartition : unassignedPartitions) {
+        for (TopicIdPartition topicPartition : partitionsToAssign) {
             if (!memberIdIterator.hasNext()) {
                 memberIdIterator = memberIds.iterator();
             }
             String memberId = memberIdIterator.next();
-            assignment.computeIfAbsent(targetPartition, k -> new 
ArrayList<>()).add(memberId);
+            assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
+        }
+    }
+
+    /**
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
+     * @param desiredAssignmentCount The number of partitions which can be 
assigned to each member to give even balance.
+     *                               Note that this number is slightly higher 
than strictly required to allow for situations
+     *                               in which we have hashing collisions.
+     */
+    void roundRobinAssignmentWithCount(
+        Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
+        Map<String, Set<TopicIdPartition>> assignment,
+        int desiredAssignmentCount
+    ) {
+        Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
+
+        // We iterate through the target partitions which are not in the 
assignment and assign a memberId to them.
+        // In case we run out of members (memberIds < partitionsToAssign), we 
again start from the starting index of memberIds.
+        Iterator<String> memberIdIterator = memberIdsCopy.iterator();
+        ListIterator<TopicIdPartition> partitionListIterator = 
partitionsToAssign.listIterator();
+        while (partitionListIterator.hasNext()) {
+            TopicIdPartition partition = partitionListIterator.next();
+            if (!memberIdIterator.hasNext()) {
+                memberIdIterator = memberIdsCopy.iterator();
+                if (memberIdsCopy.isEmpty()) {
+                    // This should never happen, but guarding against an 
infinite loop
+                    throw new PartitionAssignorException("Inconsistent number 
of member IDs");
+                }
+            }
+            String memberId = memberIdIterator.next();
+            Set<TopicIdPartition> memberPartitions = 
assignment.computeIfAbsent(memberId, k -> new HashSet<>());
+            if (memberPartitions.size() <= desiredAssignmentCount) {
+                memberPartitions.add(partition);
+            } else {
+                memberIdIterator.remove();
+                partitionListIterator.previous();

Review Comment:
   What happens if all members have reached the desired count but we still have 
partitions remaining? I'm not sure, but I think it might end up in an infinite 
loop?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -135,56 +148,71 @@ private GroupAssignment newAssignmentHomogeneous(
         List<TopicIdPartition> targetPartitions,
         Map<TopicIdPartition, List<String>> currentAssignment
     ) {
-        Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
-
-        // Step 1: Hash member IDs to topic partitions.
-        memberHashAssignment(targetPartitions, groupSpec.memberIds(), 
newAssignment);
-
-        // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
-        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
-            .filter(targetPartition -> 
!newAssignment.containsKey(targetPartition))
-            .filter(targetPartition -> 
!currentAssignment.containsKey(targetPartition))
-            .toList();
-
-        roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, 
newAssignment);
-
-        // Step 3: We combine current assignment and new assignment.
-        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
-
-        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
-        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
-        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
-        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
-        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
-        // the share consumers will be addressed in a future PR.
+        // For entirely balanced assignment, we would expect 
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
+        // That can be expressed as         Math.ceil(numTargetPartitions / 
(double) numGroupMembers)
+        // Using integer arithmetic, as     (numTargetPartitions + 
numGroupMembers - 1) / numGroupMembers
+        int numGroupMembers = groupSpec.memberIds().size();
+        int numTargetPartitions = targetPartitions.size();
+        int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 
1) / numGroupMembers;
+
+        Map<TopicIdPartition, List<String>> newAssignment = new 
HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1));
+
+        // Hash member IDs to topic partitions. Each member will be assigned 
one partition, but some partitions
+        // might have been assigned to more than one member.
+        memberHashAssignment(groupSpec.memberIds(), targetPartitions, 
newAssignment);
+
+        // Combine current and new hashed assignments, sized to accommodate 
the expected number of mappings.
+        Map<String, Set<TopicIdPartition>> finalAssignment = new 
HashMap<>((int) ((numGroupMembers / 0.75f) + 1));
+        Map<TopicIdPartition, Set<String>> finalAssignmentByPartition = new 
HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1));
+
+        // First, take the members assigned by hashing.
+        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member -> {
+            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
+            finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> 
new HashSet<>()).add(member);
+        }));
 
-        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member ->
-            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition)));
+        // Then, take the members from the current assignment, making sure 
that no member has too many assigned partitions.
         // When combining current assignment, we need to only consider the 
topics in current assignment that are also being
         // subscribed in the new assignment as well.
         currentAssignment.forEach((targetPartition, members) -> {
-            if (subscribedTopicIds.contains(targetPartition.topicId()))
+            if (subscribedTopicIds.contains(targetPartition.topicId())) {

Review Comment:
   I want to double check my understanding.
   
   Here we start with the current assignment and remove any topics which are no 
longer subscribed and remove any members which aren't currently around. We then 
copy assignments from currentAssignment into finalAssignment up to the desired 
number of assignments per member.
   
   This leaves us with a subset of currentAssignment and possibly a number of 
unassigned partitions. 
   
   We then round-robbin the remaining unassigned partitions into 
currentAssignment (up to the desired count). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to