junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1416090404


##########
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##########
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
    */
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.
+   * This variable can only be read or written from the event queue thread.
+   */
+  private var communicationInFlight = false

Review Comment:
   The `NetworkClient` in `_channelManager` already maintains the state for in 
flight requests through `networkClient.ready(node, now)`. So, I am wondering if 
we need to maintain the state here. Could we just expose the state in 
`NetworkClient` through `_channelManager`?



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest {
     result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-    while (!future.isDone || context.mockClient.hasInFlightRequests) {
-      context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+    while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+      if 
(Option(manager.eventQueue.pendingDeferredEvent()).exists(!_.getClass.getSimpleName.endsWith("TimeoutEvent")))
+        ctx.time.sleep(5)
       manager.eventQueue.wakeup()
-      context.time.sleep(5)
+    }
+    while (!future.isDone || ctx.mockClient.hasInFlightRequests) {

Review Comment:
   Do we need `ctx.mockClient.hasInFlightRequests`? It seems that 
`!future.isDone` is enough?



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
         eventHandlerThread.join();
         log.info("closed event queue.");
     }
+
+    /**
+     * Useful for unit tests, where we need to speed the clock up until
+     * deferred events are ready to run.
+     */
+    public Object pendingDeferredEvent() {

Review Comment:
   Could we return Optional instead of null?



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
         eventHandlerThread.join();
         log.info("closed event queue.");
     }
+
+    /**
+     * Useful for unit tests, where we need to speed the clock up until
+     * deferred events are ready to run.
+     */
+    public Object pendingDeferredEvent() {
+        lock.lock();
+        try {
+            if (eventHandler.head.next != eventHandler.head) {
+                return null;
+            }
+            Map.Entry<Long, EventContext> entry = 
eventHandler.deadlineMap.firstEntry();
+            if (entry == null) {
+                return null;
+            }
+            EventContext eventContext = entry.getValue();
+            if (eventContext.insertionType == EventInsertionType.DEFERRED) {

Review Comment:
   Do we need this check? It seems only `DEFERRED` events are stored in 
`deadlineMap`.



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
         eventHandlerThread.join();
         log.info("closed event queue.");
     }
+
+    /**
+     * Useful for unit tests, where we need to speed the clock up until
+     * deferred events are ready to run.
+     */
+    public Object pendingDeferredEvent() {
+        lock.lock();
+        try {
+            if (eventHandler.head.next != eventHandler.head) {

Review Comment:
   Deferred events are not stored under `head`. It seems there is not need to 
check `head`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to