lianetm commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1872198214
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -317,6 +327,20 @@ Timer timer() { return timer; } + /** + * Set the time when the request was enqueued to {@link NetworkClientDelegate#unsentRequests}. + */ + void setEnqueueTimeMs(final long enqueueTimeMs) { + this.enqueueTimeMs = enqueueTimeMs; + } + + /** + * Return the time when the request was enqueued to {@link NetworkClientDelegate#unsentRequests}. + */ + long enqueueTimeMs() { Review Comment: this one is less sensitive, but if it's only used here as it seems we could consider private too ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.events.PollEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; + +public class ApplicationEventHandlerTest { + private final Time time = new MockTime(); + private final BlockingQueue<ApplicationEvent> applicationEventsQueue = new LinkedBlockingQueue<>(); + private final ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class); + private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); + private final RequestManagers requestManagers = mock(RequestManagers.class); + private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); + + @Test + public void testRecordApplicationEventQueueSize() { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, "consumer"); + ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler( + new LogContext(), + time, + applicationEventsQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + asyncConsumerMetrics + )) { + PollEvent event = new PollEvent(time.milliseconds()); + + // add event + applicationEventHandler.add(event); + assertEquals(1, (double) metrics.metric(metrics.metricName("application-event-queue-size", "consumer-metrics")).metricValue()); Review Comment: could we reuse the metric name constants we already have? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal * It is possible that {@link ErrorEvent an error} * could occur when processing the events. In such cases, the processor will take a reference to the first * error, continue to process the remaining events, and then throw the first error that occurred. + * + * Visible for testing. */ - private boolean processBackgroundEvents() { + boolean processBackgroundEvents() { AtomicReference<KafkaException> firstError = new AtomicReference<>(); - LinkedList<BackgroundEvent> events = new LinkedList<>(); - backgroundEventQueue.drainTo(events); - - for (BackgroundEvent event : events) { - try { - if (event instanceof CompletableEvent) - backgroundEventReaper.add((CompletableEvent<?>) event); - - backgroundEventProcessor.process(event); - } catch (Throwable t) { - KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); - - if (!firstError.compareAndSet(null, e)) - log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + List<BackgroundEvent> events = backgroundEventHandler.drainEvents(); + if (!events.isEmpty()) { + long startMs = time.milliseconds(); + for (BackgroundEvent event : events) { + kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - event.enqueuedMs()); + try { + if (event instanceof CompletableEvent) + backgroundEventReaper.add((CompletableEvent<?>) event); + + backgroundEventProcessor.process(event); + } catch (Throwable t) { + KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + + if (!firstError.compareAndSet(null, e)) + log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + } } + kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs); } backgroundEventReaper.reap(time.milliseconds()); Review Comment: Interesting, and if we agree on what we want we could just send an update in the KIP email thread to add it to the KIP and here. To align internally first, I guess we would be interested in the num/avg of expired events, but we need to consider how that metric would go crazy and be a false alarm in cases like poll(0) right? Should we consider relevant the expiration only if there was a non-zero timeout? Thoughts? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.events.PollEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; + +public class ApplicationEventHandlerTest { + private final Time time = new MockTime(); + private final BlockingQueue<ApplicationEvent> applicationEventsQueue = new LinkedBlockingQueue<>(); + private final ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class); + private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); + private final RequestManagers requestManagers = mock(RequestManagers.class); + private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); + + @Test + public void testRecordApplicationEventQueueSize() { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, "consumer"); + ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler( + new LogContext(), + time, + applicationEventsQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + asyncConsumerMetrics + )) { + PollEvent event = new PollEvent(time.milliseconds()); + + // add event + applicationEventHandler.add(event); Review Comment: ```suggestion // add event applicationEventHandler.add(new PollEvent(time.milliseconds())); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -317,6 +327,20 @@ Timer timer() { return timer; } + /** + * Set the time when the request was enqueued to {@link NetworkClientDelegate#unsentRequests}. + */ + void setEnqueueTimeMs(final long enqueueTimeMs) { Review Comment: this should be private right? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BackgroundEventHandlerTest { + private final BlockingQueue<BackgroundEvent> backgroundEventsQueue = new LinkedBlockingQueue<>(); + + @Test + public void testRecordBackgroundEventQueueSize() { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, "consumer")) { + BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + backgroundEventsQueue, + new MockTime(0), + asyncConsumerMetrics); + BackgroundEvent event = new ErrorEvent(new Throwable()); + + // add event + backgroundEventHandler.add(event); Review Comment: ```suggestion // add event backgroundEventHandler.add(new ErrorEvent(new Throwable())); ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -199,4 +203,57 @@ public void testSendUnsentRequests() { consumerNetworkThread.cleanup(); verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong()); } + + @Test + public void testRunOnceRecordTimeBetweenNetworkThreadPoll() { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, "consumer"); + ConsumerNetworkThread consumerNetworkThread = new ConsumerNetworkThread( + new LogContext(), + time, + applicationEventQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + asyncConsumerMetrics + )) { + consumerNetworkThread.initializeResources(); + + consumerNetworkThread.runOnce(); + time.sleep(10); + consumerNetworkThread.runOnce(); + assertTrue((double) metrics.metric(metrics.metricName("time-between-network-thread-poll-avg", "consumer-metrics")).metricValue() > 0); + assertTrue((double) metrics.metric(metrics.metricName("time-between-network-thread-poll-max", "consumer-metrics")).metricValue() > 0); Review Comment: couldn't we be more precise here (I guess it should be 10 exactly given how this is calculated right? not sure if we would need >= in this case) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -1951,6 +1952,31 @@ public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws Interru }, "Consumer did not throw the expected UnsupportedVersionException on poll"); } + @Test + public void testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() { + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + mock(SubscriptionState.class), + "group-id", + "client-id", + false); + Metrics metrics = consumer.metricsRegistry(); + AsyncConsumerMetrics kafkaConsumerMetrics = consumer.kafkaConsumerMetrics(); + + ConsumerRebalanceListenerCallbackNeededEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, Collections.emptySortedSet()); + event.setEnqueuedMs(time.milliseconds()); + backgroundEventQueue.add(event); + kafkaConsumerMetrics.recordBackgroundEventQueueSize(1); + + time.sleep(10); + consumer.processBackgroundEvents(); + assertEquals(0, (double) metrics.metric(metrics.metricName("background-event-queue-size", "consumer-metrics")).metricValue()); + assertTrue((double) metrics.metric(metrics.metricName("background-event-queue-time-avg", "consumer-metrics")).metricValue() > 0); + assertTrue((double) metrics.metric(metrics.metricName("background-event-queue-time-max", "consumer-metrics")).metricValue() > 0); Review Comment: couldn't we be more precise here and expect >= 10? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; + +import java.util.Arrays; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; + +public class AsyncConsumerMetrics extends KafkaConsumerMetrics implements AutoCloseable { + private final Metrics metrics; + + public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = "time-between-network-thread-poll"; + public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = "application-event-queue-size"; + public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = "application-event-queue-time"; + public static final String APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "application-event-queue-processing-time"; + public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = "background-event-queue-size"; + public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = "background-event-queue-time"; + public static final String BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "background-event-queue-processing-time"; + public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = "unsent-requests-queue-size"; + public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = "unsent-requests-queue-time"; + private final Sensor timeBetweenNetworkThreadPollSensor; + private final Sensor applicationEventQueueSizeSensor; + private final Sensor applicationEventQueueTimeSensor; + private final Sensor applicationEventQueueProcessingTimeSensor; + private final Sensor backgroundEventQueueSizeSensor; + private final Sensor backgroundEventQueueTimeSensor; + private final Sensor backgroundEventQueueProcessingTimeSensor; + private final Sensor unsentRequestsQueueSizeSensor; + private final Sensor unsentRequestsQueueTimeSensor; + + public AsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) { + super(metrics, metricGrpPrefix); + + this.metrics = metrics; + final String metricGroupName = metricGrpPrefix + CONSUMER_METRICS_SUFFIX; + this.timeBetweenNetworkThreadPollSensor = metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME); + this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-avg", + metricGroupName, + "The average time taken, in milliseconds, between each poll in the network thread."), + new Avg()); + this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-max", + metricGroupName, + "The maximum time taken, in milliseconds, between each poll in the network thread."), + new Max()); + + this.applicationEventQueueSizeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.applicationEventQueueSizeSensor.add(metrics.metricName("application-event-queue-size", + metricGroupName, + "The current number of events in the consumer network application event queue."), Review Comment: nit: I guess that, in a time from now, even us that know this by heart will get tricked with if this is the outgoing or incoming queue. Should we be more explicit with something like ```suggestion "The current number of events in the queue to send from the application thread to the background thread."), ``` (and then we can consistently have the flipped version of the message for the `background-event-queue-size` metric) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org