aliehsaeedii commented on code in PR #19758:
URL: https://github.com/apache/kafka/pull/19758#discussion_r2111820039


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? listStreamsGroups()
+                : opts.options.valuesOf(opts.groupOpt);
+
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, Throwable> failed = new HashMap<>();
+
+            // retrieve internal topics before deleting groups
+            Map<String, List<String>> internalTopics = 
retrieveInternalTopics(groupIds);
+
+            Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                groupIds
+            ).deletedGroups();
+
+            groupsToDelete.forEach((g, f) -> {
+                try {
+                    f.get();
+                    success.put(g, null);
+                } catch (InterruptedException ie) {
+                    failed.put(g, ie);
+                } catch (ExecutionException e) {
+                    failed.put(g, e.getCause());
+                }
+            });
+
+            // delete internal topics
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!success.isEmpty()) {
+                for (String groupId : success.keySet()) {
+                    List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                    if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                        try {
+                            DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                            deleteTopicsResult.all().get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                        }
+                    }
+                }
+            }
+
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining(", 
")) + "'" + ") was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("'"))
 + "', '");
+                }
+            }
+
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
+                                              Set<String> deletedGroupIds,
+                                              Set<String> 
groupIdsWithInternalTopics) {
+            if (!deletedGroupIds.isEmpty()) {
+                if (internalTopicsDeletionFailures.isEmpty()) {
+                    List<String> successfulGroups = deletedGroupIds.stream()
+                        .filter(groupIdsWithInternalTopics::contains)
+                        .collect(Collectors.toList());
+                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
+                        String.join(", ", successfulGroups) + "') was 
successful.");
+                } else {
+                    System.out.println("Deletion of some associated internal 
topics failed:");
+                    internalTopicsDeletionFailures.forEach((group, error) ->
+                        System.out.println("* Group '" + group + "' could not 
be deleted due to: " + error));
+                }
+            }
+        }
+
+        // Visibility for testing
+        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
+            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+            try {
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                for (StreamsGroupDescription description : 
descriptionMap.values()) {
+
+                    List<String> sourceTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> 
subtopology.sourceTopics().stream())
+                        .toList();
+
+                    List<String> internalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            
subtopology.repartitionSourceTopics().keySet().stream(),
+                            
subtopology.stateChangelogTopics().keySet().stream()))
+                        .filter(topic -> !sourceTopics.contains(topic))
+                        .collect(Collectors.toList());
+                    if (!internalTopics.isEmpty()) {
+                        groupToInternalTopics.put(description.groupId(), 
internalTopics);
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                if (e.getCause() instanceof UnsupportedVersionException) {
+                    printError("Retrieving internal topics is not supported by 
the broker version. " +
+                        "Please execute --delete --internal-topics <topic 
names> to delete the group's associated internal topics.", 
Optional.of(e.getCause()));

Review Comment:
   Listing internal topics requires calling `admin.listTopics()` and then 
filtering the group's internal topics using the `StreamsResetter`'s inference 
method. I think the user can both find and delete the topics if he desires to. 
In other words, I think calling `listTopics()` must be upon request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to