hachikuji commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r432178706



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
##########
@@ -31,8 +37,21 @@
      * @param isSimpleConsumerGroup If consumer group is simple or not.
      */
     public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) 
{
+        this(groupId, isSimpleConsumerGroup, Optional.empty());
+    }
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param groupId Group Id
+     * @param isSimpleConsumerGroup If consumer group is simple or not.
+     * @param state The state of the consumer group
+     */
+    public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, 
Optional<ConsumerGroupState> state) {
+        Objects.requireNonNull(state);
         this.groupId = groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+        this.state = state;

Review comment:
       nit: usually we would write this is `this.state = requireNonNull(state);`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##########
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroupsOptions> {
+
+    private Optional<Set<ConsumerGroupState>> states = Optional.empty();
+
+    /**
+     * Only groups in these states will be returned by listConsumerGroups()

Review comment:
       Can you address this comment?

##########
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -1061,10 +1061,27 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
           // Test that we can list the new group.
           TestUtils.waitUntilTrue(() => {
-            val matching = 
client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId)
-            matching.nonEmpty
+            val matching = 
client.listConsumerGroups.all.get.asScala.filter(group =>
+                group.groupId == testGroupId &&
+                group.state.get == ConsumerGroupState.STABLE)
+            matching.size == 1
           }, s"Expected to be able to list $testGroupId")
 
+          TestUtils.waitUntilTrue(() => {
+            val options = new ListConsumerGroupsOptions()

Review comment:
       This seems the same as the previous case. It might be more interesting 
if we request only groups that are stable? Then we have covered both successful 
and unsuccessful matching.

##########
File path: core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
##########
@@ -44,4 +48,100 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, 
"--list")
     getConsumerGroupService(cgcArgs)
   }
+
+  @Test
+  def testListConsumerGroupsWithStates(): Unit = {
+    val simpleGroup = "simple-group"
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedListing = Set(
+        new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),

Review comment:
       nit: looks misaligned

##########
File path: core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
##########
@@ -44,4 +48,100 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, 
"--list")
     getConsumerGroupService(cgcArgs)
   }
+
+  @Test
+  def testListConsumerGroupsWithStates(): Unit = {
+    val simpleGroup = "simple-group"
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedListing = Set(
+        new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
+        new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+
+    var foundListing = Set.empty[ConsumerGroupListing]
+    TestUtils.waitUntilTrue(() => {
+      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      expectedListing == foundListing
+    }, s"Expected to show groups $expectedListing, but found $foundListing")
+
+    val expectedListingStable = Set(
+        new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+
+    foundListing = Set.empty[ConsumerGroupListing]
+    TestUtils.waitUntilTrue(() => {
+      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      expectedListingStable == foundListing
+    }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
+  }
+
+  @Test
+  def testConsumerGroupStatesFromString(): Unit = {
+    var result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable")
+    assertEquals(Set(ConsumerGroupState.STABLE), result)
+
+    result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable, 
PreparingRebalance")
+    assertEquals(Set(ConsumerGroupState.STABLE, 
ConsumerGroupState.PREPARING_REBALANCE), result)
+
+    result = 
ConsumerGroupCommand.consumerGroupStatesFromString("Dead,CompletingRebalance,")
+    assertEquals(Set(ConsumerGroupState.DEAD, 
ConsumerGroupState.COMPLETING_REBALANCE), result)
+
+    try {
+      ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong")

Review comment:
       nit: we could use `assertThrows` or `intercept` for all of these

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -41,26 +41,37 @@ import org.apache.kafka.common.protocol.Errors
 import scala.collection.immutable.TreeMap
 import scala.reflect.ClassTag
 import org.apache.kafka.common.requests.ListOffsetResponse
+import org.apache.kafka.common.ConsumerGroupState
+import joptsimple.OptionException
 
 object ConsumerGroupCommand extends Logging {
 
+  val allStates = ConsumerGroupState.values.toList

Review comment:
       Is this used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to