AndrewJSchofield commented on code in PR #19758:
URL: https://github.com/apache/kafka/pull/19758#discussion_r2112626437


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            try {
+                                DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");
+                }
+            }
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+            // for testing purpose: return all failures, including internal 
topics deletion failures
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private Map<String, Throwable> preAdminCallChecks(List<String> 
groupIds) {
+            List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+            Map<String, Throwable> failed = new HashMap<>();
+
+            for (String groupId : groupIdSet) {
+                Optional<GroupListing> listing = 
streamsGroupIds.stream().filter(item -> 
item.groupId().equals(groupId)).findAny();
+                if (listing.isEmpty()) {
+                    failed.put(groupId, new IllegalArgumentException("Group '" 
+ groupId + "' does not exist or is not a Kafka Streams group."));

Review Comment:
   `Kafka Streams group` should be simply `streams group`.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            try {
+                                DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");

Review Comment:
   The quotes around list members is neater in `kafka-consumer-groups.sh`. I 
suggest copying it.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            try {
+                                DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {

Review Comment:
   It is slightly fiddly, but entirely possible, to get the names of the topics 
which could not be deleted. I think it's probably worth the effort in terms of 
usability to tell the user which ones failed.



##########
tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java:
##########
@@ -219,6 +232,109 @@ public void testPrintEmptyGroupState() {
         
assertTrue(StreamsGroupCommand.StreamsGroupService.isGroupStateValid(GroupState.UNKNOWN,
 1));
     }
 
+    @Test
+    public void testRetrieveInternalTopics() {
+        Admin adminClient = mock(KafkaAdminClient.class);
+        String groupId = "foo-group";
+        List<String> args = new 
ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", 
groupId, "--delete"));
+        List<String> sourceTopics = List.of("source-topic1", "source-topic2");
+        List<String> repartitionSinkTopics = List.of("rep-sink-topic1", 
"rep-sink-topic2");
+        Map<String, StreamsGroupSubtopologyDescription.TopicInfo> 
stateChangelogTopics = Map.of(
+            groupId + "-1-changelog", 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class),
+            "some-pre-fix" + "-changelog", 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class),
+            groupId + "-2-changelog", 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class));
+        Map<String, StreamsGroupSubtopologyDescription.TopicInfo> 
repartitionSourceTopics = Map.of(
+            groupId + "-1-repartition", 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class),
+            groupId + "-some-thing", 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class),
+            groupId + "-2-repartition", 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class));
+
+
+        Map<String, StreamsGroupDescription> resultMap = new HashMap<>();
+        resultMap.put(groupId, new StreamsGroupDescription(
+            groupId,
+            0,
+            0,
+            0,
+            List.of(new StreamsGroupSubtopologyDescription("subtopology1", 
sourceTopics, repartitionSinkTopics, stateChangelogTopics, 
repartitionSourceTopics)),
+            List.of(),
+            GroupState.DEAD,
+            new Node(0, "localhost", 9092),
+            null));
+        DescribeStreamsGroupsResult result = 
mock(DescribeStreamsGroupsResult.class);
+        when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
+        
when(adminClient.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
+
+        StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args.toArray(new String[0]), adminClient);
+        Map<String, List<String>> internalTopics = 
service.retrieveInternalTopics(List.of(groupId));
+
+        assertNotNull(internalTopics.get(groupId));
+        assertEquals(4, internalTopics.get(groupId).size());
+        assertEquals(new HashSet<>(List.of(groupId + "-1-changelog", groupId + 
"-2-changelog", groupId + "-1-repartition", groupId + "-2-repartition")),
+            new HashSet<>(internalTopics.get(groupId)));
+        
assertFalse(internalTopics.get(groupId).stream().anyMatch(List.of("some-pre-fix-changelog",
 groupId + "-some-thing")::contains));
+        
assertFalse(internalTopics.get(groupId).stream().anyMatch(sourceTopics::contains));
+        
assertFalse(internalTopics.get(groupId).stream().anyMatch(repartitionSinkTopics::contains));
+
+        service.close();
+    }
+
+    @Test
+    public void testDeleteStreamsGroup() {
+        Admin adminClient = mock(KafkaAdminClient.class);
+        String groupId = "foo-group";
+        List<String> args = new 
ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", 
groupId, "--delete"));
+
+        DeleteStreamsGroupsResult deleteStreamsGroupsResult = 
mock(DeleteStreamsGroupsResult.class);
+        when(adminClient.deleteStreamsGroups(eq(List.of(groupId)), 
any(DeleteStreamsGroupsOptions.class))).thenReturn(deleteStreamsGroupsResult);
+        
when(deleteStreamsGroupsResult.deletedGroups()).thenReturn(Map.of(groupId, 
KafkaFuture.completedFuture(null)));
+        DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class);
+        
when(deleteTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(null));
+        
when(adminClient.deleteTopics(ArgumentMatchers.anyCollection())).thenReturn(deleteTopicsResult);
+        DescribeStreamsGroupsResult describeStreamsGroupsResult = 
mock(DescribeStreamsGroupsResult.class);
+        
when(describeStreamsGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
 mock(StreamsGroupDescription.class))));
+        
when(adminClient.describeStreamsGroups(any())).thenReturn(describeStreamsGroupsResult);
+        ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+        when(adminClient.listGroups(any())).thenReturn(listGroupsResult);
+        
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(new 
GroupListing(groupId, Optional.of(GroupType.STREAMS), "streams", 
Optional.of(GroupState.EMPTY)))));
+
+        StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args.toArray(new String[0]), adminClient);
+        service.deleteGroups();
+
+        verify(adminClient, times(1)).listGroups(any(ListGroupsOptions.class));
+        verify(adminClient, 
times(1)).deleteStreamsGroups(eq(List.of(groupId)), 
any(DeleteStreamsGroupsOptions.class));
+        verify(adminClient, times(1)).describeStreamsGroups(any());
+        // because of having 0 internal topics, we do not expect deleteTopics 
to be called
+        verify(adminClient, 
times(0)).deleteTopics(ArgumentMatchers.anyCollection());
+
+        service.close();
+    }
+
+    @Test
+    public void testDeleteNonStreamsGroup() {
+        Admin adminClient = mock(KafkaAdminClient.class);
+        String groupId = "foo-group";
+        List<String> args = new 
ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", 
groupId, "--delete"));
+
+        ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+        when(adminClient.listGroups(any())).thenReturn(listGroupsResult);
+        
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of()));
+
+        StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args.toArray(new String[0]), adminClient);
+        Map<String, Throwable> result = service.deleteGroups();
+
+        assertNotNull(result.get(groupId));
+        assertEquals(result.get(groupId).getMessage(),
+            "Group '" + groupId + "' does not exist or is not a Kafka Streams 
group.");

Review Comment:
   And this will need tweaking too.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            try {
+                                DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");

Review Comment:
   The quotes around list members is neater in `kafka-consumer-groups.sh`. I 
suggest copying it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to