junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1416090404
########## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ########## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. + * This variable can only be read or written from the event queue thread. + */ + private var communicationInFlight = false Review Comment: The `NetworkClient` in `_channelManager` already maintains the state for in flight requests through `networkClient.ready(node, now)`. So, I am wondering if we need to maintain the state here. Could we just expose the state in `NetworkClient` through `_channelManager`? ########## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ########## @@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { - while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { + while (ctx.mockChannelManager.unsentQueue.isEmpty) { + if (Option(manager.eventQueue.pendingDeferredEvent()).exists(!_.getClass.getSimpleName.endsWith("TimeoutEvent"))) + ctx.time.sleep(5) manager.eventQueue.wakeup() - context.time.sleep(5) + } + while (!future.isDone || ctx.mockClient.hasInFlightRequests) { Review Comment: Do we need `ctx.mockClient.hasInFlightRequests`? It seems that `!future.isDone` is enough? ########## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ########## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + + /** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ + public Object pendingDeferredEvent() { Review Comment: Could we return Optional instead of null? ########## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ########## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + + /** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ + public Object pendingDeferredEvent() { + lock.lock(); + try { + if (eventHandler.head.next != eventHandler.head) { + return null; + } + Map.Entry<Long, EventContext> entry = eventHandler.deadlineMap.firstEntry(); + if (entry == null) { + return null; + } + EventContext eventContext = entry.getValue(); + if (eventContext.insertionType == EventInsertionType.DEFERRED) { Review Comment: Do we need this check? It seems only `DEFERRED` events are stored in `deadlineMap`. ########## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ########## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + + /** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ + public Object pendingDeferredEvent() { + lock.lock(); + try { + if (eventHandler.head.next != eventHandler.head) { Review Comment: Deferred events are not stored under `head`. It seems there is not need to check `head`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org