AndrewJSchofield commented on code in PR #19587: URL: https://github.com/apache/kafka/pull/19587#discussion_r2065782916
########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java: ########## @@ -612,6 +615,189 @@ public void testGroupStatesFromString() { assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,")); } + @Test Review Comment: Let's have a test with multiple groups, which of course will fail. ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java: ########## @@ -612,6 +615,189 @@ public void testGroupStatesFromString() { assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,")); } + @Test + public void testDeleteShareGroupOffsetsArgsWithoutTopic() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", "groupId"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupOffsetsArgsWithoutGroup() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete-offsets", "--topic", "t1"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupOffsets() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + when(result.all()).thenReturn(KafkaFuture.completedFuture(null)); + + when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null)); + when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null)); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 3 && !res.getValue().isEmpty()) { + return false; + } + + List<String> expectedResultHeader = List.of("TOPIC", "STATUS"); + List<String> expectedResultValues1 = List.of(firstTopic, "Successful"); + List<String> expectedResultValues2 = List.of(secondTopic, "Successful"); + + return Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1) && + Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2); + }, "Expected a data row and no error in delete offsets result with group: " + firstGroup + " and topic: " + firstTopic); + } + } + + @Test + public void testDeleteShareGroupOffsetsTopLevelError() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>(); + String errorMessage = "Group g3 not found."; + GroupIdNotFoundException exception = new GroupIdNotFoundException(errorMessage); + + resultFuture.completeExceptionally(exception); + when(result.all()).thenReturn(resultFuture); + + when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture); + when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 5 && !res.getValue().isEmpty()) { + return false; + } + + List<String> errorLine = Stream.concat( + Stream.of("Error:"), + Arrays.stream(errorMessage.trim().split("\\s+")) + ).toList(); + + List<String> firstLine = new ArrayList<>(errorLine); + + List<String> secondLine = List.of("TOPIC", "STATUS"); + + List<String> thirdLine = new ArrayList<>(); + thirdLine.add(firstTopic); + thirdLine.addAll(errorLine); + + List<String> fourthLine = new ArrayList<>(); + fourthLine.add(secondTopic); + fourthLine.addAll(errorLine); + + return Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(firstLine) && + Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(secondLine) && + Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(thirdLine) && + Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(fourthLine); + }, "Expected a data row and no error in delete offsets result with group: " + firstGroup + " and topic: " + firstTopic); + } + } + + @Test + public void testDeleteShareGroupOffsetsTopicLevelError() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>(); + String errorMessage = Errors.UNKNOWN_TOPIC_OR_PARTITION.message(); + + resultFuture.completeExceptionally(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()); + when(result.all()).thenReturn(resultFuture); + + when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null)); + when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 5 && !res.getValue().isEmpty()) { + return false; + } + + List<String> errorLine = Stream.concat( + Stream.of("Error:"), + Arrays.stream(errorMessage.trim().split("\\s+")) + ).toList(); + + List<String> firstLine = List.of("TOPIC", "STATUS"); + + List<String> secondLine = List.of(firstTopic, "Successful"); Review Comment: nit: Extraneous space :) ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java: ########## @@ -612,6 +615,189 @@ public void testGroupStatesFromString() { assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,")); } + @Test + public void testDeleteShareGroupOffsetsArgsWithoutTopic() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", "groupId"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupOffsetsArgsWithoutGroup() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete-offsets", "--topic", "t1"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupOffsets() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + when(result.all()).thenReturn(KafkaFuture.completedFuture(null)); + + when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null)); + when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null)); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 3 && !res.getValue().isEmpty()) { + return false; + } + + List<String> expectedResultHeader = List.of("TOPIC", "STATUS"); + List<String> expectedResultValues1 = List.of(firstTopic, "Successful"); + List<String> expectedResultValues2 = List.of(secondTopic, "Successful"); + + return Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1) && + Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2); + }, "Expected a data row and no error in delete offsets result with group: " + firstGroup + " and topic: " + firstTopic); + } + } + + @Test + public void testDeleteShareGroupOffsetsTopLevelError() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>(); + String errorMessage = "Group g3 not found."; + GroupIdNotFoundException exception = new GroupIdNotFoundException(errorMessage); + + resultFuture.completeExceptionally(exception); + when(result.all()).thenReturn(resultFuture); + + when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture); + when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 5 && !res.getValue().isEmpty()) { + return false; + } + + List<String> errorLine = Stream.concat( + Stream.of("Error:"), + Arrays.stream(errorMessage.trim().split("\\s+")) + ).toList(); + + List<String> firstLine = new ArrayList<>(errorLine); Review Comment: This could be a bit cleaner if you gave the lines more descriptive names, such as `errorLine`, `expectedResultHeader`, `expectedResultValue1` and `expectedResultValue2`. Then the test cases would be more consistent with each other. ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java: ########## @@ -873,6 +1059,10 @@ private Runnable describeGroups(ShareGroupCommand.ShareGroupService service) { return () -> Assertions.assertDoesNotThrow(service::describeGroups); } + private Runnable deleteOffsets(ShareGroupCommand.ShareGroupService service) { + return () -> Assertions.assertDoesNotThrow(() -> service.deleteOffsets()); Review Comment: I suggest renaming the `deleteOffsets(String groupId, Set<String> topics)` method in the service, and then you can use a method reference here: `service::deleteOffsets`. ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java: ########## @@ -612,6 +615,189 @@ public void testGroupStatesFromString() { assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,")); } + @Test + public void testDeleteShareGroupOffsetsArgsWithoutTopic() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", "groupId"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupOffsetsArgsWithoutGroup() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete-offsets", "--topic", "t1"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupOffsets() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + when(result.all()).thenReturn(KafkaFuture.completedFuture(null)); + + when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null)); + when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null)); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 3 && !res.getValue().isEmpty()) { + return false; + } + + List<String> expectedResultHeader = List.of("TOPIC", "STATUS"); + List<String> expectedResultValues1 = List.of(firstTopic, "Successful"); + List<String> expectedResultValues2 = List.of(secondTopic, "Successful"); + + return Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1) && + Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2); + }, "Expected a data row and no error in delete offsets result with group: " + firstGroup + " and topic: " + firstTopic); + } + } + + @Test + public void testDeleteShareGroupOffsetsTopLevelError() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>(); + String errorMessage = "Group g3 not found."; + GroupIdNotFoundException exception = new GroupIdNotFoundException(errorMessage); + + resultFuture.completeExceptionally(exception); + when(result.all()).thenReturn(resultFuture); + + when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture); + when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 5 && !res.getValue().isEmpty()) { + return false; + } + + List<String> errorLine = Stream.concat( + Stream.of("Error:"), + Arrays.stream(errorMessage.trim().split("\\s+")) + ).toList(); + + List<String> firstLine = new ArrayList<>(errorLine); + + List<String> secondLine = List.of("TOPIC", "STATUS"); + + List<String> thirdLine = new ArrayList<>(); + thirdLine.add(firstTopic); + thirdLine.addAll(errorLine); + + List<String> fourthLine = new ArrayList<>(); + fourthLine.add(secondTopic); + fourthLine.addAll(errorLine); + + return Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(firstLine) && + Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(secondLine) && + Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(thirdLine) && + Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(fourthLine); + }, "Expected a data row and no error in delete offsets result with group: " + firstGroup + " and topic: " + firstTopic); + } + } + + @Test + public void testDeleteShareGroupOffsetsTopicLevelError() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>(); + String errorMessage = Errors.UNKNOWN_TOPIC_OR_PARTITION.message(); + + resultFuture.completeExceptionally(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()); + when(result.all()).thenReturn(resultFuture); + + when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null)); + when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 5 && !res.getValue().isEmpty()) { + return false; + } + + List<String> errorLine = Stream.concat( + Stream.of("Error:"), + Arrays.stream(errorMessage.trim().split("\\s+")) + ).toList(); + + List<String> firstLine = List.of("TOPIC", "STATUS"); + + List<String> secondLine = List.of(firstTopic, "Successful"); + + List<String> thirdLine = new ArrayList<>(); Review Comment: And another. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org