chenyulin0719 commented on code in PR #18513: URL: https://github.com/apache/kafka/pull/18513#discussion_r1928839055
########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -1839,250 +1839,209 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Utils.closeQuietly(producer, "producer") } - val EMPTY_GROUP_INSTANCE_ID = "" val testGroupId = "test_group_id" val testClientId = "test_client_id" val testInstanceId1 = "test_instance_id_1" val testInstanceId2 = "test_instance_id_2" val fakeGroupId = "fake_group_id" - def createProperties(groupInstanceId: String): Properties = { - val newConsumerConfig = new Properties(consumerConfig) - // We need to disable the auto commit because after the members got removed from group, the offset commit - // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) - newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) - newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) - if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) { - newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) - } - newConsumerConfig - } - // contains two static members and one dynamic member - val groupInstanceSet = Set(testInstanceId1, testInstanceId2, EMPTY_GROUP_INSTANCE_ID) - val consumerSet = groupInstanceSet.map { groupInstanceId => createConsumer(configOverrides = createProperties(groupInstanceId))} + val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "") val topicSet = Set(testTopicName, testTopicName1, testTopicName2) + val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId, testClientId, new Properties(consumerConfig)) - val latch = new CountDownLatch(consumerSet.size) - try { - def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): Thread = { - new Thread { - override def run : Unit = { - consumer.subscribe(Collections.singleton(topic)) - try { - while (true) { - consumer.poll(JDuration.ofSeconds(5)) - if (!consumer.assignment.isEmpty && latch.getCount > 0L) - latch.countDown() - try { - consumer.commitSync() - } catch { - case _: CommitFailedException => // Ignore and retry on next iteration. - } - } - } catch { - case _: InterruptException => // Suppress the output to stderr - } - } - } - } + // We need to disable the auto commit because after the members got removed from group, the offset commit + // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) + val configOverrides = new Properties() + configOverrides.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + groupInstanceSet.zip(topicSet).foreach(zipped => backgroundConsumerSet.addConsumer(zipped._1, zipped._2, configOverrides)) Review Comment: Done. Move groupInstanceId to addConsumer's configOverrides property. ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -1839,250 +1839,209 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Utils.closeQuietly(producer, "producer") } - val EMPTY_GROUP_INSTANCE_ID = "" val testGroupId = "test_group_id" val testClientId = "test_client_id" val testInstanceId1 = "test_instance_id_1" val testInstanceId2 = "test_instance_id_2" val fakeGroupId = "fake_group_id" - def createProperties(groupInstanceId: String): Properties = { - val newConsumerConfig = new Properties(consumerConfig) - // We need to disable the auto commit because after the members got removed from group, the offset commit - // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) - newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) - newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) - if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) { - newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) - } - newConsumerConfig - } - // contains two static members and one dynamic member - val groupInstanceSet = Set(testInstanceId1, testInstanceId2, EMPTY_GROUP_INSTANCE_ID) - val consumerSet = groupInstanceSet.map { groupInstanceId => createConsumer(configOverrides = createProperties(groupInstanceId))} + val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "") val topicSet = Set(testTopicName, testTopicName1, testTopicName2) + val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId, testClientId, new Properties(consumerConfig)) - val latch = new CountDownLatch(consumerSet.size) - try { - def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): Thread = { - new Thread { - override def run : Unit = { - consumer.subscribe(Collections.singleton(topic)) - try { - while (true) { - consumer.poll(JDuration.ofSeconds(5)) - if (!consumer.assignment.isEmpty && latch.getCount > 0L) - latch.countDown() - try { - consumer.commitSync() - } catch { - case _: CommitFailedException => // Ignore and retry on next iteration. - } - } - } catch { - case _: InterruptException => // Suppress the output to stderr - } - } - } - } + // We need to disable the auto commit because after the members got removed from group, the offset commit + // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) + val configOverrides = new Properties() + configOverrides.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + groupInstanceSet.zip(topicSet).foreach(zipped => backgroundConsumerSet.addConsumer(zipped._1, zipped._2, configOverrides)) Review Comment: Done. Moved groupInstanceId to addConsumer's configOverrides property. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org