chia7712 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1612965039


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##########
@@ -68,15 +68,31 @@ static List<ClusterConfig> generator() {
         ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder()
                 .setTypes(Stream.of(KRAFT, 
CO_KRAFT).collect(Collectors.toSet()))
                 .setServerProperties(serverProperties)
-                .setTags(Collections.singletonList("newGroupCoordinator"))
+                .setTags(Collections.singletonList("consumerGroupCoordinator"))
                 .build();
         return Arrays.asList(classicGroupCoordinator, 
consumerGroupCoordinator);
     }
 
+    static <T> AutoCloseable buildConsumers(int numberOfConsumers,
+                                            boolean syncCommit,

Review Comment:
   In this case the consumers don't use consumer offsets, so `syncCommit` must 
be "false" to avoid error



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -17,449 +17,503 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
-    public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
-        String simpleGroup = "simple-group";
-
-        createOffsetsTopic(listenerName(), new Properties());
-
-        addSimpleGroupExecutor(simpleGroup);
-        addConsumerGroupExecutor(1);
-        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class ListConsumerGroupTest {
+    private final static String TOPIC = "foo";
+    private final static String SIMPLE_GROUP = "test.simple.group";
+    private final static String DEFAULT_GROUP = "test.default.group";
+    private final static String PROTOCOL_GROUP = "test.protocol.group";
+    private final ClusterInstance clusterInstance;
+
+    ListConsumerGroupTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
 
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+    private static List<ClusterConfig> defaultGenerator() {
+        return ConsumerGroupCommandTestUtils.generator();
+    }
 
-        Set<String> expectedGroups = set(Arrays.asList(GROUP, simpleGroup, 
PROTOCOL_GROUP));
-        final AtomicReference<Set> foundGroups = new AtomicReference<>();
+    private static List<ClusterConfig> consumerProtocolOnlyGenerator() {
+        Map<String, String> serverProperties = new HashMap<>();
+        
serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
"1");
+        
serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1");
+        
serverProperties.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
 "true");
+
+        ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder()
+                .setTypes(Stream.of(Type.KRAFT, 
Type.CO_KRAFT).collect(Collectors.toSet()))
+                .setTags(Collections.singletonList("consumerGroupCoordinator"))
+                .setServerProperties(serverProperties)
+                .build();
+        return Collections.singletonList(consumerGroupCoordinator);
+    }
 
-        TestUtils.waitForCondition(() -> {
-            foundGroups.set(set(service.listConsumerGroups()));
-            return Objects.equals(expectedGroups, foundGroups.get());
-        }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
+    @ClusterTemplate("defaultGenerator")
+    public void testListConsumerGroupsWithoutFilters() throws Exception {
+        createTopic(TOPIC);
+
+        try (AutoCloseable simpleConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, 
Collections.singleton(new TopicPartition(TOPIC, 0)));
+             AutoCloseable defaultConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, DEFAULT_GROUP, TOPIC);
+             AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(
+                     
clusterInstance.config().serverProperties().get("group.coordinator.new.enable") 
== "true" ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC,
+                     PROTOCOL_GROUP,
+                     TOPIC);
+
+             ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list"});
+        ) {
+            Set<String> expectedGroups = set(Arrays.asList(SIMPLE_GROUP, 
DEFAULT_GROUP, PROTOCOL_GROUP));
+            final AtomicReference<Set> foundGroups = new AtomicReference<>();
+
+            TestUtils.waitForCondition(() -> {
+                foundGroups.set(set(service.listConsumerGroups()));
+                return Objects.equals(expectedGroups, foundGroups.get());
+            }, "Expected --list to show groups " + expectedGroups + ", but 
found " + foundGroups.get() + ".");
+        }
     }
 
