showuon commented on code in PR #15908: URL: https://github.com/apache/kafka/pull/15908#discussion_r1609296852
########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ########## @@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) { static <T> AutoCloseable buildConsumers(int numberOfConsumers, boolean syncCommit, String topic, + Collection<TopicPartition> topicPartitions, Review Comment: nit: Change to `set` to be more specific. ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ########## @@ -311,6 +311,7 @@ private AutoCloseable consumerGroupClosable(ClusterInstance cluster, GroupProtoc 1, false, topicName, + null, Review Comment: nit: Could we use empty set instead of null here? ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java: ########## @@ -141,699 +162,773 @@ public void testPrintVersion(String quorum) { } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeOffsetsOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { - String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeOffsetsOfNonExistingGroup() throws Exception { + String missingGroup = "missing.group"; + createTopic(TOPIC); // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "'."); + GroupProtocol groupProtocol = clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true") ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC; + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, Collections.emptyMap()); + // note the group to be queried is a different (non-existing) group + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}); + ) { + Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(missingGroup); + assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."); + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeMembersOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { + @ClusterTemplate("generator") + public void testDescribeMembersOfNonExistingGroup() throws Exception { String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); + createTopic(TOPIC); // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "'."); - - Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true); - assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option)."); + GroupProtocol groupProtocol = clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true") ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC; + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, Collections.emptyMap()); + // note the group to be queried is a different (non-existing) group + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}); + ) { + Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false); + assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + group + "'."); + + Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true); + assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option)."); + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeStateOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { + @ClusterTemplate("generator") + public void testDescribeStateOfNonExistingGroup() throws Exception { String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); + createTopic(TOPIC); // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - GroupState state = service.collectGroupState(group); - assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && state.numMembers == 0 && - state.coordinator != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator.id()).isEmpty(), - "Expected the state to be 'Dead', with no members in the group '" + group + "'." - ); + GroupProtocol groupProtocol = clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true") ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC; + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, Collections.emptyMap()); Review Comment: It looks to me that we could put the logic of determining `groupProtocol` in `consumerGroupClosable` method, so that we don't have to do the same thing in each test. Ex: Add a `consumerGroupClosable` method, and replace the `groupProtocol` with `clusterInstance.config().serverProperties()`. So we can do the groupProtocol determination inside the `consumerGroupClosable` method: ``` private AutoCloseable consumerGroupClosable(Map<String, String> serverProperties, GroupProtocol protocol, String groupId, String topicName) { GroupProtocol groupProtocol = serverProperties.get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true") ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC; return consumerGroupClosable(groupProtocol, ...); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org