gharris1727 commented on code in PR #17038:
URL: https://github.com/apache/kafka/pull/17038#discussion_r1742462093


##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java:
##########
@@ -40,7 +40,7 @@ public class ListConsumerGroupOffsetsResult {
 
     final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures;
 
-    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+    public ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {

Review Comment:
   This is a change to the public interface, which would require a KIP. Please 
revert this change and find an alternative. Thanks!



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -262,12 +264,13 @@ private void createInternalTopics() {
         );
     }
 
-    Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String 
group)
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> 
groups)
             throws InterruptedException, ExecutionException {
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = groups.stream()
+                .collect(Collectors.toMap(group -> group, group -> new 
ListConsumerGroupOffsetsSpec()));

Review Comment:
   nit: create one of these Spec objects and reuse it for every entry in the 
map.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##########
@@ -151,7 +155,14 @@ public void testFindConsumerGroups() throws Exception {
         doReturn(groups).when(connector).listConsumerGroups();
         
doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString());
         
doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString());
-        
doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString());

Review Comment:
   I think this leaves the `offsets` variable unused.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##########
@@ -151,7 +155,14 @@ public void testFindConsumerGroups() throws Exception {
         doReturn(groups).when(connector).listConsumerGroups();
         
doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString());
         
doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString());
-        
doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString());
+
+        Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> futures = new HashMap<>();
+        futures.put(CoordinatorKey.byGroupId("g1"), 
KafkaFuture.completedFuture(offsets));
+        futures.put(CoordinatorKey.byGroupId("g2"), 
KafkaFuture.completedFuture(offsets));
+        ListConsumerGroupOffsetsResult offsetsResult = new 
ListConsumerGroupOffsetsResult(futures);
+        offsetsResult = spy(offsetsResult);

Review Comment:
   I think you can avoid needing to change the constructor visibility if you 
use mock(ListConsumerGroupOffsetsResult.class) instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to