-    @Test
+    @ClusterTemplate("defaultGenerator")
     public void testListWithUnrecognizedNewConsumerOption() {
-        String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
-        assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
+        String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"};
+        Assertions.assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
-    public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {
-        String simpleGroup = "simple-group";
-
-        createOffsetsTopic(listenerName(), new Properties());
-
-        addSimpleGroupExecutor(simpleGroup);
-        addConsumerGroupExecutor(1, groupProtocol);
-
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        Set<ConsumerGroupListing> expectedListing = mkSet(
-            new ConsumerGroupListing(
-                simpleGroup,
-                true,
-                Optional.of(ConsumerGroupState.EMPTY),
-                Optional.of(GroupType.CLASSIC)
-            ),
-            new ConsumerGroupListing(
-                GROUP,
-                false,
-                Optional.of(ConsumerGroupState.STABLE),
-                Optional.of(GroupType.parse(groupProtocol))
-            )
-        );
-
-        assertGroupListing(
-            service,
-            Collections.emptySet(),
-            EnumSet.allOf(ConsumerGroupState.class),
-            expectedListing
-        );
-
-        expectedListing = mkSet(
-            new ConsumerGroupListing(
-                GROUP,
-                false,
-                Optional.of(ConsumerGroupState.STABLE),
-                Optional.of(GroupType.parse(groupProtocol))
-            )
-        );
-
-        assertGroupListing(
-            service,
-            Collections.emptySet(),
-            mkSet(ConsumerGroupState.STABLE),
-            expectedListing
-        );
-
-        assertGroupListing(
-            service,
-            Collections.emptySet(),
-            mkSet(ConsumerGroupState.PREPARING_REBALANCE),
-            Collections.emptySet()
-        );
+    @ClusterTemplate("defaultGenerator")
+    public void testListConsumerGroupsWithStates() throws Exception {
+        createTopic(TOPIC);
+
+        GroupProtocol groupProtocol = 
clusterInstance.config().serverProperties().get("group.coordinator.new.enable") 
== "true" ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC;
+        try (AutoCloseable simpleConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, 
Collections.singleton(new TopicPartition(TOPIC, 0)));
+             AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC);
+             ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--state"});
+        ) {
+            Set<ConsumerGroupListing> expectedListing = mkSet(
+                    new ConsumerGroupListing(
+                            SIMPLE_GROUP,
+                            true,
+                            Optional.of(ConsumerGroupState.EMPTY),
+                            Optional.of(GroupType.CLASSIC)
+                    ),
+                    new ConsumerGroupListing(
+                            PROTOCOL_GROUP,
+                            false,
+                            Optional.of(ConsumerGroupState.STABLE),
+                            Optional.of(GroupType.parse(groupProtocol.name()))
+                    )
+            );
+
+            assertGroupListing(
+                    service,
+                    Collections.emptySet(),
+                    EnumSet.allOf(ConsumerGroupState.class),
+                    expectedListing
+            );
+
+            expectedListing = mkSet(
+                    new ConsumerGroupListing(
+                            PROTOCOL_GROUP,
+                            false,
+                            Optional.of(ConsumerGroupState.STABLE),
+                            Optional.of(GroupType.parse(groupProtocol.name()))
+                    )
+            );
+
+            assertGroupListing(
+                    service,
+                    Collections.emptySet(),
+                    mkSet(ConsumerGroupState.STABLE),
+                    expectedListing
+            );
+
+            assertGroupListing(
+                    service,
+                    Collections.emptySet(),
+                    mkSet(ConsumerGroupState.PREPARING_REBALANCE),
+                    Collections.emptySet()
+            );
+        }
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
-    public void testListConsumerGroupsWithTypesClassicProtocol(String quorum, 
String groupProtocol) throws Exception {
-        String simpleGroup = "simple-group";
-
-        createOffsetsTopic(listenerName(), new Properties());
-
-        addSimpleGroupExecutor(simpleGroup);
-        addConsumerGroupExecutor(1);
-
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        Set<ConsumerGroupListing> expectedListing = mkSet(
-            new ConsumerGroupListing(
-                simpleGroup,
-                true,
-                Optional.of(ConsumerGroupState.EMPTY),
-                Optional.of(GroupType.CLASSIC)
-            ),
-            new ConsumerGroupListing(
-                GROUP,
-                false,
-                Optional.of(ConsumerGroupState.STABLE),
-                Optional.of(GroupType.CLASSIC)
-            )
-        );
-
-        // No filters explicitly mentioned. Expectation is that all groups are 
returned.
-        assertGroupListing(
-            service,
-            Collections.emptySet(),
-            Collections.emptySet(),
-            expectedListing
-        );
+    @ClusterTemplate("defaultGenerator")
+    public void testListConsumerGroupsWithTypesClassicProtocol() throws 
Exception {
+        createTopic(TOPIC);
+
+        try (AutoCloseable simpleConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, 
Collections.singleton(new TopicPartition(TOPIC, 0)));
+             AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, PROTOCOL_GROUP, TOPIC);
+             ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list"});
+        ) {
+            Set<ConsumerGroupListing> expectedListing = mkSet(
+                    new ConsumerGroupListing(
+                            SIMPLE_GROUP,
+                            true,
+                            Optional.of(ConsumerGroupState.EMPTY),
+                            Optional.of(GroupType.CLASSIC)
+                    ),
+                    new ConsumerGroupListing(
+                            PROTOCOL_GROUP,
+                            false,
+                            Optional.of(ConsumerGroupState.STABLE),
+                            Optional.of(GroupType.CLASSIC)
+                    )
+            );
+
+            // No filters explicitly mentioned. Expectation is that all groups 
are returned.
+            assertGroupListing(
+                    service,
+                    Collections.emptySet(),
+                    Collections.emptySet(),
+                    expectedListing
+            );
+
+            // When group type is mentioned:
+            // Old Group Coordinator returns empty listings if the type is not 
Classic.
+            // New Group Coordinator returns groups according to the filter.
+            assertGroupListing(
+                    service,
+                    mkSet(GroupType.CONSUMER),
+                    Collections.emptySet(),
+                    Collections.emptySet()
+            );
+
+            assertGroupListing(
+                    service,
+                    mkSet(GroupType.CLASSIC),
+                    Collections.emptySet(),
+                    expectedListing
+            );
+        }
+    }
 
