AndrewJSchofield commented on code in PR #19587:
URL: https://github.com/apache/kafka/pull/19587#discussion_r2065782916


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java:
##########
@@ -612,6 +615,189 @@ public void testGroupStatesFromString() {
         assertThrows(IllegalArgumentException.class, () -> 
ShareGroupCommand.groupStatesFromString("   ,   ,"));
     }
 
+    @Test

Review Comment:
   Let's have a test with multiple groups, which of course will fail.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java:
##########
@@ -612,6 +615,189 @@ public void testGroupStatesFromString() {
         assertThrows(IllegalArgumentException.class, () -> 
ShareGroupCommand.groupStatesFromString("   ,   ,"));
     }
 
+    @Test
+    public void testDeleteShareGroupOffsetsArgsWithoutTopic() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", "groupId"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] takes the 
following options: [topic], [group]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsArgsWithoutGroup() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--topic", "t1"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] takes the 
following options: [topic], [group]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsets() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        when(result.all()).thenReturn(KafkaFuture.completedFuture(null));
+
+        
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+        
when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null));
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 3 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+                List<String> expectedResultValues1 = List.of(firstTopic, 
"Successful");
+                List<String> expectedResultValues2 = List.of(secondTopic, 
"Successful");
+
+                return 
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader)
 &&
+                    
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1)
 &&
+                    
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2);
+            }, "Expected a data row and no error in delete offsets result with 
group: " + firstGroup + " and topic: " + firstTopic);
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsTopLevelError() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+        String errorMessage = "Group g3 not found.";
+        GroupIdNotFoundException exception = new 
GroupIdNotFoundException(errorMessage);
+
+        resultFuture.completeExceptionally(exception);
+        when(result.all()).thenReturn(resultFuture);
+
+        when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture);
+        when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 5 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> errorLine = Stream.concat(
+                    Stream.of("Error:"),
+                    Arrays.stream(errorMessage.trim().split("\\s+"))
+                    ).toList();
+
+                List<String> firstLine = new ArrayList<>(errorLine);
+
+                List<String> secondLine = List.of("TOPIC", "STATUS");
+
+                List<String> thirdLine =  new ArrayList<>();
+                thirdLine.add(firstTopic);
+                thirdLine.addAll(errorLine);
+
+                List<String> fourthLine =  new ArrayList<>();
+                fourthLine.add(secondTopic);
+                fourthLine.addAll(errorLine);
+
+                return 
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(firstLine) &&
+                    
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(secondLine) &&
+                    
Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(thirdLine) &&
+                    
Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(fourthLine);
+            }, "Expected a data row and no error in delete offsets result with 
group: " + firstGroup + " and topic: " + firstTopic);
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsTopicLevelError() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+        String errorMessage = Errors.UNKNOWN_TOPIC_OR_PARTITION.message();
+
+        
resultFuture.completeExceptionally(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());
+        when(result.all()).thenReturn(resultFuture);
+
+        
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+        when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 5 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> errorLine = Stream.concat(
+                    Stream.of("Error:"),
+                    Arrays.stream(errorMessage.trim().split("\\s+"))
+                ).toList();
+
+                List<String> firstLine = List.of("TOPIC", "STATUS");
+
+                List<String> secondLine =  List.of(firstTopic, "Successful");

Review Comment:
   nit: Extraneous space :)



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java:
##########
@@ -612,6 +615,189 @@ public void testGroupStatesFromString() {
         assertThrows(IllegalArgumentException.class, () -> 
ShareGroupCommand.groupStatesFromString("   ,   ,"));
     }
 
+    @Test
+    public void testDeleteShareGroupOffsetsArgsWithoutTopic() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", "groupId"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] takes the 
following options: [topic], [group]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsArgsWithoutGroup() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--topic", "t1"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] takes the 
following options: [topic], [group]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsets() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        when(result.all()).thenReturn(KafkaFuture.completedFuture(null));
+
+        
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+        
when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null));
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 3 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+                List<String> expectedResultValues1 = List.of(firstTopic, 
"Successful");
+                List<String> expectedResultValues2 = List.of(secondTopic, 
"Successful");
+
+                return 
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader)
 &&
+                    
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1)
 &&
