hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1047786178
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -109,101 +132,89 @@ public void run() { try { runOnce(); } catch (final WakeupException e) { - log.debug( - "Exception thrown, background thread won't terminate", - e - ); - // swallow the wakeup exception to prevent killing the - // background thread. + log.debug("WakeupException caught, background thread won't be interrupted"); + // swallow the wakeup exception to prevent killing the background thread. } } } catch (final Throwable t) { - log.error( - "The background thread failed due to unexpected error", - t - ); - if (t instanceof RuntimeException) - this.exception.set(Optional.of((RuntimeException) t)); - else - this.exception.set(Optional.of(new RuntimeException(t))); + log.error("The background thread failed due to unexpected error", t); + throw new RuntimeException(t); } finally { close(); log.debug("{} closed", getClass()); } } /** - * Process event from a single poll + * Poll and process an {@link ApplicationEvent}. It performs the following tasks: + * 1. Try to poll and event from the queue, and try to process it using the coorsponding {@link ApplicationEventProcessor}. + * 2. Try to find Coordinator if needed + * 3. Poll the networkClient for outstanding requests. */ void runOnce() { - this.inflightEvent = maybePollEvent(); - if (this.inflightEvent.isPresent()) { - log.debug("processing application event: {}", this.inflightEvent); + Optional<ApplicationEvent> event = maybePollEvent(); + + if (event.isPresent()) { + log.debug("processing application event: {}", event); + consumeApplicationEvent(event.get()); } - if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) { - // clear inflight event upon successful consumption - this.inflightEvent = Optional.empty(); + + final long currentTimeMs = time.milliseconds(); + // TODO: This is just a place holder value. + long pollWaitTimeMs = 100; + + // TODO: Add a condition here, like shouldFindCoordinator in the future. Since we don't always need to find + // the coordinator. + if (coordinatorManager.isPresent()) { + pollWaitTimeMs = Math.min(pollWaitTimeMs, handlePollResult(coordinatorManager.get().poll(currentTimeMs))); } // if there are pending events to process, poll then continue without // blocking. - if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) { - networkClient.poll(time.timer(0)); + if (!applicationEventQueue.isEmpty()) { + networkClientDelegate.poll(0); return; } - // if there are no events to process, poll until timeout. The timeout + // if there are no pending application event, poll until timeout. The timeout // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and // nextMetadataUpdate. See NetworkClient.poll impl. - networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds()))); + networkClientDelegate.poll(pollWaitTimeMs); } - private long timeToNextHeartbeatMs(final long nowMs) { - // TODO: implemented when heartbeat is added to the impl - return 100; + long handlePollResult(NetworkClientDelegate.PollResult res) { + Objects.requireNonNull(res); + if (!res.unsentRequests.isEmpty()) { + networkClientDelegate.addAll(res.unsentRequests); + } + return res.timeMsTillNextPoll; } private Optional<ApplicationEvent> maybePollEvent() { - if (this.inflightEvent.isPresent() || this.applicationEventQueue.isEmpty()) { - return this.inflightEvent; + if (this.applicationEventQueue.isEmpty()) { Review Comment: nit: call this `pollApplicationEvent`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -109,101 +132,89 @@ public void run() { try { runOnce(); } catch (final WakeupException e) { - log.debug( - "Exception thrown, background thread won't terminate", - e - ); - // swallow the wakeup exception to prevent killing the - // background thread. + log.debug("WakeupException caught, background thread won't be interrupted"); + // swallow the wakeup exception to prevent killing the background thread. } } } catch (final Throwable t) { - log.error( - "The background thread failed due to unexpected error", - t - ); - if (t instanceof RuntimeException) - this.exception.set(Optional.of((RuntimeException) t)); - else - this.exception.set(Optional.of(new RuntimeException(t))); + log.error("The background thread failed due to unexpected error", t); + throw new RuntimeException(t); } finally { close(); log.debug("{} closed", getClass()); } } /** - * Process event from a single poll + * Poll and process an {@link ApplicationEvent}. It performs the following tasks: + * 1. Try to poll and event from the queue, and try to process it using the coorsponding {@link ApplicationEventProcessor}. + * 2. Try to find Coordinator if needed + * 3. Poll the networkClient for outstanding requests. */ void runOnce() { - this.inflightEvent = maybePollEvent(); - if (this.inflightEvent.isPresent()) { - log.debug("processing application event: {}", this.inflightEvent); + Optional<ApplicationEvent> event = maybePollEvent(); + + if (event.isPresent()) { + log.debug("processing application event: {}", event); + consumeApplicationEvent(event.get()); } - if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) { - // clear inflight event upon successful consumption - this.inflightEvent = Optional.empty(); + + final long currentTimeMs = time.milliseconds(); + // TODO: This is just a place holder value. + long pollWaitTimeMs = 100; + + // TODO: Add a condition here, like shouldFindCoordinator in the future. Since we don't always need to find + // the coordinator. + if (coordinatorManager.isPresent()) { + pollWaitTimeMs = Math.min(pollWaitTimeMs, handlePollResult(coordinatorManager.get().poll(currentTimeMs))); } // if there are pending events to process, poll then continue without // blocking. - if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) { - networkClient.poll(time.timer(0)); + if (!applicationEventQueue.isEmpty()) { + networkClientDelegate.poll(0); Review Comment: I had the context a little wrong here. I wonder if we need this at all. Instead, we should try to drain all application events before we poll the network client. And when we post a new event, we should use `wakeup()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org