chenyulin0719 commented on code in PR #18513: URL: https://github.com/apache/kafka/pull/18513#discussion_r1926582730
########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -1864,17 +1864,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val consumerSet = groupInstanceSet.map { groupInstanceId => createConsumer(configOverrides = createProperties(groupInstanceId))} val topicSet = Set(testTopicName, testTopicName1, testTopicName2) - val latch = new CountDownLatch(consumerSet.size) + val startLatch = new CountDownLatch(consumerSet.size) Review Comment: Hi @dajac, That make sense to me, the duplicated code isn't elegant. I updated this PR and encapsulate the background polling threads to class BackgroundConsumerThreadsManager, and expose below functions: - startConsumerThreads - stopConsumerThreads - stopQuietly : thread/consumer cleanup . The stopQuietly() replaced the 2 level try-catch in test. The test case is unchanged. Please kindly let me know if I missed something. ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -2027,26 +2033,30 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFutureThrows(deleteResult.deletedGroups().get(testGroupId), classOf[GroupNotEmptyException]) - // Test delete one correct static member - val removeOptions = new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1))) - removeOptions.reason("test remove") - removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, removeOptions) - - assertNull(removeMembersResult.all().get()) - val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId1)) - assertNull(validMemberFuture.get()) + // Stop the consumer threads and close consumers + // dynamic member will be removed, leaving two static members in the group + consumerThreadRunning.set(false) + assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to stop consumer threads in time") val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava, new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) assertEquals(1, describeTestGroupResult.describedGroups().size()) testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get() - assertEquals(testGroupId, testGroupDescription.groupId) assertFalse(testGroupDescription.isSimpleConsumerGroup) assertEquals(consumerSet.size - 1, testGroupDescription.members().size()) - // Delete all active members remaining (a static member + a dynamic member) + // Test delete one correct static member + val removeOptions = new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1))) + removeOptions.reason("test remove") Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org