AndrewJSchofield commented on code in PR #18959:
URL: https://github.com/apache/kafka/pull/18959#discussion_r1963334197


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -79,16 +84,29 @@ public DelayedShareFetch(
             ShareFetch shareFetch,
             ReplicaManager replicaManager,
             BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
-            LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
-        this(shareFetch, replicaManager, exceptionHandler, sharePartitions, 
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM));
+            LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
+            ShareGroupMetrics shareGroupMetrics,

Review Comment:
   Please add these new arguments to the javadoc.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -62,9 +63,13 @@ public class DelayedShareFetch extends DelayedOperation {
     private final ReplicaManager replicaManager;
     private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
     private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
+    private final ShareGroupMetrics shareGroupMetrics;
+    private final Time time;
     // The topic partitions that need to be completed for the share fetch 
request are given by sharePartitions.
     // sharePartitions is a subset of shareFetchData. The order of 
insertion/deletion of entries in sharePartitions is important.
     private final LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitions;
+    // Tracks the start time to acquire any share partition for a fetch 
request.
+    private long acquireStartTimeMs;

Review Comment:
   I'm not convinced by this metric. If we start counting the time when the 
`DelayedShareFetch` is created, that's essentially the time that we started 
waiting for data from the replica manager. The description says "High values 
suggest topic-partitions contention." High values could equally indicate 
topic-partitions with low data rates.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -120,16 +141,28 @@ public void onComplete() {
         try {
             LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
             // tryComplete did not invoke forceComplete, so we need to check 
if we have any partitions to fetch.
-            if (partitionsAcquired.isEmpty())
+            if (partitionsAcquired.isEmpty()) {
                 topicPartitionData = acquirablePartitions();
-            // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
-            else
+                // The TopicPartitionsAcquireTimeMs metric signifies the 
tension when acquiring the locks
+                // for the share partition, hence if no partitions are yet 
acquired by tryComplete,
+                // we record the metric here. Do not check if the request has 
successfully acquired any
+                // partitions now or not, as then the upper bound of request 
timeout shall be recorded
+                // for the metric.
+                updateAcquireElapsedTimeMetric();
+            } else {
+                // tryComplete invoked forceComplete, so we can use the data 
from tryComplete.
                 topicPartitionData = partitionsAcquired;
+            }
 
             if (topicPartitionData.isEmpty()) {
                 // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
-                shareFetch.maybeComplete(Collections.emptyMap());
+                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
+                shareFetch.maybeComplete(Map.of());
                 return;
+            } else {
+                // Update metric to record acquired to requested partitions.
+                double requestTopicToAcquired = (double) 
topicPartitionData.size() / shareFetch.partitionMaxBytes().size();
+                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(requestTopicToAcquired * 100));

Review Comment:
   Given that this is now a percentage not a ratio, I was thinking of asking 
you to rename to a metric name ending in `percent`. However, reviewing the 
existing ratio and percentage metrics, there's no single standard, and using 
"ratio" in the name is as good a way as any. I wouldn't suggest changing it.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -417,6 +452,19 @@ private void handleFetchException(
         shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
     }
 
+    /**
+     * The method updates the metric for the time taken to acquire the share 
partition locks. Also,
+     * it resets the acquireStartTimeMs to the current time, so that the 
metric records the time taken
+     * to acquire the locks for the re-try, if the partitions are re-acquired. 
The partitions can be
+     * re-acquired if the fetch request is not completed because of the 
minBytes or some other condition.
+     */
+    private void updateAcquireElapsedTimeMetric() {
+        
shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(shareFetch.groupId(), 
time.hiResClockMs() - acquireStartTimeMs);

Review Comment:
   I'd just capture the `time.hiResClockMs()` once and use it twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to