fonsdant commented on code in PR #17038:
URL: https://github.com/apache/kafka/pull/17038#discussion_r1746786740


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -262,12 +264,13 @@ private void createInternalTopics() {
         );
     }
 
-    Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String 
group)
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> 
groups)
             throws InterruptedException, ExecutionException {
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = groups.stream()
+                .collect(Collectors.toMap(group -> group, group -> new 
ListConsumerGroupOffsetsSpec()));
         return adminCall(
-                () -> 
sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(),
-                () -> String.format("list offsets for consumer group %s on %s 
cluster", group,
-                        config.sourceClusterAlias())
+                () -> sourceAdminClient.listConsumerGroupOffsets(groupSpecs),
+                () -> String.format("list offsets for consumer groups %s on %s 
cluster", groups, config.sourceClusterAlias())
         );

Review Comment:
   While looping, we need to get topic partitions and metadata by group. For 
this reason, I have chosen `ListConsumerGroupOffsetsResult` as the return of 
listConsumerGroupOffsets as it allows us to do 
`result.partitionsToOffsetAndMetadata(group).get()` after calling 
`listConsumerGroups(filteredGroups)`.
   
   In `.all().get()` inside `listConsumerGroups` approach, once we have all 
topic partitions and metadata for all consumer groups, we will not be able to 
distinguish each of them by group while looping.
   
   An alternative could be to make a copy of 
`ListConsumerGroupOffsetsResult.all()` that groups future results by group. 
This way, we could continue to get them batched and still have how to 
distinguish what group each one belongs to. But I think it would need KIP, 
right? So it does not seem to be the case.
   
   Let me know if I have missed something.
   
   Many thanks for your review, @gharris1727! :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to