jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206145989


##########
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##########
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+    // Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+    // For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+    // All other requests should fail immediately.
+
+    val numThreads = 5
+    val latch = new CountDownLatch(idBlockLen * 3)
     val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-    IntStream.range(0, idBlockLen * 3).forEach { i =>
-      assertEquals(i, manager.generateProducerId())
+    val pidMap = mutable.Map[Long, Int]()
+    val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+    for ( _ <- 0 until numThreads) {
+      requestHandlerThreadPool.submit(() => {
+        while(latch.getCount > 0) {
+          val result = manager.generateProducerId()
+          result match {
+            case Success(pid) =>
+              pidMap synchronized {
+                if (latch.getCount != 0) {
+                  val counter = pidMap.getOrElse(pid, 0)
+                  pidMap += pid -> (counter + 1)
+                  latch.countDown()
+                }
+              }
+
+            case Failure(exception) =>
+              assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+          }
+          Thread.sleep(100)
+        }
+      }, 0)
+    }
+    assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   roughly 6 seconds. i have lowered this to 10s



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to