AndrewJSchofield commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1854114243
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java: ########## @@ -42,26 +44,32 @@ public class ApplicationEventHandler implements Closeable { private final Logger log; + private final Time time; private final BlockingQueue<ApplicationEvent> applicationEventQueue; Review Comment: I wonder if we could actually make a class for the application event queue, rather than using `BlockingQueue` directly. In this way, we could do things like incorporate the queue size metric into the queue, rather than having update the metrics in the caller. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; + +import java.util.Arrays; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; + +public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements AutoCloseable { Review Comment: These metrics are really to do with measuring the processing of events sent between the application and background threads. AsyncKafkaConsumer and ShareConsumerImpl both use the same architecture and the same metrics would apply equally well to both. So, I suggest: * Renaming this to `AsyncConsumerMetrics` * Do not inherit from `KafkaConsumerMetrics` * Supply an instance of this class when constructing the `NetworkClientDelegate` in `ShareConsumerImpl`. This should not be `Optional.empty()`. In fact, I don't think it should be optional at all. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ########## @@ -61,6 +62,14 @@ public Uuid id() { return id; } + public void setEnqueuedMs(long enqueuedMs) { + this.enqueuedMs = enqueuedMs; + } Review Comment: I take the point about `hashCode()` or `equals()` but we probably should be able to see the enqueued time in the string representation of the events. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -60,21 +61,24 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier; private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier; private final Supplier<RequestManagers> requestManagersSupplier; + private final Optional<KafkaAsyncConsumerMetrics> kafkaAsyncConsumerMetrics; private ApplicationEventProcessor applicationEventProcessor; private NetworkClientDelegate networkClientDelegate; private RequestManagers requestManagers; private volatile boolean running; private final IdempotentCloser closer = new IdempotentCloser(); private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS; + private long lastPollTimeMs = 0L; public ConsumerNetworkThread(LogContext logContext, Time time, BlockingQueue<ApplicationEvent> applicationEventQueue, CompletableEventReaper applicationEventReaper, Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier, Supplier<NetworkClientDelegate> networkClientDelegateSupplier, - Supplier<RequestManagers> requestManagersSupplier) { + Supplier<RequestManagers> requestManagersSupplier, + Optional<KafkaAsyncConsumerMetrics> kafkaAsyncConsumerMetrics) { Review Comment: As mentioned in another comment, this should not be optional. It's almost general-purpose enough that it can be used for all users of ConsumerNetworkThread. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ########## @@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue) */ public void add(BackgroundEvent event) { Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null"); + event.setEnqueuedMs(time.milliseconds()); backgroundEventQueue.add(event); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordBackgroundEventQueueSize(backgroundEventQueue.size())); Review Comment: Yes, I agree. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal * It is possible that {@link ErrorEvent an error} * could occur when processing the events. In such cases, the processor will take a reference to the first * error, continue to process the remaining events, and then throw the first error that occurred. + * + * Visible for testing. */ - private boolean processBackgroundEvents() { + boolean processBackgroundEvents() { AtomicReference<KafkaException> firstError = new AtomicReference<>(); - LinkedList<BackgroundEvent> events = new LinkedList<>(); - backgroundEventQueue.drainTo(events); - - for (BackgroundEvent event : events) { - try { - if (event instanceof CompletableEvent) - backgroundEventReaper.add((CompletableEvent<?>) event); - - backgroundEventProcessor.process(event); - } catch (Throwable t) { - KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); - - if (!firstError.compareAndSet(null, e)) - log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + List<BackgroundEvent> events = backgroundEventHandler.drainEvents(); + if (!events.isEmpty()) { + long startMs = time.milliseconds(); + for (BackgroundEvent event : events) { + kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - event.enqueuedMs()); + try { + if (event instanceof CompletableEvent) + backgroundEventReaper.add((CompletableEvent<?>) event); + + backgroundEventProcessor.process(event); + } catch (Throwable t) { + KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + + if (!firstError.compareAndSet(null, e)) + log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + } } + kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs); } backgroundEventReaper.reap(time.milliseconds()); Review Comment: How are we going to account for events which expired and are removed from the queue by the event reaper? They probably ought to be included in the metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org