kirktrue commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1800283720
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1740,14 +1742,19 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal * It is possible that {@link ErrorEvent an error} * could occur when processing the events. In such cases, the processor will take a reference to the first * error, continue to process the remaining events, and then throw the first error that occurred. + * + * Visible for testing. */ - private boolean processBackgroundEvents() { + boolean processBackgroundEvents() { AtomicReference<KafkaException> firstError = new AtomicReference<>(); LinkedList<BackgroundEvent> events = new LinkedList<>(); backgroundEventQueue.drainTo(events); + kafkaConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()); for (BackgroundEvent event : events) { + kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - event.addedToQueueMs()); + long startMs = time.milliseconds(); Review Comment: Does the metric determine how long it takes to process the background event queue, or how long it takes to process background events? If it's the latter, we want to update the metric inside each loop, but if it's the former we should update once outside the loop. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -85,6 +88,17 @@ public NetworkClientDelegate( this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.kafkaConsumerMetrics = kafkaConsumerMetrics; + } + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client, + final Metadata metadata, + final BackgroundEventHandler backgroundEventHandler) { + this(time, config, logContext, client, metadata, backgroundEventHandler, Optional.empty()); Review Comment: Is it possible to remove this constructor? Can't callers invoke the existing constructor with `Optional.empty()`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java: ########## @@ -73,7 +90,9 @@ public ApplicationEventHandler(final LogContext logContext, */ public void add(final ApplicationEvent event) { Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null"); + event.setAddedToQueueMs(System.currentTimeMillis()); Review Comment: This needs to get the current time in milliseconds from the `Time` object that was passed in to the constructor. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ########## @@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue) */ public void add(BackgroundEvent event) { Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null"); + event.setAddedToQueueMs(System.currentTimeMillis()); Review Comment: Same comment here: we need to update the constructor to provide a `Time` object and then use that here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -139,6 +155,9 @@ void runOnce() { processApplicationEvents(); final long currentTimeMs = time.milliseconds(); + final long timeSinceLastPollMs = lastPollTimeMs != 0L ? currentTimeMs - lastPollTimeMs : currentTimeMs; Review Comment: In the first invocation of `runOnce()`, `timeSinceLastPollMs` will be something like `1728954137284`. Is that correct? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -317,6 +335,14 @@ Timer timer() { return timer; } + void setAddedToQueueMs(final long addedToQueueMs) { + this.addedToQueueMs = addedToQueueMs; + } + + long addedToQueueMs() { + return addedToQueueMs; + } + Review Comment: I know it's a bit nit-picky, but can we change it to: ```suggestion void setEnqueuedMs(final long enqueuedMs) { this.enqueuedMs = enqueuedMs; } long enqueuedMs() { return enqueuedMs; } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -84,6 +88,18 @@ public ConsumerNetworkThread(LogContext logContext, this.networkClientDelegateSupplier = networkClientDelegateSupplier; this.requestManagersSupplier = requestManagersSupplier; this.running = true; + this.kafkaConsumerMetrics = kafkaConsumerMetrics; + } + + public ConsumerNetworkThread(LogContext logContext, + Time time, + BlockingQueue<ApplicationEvent> applicationEventQueue, + CompletableEventReaper applicationEventReaper, + Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier, + Supplier<NetworkClientDelegate> networkClientDelegateSupplier, + Supplier<RequestManagers> requestManagersSupplier) { + this(logContext, time, applicationEventQueue, applicationEventReaper, applicationEventProcessorSupplier, + networkClientDelegateSupplier, requestManagersSupplier, Optional.empty()); Review Comment: Is it possible to remove this constructor? Can't we call the existing constructor with `Optional.empty()`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ########## @@ -30,9 +32,16 @@ public class BackgroundEventHandler { private final Queue<BackgroundEvent> backgroundEventQueue; + private final Optional<KafkaConsumerMetrics> kafkaConsumerMetrics; - public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue) { + public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue, + final Optional<KafkaConsumerMetrics> kafkaConsumerMetrics) { this.backgroundEventQueue = backgroundEventQueue; + this.kafkaConsumerMetrics = kafkaConsumerMetrics; + } + + public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue) { + this(backgroundEventQueue, Optional.empty()); Review Comment: Same with the other changes. I'd rather make the callers pass in `Optional.empty()`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java: ########## @@ -51,6 +52,14 @@ public Uuid id() { return id; } + public void setAddedToQueueMs(long addedToQueueMs) { + this.addedToQueueMs = addedToQueueMs; + } + + public long addedToQueueMs() { + return addedToQueueMs; + } Review Comment: ```suggestion public void setEnqueuedMs(long enqueuedMs) { this. enqueuedMs = enqueuedMs; } public long enqueuedMs() { return enqueuedMs; } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ########## @@ -61,6 +62,14 @@ public Uuid id() { return id; } + public void setAddedToQueueMs(long addedToQueueMs) { + this.addedToQueueMs = addedToQueueMs; + } + + public long addedToQueueMs() { + return addedToQueueMs; + } + Review Comment: ```suggestion public void setEnqueuedMs(long enqueuedMs) { enqueuedMs = enqueuedMs; } public long enqueuedMs() { return enqueuedMs; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org