aliehsaeedii commented on code in PR #19758: URL: https://github.com/apache/kafka/pull/19758#discussion_r2111820039
########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String> configOverrides) throws IO props.putAll(configOverrides); return Admin.create(props); } + + Map<String, Throwable> deleteGroups() { + List<String> groupIds = opts.options.has(opts.allGroupsOpt) + ? listStreamsGroups() + : opts.options.valuesOf(opts.groupOpt); + + Map<String, Throwable> success = new HashMap<>(); + Map<String, Throwable> failed = new HashMap<>(); + + // retrieve internal topics before deleting groups + Map<String, List<String>> internalTopics = retrieveInternalTopics(groupIds); + + Map<String, KafkaFuture<Void>> groupsToDelete = adminClient.deleteStreamsGroups( + groupIds + ).deletedGroups(); + + groupsToDelete.forEach((g, f) -> { + try { + f.get(); + success.put(g, null); + } catch (InterruptedException ie) { + failed.put(g, ie); + } catch (ExecutionException e) { + failed.put(g, e.getCause()); + } + }); + + // delete internal topics + Map<String, Throwable> internalTopicsDeletionFailures = new HashMap<>(); + if (!success.isEmpty()) { + for (String groupId : success.keySet()) { + List<String> internalTopicsToDelete = internalTopics.get(groupId); + if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) { + try { + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete); + deleteTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { + internalTopicsDeletionFailures.put(groupId, e.getCause()); + } + } + } + } + + if (failed.isEmpty()) { + System.out.println("Deletion of requested streams groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining(", ")) + "'" + ") was successful."); + } else { + printError("Deletion of some streams groups failed:", Optional.empty()); + failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + error)); + + if (!success.isEmpty()) { + System.out.println("\nThese streams groups were deleted successfully: " + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("'")) + "', '"); + } + } + + if (!internalTopics.keySet().isEmpty()) { + printInternalTopicErrors(internalTopicsDeletionFailures, success.keySet(), internalTopics.keySet()); + } + + failed.putAll(success); + failed.putAll(internalTopicsDeletionFailures); + return failed; + } + + private void printInternalTopicErrors(Map<String, Throwable> internalTopicsDeletionFailures, + Set<String> deletedGroupIds, + Set<String> groupIdsWithInternalTopics) { + if (!deletedGroupIds.isEmpty()) { + if (internalTopicsDeletionFailures.isEmpty()) { + List<String> successfulGroups = deletedGroupIds.stream() + .filter(groupIdsWithInternalTopics::contains) + .collect(Collectors.toList()); + System.out.println("Deletion of associated internal topics of the streams groups ('" + + String.join(", ", successfulGroups) + "') was successful."); + } else { + System.out.println("Deletion of some associated internal topics failed:"); + internalTopicsDeletionFailures.forEach((group, error) -> + System.out.println("* Group '" + group + "' could not be deleted due to: " + error)); + } + } + } + + // Visibility for testing + Map<String, List<String>> retrieveInternalTopics(List<String> groupIds) { + Map<String, List<String>> groupToInternalTopics = new HashMap<>(); + try { + Map<String, StreamsGroupDescription> descriptionMap = adminClient.describeStreamsGroups(groupIds).all().get(); + for (StreamsGroupDescription description : descriptionMap.values()) { + + List<String> sourceTopics = description.subtopologies().stream() + .flatMap(subtopology -> subtopology.sourceTopics().stream()) + .toList(); + + List<String> internalTopics = description.subtopologies().stream() + .flatMap(subtopology -> Stream.concat( + subtopology.repartitionSourceTopics().keySet().stream(), + subtopology.stateChangelogTopics().keySet().stream())) + .filter(topic -> !sourceTopics.contains(topic)) + .collect(Collectors.toList()); + if (!internalTopics.isEmpty()) { + groupToInternalTopics.put(description.groupId(), internalTopics); + } + } + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof UnsupportedVersionException) { + printError("Retrieving internal topics is not supported by the broker version. " + + "Please execute --delete --internal-topics <topic names> to delete the group's associated internal topics.", Optional.of(e.getCause())); Review Comment: Listing internal topics requires calling `admin.listTopics()` and then filtering the group's internal topics using the `StreamsResetter`'s inference method. I think the user can both find and delete the topics if he desires to. In other words, I think calling `listTopics()` must be upon request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org