chenyulin0719 commented on code in PR #18513: URL: https://github.com/apache/kafka/pull/18513#discussion_r1928839828
########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, Collections.emptyList(), null, null), topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG)) } + + class BackgroundConsumerSet(testGroupId: String, testClientId: String, defaultConsumerConfig: Properties) { + private val consumerSet: scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = scala.collection.mutable.Set.empty + private val consumerThreads: scala.collection.mutable.Set[Thread] = scala.collection.mutable.Set.empty + private var startLatch: CountDownLatch = new CountDownLatch(0) + private var stopLatch: CountDownLatch = new CountDownLatch(0) + private var consumerThreadRunning = new AtomicBoolean(false) + + defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + + def addConsumer(groupInstanceId: String, topic: String, configOverrides: Properties = new Properties()): Unit = { + val newConsumerConfig = defaultConsumerConfig.clone().asInstanceOf[Properties] Review Comment: `new Properties(defaultConsumerConfig)` doesn't work here. It simply assign `defaultConsumerConfig` to a shadow property map. However, in the implementation of `IntegrationTestHarness#createConsumer`, the `props ++= configOverrides` only combine those key/value pairs that are explicitly set. So the shadow property map will be ignored. As a result, the `defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)` in test will be ignored. I think we should keep using deep clone. WDYT? [1] https://github.com/apache/kafka/blob/5d81fe20c836670cbb763f6976ab3e59957feb34/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala#L194 [2] https://github.com/apache/kafka/blob/5d81fe20c836670cbb763f6976ab3e59957feb34/core/src/main/scala/kafka/utils/Implicits.scala#L39-L42 ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, Collections.emptyList(), null, null), topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG)) } + + class BackgroundConsumerSet(testGroupId: String, testClientId: String, defaultConsumerConfig: Properties) { + private val consumerSet: scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = scala.collection.mutable.Set.empty + private val consumerThreads: scala.collection.mutable.Set[Thread] = scala.collection.mutable.Set.empty + private var startLatch: CountDownLatch = new CountDownLatch(0) + private var stopLatch: CountDownLatch = new CountDownLatch(0) + private var consumerThreadRunning = new AtomicBoolean(false) + + defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + + def addConsumer(groupInstanceId: String, topic: String, configOverrides: Properties = new Properties()): Unit = { + val newConsumerConfig = defaultConsumerConfig.clone().asInstanceOf[Properties] Review Comment: `new Properties(defaultConsumerConfig)` doesn't work here. It simply assign `defaultConsumerConfig` to a shadow property map. However, in the implementation of `IntegrationTestHarness#createConsumer`, the `props ++= configOverrides` only combine those key/value pairs that are explicitly set. So the shadow property map will be ignored. As a result, the `defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)` in test will be ignored. I think we should keep using deep clone. @dajac WDYT? [1] https://github.com/apache/kafka/blob/5d81fe20c836670cbb763f6976ab3e59957feb34/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala#L194 [2] https://github.com/apache/kafka/blob/5d81fe20c836670cbb763f6976ab3e59957feb34/core/src/main/scala/kafka/utils/Implicits.scala#L39-L42 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org