dajac commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1607787555


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
         return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
     }
 
+    /**
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> partitionAssignments() {
+        return Collections.unmodifiableMap(partitionAssignments);
+    }
+
     /**
      * Updates target assignment of a member.
      *
      * @param memberId              The member id.
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updatePartitionAssignments(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
+    /**
+     * Updates partition assignments of the topics.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     *
+     * Package private for testing.
+     */
+    void updatePartitionAssignments(

Review Comment:
   nit: Could we make it private?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -56,11 +66,27 @@ public SubscriptionType subscriptionType() {
         return subscriptionType;
     }
 
+    /**

Review Comment:
   nit: Should we use `{@inheritDoc}` instead of copying the javadoc? I would 
change it for all the methods coming from the interface.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
         return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
     }
 
+    /**
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> partitionAssignments() {
+        return Collections.unmodifiableMap(partitionAssignments);
+    }
+
     /**
      * Updates target assignment of a member.
      *
      * @param memberId              The member id.
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updatePartitionAssignments(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
+    /**
+     * Updates partition assignments of the topics.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     *
+     * Package private for testing.
+     */
+    void updatePartitionAssignments(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<Uuid> allTopicIds = new HashSet<>();
+        allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+        allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+        for (Uuid topicId : allTopicIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+            TimelineHashMap<Integer, String> topicPartitionAssignment = 
partitionAssignments.computeIfAbsent(
+                topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {

Review Comment:
   It may be worth putting a comment about this.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -798,6 +798,44 @@ public void 
testUpdateSubscribedTopicNamesAndSubscriptionType() {
         );
     }
 
+    @Test
+    public void testUpdatePartitionAssignments() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
"test-group", metricsShard);
+        Uuid topicId = Uuid.randomUuid();
+        String memberId1 = "member1";
+        String memberId2 = "member2";
+
+        // Initial assignment for member1
+        Assignment initialAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(0))
+        ));
+        consumerGroup.updateTargetAssignment(memberId1, initialAssignment);
+
+        // New assignment for member1;
+        Assignment newAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(1))
+        ));
+        consumerGroup.updatePartitionAssignments(memberId1, initialAssignment, 
newAssignment);

Review Comment:
   It would be better to alway go via `updateTargetAssignment`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -798,6 +798,44 @@ public void 
testUpdateSubscribedTopicNamesAndSubscriptionType() {
         );
     }
 
+    @Test
+    public void testUpdatePartitionAssignments() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
"test-group", metricsShard);
+        Uuid topicId = Uuid.randomUuid();
+        String memberId1 = "member1";
+        String memberId2 = "member2";
+
+        // Initial assignment for member1
+        Assignment initialAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(0))
+        ));
+        consumerGroup.updateTargetAssignment(memberId1, initialAssignment);
+
+        // New assignment for member1;
+        Assignment newAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(1))
+        ));
+        consumerGroup.updatePartitionAssignments(memberId1, initialAssignment, 
newAssignment);
+
+        // Verify that partition 0 is no longer assigned and partition 1 is 
assigned to member1
+        
assertFalse(consumerGroup.partitionAssignments().get(topicId).containsKey(0));
+        assertEquals(memberId1, 
consumerGroup.partitionAssignments().get(topicId).get(1));
+
+        // New assignment for member2
+        newAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(2))
+        ));
+        consumerGroup.updatePartitionAssignments(memberId2, Assignment.EMPTY, 
newAssignment);
+
+        // Verify that partition 2 is assigned to member2
+        assertEquals(memberId2, 
consumerGroup.partitionAssignments().get(topicId).get(2));
+    }

Review Comment:
   Could we also add test cases to cover the need to verifying the member id 
when we remove a partition? We basically need to cover the case that you 
explained to Jeff in your comment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+
+/**
+ * The assignment specifications for a consumer group.

Review Comment:
   nit: "The group specifications to compute the assignment" or something along 
this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -170,6 +170,11 @@ public static class DeadlineAndEpoch {
      */
     private final TimelineHashMap<String, Assignment> targetAssignment;
 
+    /**
+     * Partition assignments per topic.
+     */
+    private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> 
partitionAssignments;

Review Comment:
   nit: Should we name it `invertedTargetAssignment`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to