chenyulin0719 commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1928839828


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, testClientId: String, 
defaultConsumerConfig: Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+    defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+    def addConsumer(groupInstanceId: String, topic: String, configOverrides: 
Properties = new Properties()): Unit = {
+      val newConsumerConfig = 
defaultConsumerConfig.clone().asInstanceOf[Properties]

Review Comment:
   `new Properties(defaultConsumerConfig)` doesn't work here. It simply assign 
`defaultConsumerConfig` to a shadow property map.  
   
   However, in the implementation of `IntegrationTestHarness#createConsumer`, 
the `props ++= configOverrides` only combine those key/value pairs that are 
explicitly set. So the shadow property map will be ignored. As a result, the 
`defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)` in test will be ignored.
   
   I think we should keep using deep clone. WDYT?
   
   [1] 
https://github.com/apache/kafka/blob/5d81fe20c836670cbb763f6976ab3e59957feb34/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala#L194
   [2] 
https://github.com/apache/kafka/blob/5d81fe20c836670cbb763f6976ab3e59957feb34/core/src/main/scala/kafka/utils/Implicits.scala#L39-L42
   



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, testClientId: String, 
defaultConsumerConfig: Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+    defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+    def addConsumer(groupInstanceId: String, topic: String, configOverrides: 
Properties = new Properties()): Unit = {
+      val newConsumerConfig = 
defaultConsumerConfig.clone().asInstanceOf[Properties]

Review Comment:
   `new Properties(defaultConsumerConfig)` doesn't work here. It simply assign 
`defaultConsumerConfig` to a shadow property map.  
   
   However, in the implementation of `IntegrationTestHarness#createConsumer`, 
the `props ++= configOverrides` only combine those key/value pairs that are 
explicitly set. So the shadow property map will be ignored. As a result, the 
`defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)` in test will be ignored.
   
   I think we should keep using deep clone. @dajac WDYT?
   
   [1] 
https://github.com/apache/kafka/blob/5d81fe20c836670cbb763f6976ab3e59957feb34/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala#L194
   [2] 
https://github.com/apache/kafka/blob/5d81fe20c836670cbb763f6976ab3e59957feb34/core/src/main/scala/kafka/utils/Implicits.scala#L39-L42
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to