+                    
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2);
+            }, "Expected a data row and no error in delete offsets result with 
group: " + firstGroup + " and topic: " + firstTopic);
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsTopLevelError() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+        String errorMessage = "Group g3 not found.";
+        GroupIdNotFoundException exception = new 
GroupIdNotFoundException(errorMessage);
+
+        resultFuture.completeExceptionally(exception);
+        when(result.all()).thenReturn(resultFuture);
+
+        when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture);
+        when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 5 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> errorLine = Stream.concat(
+                    Stream.of("Error:"),
+                    Arrays.stream(errorMessage.trim().split("\\s+"))
+                    ).toList();
+
+                List<String> firstLine = new ArrayList<>(errorLine);

Review Comment:
   This could be a bit cleaner if you gave the lines more descriptive names, 
such as `errorLine`, `expectedResultHeader`, `expectedResultValue1` and 
`expectedResultValue2`. Then the test cases would be more consistent with each 
other.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java:
##########
@@ -873,6 +1059,10 @@ private Runnable 
describeGroups(ShareGroupCommand.ShareGroupService service) {
         return () -> Assertions.assertDoesNotThrow(service::describeGroups);
     }
 
+    private Runnable deleteOffsets(ShareGroupCommand.ShareGroupService 
service) {
+        return () -> Assertions.assertDoesNotThrow(() -> 
service.deleteOffsets());

Review Comment:
   I suggest renaming the `deleteOffsets(String groupId, Set<String> topics)` 
method in the service, and then you can use a method reference here: 
`service::deleteOffsets`.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java:
##########
@@ -612,6 +615,189 @@ public void testGroupStatesFromString() {
         assertThrows(IllegalArgumentException.class, () -> 
ShareGroupCommand.groupStatesFromString("   ,   ,"));
     }
 
+    @Test
+    public void testDeleteShareGroupOffsetsArgsWithoutTopic() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", "groupId"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] takes the 
following options: [topic], [group]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsArgsWithoutGroup() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--topic", "t1"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] takes the 
following options: [topic], [group]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsets() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        when(result.all()).thenReturn(KafkaFuture.completedFuture(null));
+
+        
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+        
when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null));
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 3 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+                List<String> expectedResultValues1 = List.of(firstTopic, 
"Successful");
+                List<String> expectedResultValues2 = List.of(secondTopic, 
"Successful");
+
+                return 
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader)
 &&
+                    
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1)
 &&
+                    
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2);
+            }, "Expected a data row and no error in delete offsets result with 
group: " + firstGroup + " and topic: " + firstTopic);
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsTopLevelError() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+        String errorMessage = "Group g3 not found.";
+        GroupIdNotFoundException exception = new 
GroupIdNotFoundException(errorMessage);
+
+        resultFuture.completeExceptionally(exception);
+        when(result.all()).thenReturn(resultFuture);
+
+        when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture);
+        when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 5 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> errorLine = Stream.concat(
+                    Stream.of("Error:"),
+                    Arrays.stream(errorMessage.trim().split("\\s+"))
+                    ).toList();
+
+                List<String> firstLine = new ArrayList<>(errorLine);
+
+                List<String> secondLine = List.of("TOPIC", "STATUS");
+
+                List<String> thirdLine =  new ArrayList<>();
+                thirdLine.add(firstTopic);
+                thirdLine.addAll(errorLine);
+
+                List<String> fourthLine =  new ArrayList<>();
+                fourthLine.add(secondTopic);
+                fourthLine.addAll(errorLine);
+
+                return 
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(firstLine) &&
+                    
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(secondLine) &&
+                    
Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(thirdLine) &&
+                    
Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(fourthLine);
+            }, "Expected a data row and no error in delete offsets result with 
group: " + firstGroup + " and topic: " + firstTopic);
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsTopicLevelError() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+        String errorMessage = Errors.UNKNOWN_TOPIC_OR_PARTITION.message();
+
+        
resultFuture.completeExceptionally(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());
+        when(result.all()).thenReturn(resultFuture);
+
+        
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+        when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 5 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> errorLine = Stream.concat(
+                    Stream.of("Error:"),
+                    Arrays.stream(errorMessage.trim().split("\\s+"))
+                ).toList();
+
+                List<String> firstLine = List.of("TOPIC", "STATUS");
+
+                List<String> secondLine =  List.of(firstTopic, "Successful");
+
+                List<String> thirdLine =  new ArrayList<>();

Review Comment:
   And another.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to