dajac commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1926723634


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3910,79 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerThreadsManager(testGroupId: String, testClientId: 
String, groupInstanceSet: Set[String], topicSet: Set[String]){
+    private val consumerSet: Set[Consumer[Array[Byte], Array[Byte]]] = 
groupInstanceSet.map { groupInstanceId =>
+      createConsumer(configOverrides = createProperties(groupInstanceId))
+    }
+    private val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createConsumerThread(zipped._1, zipped._2))
+    private val startLatch: CountDownLatch = new 
CountDownLatch(consumerSet.size)
+    private val stopLatch: CountDownLatch = new 
CountDownLatch(consumerSet.size)
+    private val consumerThreadRunning = new AtomicBoolean(true)
+
+    def startConsumerThreads(): Unit = {
+      consumerThreads.foreach(_.start())
+      assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
start consumer threads in time")
+    }
+
+    def stopConsumerThreads(): Unit = {
+      consumerThreadRunning.set(false)
+      assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
stop consumer threads in time")
+    }
+
+    def stopQuietly(): Unit = {
+      try {
+        consumerThreads.foreach {
+          consumerThread =>
+            consumerThread.interrupt()
+            consumerThread.join()
+        }
+      }
+      finally{
+        consumerSet.zip(groupInstanceSet).foreach(zipped => 
Utils.closeQuietly(zipped._1, zipped._2))
+      }
+    }

Review Comment:
   nit: `close(): Unit`?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3910,79 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerThreadsManager(testGroupId: String, testClientId: 
String, groupInstanceSet: Set[String], topicSet: Set[String]){

Review Comment:
   I wonder if we could make it a bit more generic. For instance, the 
constructor could take the group id and the default config for all consumers. 
Then we could have a method `addConsumer` to add a consumer with more 
specialized params. What do you think?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3910,79 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerThreadsManager(testGroupId: String, testClientId: 
String, groupInstanceSet: Set[String], topicSet: Set[String]){
+    private val consumerSet: Set[Consumer[Array[Byte], Array[Byte]]] = 
groupInstanceSet.map { groupInstanceId =>
+      createConsumer(configOverrides = createProperties(groupInstanceId))
+    }
+    private val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createConsumerThread(zipped._1, zipped._2))
+    private val startLatch: CountDownLatch = new 
CountDownLatch(consumerSet.size)
+    private val stopLatch: CountDownLatch = new 
CountDownLatch(consumerSet.size)
+    private val consumerThreadRunning = new AtomicBoolean(true)
+
+    def startConsumerThreads(): Unit = {
+      consumerThreads.foreach(_.start())
+      assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
start consumer threads in time")
+    }
+
+    def stopConsumerThreads(): Unit = {
+      consumerThreadRunning.set(false)
+      assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
stop consumer threads in time")
+    }

Review Comment:
   nit: `stop(): Unit`?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3910,79 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerThreadsManager(testGroupId: String, testClientId: 
String, groupInstanceSet: Set[String], topicSet: Set[String]){
+    private val consumerSet: Set[Consumer[Array[Byte], Array[Byte]]] = 
groupInstanceSet.map { groupInstanceId =>
+      createConsumer(configOverrides = createProperties(groupInstanceId))
+    }
+    private val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createConsumerThread(zipped._1, zipped._2))
+    private val startLatch: CountDownLatch = new 
CountDownLatch(consumerSet.size)
+    private val stopLatch: CountDownLatch = new 
CountDownLatch(consumerSet.size)
+    private val consumerThreadRunning = new AtomicBoolean(true)
+
+    def startConsumerThreads(): Unit = {
+      consumerThreads.foreach(_.start())
+      assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
start consumer threads in time")
+    }

Review Comment:
   nit: `start(): Unit`?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3910,79 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerThreadsManager(testGroupId: String, testClientId: 
String, groupInstanceSet: Set[String], topicSet: Set[String]){

Review Comment:
   nit: `BackgroundConsumerSet`?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3910,79 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerThreadsManager(testGroupId: String, testClientId: 
String, groupInstanceSet: Set[String], topicSet: Set[String]){
+    private val consumerSet: Set[Consumer[Array[Byte], Array[Byte]]] = 
groupInstanceSet.map { groupInstanceId =>
+      createConsumer(configOverrides = createProperties(groupInstanceId))
+    }
+    private val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createConsumerThread(zipped._1, zipped._2))
+    private val startLatch: CountDownLatch = new 
CountDownLatch(consumerSet.size)
+    private val stopLatch: CountDownLatch = new 
CountDownLatch(consumerSet.size)
+    private val consumerThreadRunning = new AtomicBoolean(true)
+
+    def startConsumerThreads(): Unit = {
+      consumerThreads.foreach(_.start())
+      assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
start consumer threads in time")
+    }
+
+    def stopConsumerThreads(): Unit = {
+      consumerThreadRunning.set(false)
+      assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
stop consumer threads in time")
+    }
+
+    def stopQuietly(): Unit = {
+      try {
+        consumerThreads.foreach {
+          consumerThread =>
+            consumerThread.interrupt()
+            consumerThread.join()
+        }
+      }
+      finally{
+        consumerSet.zip(groupInstanceSet).foreach(zipped => 
Utils.closeQuietly(zipped._1, zipped._2))
+      }
+    }
+
+    private def createProperties(groupInstanceId: String): Properties = {
+      val newConsumerConfig = new Properties(consumerConfig)
+      // We need to disable the auto commit because after the members got 
removed from group, the offset commit
+      // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)

Review Comment:
   This looks a bit weird here hence my comment about passing the default 
config in the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to