lianetm commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1849018100
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -173,6 +181,7 @@ private void processApplicationEvents() { log.warn("Error processing event {}", t.getMessage(), t); } } + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueProcessingTime(time.milliseconds() - startMs)); Review Comment: we probably don't want to record this if there were no events right? Maybe just an early return after drainEvents if there are none? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -173,6 +178,7 @@ private void trySend(final long currentTimeMs) { unsent.timer.update(currentTimeMs); if (unsent.timer.isExpired()) { iterator.remove(); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - unsent.enqueuedMs())); Review Comment: shouldn't we use `currentTimeMs` instead of `this.time.milliseconds()`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java: ########## @@ -73,10 +84,19 @@ public ApplicationEventHandler(final LogContext logContext, */ public void add(final ApplicationEvent event) { Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null"); + event.setEnqueuedMs(time.milliseconds()); applicationEventQueue.add(event); + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(applicationEventQueue.size())); wakeupNetworkThread(); } + public List<ApplicationEvent> drainEvents() { Review Comment: could you add a javadoc for this pls ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -183,6 +189,7 @@ private void trySend(final long currentTimeMs) { continue; } iterator.remove(); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - unsent.enqueuedMs())); Review Comment: `currentTimeMs`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -162,15 +171,20 @@ void runOnce() { private void processApplicationEvents() { LinkedList<ApplicationEvent> events = new LinkedList<>(); applicationEventQueue.drainTo(events); + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(0)); Review Comment: Oh good point, I forgot about that dependency! (feels kind of unexpected actually). Given that structure we shouldn't reference AppEventHandler in the ConsumerNetworkThread because we would end up with a circular dependency. Sorry for the extra work, but I would suggest we revert this back to your initial change, without having a drainEvents, and we rethink this class structure in a separate jira, to see if it would make sense to decouple the AppEventHandler from the network thread (and if so then we could properly add a drainEvents, without circular deps). Makes sense? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1913,6 +1923,7 @@ private boolean processBackgroundEvents() { log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); } } + kafkaAsyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs); Review Comment: we probably don't want to record this if there were no events right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -144,6 +148,7 @@ public void poll(final long timeoutMs, final long currentTimeMs) { this.client.poll(pollTimeoutMs, currentTimeMs); maybePropagateMetadataError(); checkDisconnects(currentTimeMs); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueSize(unsentRequests.size(), this.time.milliseconds())); Review Comment: should we use `currentTimeMs` instead of `this.time.milliseconds`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -317,6 +328,14 @@ Timer timer() { return timer; } + void setEnqueuedMs(final long enqueuedMs) { Review Comment: could add a java doc for this, the getter and var. Mainly to clarify its the time when it was added to the queue (not a duration/time it's been on the queue, which is another concept we are working with, that's why I think it's worth clarifying)....we could also consider `enqueueTimeMs` as name. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -267,7 +272,9 @@ public void addAll(final List<UnsentRequest> requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); r.setTimer(this.time, this.requestTimeoutMs); + r.setEnqueuedMs(this.time.milliseconds()); unsentRequests.add(r); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueSize(unsentRequests.size())); Review Comment: Makes sense to record the unsent event queue size at the end of a poll iteration, after events were added and removed/sent, that's truly what could help spot issues (too many requests being left "unsent" on each run). Then it actually makes me wonder about the value of also calling it on add, would it be useful to see the number of events added on each run? or just seeing how many where left unsent is all that matters? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -210,6 +217,7 @@ protected void checkDisconnects(final long currentTimeMs) { UnsentRequest u = iter.next(); if (u.node.isPresent() && client.connectionFailed(u.node.get())) { iter.remove(); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - u.enqueuedMs())); Review Comment: `currentTimeMs`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org