AndrewJSchofield commented on code in PR #19758: URL: https://github.com/apache/kafka/pull/19758#discussion_r2112626437
########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> configOverrides) throws IO props.putAll(configOverrides); return Admin.create(props); } + + Map<String, Throwable> deleteGroups() { + List<String> groupIds = opts.options.has(opts.allGroupsOpt) + ? new ArrayList<>(listStreamsGroups()) + : new ArrayList<>(opts.options.valuesOf(opts.groupOpt)); + + // pre admin call checks + Map<String, Throwable> failed = preAdminCallChecks(groupIds); + + groupIds.removeAll(failed.keySet()); + Map<String, Throwable> success = new HashMap<>(); + Map<String, List<String>> internalTopics = new HashMap<>(); + Map<String, Throwable> internalTopicsDeletionFailures = new HashMap<>(); + if (!groupIds.isEmpty()) { + // retrieve internal topics before deleting groups + internalTopics = retrieveInternalTopics(groupIds); + + // delete streams groups + Map<String, KafkaFuture<Void>> groupsToDelete = adminClient.deleteStreamsGroups( + groupIds, + withTimeoutMs(new DeleteStreamsGroupsOptions()) + ).deletedGroups(); + + groupsToDelete.forEach((g, f) -> { + try { + f.get(); + success.put(g, null); + } catch (InterruptedException ie) { + failed.put(g, ie); + } catch (ExecutionException e) { + failed.put(g, e.getCause()); + } + }); + + // delete internal topics + if (!success.isEmpty()) { + for (String groupId : success.keySet()) { + List<String> internalTopicsToDelete = internalTopics.get(groupId); + if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) { + try { + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete); + deleteTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { + internalTopicsDeletionFailures.put(groupId, e.getCause()); + } + } + } + } + } + + // display outcome messages based on the results + if (failed.isEmpty()) { + System.out.println("Deletion of requested streams groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "') was successful."); + } else { + printError("Deletion of some streams groups failed:", Optional.empty()); + failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + error)); + + if (!success.isEmpty()) { + System.out.println("\nThese streams groups were deleted successfully: " + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "'."); + } + } + if (!internalTopics.keySet().isEmpty()) { + printInternalTopicErrors(internalTopicsDeletionFailures, success.keySet(), internalTopics.keySet()); + } + // for testing purpose: return all failures, including internal topics deletion failures + failed.putAll(success); + failed.putAll(internalTopicsDeletionFailures); + return failed; + } + + private Map<String, Throwable> preAdminCallChecks(List<String> groupIds) { + List<GroupListing> streamsGroupIds = listDetailedStreamsGroups(); + LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds); + + Map<String, Throwable> failed = new HashMap<>(); + + for (String groupId : groupIdSet) { + Optional<GroupListing> listing = streamsGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny(); + if (listing.isEmpty()) { + failed.put(groupId, new IllegalArgumentException("Group '" + groupId + "' does not exist or is not a Kafka Streams group.")); Review Comment: `Kafka Streams group` should be simply `streams group`. ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> configOverrides) throws IO props.putAll(configOverrides); return Admin.create(props); } + + Map<String, Throwable> deleteGroups() { + List<String> groupIds = opts.options.has(opts.allGroupsOpt) + ? new ArrayList<>(listStreamsGroups()) + : new ArrayList<>(opts.options.valuesOf(opts.groupOpt)); + + // pre admin call checks + Map<String, Throwable> failed = preAdminCallChecks(groupIds); + + groupIds.removeAll(failed.keySet()); + Map<String, Throwable> success = new HashMap<>(); + Map<String, List<String>> internalTopics = new HashMap<>(); + Map<String, Throwable> internalTopicsDeletionFailures = new HashMap<>(); + if (!groupIds.isEmpty()) { + // retrieve internal topics before deleting groups + internalTopics = retrieveInternalTopics(groupIds); + + // delete streams groups + Map<String, KafkaFuture<Void>> groupsToDelete = adminClient.deleteStreamsGroups( + groupIds, + withTimeoutMs(new DeleteStreamsGroupsOptions()) + ).deletedGroups(); + + groupsToDelete.forEach((g, f) -> { + try { + f.get(); + success.put(g, null); + } catch (InterruptedException ie) { + failed.put(g, ie); + } catch (ExecutionException e) { + failed.put(g, e.getCause()); + } + }); + + // delete internal topics + if (!success.isEmpty()) { + for (String groupId : success.keySet()) { + List<String> internalTopicsToDelete = internalTopics.get(groupId); + if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) { + try { + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete); + deleteTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { + internalTopicsDeletionFailures.put(groupId, e.getCause()); + } + } + } + } + } + + // display outcome messages based on the results + if (failed.isEmpty()) { + System.out.println("Deletion of requested streams groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "') was successful."); Review Comment: The quotes around list members is neater in `kafka-consumer-groups.sh`. I suggest copying it. ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> configOverrides) throws IO props.putAll(configOverrides); return Admin.create(props); } + + Map<String, Throwable> deleteGroups() { + List<String> groupIds = opts.options.has(opts.allGroupsOpt) + ? new ArrayList<>(listStreamsGroups()) + : new ArrayList<>(opts.options.valuesOf(opts.groupOpt)); + + // pre admin call checks + Map<String, Throwable> failed = preAdminCallChecks(groupIds); + + groupIds.removeAll(failed.keySet()); + Map<String, Throwable> success = new HashMap<>(); + Map<String, List<String>> internalTopics = new HashMap<>(); + Map<String, Throwable> internalTopicsDeletionFailures = new HashMap<>(); + if (!groupIds.isEmpty()) { + // retrieve internal topics before deleting groups + internalTopics = retrieveInternalTopics(groupIds); + + // delete streams groups + Map<String, KafkaFuture<Void>> groupsToDelete = adminClient.deleteStreamsGroups( + groupIds, + withTimeoutMs(new DeleteStreamsGroupsOptions()) + ).deletedGroups(); + + groupsToDelete.forEach((g, f) -> { + try { + f.get(); + success.put(g, null); + } catch (InterruptedException ie) { + failed.put(g, ie); + } catch (ExecutionException e) { + failed.put(g, e.getCause()); + } + }); + + // delete internal topics + if (!success.isEmpty()) { + for (String groupId : success.keySet()) { + List<String> internalTopicsToDelete = internalTopics.get(groupId); + if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) { + try { + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete); + deleteTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { Review Comment: It is slightly fiddly, but entirely possible, to get the names of the topics which could not be deleted. I think it's probably worth the effort in terms of usability to tell the user which ones failed. ########## tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java: ########## @@ -219,6 +232,109 @@ public void testPrintEmptyGroupState() { assertTrue(StreamsGroupCommand.StreamsGroupService.isGroupStateValid(GroupState.UNKNOWN, 1)); } + @Test + public void testRetrieveInternalTopics() { + Admin adminClient = mock(KafkaAdminClient.class); + String groupId = "foo-group"; + List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--delete")); + List<String> sourceTopics = List.of("source-topic1", "source-topic2"); + List<String> repartitionSinkTopics = List.of("rep-sink-topic1", "rep-sink-topic2"); + Map<String, StreamsGroupSubtopologyDescription.TopicInfo> stateChangelogTopics = Map.of( + groupId + "-1-changelog", mock(StreamsGroupSubtopologyDescription.TopicInfo.class), + "some-pre-fix" + "-changelog", mock(StreamsGroupSubtopologyDescription.TopicInfo.class), + groupId + "-2-changelog", mock(StreamsGroupSubtopologyDescription.TopicInfo.class)); + Map<String, StreamsGroupSubtopologyDescription.TopicInfo> repartitionSourceTopics = Map.of( + groupId + "-1-repartition", mock(StreamsGroupSubtopologyDescription.TopicInfo.class), + groupId + "-some-thing", mock(StreamsGroupSubtopologyDescription.TopicInfo.class), + groupId + "-2-repartition", mock(StreamsGroupSubtopologyDescription.TopicInfo.class)); + + + Map<String, StreamsGroupDescription> resultMap = new HashMap<>(); + resultMap.put(groupId, new StreamsGroupDescription( + groupId, + 0, + 0, + 0, + List.of(new StreamsGroupSubtopologyDescription("subtopology1", sourceTopics, repartitionSinkTopics, stateChangelogTopics, repartitionSourceTopics)), + List.of(), + GroupState.DEAD, + new Node(0, "localhost", 9092), + null)); + DescribeStreamsGroupsResult result = mock(DescribeStreamsGroupsResult.class); + when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap)); + when(adminClient.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result); + + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient); + Map<String, List<String>> internalTopics = service.retrieveInternalTopics(List.of(groupId)); + + assertNotNull(internalTopics.get(groupId)); + assertEquals(4, internalTopics.get(groupId).size()); + assertEquals(new HashSet<>(List.of(groupId + "-1-changelog", groupId + "-2-changelog", groupId + "-1-repartition", groupId + "-2-repartition")), + new HashSet<>(internalTopics.get(groupId))); + assertFalse(internalTopics.get(groupId).stream().anyMatch(List.of("some-pre-fix-changelog", groupId + "-some-thing")::contains)); + assertFalse(internalTopics.get(groupId).stream().anyMatch(sourceTopics::contains)); + assertFalse(internalTopics.get(groupId).stream().anyMatch(repartitionSinkTopics::contains)); + + service.close(); + } + + @Test + public void testDeleteStreamsGroup() { + Admin adminClient = mock(KafkaAdminClient.class); + String groupId = "foo-group"; + List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--delete")); + + DeleteStreamsGroupsResult deleteStreamsGroupsResult = mock(DeleteStreamsGroupsResult.class); + when(adminClient.deleteStreamsGroups(eq(List.of(groupId)), any(DeleteStreamsGroupsOptions.class))).thenReturn(deleteStreamsGroupsResult); + when(deleteStreamsGroupsResult.deletedGroups()).thenReturn(Map.of(groupId, KafkaFuture.completedFuture(null))); + DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class); + when(deleteTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(null)); + when(adminClient.deleteTopics(ArgumentMatchers.anyCollection())).thenReturn(deleteTopicsResult); + DescribeStreamsGroupsResult describeStreamsGroupsResult = mock(DescribeStreamsGroupsResult.class); + when(describeStreamsGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId, mock(StreamsGroupDescription.class)))); + when(adminClient.describeStreamsGroups(any())).thenReturn(describeStreamsGroupsResult); + ListGroupsResult listGroupsResult = mock(ListGroupsResult.class); + when(adminClient.listGroups(any())).thenReturn(listGroupsResult); + when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(new GroupListing(groupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY))))); + + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient); + service.deleteGroups(); + + verify(adminClient, times(1)).listGroups(any(ListGroupsOptions.class)); + verify(adminClient, times(1)).deleteStreamsGroups(eq(List.of(groupId)), any(DeleteStreamsGroupsOptions.class)); + verify(adminClient, times(1)).describeStreamsGroups(any()); + // because of having 0 internal topics, we do not expect deleteTopics to be called + verify(adminClient, times(0)).deleteTopics(ArgumentMatchers.anyCollection()); + + service.close(); + } + + @Test + public void testDeleteNonStreamsGroup() { + Admin adminClient = mock(KafkaAdminClient.class); + String groupId = "foo-group"; + List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--delete")); + + ListGroupsResult listGroupsResult = mock(ListGroupsResult.class); + when(adminClient.listGroups(any())).thenReturn(listGroupsResult); + when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of())); + + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient); + Map<String, Throwable> result = service.deleteGroups(); + + assertNotNull(result.get(groupId)); + assertEquals(result.get(groupId).getMessage(), + "Group '" + groupId + "' does not exist or is not a Kafka Streams group."); Review Comment: And this will need tweaking too. ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> configOverrides) throws IO props.putAll(configOverrides); return Admin.create(props); } + + Map<String, Throwable> deleteGroups() { + List<String> groupIds = opts.options.has(opts.allGroupsOpt) + ? new ArrayList<>(listStreamsGroups()) + : new ArrayList<>(opts.options.valuesOf(opts.groupOpt)); + + // pre admin call checks + Map<String, Throwable> failed = preAdminCallChecks(groupIds); + + groupIds.removeAll(failed.keySet()); + Map<String, Throwable> success = new HashMap<>(); + Map<String, List<String>> internalTopics = new HashMap<>(); + Map<String, Throwable> internalTopicsDeletionFailures = new HashMap<>(); + if (!groupIds.isEmpty()) { + // retrieve internal topics before deleting groups + internalTopics = retrieveInternalTopics(groupIds); + + // delete streams groups + Map<String, KafkaFuture<Void>> groupsToDelete = adminClient.deleteStreamsGroups( + groupIds, + withTimeoutMs(new DeleteStreamsGroupsOptions()) + ).deletedGroups(); + + groupsToDelete.forEach((g, f) -> { + try { + f.get(); + success.put(g, null); + } catch (InterruptedException ie) { + failed.put(g, ie); + } catch (ExecutionException e) { + failed.put(g, e.getCause()); + } + }); + + // delete internal topics + if (!success.isEmpty()) { + for (String groupId : success.keySet()) { + List<String> internalTopicsToDelete = internalTopics.get(groupId); + if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) { + try { + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete); + deleteTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { + internalTopicsDeletionFailures.put(groupId, e.getCause()); + } + } + } + } + } + + // display outcome messages based on the results + if (failed.isEmpty()) { + System.out.println("Deletion of requested streams groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "') was successful."); + } else { + printError("Deletion of some streams groups failed:", Optional.empty()); + failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + error)); + + if (!success.isEmpty()) { + System.out.println("\nThese streams groups were deleted successfully: " + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "'."); Review Comment: The quotes around list members is neater in `kafka-consumer-groups.sh`. I suggest copying it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org