apoorvmittal10 commented on code in PR #18959: URL: https://github.com/apache/kafka/pull/18959#discussion_r1964540273
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -79,16 +84,29 @@ public DelayedShareFetch( ShareFetch shareFetch, ReplicaManager replicaManager, BiConsumer<SharePartitionKey, Throwable> exceptionHandler, - LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) { - this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)); + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions, + ShareGroupMetrics shareGroupMetrics, Review Comment: Done. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -417,6 +452,19 @@ private void handleFetchException( shareFetch.maybeCompleteWithException(topicIdPartitions, throwable); } + /** + * The method updates the metric for the time taken to acquire the share partition locks. Also, + * it resets the acquireStartTimeMs to the current time, so that the metric records the time taken + * to acquire the locks for the re-try, if the partitions are re-acquired. The partitions can be + * re-acquired if the fetch request is not completed because of the minBytes or some other condition. + */ + private void updateAcquireElapsedTimeMetric() { + shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(shareFetch.groupId(), time.hiResClockMs() - acquireStartTimeMs); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org