lianetm commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1847318144
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ########## @@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue) */ public void add(BackgroundEvent event) { Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null"); + event.setEnqueuedMs(time.milliseconds()); backgroundEventQueue.add(event); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordBackgroundEventQueueSize(backgroundEventQueue.size())); Review Comment: this change here makes sense to me, but makes me wonder if we should push it further. It would be helpful if we could try to keep all the updates for this queue size metric in this component that holds the queue, so we can easily maintain/track how "add" and "remove/drain" update that metric. We could then use that drain from the processBackgroundEvents, instead of manually draining the queue and recording the metric there . What do you think? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -162,15 +171,20 @@ void runOnce() { private void processApplicationEvents() { LinkedList<ApplicationEvent> events = new LinkedList<>(); applicationEventQueue.drainTo(events); + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(0)); for (ApplicationEvent event : events) { + long startMs = time.milliseconds(); + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueTime(startMs - event.enqueuedMs())); try { if (event instanceof CompletableEvent) applicationEventReaper.add((CompletableEvent<?>) event); applicationEventProcessor.process(event); } catch (Throwable t) { log.warn("Error processing event {}", t.getMessage(), t); + } finally { + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueProcessingTime(time.milliseconds() - startMs)); Review Comment: Is this the best place to record this? From the description of the metric I get that we want to measure the time "that the consumer network takes to process **all available application events**". So wouldn't it be simpler to record the metric once per `runOnce` instead of recording it N times on each run? (startTime right before the loop over events, and ending/recording right after the loop). I went to the KIP discussion thread to double check this interpretation, and this was the intention behind what was proposed (by me actually I discovered he he). > LM3. Thinking about the actual usage of "time-between-network-thread-poll-xxx" metric, I imagine it would be helpful to know more about what could be impacting it. As I see it, the network thread cadence could be mainly impacted by: 1- app event processing (generate requests), 2- network client poll (actual send/receive). For 2, the new consumer reuses the same component as the legacy one, but 1 is specific to the new consumer, so what about a metric for application-event-processing-time-ms (we could consider avg I would say). It would be the time that the network thread takes to process all available events on each run. What do you think? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -162,15 +171,20 @@ void runOnce() { private void processApplicationEvents() { LinkedList<ApplicationEvent> events = new LinkedList<>(); applicationEventQueue.drainTo(events); + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(0)); Review Comment: keeping the symmetry with the background event, would it make sense to encapsulate these actions in the `ApplicationEventHandler` so that we keep that component responsible of `add` and `drain` the queue (including the metric actions related to those ops)? It would mean that this `ConsumerNetworkThread` would keep the ref to the `ApplicationEventHandler` that has the queue (instead of directly having the queue like it does now), but that is already available, so I guess we just need to pass it in the constructor instead of the queue. What do you think? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; + +import java.util.Arrays; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; + +public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements AutoCloseable { + private final Metrics metrics; + + public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = "time-between-network-thread-poll"; + public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = "application-event-queue-size"; + public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = "application-event-queue-time"; + public static final String APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "application-event-queue-processing-time"; + public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = "background-event-queue-size"; + public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = "background-event-queue-time"; + public static final String BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "background-event-queue-processing-time"; + public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = "unsent-requests-queue-size"; + public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = "unsent-requests-queue-time"; + private final Sensor timeBetweenNetworkThreadPollSensor; + private final Sensor applicationEventQueueSizeSensor; + private final Sensor applicationEventQueueTimeSensor; + private final Sensor applicationEventQueueProcessingTimeSensor; + private final Sensor backgroundEventQueueSizeSensor; + private final Sensor backgroundEventQueueTimeSensor; + private final Sensor backgroundEventQueueProcessingTimeSensor; + private final Sensor unsentRequestsQueueSizeSensor; + private final Sensor unsentRequestsQueueTimeSensor; + + public KafkaAsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) { + super(metrics, metricGrpPrefix); + + this.metrics = metrics; + final String metricGroupName = metricGrpPrefix + CONSUMER_METRICS_SUFFIX; + this.timeBetweenNetworkThreadPollSensor = metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME); + this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-avg", + metricGroupName, + "The average time taken, in milliseconds, between each poll in the network thread."), + new Avg()); + this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-max", + metricGroupName, + "The maximum time taken, in milliseconds, between each poll in the network thread."), + new Max()); + + this.applicationEventQueueSizeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.applicationEventQueueSizeSensor.add(metrics.metricName("application-event-queue-size", + metricGroupName, + "The current number of events in the consumer network application event queue."), + new Value()); + + this.applicationEventQueueTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME); + this.applicationEventQueueTimeSensor.add(metrics.metricName("application-event-queue-time-avg", + metricGroupName, + "The average time, in milliseconds, that application events are taking to be dequeued."), + new Avg()); + this.applicationEventQueueTimeSensor.add(metrics.metricName("application-event-queue-time-max", + metricGroupName, + "The maximum time, in milliseconds, that an application event took to be dequeued."), + new Max()); + + this.applicationEventQueueProcessingTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.applicationEventQueueProcessingTimeSensor.add(metrics.metricName("application-event-queue-processing-time-avg", + metricGroupName, + "The average time, in milliseconds, that the consumer network takes to process all available application events."), + new Avg()); + this.applicationEventQueueProcessingTimeSensor.add(metrics.metricName("application-event-queue-processing-time-max", + metricGroupName, + "The maximum time, in milliseconds, that the consumer network took to process all available application events."), + new Max()); + + this.unsentRequestsQueueSizeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME); + this.unsentRequestsQueueSizeSensor.add(metrics.metricName("unsent-requests-queue-size", + metricGroupName, + "The current number of unsent requests in the consumer network."), + new Value()); + + this.unsentRequestsQueueTimeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME); + this.unsentRequestsQueueTimeSensor.add(metrics.metricName("unsent-requests-queue-time-avg", + metricGroupName, + "The average time, in milliseconds, that requests are taking to be sent in the consumer network."), + new Avg()); + this.unsentRequestsQueueTimeSensor.add(metrics.metricName("unsent-requests-queue-time-max", + metricGroupName, + "The maximum time, in milliseconds, that a request remained unsent in the consumer network."), + new Max()); + + this.backgroundEventQueueSizeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.backgroundEventQueueSizeSensor.add(metrics.metricName("background-event-queue-size", + metricGroupName, + "The current number of events in the consumer background event queue."), + new Value()); + + this.backgroundEventQueueTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME); + this.backgroundEventQueueTimeSensor.add(metrics.metricName("background-event-queue-time-avg", + metricGroupName, + "The average time, in milliseconds, that background events are taking to be dequeued."), + new Avg()); + this.backgroundEventQueueTimeSensor.add(metrics.metricName("background-event-queue-time-max", + metricGroupName, + "The maximum time, in milliseconds, that background events are taking to be dequeued."), + new Max()); + + this.backgroundEventQueueProcessingTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.backgroundEventQueueProcessingTimeSensor.add(metrics.metricName("background-event-queue-processing-time-avg", + metricGroupName, + "The average time, in milliseconds, that the consumer took to process all available background events."), + new Avg()); + this.backgroundEventQueueProcessingTimeSensor.add(metrics.metricName("background-event-queue-processing-time-max", + metricGroupName, + "The maximum time, in milliseconds, that the consumer took to process all available background events."), + new Max()); + } + + public void recordTimeBetweenNetworkThreadPoll(long timeBetweenNetworkThreadPoll) { + this.timeBetweenNetworkThreadPollSensor.record(timeBetweenNetworkThreadPoll); + } + + public void recordApplicationEventQueueSize(int size) { + this.applicationEventQueueSizeSensor.record(size); + } + + public void recordApplicationEventQueueTime(long time) { + this.applicationEventQueueTimeSensor.record(time); + } + + public void recordApplicationEventQueueProcessingTime(long processingTime) { + this.applicationEventQueueProcessingTimeSensor.record(processingTime); + } + + public void recordUnsentRequestsQueueSize(int size) { Review Comment: This metric is about the size of the queue at a given time, so I expect we should have another param here timeMs, for the time where we read the metric, and we should pass it into the .record, that has an overload for it. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -170,6 +174,7 @@ private void trySend(final long currentTimeMs) { Iterator<UnsentRequest> iterator = unsentRequests.iterator(); while (iterator.hasNext()) { UnsentRequest unsent = iterator.next(); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueTime(currentTimeMs - unsent.enqueuedMs())); Review Comment: recording this here means we would consider the request removed from the unsent queue even in the case where it cannot be sent and it actually stays in the unsent queue (!doSend), right? If so, I guess we should probably record this only when we do remove it from the queue with iterator.remove() (either because it's expired, or because we did sent it). Also, shouldn't we record this same metric on `checkDisconnects` if the request is removed from the unsent queue because the node is disconnected? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1911,6 +1922,8 @@ private boolean processBackgroundEvents() { if (!firstError.compareAndSet(null, e)) log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + } finally { + kafkaAsyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs); Review Comment: Similar to `recordApplicationEventQueueProcessingTime`. The metric description states this is about the time "that the consumer took to process **all available background events**' . Shouldn't we simply take the time from right before the loop to right after it ends, and record the metric once per run of the `processBackgroundEvents`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1893,14 +1899,19 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal * It is possible that {@link ErrorEvent an error} * could occur when processing the events. In such cases, the processor will take a reference to the first * error, continue to process the remaining events, and then throw the first error that occurred. + * + * Visible for testing. */ - private boolean processBackgroundEvents() { + boolean processBackgroundEvents() { AtomicReference<KafkaException> firstError = new AtomicReference<>(); LinkedList<BackgroundEvent> events = new LinkedList<>(); backgroundEventQueue.drainTo(events); + kafkaAsyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()); Review Comment: This is what I was wondering if we could encapsulate a BackgroundEventHandler.drain or similar, that would take care of draining the queue and recording the metric (all metric updates done there consistently) ```suggestion LinkedList<BackgroundEvent> events = backgroundEventHandler.drainBackgroundEvents(); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; + +import java.util.Arrays; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; + +public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements AutoCloseable { + private final Metrics metrics; + + public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = "time-between-network-thread-poll"; + public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = "application-event-queue-size"; + public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = "application-event-queue-time"; + public static final String APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "application-event-queue-processing-time"; + public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = "background-event-queue-size"; + public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = "background-event-queue-time"; + public static final String BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "background-event-queue-processing-time"; + public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = "unsent-requests-queue-size"; + public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = "unsent-requests-queue-time"; + private final Sensor timeBetweenNetworkThreadPollSensor; + private final Sensor applicationEventQueueSizeSensor; Review Comment: nit: extra space before Sensor ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -267,7 +272,9 @@ public void addAll(final List<UnsentRequest> requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); r.setTimer(this.time, this.requestTimeoutMs); + r.setEnqueuedMs(this.time.milliseconds()); unsentRequests.add(r); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueSize(unsentRequests.size())); Review Comment: I'm still debating whether this is the best place to record this. We want snapshots in time of the queue size. Recording here has the limitation that we won't be recording when the size decreases (ie. requests sent, failed due to disconnections). So I wonder if recording this on poll, which is called regularly, would given a better view of the queue size? The way add/poll are used from the ConsumerNetworkThread.runOnce they end up being called sequentially anyways, but I'm thinking about the case where, let's say managers are not returning any requests (so addAll is called with empty, add never called), but there could be unsent requests in the queue, that could be sent out, cancelled, time out, etc). Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org