FrankYang0529 commented on code in PR #15908: URL: https://github.com/apache/kafka/pull/15908#discussion_r1614679946
########## tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java: ########## @@ -141,699 +162,773 @@ public void testPrintVersion(String quorum) { } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeOffsetsOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { - String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeOffsetsOfNonExistingGroup() throws Exception { + String missingGroup = "missing.group"; + createTopic(TOPIC); // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "'."); + GroupProtocol groupProtocol = clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true") ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC; + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, Collections.emptyMap()); + // note the group to be queried is a different (non-existing) group + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}); + ) { + Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(missingGroup); + assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."); + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeMembersOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { + @ClusterTemplate("generator") + public void testDescribeMembersOfNonExistingGroup() throws Exception { String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); + createTopic(TOPIC); // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "'."); - - Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true); - assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option)."); + GroupProtocol groupProtocol = clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true") ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC; + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, Collections.emptyMap()); + // note the group to be queried is a different (non-existing) group + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}); + ) { + Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false); + assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + group + "'."); + + Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true); + assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option)."); + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeStateOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { + @ClusterTemplate("generator") + public void testDescribeStateOfNonExistingGroup() throws Exception { String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); + createTopic(TOPIC); // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - GroupState state = service.collectGroupState(group); - assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && state.numMembers == 0 && - state.coordinator != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator.id()).isEmpty(), - "Expected the state to be 'Dead', with no members in the group '" + group + "'." - ); + GroupProtocol groupProtocol = clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true") ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC; + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, Collections.emptyMap()); Review Comment: Thanks for the suggestion. I found that I only run consumer group protocol when `group.coordinator.new.enable` is true. I change to use `ClusterInstance#supportedGroupProtocols` to check supported protocols, so we can run all cases like before. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org