jsancio commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r458979257



##########
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+    servers = makeServers(1, enableControlledShutdown = false)
+    val controller = getController().kafkaController
+    val count = new AtomicInteger(2)
+    val latch = new CountDownLatch(1)
+    val spyThread = spy(controller.eventManager.thread)
+    controller.eventManager.setControllerEventThread(spyThread)
+    val processedEvent = new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = latch.await()
+      override def preempt(): Unit = {}
+    }
+    val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = {}
+      override def preempt(): Unit = count.decrementAndGet()
+    }
+
+    controller.eventManager.put(processedEvent)
+    controller.eventManager.put(preemptedEvent)
+    controller.eventManager.put(preemptedEvent)
+
+    doAnswer((_: InvocationOnMock) => {
+      latch.countDown()
+    }).doCallRealMethod().when(spyThread).awaitShutdown()

Review comment:
       I see that you made the thread variable a `var` to be able to do this. 
Would this work instead?
   
   ```scala
   controller.eventManager.put(processedEvent)
   controller.eventManager.put(preemptedEvent)
   controller.eventManager.put(preemptedEvent)
   
   controller.shutdown()
   latch.countDown()
   TestUtils.waitUntilTrue(() => { 
     !controller.eventManager.thread.isAlive
   }, "...")
   
   assertEquals(0, count.get())
   ```

##########
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+    servers = makeServers(1, enableControlledShutdown = false)
+    val controller = getController().kafkaController
+    val count = new AtomicInteger(2)
+    val latch = new CountDownLatch(1)
+    val spyThread = spy(controller.eventManager.thread)
+    controller.eventManager.setControllerEventThread(spyThread)
+    val processedEvent = new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = latch.await()
+      override def preempt(): Unit = {}
+    }
+    val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = {}
+      override def preempt(): Unit = count.decrementAndGet()
+    }
+
+    controller.eventManager.put(processedEvent)
+    controller.eventManager.put(preemptedEvent)
+    controller.eventManager.put(preemptedEvent)
+
+    doAnswer((_: InvocationOnMock) => {
+      latch.countDown()
+    }).doCallRealMethod().when(spyThread).awaitShutdown()
+    controller.shutdown()
+    TestUtils.waitUntilTrue(() => {
+      count.get() == 0
+    }, "preemption was not fully completed before shutdown")
+
+    verify(spyThread).awaitShutdown()
+  }
+
+  @Test
+  def testPreemptionWithCallbacks(): Unit = {
+    servers = makeServers(1, enableControlledShutdown = false)
+    val controller = getController().kafkaController
+    val latch = new CountDownLatch(1)
+    val spyThread = spy(controller.eventManager.thread)
+    controller.eventManager.setControllerEventThread(spyThread)

Review comment:
       See my other comment about possibly removing this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to