AndrewJSchofield commented on code in PR #18959: URL: https://github.com/apache/kafka/pull/18959#discussion_r1963334197
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -79,16 +84,29 @@ public DelayedShareFetch( ShareFetch shareFetch, ReplicaManager replicaManager, BiConsumer<SharePartitionKey, Throwable> exceptionHandler, - LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) { - this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)); + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions, + ShareGroupMetrics shareGroupMetrics, Review Comment: Please add these new arguments to the javadoc. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -62,9 +63,13 @@ public class DelayedShareFetch extends DelayedOperation { private final ReplicaManager replicaManager; private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler; private final PartitionMaxBytesStrategy partitionMaxBytesStrategy; + private final ShareGroupMetrics shareGroupMetrics; + private final Time time; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions; + // Tracks the start time to acquire any share partition for a fetch request. + private long acquireStartTimeMs; Review Comment: I'm not convinced by this metric. If we start counting the time when the `DelayedShareFetch` is created, that's essentially the time that we started waiting for data from the replica manager. The description says "High values suggest topic-partitions contention." High values could equally indicate topic-partitions with low data rates. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -120,16 +141,28 @@ public void onComplete() { try { LinkedHashMap<TopicIdPartition, Long> topicPartitionData; // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. - if (partitionsAcquired.isEmpty()) + if (partitionsAcquired.isEmpty()) { topicPartitionData = acquirablePartitions(); - // tryComplete invoked forceComplete, so we can use the data from tryComplete. - else + // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks + // for the share partition, hence if no partitions are yet acquired by tryComplete, + // we record the metric here. Do not check if the request has successfully acquired any + // partitions now or not, as then the upper bound of request timeout shall be recorded + // for the metric. + updateAcquireElapsedTimeMetric(); + } else { + // tryComplete invoked forceComplete, so we can use the data from tryComplete. topicPartitionData = partitionsAcquired; + } if (topicPartitionData.isEmpty()) { // No locks for share partitions could be acquired, so we complete the request with an empty response. - shareFetch.maybeComplete(Collections.emptyMap()); + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); + shareFetch.maybeComplete(Map.of()); return; + } else { + // Update metric to record acquired to requested partitions. + double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.partitionMaxBytes().size(); + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); Review Comment: Given that this is now a percentage not a ratio, I was thinking of asking you to rename to a metric name ending in `percent`. However, reviewing the existing ratio and percentage metrics, there's no single standard, and using "ratio" in the name is as good a way as any. I wouldn't suggest changing it. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -417,6 +452,19 @@ private void handleFetchException( shareFetch.maybeCompleteWithException(topicIdPartitions, throwable); } + /** + * The method updates the metric for the time taken to acquire the share partition locks. Also, + * it resets the acquireStartTimeMs to the current time, so that the metric records the time taken + * to acquire the locks for the re-try, if the partitions are re-acquired. The partitions can be + * re-acquired if the fetch request is not completed because of the minBytes or some other condition. + */ + private void updateAcquireElapsedTimeMetric() { + shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(shareFetch.groupId(), time.hiResClockMs() - acquireStartTimeMs); Review Comment: I'd just capture the `time.hiResClockMs()` once and use it twice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org