showuon commented on code in PR #15908:
URL: https://github.com/apache/kafka/pull/15908#discussion_r1609296852


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##########
@@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) {
     static <T> AutoCloseable buildConsumers(int numberOfConsumers,
                                             boolean syncCommit,
                                             String topic,
+                                            Collection<TopicPartition> 
topicPartitions,

Review Comment:
   nit: Change to `set` to be more specific.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##########
@@ -311,6 +311,7 @@ private AutoCloseable consumerGroupClosable(ClusterInstance 
cluster, GroupProtoc
                 1,
                 false,
                 topicName,
+                null,

Review Comment:
   nit: Could we use empty set instead of null here?



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##########
@@ -141,699 +162,773 @@ public void testPrintVersion(String quorum) {
         }
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-    public void testDescribeOffsetsOfNonExistingGroup(String quorum, String 
groupProtocol) throws Exception {
-        String group = "missing.group";
-        createOffsetsTopic(listenerName(), new Properties());
+    @ClusterTemplate("generator")
+    public void testDescribeOffsetsOfNonExistingGroup() throws Exception {
+        String missingGroup = "missing.group";
+        createTopic(TOPIC);
 
         // run one consumer in the group consuming from a single-partition 
topic
-        addConsumerGroupExecutor(1, groupProtocol);
-        // note the group to be queried is a different (non-existing) group
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", group};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        Entry<Optional<ConsumerGroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(group);
-        assertTrue(res.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
-            "Expected the state to be 'Dead', with no members in the group '" 
+ group + "'.");
+        GroupProtocol groupProtocol = 
clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true")
 ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC;
+        try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, 
Collections.emptyMap());
+             // note the group to be queried is a different (non-existing) 
group
+             ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup});
+        ) {
+            Entry<Optional<ConsumerGroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(missingGroup);
+            assertTrue(res.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
+                    "Expected the state to be 'Dead', with no members in the 
group '" + missingGroup + "'.");
+        }
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-    public void testDescribeMembersOfNonExistingGroup(String quorum, String 
groupProtocol) throws Exception {
+    @ClusterTemplate("generator")
+    public void testDescribeMembersOfNonExistingGroup() throws Exception {
         String group = "missing.group";
-        createOffsetsTopic(listenerName(), new Properties());
+        createTopic(TOPIC);
 
         // run one consumer in the group consuming from a single-partition 
topic
-        addConsumerGroupExecutor(1, groupProtocol);
-        // note the group to be queried is a different (non-existing) group
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", group};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        Entry<Optional<ConsumerGroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group, false);
-        assertTrue(res.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
-            "Expected the state to be 'Dead', with no members in the group '" 
+ group + "'.");
-
-        Entry<Optional<ConsumerGroupState>, 
Optional<Collection<MemberAssignmentState>>> res2 = 
service.collectGroupMembers(group, true);
-        assertTrue(res2.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res2.getValue().map(Collection::isEmpty).orElse(false),
-            "Expected the state to be 'Dead', with no members in the group '" 
+ group + "' (verbose option).");
+        GroupProtocol groupProtocol = 
clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true")
 ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC;
+        try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, 
Collections.emptyMap());
+             // note the group to be queried is a different (non-existing) 
group
+             ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", group});
+        ) {
+            Entry<Optional<ConsumerGroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group, false);
+            assertTrue(res.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res.getValue().map(Collection::isEmpty).orElse(false),
+                    "Expected the state to be 'Dead', with no members in the 
group '" + group + "'.");
+
+            Entry<Optional<ConsumerGroupState>, 
Optional<Collection<MemberAssignmentState>>> res2 = 
service.collectGroupMembers(group, true);
+            assertTrue(res2.getKey().map(s -> 
s.equals(ConsumerGroupState.DEAD)).orElse(false) && 
res2.getValue().map(Collection::isEmpty).orElse(false),
+                    "Expected the state to be 'Dead', with no members in the 
group '" + group + "' (verbose option).");
+        }
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-    public void testDescribeStateOfNonExistingGroup(String quorum, String 
groupProtocol) throws Exception {
+    @ClusterTemplate("generator")
+    public void testDescribeStateOfNonExistingGroup() throws Exception {
         String group = "missing.group";
-        createOffsetsTopic(listenerName(), new Properties());
+        createTopic(TOPIC);
 
         // run one consumer in the group consuming from a single-partition 
topic
-        addConsumerGroupExecutor(1, groupProtocol);
-        // note the group to be queried is a different (non-existing) group
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", group};
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
-        GroupState state = service.collectGroupState(group);
-        assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && 
state.numMembers == 0 &&
-                state.coordinator != null && !brokers().filter(s -> 
s.config().brokerId() == state.coordinator.id()).isEmpty(),
-            "Expected the state to be 'Dead', with no members in the group '" 
+ group + "'."
-        );
+        GroupProtocol groupProtocol = 
clusterInstance.config().serverProperties().get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true")
 ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC;
+        try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, PROTOCOL_GROUP, TOPIC, 
Collections.emptyMap());

Review Comment:
   It looks to me that we could put the logic of determining `groupProtocol` in 
`consumerGroupClosable` method, so that we don't have to do the same thing in 
each test. Ex: Add a `consumerGroupClosable` method, and replace the 
`groupProtocol` with `clusterInstance.config().serverProperties()`. So we can 
do the groupProtocol determination inside the `consumerGroupClosable` method:
   
   ```
   private AutoCloseable consumerGroupClosable(Map<String, String> 
serverProperties, GroupProtocol protocol, String groupId, String topicName) {
       GroupProtocol groupProtocol = 
serverProperties.get(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG).trim().equals("true")
 ? GroupProtocol.CONSUMER : GroupProtocol.CLASSIC;
       return   consumerGroupClosable(groupProtocol, ...);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to