rreddy-22 commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1608524020


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
         return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
     }
 
+    /**
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> partitionAssignments() {
+        return Collections.unmodifiableMap(partitionAssignments);
+    }
+
     /**
      * Updates target assignment of a member.
      *
      * @param memberId              The member id.
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updatePartitionAssignments(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
+    /**
+     * Updates partition assignments of the topics.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     *
+     * Package private for testing.
+     */
+    void updatePartitionAssignments(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<Uuid> allTopicIds = new HashSet<>();
+        allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+        allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+        for (Uuid topicId : allTopicIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+            TimelineHashMap<Integer, String> topicPartitionAssignment = 
partitionAssignments.computeIfAbsent(
+                topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {

Review Comment:
   > what is setting and unsetting the assignment to true/false?
   This is for the byte array case like I explained where each index represents 
a partition and you can set it to 1/true when the partition is assigned and to 
0/false otherwise



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
         return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
     }
 
+    /**
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> partitionAssignments() {
+        return Collections.unmodifiableMap(partitionAssignments);
+    }
+
     /**
      * Updates target assignment of a member.
      *
      * @param memberId              The member id.
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updatePartitionAssignments(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
+    /**
+     * Updates partition assignments of the topics.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     *
+     * Package private for testing.
+     */
+    void updatePartitionAssignments(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<Uuid> allTopicIds = new HashSet<>();
+        allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+        allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+        for (Uuid topicId : allTopicIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+            TimelineHashMap<Integer, String> topicPartitionAssignment = 
partitionAssignments.computeIfAbsent(
+                topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {

Review Comment:
   > what is setting and unsetting the assignment to true/false?
   
   This is for the byte array case like I explained where each index represents 
a partition and you can set it to 1/true when the partition is assigned and to 
0/false otherwise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to