-        // When group type is mentioned:
-        // Old Group Coordinator returns empty listings if the type is not 
Classic.
-        // New Group Coordinator returns groups according to the filter.
-        assertGroupListing(
-            service,
-            mkSet(GroupType.CONSUMER),
-            Collections.emptySet(),
-            Collections.emptySet()
-        );
+    @ClusterTemplate("consumerProtocolOnlyGenerator")
+    public void testListConsumerGroupsWithTypesConsumerProtocol() throws 
Exception {
+        createTopic(TOPIC);
+
+        try (AutoCloseable simpleConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, 
Collections.singleton(new TopicPartition(TOPIC, 0)));
+             AutoCloseable defaultConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, DEFAULT_GROUP, TOPIC);
+             AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CONSUMER, PROTOCOL_GROUP, TOPIC);
+             ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list"});
+        ) {
+
+
+            // No filters explicitly mentioned. Expectation is that all groups 
are returned.
+            Set<ConsumerGroupListing> expectedListing = mkSet(
+                    new ConsumerGroupListing(
+                            SIMPLE_GROUP,
+                            true,
+                            Optional.of(ConsumerGroupState.EMPTY),
+                            Optional.of(GroupType.CLASSIC)
+                    ),
+                    new ConsumerGroupListing(
+                            DEFAULT_GROUP,
+                            false,
+                            Optional.of(ConsumerGroupState.STABLE),
+                            Optional.of(GroupType.CLASSIC)
+                    ),
+                    new ConsumerGroupListing(
+                            PROTOCOL_GROUP,
+                            false,
+                            Optional.of(ConsumerGroupState.STABLE),
+                            Optional.of(GroupType.CONSUMER)
+                    )
+            );
+
+            assertGroupListing(
+                    service,
+                    Collections.emptySet(),
+                    Collections.emptySet(),
+                    expectedListing
+            );
+
+            // When group type is mentioned:
+            // New Group Coordinator returns groups according to the filter.
+            expectedListing = mkSet(
+                    new ConsumerGroupListing(
+                            PROTOCOL_GROUP,
+                            false,
+                            Optional.of(ConsumerGroupState.STABLE),
+                            Optional.of(GroupType.CONSUMER)
+                    )
+            );
+
+            assertGroupListing(
+                    service,
+                    mkSet(GroupType.CONSUMER),
+                    Collections.emptySet(),
+                    expectedListing
+            );
+
+            expectedListing = mkSet(
+                    new ConsumerGroupListing(
+                            SIMPLE_GROUP,
+                            true,
+                            Optional.of(ConsumerGroupState.EMPTY),
+                            Optional.of(GroupType.CLASSIC)
+                    ),
+                    new ConsumerGroupListing(
+                            DEFAULT_GROUP,
+                            false,
+                            Optional.of(ConsumerGroupState.STABLE),
+                            Optional.of(GroupType.CLASSIC)
+                    )
+            );
+
+            assertGroupListing(
+                    service,
+                    mkSet(GroupType.CLASSIC),
+                    Collections.emptySet(),
+                    expectedListing
+            );
+        }
+    }
 
