dajac commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1611240659
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -517,15 +524,78 @@ public Assignment targetAssignment(String memberId) { } /** - * Updates target assignment of a member. + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ + public Map<Uuid, Map<Integer, String>> invertedTargetAssignment() { + return Collections.unmodifiableMap(invertedTargetAssignment); + } + + /** + * Updates the target assignment of a member. * * @param memberId The member id. * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { Review Comment: @rreddy-22 I was looking at the failed tests and I found a few which are related to your PR. I was curious so I took a deeper look into them and I found that we forgot to update the inverted target assignment in `removeTargetAssignment` too. Could you please take a look and fix this? We should also add more tests, I suppose. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ########## @@ -186,9 +190,33 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } - assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } + public Map<Uuid, Map<Integer, String>> invertedTargetAssignment( Review Comment: I agree with Jeff. Let's try to reuse as they fundamentally do the same thing. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ########## @@ -186,9 +190,33 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } - assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } + public Map<Uuid, Map<Integer, String>> invertedTargetAssignment( Review Comment: nit: The method could be static. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ########## @@ -263,13 +270,36 @@ private void simulateIncrementalRebalance() { Collections.emptyMap() )); - assignmentSpec = new AssignmentSpec(updatedMembers, subscriptionType); + groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, invertedTargetAssignment); + } + + private Map<Uuid, Map<Integer, String>> updateInvertedTargetAssignment( + GroupAssignment targetAssignment + ) { + Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>(topicCount); + for (Uuid topicId : allTopicIds) { + invertedTargetAssignment.put(topicId, new HashMap<>(subscribedTopicDescriber.numPartitions(topicId))); + } Review Comment: This is not needed. I think that we can init it by using `computeIfAbsent` like you did in `invertedTargetAssignment`. With this change, the two methods are identical so we can share the code. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ########## @@ -263,13 +270,36 @@ private void simulateIncrementalRebalance() { Collections.emptyMap() )); - assignmentSpec = new AssignmentSpec(updatedMembers, subscriptionType); + groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, invertedTargetAssignment); + } + + private Map<Uuid, Map<Integer, String>> updateInvertedTargetAssignment( Review Comment: nit: Should we call it `computeInvertedTargetAssignment`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org