lucasbru commented on code in PR #18911:

+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.GroupProtocol;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import joptsimple.OptionException;
+import static 
+public class StreamsGroupCommandTest {
+    public static EmbeddedKafkaCluster cluster = null;
+    static KafkaStreams streams;
+    private static final String APP_ID = "streams-group-command-test";
+    private static final String INPUT_TOPIC = "customInputTopic";
+    private static final String OUTPUT_TOPIC = "customOutputTopic";
+    @BeforeAll
+    public static void setup() throws Exception {
+        // start the cluster and create the input topic
+        final Properties props = new Properties();
+        cluster = new EmbeddedKafkaCluster(1, props);
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC, 2, 1);
+        // start kafka streams
+        Properties streamsProp = new Properties();
+        streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
+        streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
+        streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
+        streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, 
+        streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+        streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,;
+        streams = new KafkaStreams(topology(), streamsProp);
+        startApplicationAndWaitUntilRunning(streams);
+    }
+    @AfterAll
+    public static void closeCluster() {
+        streams.close();
+        cluster.stop();
+        cluster = null;
+    }
+    @Test
+    public void testListStreamsGroupWithoutFilters() throws Exception {
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--list"})) {
+            Set<String> expectedGroups = new 
+            final AtomicReference<Set> foundGroups = new AtomicReference<>();
+            TestUtils.waitForCondition(() -> {
+                foundGroups.set(new HashSet<>(service.listStreamsGroups()));
+                return Objects.equals(expectedGroups, foundGroups.get());
+            }, "Expected --list to show streams groups " + expectedGroups + ", 
but found " + foundGroups.get() + ".");
+        }
+    }
+    @Test
+    public void testListWithUnrecognizedNewOption() throws Exception {
+        String[] cgcArgs = new String[]{"--new-option", "--bootstrap-server", 
cluster.bootstrapServers(), "--list"};
+        Assertions.assertThrows(OptionException.class, () -> 
+    }
+    @Test
+    public void testListStreamsGroupWithStates() throws Exception {
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--list", "--state"})) {
+            Set<GroupListing> expectedListing = Set.of(
+                new GroupListing(
+                    APP_ID,
+                    Optional.of(GroupType.STREAMS),
+                    "streams",
+                    Optional.of(GroupState.STABLE))
+            );
+            final AtomicReference<Set<GroupListing>> foundListing = new 
+            TestUtils.waitForCondition(() -> {
+                foundListing.set(new 
+                return Objects.equals(expectedListing, foundListing.get());
+            }, "Expected --list to show streams groups " + expectedListing + 
", but found " + foundListing.get() + ".");
+        }
+    }
+    @Test
+    public void testListStreamsGroupWithSpecifiedStates() throws Exception {
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--list", "--state", "stable"})) {
+            Set<GroupListing> expectedListing = Set.of(
+                new GroupListing(
+                    APP_ID,
+                    Optional.of(GroupType.STREAMS),
+                    "streams",
+                    Optional.of(GroupState.STABLE))
+            );
+            final AtomicReference<Set<GroupListing>> foundListing = new 
+            TestUtils.waitForCondition(() -> {
+                foundListing.set(new 
+                return Objects.equals(expectedListing, foundListing.get());
+            }, "Expected --list to show streams groups " + expectedListing + 
", but found " + foundListing.get() + ".");
+        }
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) {
+            Set<GroupListing> expectedListing = Collections.emptySet();
+            final AtomicReference<Set<GroupListing>> foundListing = new 
+            TestUtils.waitForCondition(() -> {
+                foundListing.set(new 
+                return Objects.equals(expectedListing, foundListing.get());
+            }, "Expected --list to show streams groups " + expectedListing + 
", but found " + foundListing.get() + ".");
+        }
+    }
+    @Test
+    public void testListStreamsGroupOutput() throws Exception {
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
+            Collections.emptyList(),
+            Set.of(Collections.singletonList(APP_ID))
+        );
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--list", "--state"),
+            Arrays.asList("GROUP", "STATE"),
+            Set.of(Arrays.asList(APP_ID, "Stable"))
+        );
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--list", "--state", "Stable"),
+            Arrays.asList("GROUP", "STATE"),
+            Set.of(Arrays.asList(APP_ID, "Stable"))
+        );
+        // Check case-insensitivity in state filter.
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--list", "--state", "stable"),
+            Arrays.asList("GROUP", "STATE"),
+            Set.of(Arrays.asList(APP_ID, "Stable"))
+        );
+    }
+    @Test
+    public void testDescribeStreamsGroup() throws Exception {
+        final List<String> expectedHeaders = List.of("GROUP", "TOPIC", 
+        final Set<List<String>> expectedRows = Set.of(List.of(APP_ID, "", "0", 
"0"), List.of(APP_ID, "", "1", "0"));
+        final List<Integer> dontCares = List.of(1);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe"), expectedHeaders, expectedRows, 5, dontCares);
+        // --describe --offsets has the same output as --describe
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--offsets"), expectedHeaders, expectedRows, 5, dontCares);
+    }
+    @Test
+    public void testDescribeStreamsGroupWithVerboseOption() throws Exception {
+        final List<String> expectedHeaders = List.of("GROUP", "TOPIC", 
+        final Set<List<String>> expectedRows = Set.of(List.of(APP_ID, "", "0", 
"2", "0"), List.of(APP_ID, "", "1", "2", "0"));
+        final List<Integer> dontCares = List.of(1);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose"), expectedHeaders, expectedRows, 5, dontCares);
+        // --describe --offsets has the same output as --describe
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--offsets", "--verbose"), expectedHeaders, expectedRows, 5, 
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose", "--offsets"), expectedHeaders, expectedRows, 5, 
+    }
+    @Test
+    public void testDescribeStreamsGroupWithStateOption() throws Exception {
+        final List<String> expectedHeaders = Arrays.asList("GROUP", 
+        final Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID, 
"", "", "Stable", "1"));
+        final List<Integer> dontCares = List.of(1, 2);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--state"), expectedHeaders, expectedRows, 2, dontCares);
+    }
+    @Test
+    public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws 
Exception {
+        final List<String> expectedHeaders = Arrays.asList("GROUP", 
+        final Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID, 
"", "", "Stable", "2", "2", "1"));
+        final List<Integer> dontCares = List.of(1, 2);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--state", "--verbose"), expectedHeaders, expectedRows, 2, 
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose", "--state"), expectedHeaders, expectedRows, 2, 
+    }
+    @Test
+    public void testDescribeStreamsGroupWithMembersOption() throws Exception {
+        final Set<MemberRows> expectedRows = Set.of(
+            new MemberRows(
+                List.of("GROUP", "MEMBER", "PROCESS", "CLIENT-ID"),
+                List.of(APP_ID, "", "", ""),
+      "ACTIVE-TASKS: 0:[0,1] 
+                List.of("STANDBY-TASKS:"),
+                List.of("WARMUP-TASKS:")
+            ));
+        final List<Integer> dontCares = List.of(1, 2, 3);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--members"), expectedRows, dontCares, false);
+    }
+    @Test
+    public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws 
Exception {
+        final Set<MemberRows> expectedRows = Set.of(new MemberRows(
+            List.of(APP_ID, "2", "0", "", "streams", "2", "", ""),
+  "ACTIVE-TASKS: 0:[0,1] 
1:[0,1]".split("\\s+")).toList(), List.of("STANDBY-TASKS:"), 

Review Comment:
   parseLines and so on is quite convoluted, so I don't think I fully 
understand what the raw output is, but it seems to me that the tasks are 
printed on a separate line than the rest of the member. Does this make sense in 
a tabular represetnation? Shouldn't we have one member per line in a table?

@@ -270,7 +285,7 @@ private void printOffsets(StreamsGroupDescription 
description, boolean verbose)
                     String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %-15s %s\n";
                     System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
                     for (Map.Entry<TopicPartition, Long> offset : 
offsets.entrySet()) {
-                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue());
+                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), description.groupEpoch(), 

Review Comment:
   No, that's the group epoch.
   I wonder if you could take a look at `ConsumerGroupCommand` and check how we 
fetch the leader epoch there. We need to fetch the committed offsets from the 
consumer offset topic. For now, I believe we have to call 
`adminClient.listConsumerGroupOffsets` - unfortunately it's named this way, but 
it will work for streams groups as well.
   We need a ticket to introduce copies of `listConsumerGroupOffsets` etc. pp. 
that are named `listStreamGroupOffsets` groups (and do the same) - they use the 
same RPC under the hood.
   I would personally just use the same columns as in the consumergroup tool - 