-        assertGroupListing(
-            service,
-            mkSet(GroupType.CLASSIC),
-            Collections.emptySet(),
-            expectedListing
-        );
+    @ClusterTemplate("defaultGenerator")
+    public void testListGroupCommandClassicProtocol() throws Exception {
+        createTopic(TOPIC);
+
+        try (AutoCloseable simpleConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, 
Collections.singleton(new TopicPartition(TOPIC, 0)));
+             AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, PROTOCOL_GROUP, TOPIC)
+        ) {
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list"),
+                    Collections.emptyList(),
+                    mkSet(
+                            Collections.singletonList(PROTOCOL_GROUP),
+                            Collections.singletonList(SIMPLE_GROUP)
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--state"),
+                    Arrays.asList("GROUP", "STATE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Stable"),
+                            Arrays.asList(SIMPLE_GROUP, "Empty")
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--type"),
+                    Arrays.asList("GROUP", "TYPE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Classic"),
+                            Arrays.asList(SIMPLE_GROUP, "Classic")
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--type", "--state"),
+                    Arrays.asList("GROUP", "TYPE", "STATE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Classic", "Stable"),
+                            Arrays.asList(SIMPLE_GROUP, "Classic", "Empty")
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--state", "Stable"),
+                    Arrays.asList("GROUP", "STATE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Stable")
+                    )
+            );
+
+            // Check case-insensitivity in state filter.
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--state", "stable"),
+                    Arrays.asList("GROUP", "STATE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Stable")
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--type", "Classic"),
+                    Arrays.asList("GROUP", "TYPE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Classic"),
+                            Arrays.asList(SIMPLE_GROUP, "Classic")
+                    )
+            );
+
+            // Check case-insensitivity in type filter.
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--type", "classic"),
+                    Arrays.asList("GROUP", "TYPE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Classic"),
+                            Arrays.asList(SIMPLE_GROUP, "Classic")
+                    )
+            );
+        }
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
-    public void testListConsumerGroupsWithTypesConsumerProtocol(String quorum, 
String groupProtocol) throws Exception {
-        String simpleGroup = "simple-group";
-
-        createOffsetsTopic(listenerName(), new Properties());
-
-        addSimpleGroupExecutor(simpleGroup);
-        addConsumerGroupExecutor(1);
-        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
-
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        // No filters explicitly mentioned. Expectation is that all groups are 
returned.
-        Set<ConsumerGroupListing> expectedListing = mkSet(
-            new ConsumerGroupListing(
-                simpleGroup,
-                true,
-                Optional.of(ConsumerGroupState.EMPTY),
-                Optional.of(GroupType.CLASSIC)
-            ),
-            new ConsumerGroupListing(
-                GROUP,
-                false,
-                Optional.of(ConsumerGroupState.STABLE),
-                Optional.of(GroupType.CLASSIC)
-            ),
-            new ConsumerGroupListing(
-                PROTOCOL_GROUP,
-                false,
-                Optional.of(ConsumerGroupState.STABLE),
-                Optional.of(GroupType.CONSUMER)
-            )
-        );
+    @ClusterTemplate("consumerProtocolOnlyGenerator")
+    public void testListGroupCommandConsumerProtocol() throws Exception {
+        createTopic(TOPIC);
+
+        try (AutoCloseable simpleConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, 
Collections.singleton(new TopicPartition(TOPIC, 0)));
+             AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CONSUMER, PROTOCOL_GROUP, TOPIC)
+        ) {
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list"),
+                    Collections.emptyList(),
+                    mkSet(
+                            Collections.singletonList(PROTOCOL_GROUP),
+                            Collections.singletonList(SIMPLE_GROUP)
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--state"),
+                    Arrays.asList("GROUP", "STATE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Stable"),
+                            Arrays.asList(SIMPLE_GROUP, "Empty")
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--type"),
+                    Arrays.asList("GROUP", "TYPE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Consumer"),
+                            Arrays.asList(SIMPLE_GROUP, "Classic")
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--type", "--state"),
+                    Arrays.asList("GROUP", "TYPE", "STATE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Consumer", 
"Stable"),
+                            Arrays.asList(SIMPLE_GROUP, "Classic", "Empty")
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--type", "consumer"),
+                    Arrays.asList("GROUP", "TYPE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Consumer")
+                    )
+            );
+
+            validateListOutput(
+                    Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--type", "consumer", "--state", 
"Stable"),
+                    Arrays.asList("GROUP", "TYPE", "STATE"),
+                    mkSet(
+                            Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable")
+                    )
+            );
+        }
+    }
 
-        assertGroupListing(
-            service,
-            Collections.emptySet(),
-            Collections.emptySet(),
-            expectedListing
+    private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String 
groupId, String topicName) {
+        Map<String, Object> configs = composeConfigs(
+                groupId,
+                protocol.name,
+                emptyMap()
         );
 
-        // When group type is mentioned:
-        // New Group Coordinator returns groups according to the filter.
-        expectedListing = mkSet(
-            new ConsumerGroupListing(
-                PROTOCOL_GROUP,
+        return ConsumerGroupCommandTestUtils.buildConsumers(
+                1,
                 false,
-                Optional.of(ConsumerGroupState.STABLE),
-                Optional.of(GroupType.CONSUMER)
-            )
+                topicName,
+                () -> new KafkaConsumer<String, String>(configs)
         );
+    }
 
-        assertGroupListing(
-            service,
-            mkSet(GroupType.CONSUMER),
-            Collections.emptySet(),
-            expectedListing
+    private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String 
groupId, Set<TopicPartition> topicPartitions) {

Review Comment:
   This is a bit weird since we don't use `groupId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to