dajac commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1611240659


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -517,15 +524,78 @@ public Assignment targetAssignment(String memberId) {
     }
 
     /**
-     * Updates target assignment of a member.
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment() {
+        return Collections.unmodifiableMap(invertedTargetAssignment);
+    }
+
+    /**
+     * Updates the target assignment of a member.
      *
      * @param memberId              The member id.
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {

Review Comment:
   @rreddy-22 I was looking at the failed tests and I found a few which are 
related to your PR. I was curious so I took a deeper look into them and I found 
that we forgot to update the inverted target assignment in 
`removeTargetAssignment` too. Could you please take a look and fix this? We 
should also add more tests, I suppose.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##########
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
                 Collections.emptyMap()
             ));
         }
-        assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+        groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, 
Collections.emptyMap());
     }
 
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment(

Review Comment:
   I agree with Jeff. Let's try to reuse as they fundamentally do the same 
thing.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##########
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
                 Collections.emptyMap()
             ));
         }
-        assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+        groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, 
Collections.emptyMap());
     }
 
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment(

Review Comment:
   nit: The method could be static.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##########
@@ -263,13 +270,36 @@ private void simulateIncrementalRebalance() {
             Collections.emptyMap()
         ));
 
-        assignmentSpec = new AssignmentSpec(updatedMembers, subscriptionType);
+        groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, 
invertedTargetAssignment);
+    }
+
+    private Map<Uuid, Map<Integer, String>> updateInvertedTargetAssignment(
+        GroupAssignment targetAssignment
+    ) {
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new 
HashMap<>(topicCount);
+        for (Uuid topicId : allTopicIds) {
+            invertedTargetAssignment.put(topicId, new 
HashMap<>(subscribedTopicDescriber.numPartitions(topicId)));
+        }

Review Comment:
   This is not needed. I think that we can init it by using `computeIfAbsent` 
like you did in `invertedTargetAssignment`. With this change, the two methods 
are identical so we can share the code.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##########
@@ -263,13 +270,36 @@ private void simulateIncrementalRebalance() {
             Collections.emptyMap()
         ));
 
-        assignmentSpec = new AssignmentSpec(updatedMembers, subscriptionType);
+        groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, 
invertedTargetAssignment);
+    }
+
+    private Map<Uuid, Map<Integer, String>> updateInvertedTargetAssignment(

Review Comment:
   nit: Should we call it `computeInvertedTargetAssignment`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to