dajac commented on code in PR #18513: URL: https://github.com/apache/kafka/pull/18513#discussion_r1938990654
########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, Collections.emptyList(), null, null), topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG)) } + + class BackgroundConsumerSet(testGroupId: String, testClientId: String, defaultConsumerConfig: Properties) { + private val consumerSet: scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = scala.collection.mutable.Set.empty + private val consumerThreads: scala.collection.mutable.Set[Thread] = scala.collection.mutable.Set.empty + private var startLatch: CountDownLatch = new CountDownLatch(0) + private var stopLatch: CountDownLatch = new CountDownLatch(0) + private var consumerThreadRunning = new AtomicBoolean(false) + + defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + + def addConsumer(groupInstanceId: String, topic: String, configOverrides: Properties = new Properties()): Unit = { + val newConsumerConfig = defaultConsumerConfig.clone().asInstanceOf[Properties] + if (groupInstanceId != "") { + // static member + newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) + } + newConsumerConfig.putAll(configOverrides) + + val consumer = createConsumer(configOverrides = newConsumerConfig) + val consumerThread = createConsumerThread(consumer, topic) + consumerSet.add(consumer) Review Comment: Sounds good. I have one minor thought. I wonder whether we should turn the consumer thread into an explicit class. This would allow use to access the consumer reference and hence avoid the avoid to keep two lists here. What do you think? ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -1838,250 +1838,218 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Utils.closeQuietly(producer, "producer") } - val EMPTY_GROUP_INSTANCE_ID = "" val testGroupId = "test_group_id" val testClientId = "test_client_id" val testInstanceId1 = "test_instance_id_1" val testInstanceId2 = "test_instance_id_2" val fakeGroupId = "fake_group_id" - def createProperties(groupInstanceId: String): Properties = { - val newConsumerConfig = new Properties(consumerConfig) - // We need to disable the auto commit because after the members got removed from group, the offset commit - // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) - newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) - newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) - if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) { - newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) - } - newConsumerConfig - } - // contains two static members and one dynamic member - val groupInstanceSet = Set(testInstanceId1, testInstanceId2, EMPTY_GROUP_INSTANCE_ID) - val consumerSet = groupInstanceSet.map { groupInstanceId => createConsumer(configOverrides = createProperties(groupInstanceId))} + val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "") val topicSet = Set(testTopicName, testTopicName1, testTopicName2) - val latch = new CountDownLatch(consumerSet.size) - try { - def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): Thread = { - new Thread { - override def run : Unit = { - consumer.subscribe(Collections.singleton(topic)) - try { - while (true) { - consumer.poll(JDuration.ofSeconds(5)) - if (!consumer.assignment.isEmpty && latch.getCount > 0L) - latch.countDown() - try { - consumer.commitSync() - } catch { - case _: CommitFailedException => // Ignore and retry on next iteration. - } - } - } catch { - case _: InterruptException => // Suppress the output to stderr - } - } - } + // We need to disable the auto commit because after the members got removed from group, the offset commit + // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) + val defaultConsumerConfig = new Properties(consumerConfig) + defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + + val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId, defaultConsumerConfig) + groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) => + val configOverrides = new Properties() + if (groupInstanceId != "") { + // static member + configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) } + backgroundConsumerSet.addConsumer(topic, configOverrides) + } - // Start consumers in a thread that will subscribe to a new group. - val consumerThreads = consumerSet.zip(topicSet).map(zipped => createConsumerThread(zipped._1, zipped._2)) + try { val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC + // Start consumer polling threads in the background + backgroundConsumerSet.start() - try { - consumerThreads.foreach(_.start()) - assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)) - // Test that we can list the new group. - TestUtils.waitUntilTrue(() => { - val matching = client.listConsumerGroups.all.get.asScala.filter(group => - group.groupId == testGroupId && - group.groupState.get == GroupState.STABLE) - matching.size == 1 - }, s"Expected to be able to list $testGroupId") - - TestUtils.waitUntilTrue(() => { - val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava) - val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => - group.groupId == testGroupId && - group.groupState.get == GroupState.STABLE) - matching.size == 1 - }, s"Expected to be able to list $testGroupId in group type $groupType") - - TestUtils.waitUntilTrue(() => { - val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava) - .inGroupStates(Set(GroupState.STABLE).asJava) - val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => + // Test that we can list the new group. + TestUtils.waitUntilTrue(() => { + val matching = client.listConsumerGroups.all.get.asScala.filter(group => group.groupId == testGroupId && - group.groupState.get == GroupState.STABLE) - matching.size == 1 - }, s"Expected to be able to list $testGroupId in group type $groupType and state Stable") - - TestUtils.waitUntilTrue(() => { - val options = new ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava) - val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => - group.groupId == testGroupId && - group.groupState.get == GroupState.STABLE) - matching.size == 1 - }, s"Expected to be able to list $testGroupId in state Stable") - - TestUtils.waitUntilTrue(() => { - val options = new ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava) - val matching = client.listConsumerGroups(options).all.get.asScala.filter( - _.groupId == testGroupId) - matching.isEmpty - }, s"Expected to find zero groups") - - val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava, - new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) - assertEquals(2, describeWithFakeGroupResult.describedGroups().size()) - - // Test that we can get information about the test consumer group. - assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId)) - var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get() - if (groupType == GroupType.CLASSIC) { - assertTrue(testGroupDescription.groupEpoch.isEmpty) - assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty) - } else { - assertEquals(Optional.of(3), testGroupDescription.groupEpoch) - assertEquals(Optional.of(3), testGroupDescription.targetAssignmentEpoch) - } + group.groupState.get == GroupState.STABLE) + matching.size == 1 + }, s"Expected to be able to list $testGroupId") - assertEquals(testGroupId, testGroupDescription.groupId()) - assertFalse(testGroupDescription.isSimpleConsumerGroup) - assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) - val members = testGroupDescription.members() - members.asScala.foreach { member => - assertEquals(testClientId, member.clientId) - assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded) - } - val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()) - topicSet.foreach { topic => - val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty) - assertEquals(testNumPartitions, topicPartitions.size) - } + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava) + val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE) + matching.size == 1 + }, s"Expected to be able to list $testGroupId in group type $groupType") - val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP) - assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) - - // Test that the fake group throws GroupIdNotFoundException - assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId)) - assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId), classOf[GroupIdNotFoundException], - s"Group $fakeGroupId not found.") - - // Test that all() also throws GroupIdNotFoundException - assertFutureThrows(describeWithFakeGroupResult.all(), classOf[GroupIdNotFoundException], - s"Group $fakeGroupId not found.") - - val testTopicPart0 = new TopicPartition(testTopicName, 0) - - // Test listConsumerGroupOffsets - TestUtils.waitUntilTrue(() => { - val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() - parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 1) - }, s"Expected the offset for partition 0 to eventually become 1.") - - // Test listConsumerGroupOffsets with requireStable true - val options = new ListConsumerGroupOffsetsOptions().requireStable(true) - var parts = client.listConsumerGroupOffsets(testGroupId, options) - .partitionsToOffsetAndMetadata().get() - assertTrue(parts.containsKey(testTopicPart0)) - assertEquals(1, parts.get(testTopicPart0).offset()) - - // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec - val groupSpecs = Collections.singletonMap(testGroupId, - new ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singleton(new TopicPartition(testTopicName, 0)))) - parts = client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get() - assertTrue(parts.containsKey(testTopicPart0)) - assertEquals(1, parts.get(testTopicPart0).offset()) - - // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and requireStable option - parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get() - assertTrue(parts.containsKey(testTopicPart0)) - assertEquals(1, parts.get(testTopicPart0).offset()) - - // Test delete non-exist consumer instance - val invalidInstanceId = "invalid-instance-id" - var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions( - Collections.singleton(new MemberToRemove(invalidInstanceId)) - )) - - assertFutureThrows(removeMembersResult.all, classOf[UnknownMemberIdException]) - val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId)) - assertFutureThrows(firstMemberFuture, classOf[UnknownMemberIdException]) - - // Test consumer group deletion - var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava) - assertEquals(2, deleteResult.deletedGroups().size()) - - // Deleting the fake group ID should get GroupIdNotFoundException. - assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId)) - assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId), - classOf[GroupIdNotFoundException]) - - // Deleting the real group ID should get GroupNotEmptyException - assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) - assertFutureThrows(deleteResult.deletedGroups().get(testGroupId), - classOf[GroupNotEmptyException]) + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava) + .inGroupStates(Set(GroupState.STABLE).asJava) + val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE) + matching.size == 1 + }, s"Expected to be able to list $testGroupId in group type $groupType and state Stable") - // Test delete one correct static member - val removeOptions = new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1))) - removeOptions.reason("test remove") - removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, removeOptions) - - assertNull(removeMembersResult.all().get()) - val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId1)) - assertNull(validMemberFuture.get()) - - val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava, - new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) - assertEquals(1, describeTestGroupResult.describedGroups().size()) - - testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get() - - assertEquals(testGroupId, testGroupDescription.groupId) - assertFalse(testGroupDescription.isSimpleConsumerGroup) - assertEquals(consumerSet.size - 1, testGroupDescription.members().size()) - - // Delete all active members remaining (a static member + a dynamic member) - removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions()) - assertNull(removeMembersResult.all().get()) - - // The group should contain no members now. - testGroupDescription = client.describeConsumerGroups(Seq(testGroupId).asJava, - new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) - .describedGroups().get(testGroupId).get() - assertTrue(testGroupDescription.members().isEmpty) - - // Consumer group deletion on empty group should succeed - deleteResult = client.deleteConsumerGroups(Seq(testGroupId).asJava) - assertEquals(1, deleteResult.deletedGroups().size()) - - assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) - assertNull(deleteResult.deletedGroups().get(testGroupId).get()) - - // Test alterConsumerGroupOffsets - val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(testGroupId, - Collections.singletonMap(testTopicPart0, new OffsetAndMetadata(0L))) - assertNull(alterConsumerGroupOffsetsResult.all().get()) - assertNull(alterConsumerGroupOffsetsResult.partitionResult(testTopicPart0).get()) - - // Verify alterConsumerGroupOffsets success - TestUtils.waitUntilTrue(() => { - val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() - parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 0) - }, s"Expected the offset for partition 0 to eventually become 0.") - } finally { - consumerThreads.foreach { - case consumerThread => - consumerThread.interrupt() - consumerThread.join() + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava) + val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE) + matching.size == 1 + }, s"Expected to be able to list $testGroupId in state Stable") + + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava) + val matching = client.listConsumerGroups(options).all.get.asScala.filter( + _.groupId == testGroupId) + matching.isEmpty + }, s"Expected to find zero groups") + + val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava, + new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) + assertEquals(2, describeWithFakeGroupResult.describedGroups().size()) + + // Test that we can get information about the test consumer group. + assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId)) + var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get() + if (groupType == GroupType.CLASSIC) { + assertTrue(testGroupDescription.groupEpoch.isEmpty) + assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty) + } else { + assertEquals(Optional.of(3), testGroupDescription.groupEpoch) + assertEquals(Optional.of(3), testGroupDescription.targetAssignmentEpoch) } - } + + assertEquals(testGroupId, testGroupDescription.groupId()) + assertFalse(testGroupDescription.isSimpleConsumerGroup) + assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) + val members = testGroupDescription.members() + members.asScala.foreach { member => + assertEquals(testClientId, member.clientId) + assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded) + } + val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()) + topicSet.foreach { topic => + val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty) + assertEquals(testNumPartitions, topicPartitions.size) + } + + val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP) + assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) + + // Test that the fake group throws GroupIdNotFoundException + assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId)) + assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId), classOf[GroupIdNotFoundException], + s"Group $fakeGroupId not found.") + + // Test that all() also throws GroupIdNotFoundException + assertFutureThrows(describeWithFakeGroupResult.all(), classOf[GroupIdNotFoundException], + s"Group $fakeGroupId not found.") + + val testTopicPart0 = new TopicPartition(testTopicName, 0) + + // Test listConsumerGroupOffsets + TestUtils.waitUntilTrue(() => { + val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() + parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 1) + }, s"Expected the offset for partition 0 to eventually become 1.") + + // Test listConsumerGroupOffsets with requireStable true + val options = new ListConsumerGroupOffsetsOptions().requireStable(true) + var parts = client.listConsumerGroupOffsets(testGroupId, options) + .partitionsToOffsetAndMetadata().get() + assertTrue(parts.containsKey(testTopicPart0)) + assertEquals(1, parts.get(testTopicPart0).offset()) + + // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec + val groupSpecs = Collections.singletonMap(testGroupId, + new ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singleton(new TopicPartition(testTopicName, 0)))) + parts = client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get() + assertTrue(parts.containsKey(testTopicPart0)) + assertEquals(1, parts.get(testTopicPart0).offset()) + + // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and requireStable option + parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get() + assertTrue(parts.containsKey(testTopicPart0)) + assertEquals(1, parts.get(testTopicPart0).offset()) + + // Test delete non-exist consumer instance + val invalidInstanceId = "invalid-instance-id" + var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions( + Collections.singleton(new MemberToRemove(invalidInstanceId)) + )) + + assertFutureThrows(removeMembersResult.all, classOf[UnknownMemberIdException]) + val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId)) + assertFutureThrows(firstMemberFuture, classOf[UnknownMemberIdException]) + + // Test consumer group deletion + var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava) + assertEquals(2, deleteResult.deletedGroups().size()) + + // Deleting the fake group ID should get GroupIdNotFoundException. + assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId)) + assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId), + classOf[GroupIdNotFoundException]) + + // Deleting the real group ID should get GroupNotEmptyException + assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) + assertFutureThrows(deleteResult.deletedGroups().get(testGroupId), + classOf[GroupNotEmptyException]) + + // Stop the consumer threads and close consumers to prevent rejoining. + // dynamic member will be removed, leaving two static members in the group + backgroundConsumerSet.stop() + + val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava, + new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) + assertEquals(1, describeTestGroupResult.describedGroups().size()) + + testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get() + assertEquals(testGroupId, testGroupDescription.groupId) + assertFalse(testGroupDescription.isSimpleConsumerGroup) + assertEquals(groupInstanceSet.size - 1, testGroupDescription.members().size()) + + // Test delete one correct static member + removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, + new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1)))) + + assertNull(removeMembersResult.all().get()) + assertNull(removeMembersResult.memberResult(new MemberToRemove(testInstanceId1)).get()) + + // Delete all active members remaining (a static member) + removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions()) Review Comment: It is tricky to reliably test this because we need to shutdown the consumers without making them leave the group. Otherwise, there is always a chance of having them rejoin the group before we run the assertions. This actually made me think that we could use `internal.leave.group.on.close` for this purpose so I am +1 for adding a small and separate integration test for this use case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org