FrankYang0529 commented on code in PR #15908:
URL: https://github.com/apache/kafka/pull/15908#discussion_r1614679946


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##########
@@ -141,699 +162,773 @@ public void testPrintVersion(String quorum) {
         }
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-    public void testDescribeOffsetsOfNonExistingGroup(String quorum, String 
groupProtocol) throws Exception {
-        String group = "missing.group";
-        createOffsetsTopic(listenerName(), new Properties());
+    @ClusterTemplate("generator")
+    public void testDescribeOffsetsOfNonExistingGroup() throws Exception {
+        String missingGroup = "missing.group";
+        createTopic(TOPIC);
 
         // run one consumer in the group consuming from a single-partition 
topic
-        addConsumerGroupExecutor(1, groupProtocol);
-        // note the group to be queried is a different (non-existing) group
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", group};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        Entry<Optional<ConsumerGroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(group);
-        assertTrue(res.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
-            "Expected the state to be 'Dead', with no members in the group '" 
+ group + "'.");
+        GroupProtocol groupProtocol = 
clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true")
 ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC;
+        try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, 
Collections.emptyMap());
+             // note the group to be queried is a different (non-existing) 
group
+             ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup});
+        ) {
+            Entry<Optional<ConsumerGroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(missingGroup);
+            assertTrue(res.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
+                    "Expected the state to be 'Dead', with no members in the 
group '" + missingGroup + "'.");
+        }
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-    public void testDescribeMembersOfNonExistingGroup(String quorum, String 
groupProtocol) throws Exception {
+    @ClusterTemplate("generator")
+    public void testDescribeMembersOfNonExistingGroup() throws Exception {
         String group = "missing.group";
-        createOffsetsTopic(listenerName(), new Properties());
+        createTopic(TOPIC);
 
         // run one consumer in the group consuming from a single-partition 
topic
-        addConsumerGroupExecutor(1, groupProtocol);
-        // note the group to be queried is a different (non-existing) group
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", group};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        Entry<Optional<ConsumerGroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group, false);
-        assertTrue(res.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
-            "Expected the state to be 'Dead', with no members in the group '" 
+ group + "'.");
-
-        Entry<Optional<ConsumerGroupState>, 
Optional<Collection<MemberAssignmentState>>> res2 = 
service.collectGroupMembers(group, true);
-        assertTrue(res2.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res2.getValue().map(Collection::isEmpty).orElse(false),
-            "Expected the state to be 'Dead', with no members in the group '" 
+ group + "' (verbose option).");
+        GroupProtocol groupProtocol = 
clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true")
 ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC;
+        try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, 
Collections.emptyMap());
+             // note the group to be queried is a different (non-existing) 
group
+             ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", group});
+        ) {
+            Entry<Optional<ConsumerGroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group, false);
+            assertTrue(res.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
+                    "Expected the state to be 'Dead', with no members in the 
group '" + group + "'.");
+
+            Entry<Optional<ConsumerGroupState>, 
Optional<Collection<MemberAssignmentState>>> res2 = 
service.collectGroupMembers(group, true);
+            assertTrue(res2.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res2.getValue().map(Collection::isEmpty).orElse(false),
+                    "Expected the state to be 'Dead', with no members in the 
group '" + group + "' (verbose option).");
+        }
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-    public void testDescribeStateOfNonExistingGroup(String quorum, String 
groupProtocol) throws Exception {
+    @ClusterTemplate("generator")
+    public void testDescribeStateOfNonExistingGroup() throws Exception {
         String group = "missing.group";
-        createOffsetsTopic(listenerName(), new Properties());
+        createTopic(TOPIC);
 
         // run one consumer in the group consuming from a single-partition 
topic
-        addConsumerGroupExecutor(1, groupProtocol);
-        // note the group to be queried is a different (non-existing) group
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", group};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        GroupState state = service.collectGroupState(group);
-        assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && 
state.numMembers == 0 &&
-                state.coordinator != null && !brokers().filter(s -> 
s.config().brokerId() == state.coordinator.id()).isEmpty(),
-            "Expected the state to be 'Dead', with no members in the group '" 
+ group + "'."
-        );
+        GroupProtocol groupProtocol = 
clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true")
 ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC;
+        try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, 
Collections.emptyMap());

Review Comment:
   Thanks for the suggestion. I found that I only run consumer group protocol 
when `group.coordinator.new.enable` is true. I change to use 
`ClusterInstance#supportedGroupProtocols` to check supported protocols, so we 
can run all cases like before. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to