hachikuji commented on a change in pull request #8238: URL: https://github.com/apache/kafka/pull/8238#discussion_r432178706
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java ########## @@ -31,8 +37,21 @@ * @param isSimpleConsumerGroup If consumer group is simple or not. */ public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) { + this(groupId, isSimpleConsumerGroup, Optional.empty()); + } + + /** + * Create an instance with the specified parameters. + * + * @param groupId Group Id + * @param isSimpleConsumerGroup If consumer group is simple or not. + * @param state The state of the consumer group + */ + public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, Optional<ConsumerGroupState> state) { + Objects.requireNonNull(state); this.groupId = groupId; this.isSimpleConsumerGroup = isSimpleConsumerGroup; + this.state = state; Review comment: nit: usually we would write this is `this.state = requireNonNull(state);` ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java ########## @@ -26,4 +31,34 @@ */ @InterfaceStability.Evolving public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> { + + private Optional<Set<ConsumerGroupState>> states = Optional.empty(); + + /** + * Only groups in these states will be returned by listConsumerGroups() Review comment: Can you address this comment? ########## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ########## @@ -1061,10 +1061,27 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)) // Test that we can list the new group. TestUtils.waitUntilTrue(() => { - val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId) - matching.nonEmpty + val matching = client.listConsumerGroups.all.get.asScala.filter(group => + group.groupId == testGroupId && + group.state.get == ConsumerGroupState.STABLE) + matching.size == 1 }, s"Expected to be able to list $testGroupId") + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions() Review comment: This seems the same as the previous case. It might be more interesting if we request only groups that are stable? Then we have covered both successful and unsuccessful matching. ########## File path: core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala ########## @@ -44,4 +48,100 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--list") getConsumerGroupService(cgcArgs) } + + @Test + def testListConsumerGroupsWithStates(): Unit = { + val simpleGroup = "simple-group" + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state") + val service = getConsumerGroupService(cgcArgs) + + val expectedListing = Set( + new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), Review comment: nit: looks misaligned ########## File path: core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala ########## @@ -44,4 +48,100 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--list") getConsumerGroupService(cgcArgs) } + + @Test + def testListConsumerGroupsWithStates(): Unit = { + val simpleGroup = "simple-group" + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state") + val service = getConsumerGroupService(cgcArgs) + + val expectedListing = Set( + new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), + new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + + var foundListing = Set.empty[ConsumerGroupListing] + TestUtils.waitUntilTrue(() => { + foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + expectedListing == foundListing + }, s"Expected to show groups $expectedListing, but found $foundListing") + + val expectedListingStable = Set( + new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + + foundListing = Set.empty[ConsumerGroupListing] + TestUtils.waitUntilTrue(() => { + foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + expectedListingStable == foundListing + }, s"Expected to show groups $expectedListingStable, but found $foundListing") + } + + @Test + def testConsumerGroupStatesFromString(): Unit = { + var result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable") + assertEquals(Set(ConsumerGroupState.STABLE), result) + + result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable, PreparingRebalance") + assertEquals(Set(ConsumerGroupState.STABLE, ConsumerGroupState.PREPARING_REBALANCE), result) + + result = ConsumerGroupCommand.consumerGroupStatesFromString("Dead,CompletingRebalance,") + assertEquals(Set(ConsumerGroupState.DEAD, ConsumerGroupState.COMPLETING_REBALANCE), result) + + try { + ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong") Review comment: nit: we could use `assertThrows` or `intercept` for all of these ########## File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ########## @@ -41,26 +41,37 @@ import org.apache.kafka.common.protocol.Errors import scala.collection.immutable.TreeMap import scala.reflect.ClassTag import org.apache.kafka.common.requests.ListOffsetResponse +import org.apache.kafka.common.ConsumerGroupState +import joptsimple.OptionException object ConsumerGroupCommand extends Logging { + val allStates = ConsumerGroupState.values.toList Review comment: Is this used? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org