chia7712 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1612965039
########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ########## @@ -68,15 +68,31 @@ static List<ClusterConfig> generator() { ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder() .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet())) .setServerProperties(serverProperties) - .setTags(Collections.singletonList("newGroupCoordinator")) + .setTags(Collections.singletonList("consumerGroupCoordinator")) .build(); return Arrays.asList(classicGroupCoordinator, consumerGroupCoordinator); } + static <T> AutoCloseable buildConsumers(int numberOfConsumers, + boolean syncCommit, Review Comment: In this case the consumers don't use consumer offsets, so `syncCommit` must be "false" to avoid error ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ########## @@ -17,449 +17,503 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; +import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -public class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource("getTestQuorumAndGroupProtocolParametersAll") - public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { - String simpleGroup = "simple-group"; - - createOffsetsTopic(listenerName(), new Properties()); - - addSimpleGroupExecutor(simpleGroup); - addConsumerGroupExecutor(1); - addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class ListConsumerGroupTest { + private final static String TOPIC = "foo"; + private final static String SIMPLE_GROUP = "test.simple.group"; + private final static String DEFAULT_GROUP = "test.default.group"; + private final static String PROTOCOL_GROUP = "test.protocol.group"; + private final ClusterInstance clusterInstance; + + ListConsumerGroupTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + private static List<ClusterConfig> defaultGenerator() { + return ConsumerGroupCommandTestUtils.generator(); + } - Set<String> expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); - final AtomicReference<Set> foundGroups = new AtomicReference<>(); + private static List<ClusterConfig> consumerProtocolOnlyGenerator() { + Map<String, String> serverProperties = new HashMap<>(); + serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); + serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + serverProperties.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); + + ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder() + .setTypes(Stream.of(Type.KRAFT, Type.CO_KRAFT).collect(Collectors.toSet())) + .setTags(Collections.singletonList("consumerGroupCoordinator")) + .setServerProperties(serverProperties) + .build(); + return Collections.singletonList(consumerGroupCoordinator); + } - TestUtils.waitForCondition(() -> { - foundGroups.set(set(service.listConsumerGroups())); - return Objects.equals(expectedGroups, foundGroups.get()); - }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + "."); + @ClusterTemplate("defaultGenerator") + public void testListConsumerGroupsWithoutFilters() throws Exception { + createTopic(TOPIC); + + try (AutoCloseable simpleConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, Collections.singleton(new TopicPartition(TOPIC, 0))); + AutoCloseable defaultConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, DEFAULT_GROUP, TOPIC); + AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable( + clusterInstance.config().serverProperties().get("group.coordinator.new.enable") == "true" ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC, + PROTOCOL_GROUP, + TOPIC); + + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"}); + ) { + Set<String> expectedGroups = set(Arrays.asList(SIMPLE_GROUP, DEFAULT_GROUP, PROTOCOL_GROUP)); + final AtomicReference<Set> foundGroups = new AtomicReference<>(); + + TestUtils.waitForCondition(() -> { + foundGroups.set(set(service.listConsumerGroups())); + return Objects.equals(expectedGroups, foundGroups.get()); + }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + "."); + } } - @Test + @ClusterTemplate("defaultGenerator") public void testListWithUnrecognizedNewConsumerOption() { - String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--list"}; - assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); + String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", clusterInstance.bootstrapServers(), "--list"}; + Assertions.assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource("getTestQuorumAndGroupProtocolParametersAll") - public void testListConsumerGroupsWithStates(String quorum, String groupProtocol) throws Exception { - String simpleGroup = "simple-group"; - - createOffsetsTopic(listenerName(), new Properties()); - - addSimpleGroupExecutor(simpleGroup); - addConsumerGroupExecutor(1, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - Set<ConsumerGroupListing> expectedListing = mkSet( - new ConsumerGroupListing( - simpleGroup, - true, - Optional.of(ConsumerGroupState.EMPTY), - Optional.of(GroupType.CLASSIC) - ), - new ConsumerGroupListing( - GROUP, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.parse(groupProtocol)) - ) - ); - - assertGroupListing( - service, - Collections.emptySet(), - EnumSet.allOf(ConsumerGroupState.class), - expectedListing - ); - - expectedListing = mkSet( - new ConsumerGroupListing( - GROUP, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.parse(groupProtocol)) - ) - ); - - assertGroupListing( - service, - Collections.emptySet(), - mkSet(ConsumerGroupState.STABLE), - expectedListing - ); - - assertGroupListing( - service, - Collections.emptySet(), - mkSet(ConsumerGroupState.PREPARING_REBALANCE), - Collections.emptySet() - ); + @ClusterTemplate("defaultGenerator") + public void testListConsumerGroupsWithStates() throws Exception { + createTopic(TOPIC); + + GroupProtocol groupProtocol = clusterInstance.config().serverProperties().get("group.coordinator.new.enable") == "true" ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC; + try (AutoCloseable simpleConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, Collections.singleton(new TopicPartition(TOPIC, 0))); + AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"}); + ) { + Set<ConsumerGroupListing> expectedListing = mkSet( + new ConsumerGroupListing( + SIMPLE_GROUP, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + PROTOCOL_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.parse(groupProtocol.name())) + ) + ); + + assertGroupListing( + service, + Collections.emptySet(), + EnumSet.allOf(ConsumerGroupState.class), + expectedListing + ); + + expectedListing = mkSet( + new ConsumerGroupListing( + PROTOCOL_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.parse(groupProtocol.name())) + ) + ); + + assertGroupListing( + service, + Collections.emptySet(), + mkSet(ConsumerGroupState.STABLE), + expectedListing + ); + + assertGroupListing( + service, + Collections.emptySet(), + mkSet(ConsumerGroupState.PREPARING_REBALANCE), + Collections.emptySet() + ); + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") - public void testListConsumerGroupsWithTypesClassicProtocol(String quorum, String groupProtocol) throws Exception { - String simpleGroup = "simple-group"; - - createOffsetsTopic(listenerName(), new Properties()); - - addSimpleGroupExecutor(simpleGroup); - addConsumerGroupExecutor(1); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - Set<ConsumerGroupListing> expectedListing = mkSet( - new ConsumerGroupListing( - simpleGroup, - true, - Optional.of(ConsumerGroupState.EMPTY), - Optional.of(GroupType.CLASSIC) - ), - new ConsumerGroupListing( - GROUP, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CLASSIC) - ) - ); - - // No filters explicitly mentioned. Expectation is that all groups are returned. - assertGroupListing( - service, - Collections.emptySet(), - Collections.emptySet(), - expectedListing - ); + @ClusterTemplate("defaultGenerator") + public void testListConsumerGroupsWithTypesClassicProtocol() throws Exception { + createTopic(TOPIC); + + try (AutoCloseable simpleConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, Collections.singleton(new TopicPartition(TOPIC, 0))); + AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, PROTOCOL_GROUP, TOPIC); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"}); + ) { + Set<ConsumerGroupListing> expectedListing = mkSet( + new ConsumerGroupListing( + SIMPLE_GROUP, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + PROTOCOL_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ); + + // No filters explicitly mentioned. Expectation is that all groups are returned. + assertGroupListing( + service, + Collections.emptySet(), + Collections.emptySet(), + expectedListing + ); + + // When group type is mentioned: + // Old Group Coordinator returns empty listings if the type is not Classic. + // New Group Coordinator returns groups according to the filter. + assertGroupListing( + service, + mkSet(GroupType.CONSUMER), + Collections.emptySet(), + Collections.emptySet() + ); + + assertGroupListing( + service, + mkSet(GroupType.CLASSIC), + Collections.emptySet(), + expectedListing + ); + } + } - // When group type is mentioned: - // Old Group Coordinator returns empty listings if the type is not Classic. - // New Group Coordinator returns groups according to the filter. - assertGroupListing( - service, - mkSet(GroupType.CONSUMER), - Collections.emptySet(), - Collections.emptySet() - ); + @ClusterTemplate("consumerProtocolOnlyGenerator") + public void testListConsumerGroupsWithTypesConsumerProtocol() throws Exception { + createTopic(TOPIC); + + try (AutoCloseable simpleConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, Collections.singleton(new TopicPartition(TOPIC, 0))); + AutoCloseable defaultConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, DEFAULT_GROUP, TOPIC); + AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CONSUMER, PROTOCOL_GROUP, TOPIC); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"}); + ) { + + + // No filters explicitly mentioned. Expectation is that all groups are returned. + Set<ConsumerGroupListing> expectedListing = mkSet( + new ConsumerGroupListing( + SIMPLE_GROUP, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + DEFAULT_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + PROTOCOL_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CONSUMER) + ) + ); + + assertGroupListing( + service, + Collections.emptySet(), + Collections.emptySet(), + expectedListing + ); + + // When group type is mentioned: + // New Group Coordinator returns groups according to the filter. + expectedListing = mkSet( + new ConsumerGroupListing( + PROTOCOL_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CONSUMER) + ) + ); + + assertGroupListing( + service, + mkSet(GroupType.CONSUMER), + Collections.emptySet(), + expectedListing + ); + + expectedListing = mkSet( + new ConsumerGroupListing( + SIMPLE_GROUP, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + DEFAULT_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ); + + assertGroupListing( + service, + mkSet(GroupType.CLASSIC), + Collections.emptySet(), + expectedListing + ); + } + } - assertGroupListing( - service, - mkSet(GroupType.CLASSIC), - Collections.emptySet(), - expectedListing - ); + @ClusterTemplate("defaultGenerator") + public void testListGroupCommandClassicProtocol() throws Exception { + createTopic(TOPIC); + + try (AutoCloseable simpleConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, Collections.singleton(new TopicPartition(TOPIC, 0))); + AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, PROTOCOL_GROUP, TOPIC) + ) { + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list"), + Collections.emptyList(), + mkSet( + Collections.singletonList(PROTOCOL_GROUP), + Collections.singletonList(SIMPLE_GROUP) + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"), + Arrays.asList("GROUP", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Stable"), + Arrays.asList(SIMPLE_GROUP, "Empty") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--type"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Classic"), + Arrays.asList(SIMPLE_GROUP, "Classic") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--type", "--state"), + Arrays.asList("GROUP", "TYPE", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Classic", "Stable"), + Arrays.asList(SIMPLE_GROUP, "Classic", "Empty") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state", "Stable"), + Arrays.asList("GROUP", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Stable") + ) + ); + + // Check case-insensitivity in state filter. + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state", "stable"), + Arrays.asList("GROUP", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Stable") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--type", "Classic"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Classic"), + Arrays.asList(SIMPLE_GROUP, "Classic") + ) + ); + + // Check case-insensitivity in type filter. + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--type", "classic"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Classic"), + Arrays.asList(SIMPLE_GROUP, "Classic") + ) + ); + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly") - public void testListConsumerGroupsWithTypesConsumerProtocol(String quorum, String groupProtocol) throws Exception { - String simpleGroup = "simple-group"; - - createOffsetsTopic(listenerName(), new Properties()); - - addSimpleGroupExecutor(simpleGroup); - addConsumerGroupExecutor(1); - addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - // No filters explicitly mentioned. Expectation is that all groups are returned. - Set<ConsumerGroupListing> expectedListing = mkSet( - new ConsumerGroupListing( - simpleGroup, - true, - Optional.of(ConsumerGroupState.EMPTY), - Optional.of(GroupType.CLASSIC) - ), - new ConsumerGroupListing( - GROUP, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CLASSIC) - ), - new ConsumerGroupListing( - PROTOCOL_GROUP, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CONSUMER) - ) - ); + @ClusterTemplate("consumerProtocolOnlyGenerator") + public void testListGroupCommandConsumerProtocol() throws Exception { + createTopic(TOPIC); + + try (AutoCloseable simpleConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, SIMPLE_GROUP, Collections.singleton(new TopicPartition(TOPIC, 0))); + AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CONSUMER, PROTOCOL_GROUP, TOPIC) + ) { + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list"), + Collections.emptyList(), + mkSet( + Collections.singletonList(PROTOCOL_GROUP), + Collections.singletonList(SIMPLE_GROUP) + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"), + Arrays.asList("GROUP", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Stable"), + Arrays.asList(SIMPLE_GROUP, "Empty") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--type"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Consumer"), + Arrays.asList(SIMPLE_GROUP, "Classic") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--type", "--state"), + Arrays.asList("GROUP", "TYPE", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable"), + Arrays.asList(SIMPLE_GROUP, "Classic", "Empty") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--type", "consumer"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Consumer") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--type", "consumer", "--state", "Stable"), + Arrays.asList("GROUP", "TYPE", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable") + ) + ); + } + } - assertGroupListing( - service, - Collections.emptySet(), - Collections.emptySet(), - expectedListing + private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, String topicName) { + Map<String, Object> configs = composeConfigs( + groupId, + protocol.name, + emptyMap() ); - // When group type is mentioned: - // New Group Coordinator returns groups according to the filter. - expectedListing = mkSet( - new ConsumerGroupListing( - PROTOCOL_GROUP, + return ConsumerGroupCommandTestUtils.buildConsumers( + 1, false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CONSUMER) - ) + topicName, + () -> new KafkaConsumer<String, String>(configs) ); + } - assertGroupListing( - service, - mkSet(GroupType.CONSUMER), - Collections.emptySet(), - expectedListing + private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, Set<TopicPartition> topicPartitions) { Review Comment: This is a bit weird since we don't use `groupId`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org