apoorvmittal10 commented on code in PR #18959:
URL: https://github.com/apache/kafka/pull/18959#discussion_r1964540273


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -79,16 +84,29 @@ public DelayedShareFetch(
             ShareFetch shareFetch,
             ReplicaManager replicaManager,
             BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
-            LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
-        this(shareFetch, replicaManager, exceptionHandler, sharePartitions, 
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM));
+            LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
+            ShareGroupMetrics shareGroupMetrics,

Review Comment:
   Done.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -417,6 +452,19 @@ private void handleFetchException(
         shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
     }
 
+    /**
+     * The method updates the metric for the time taken to acquire the share 
partition locks. Also,
+     * it resets the acquireStartTimeMs to the current time, so that the 
metric records the time taken
+     * to acquire the locks for the re-try, if the partitions are re-acquired. 
The partitions can be
+     * re-acquired if the fetch request is not completed because of the 
minBytes or some other condition.
+     */
+    private void updateAcquireElapsedTimeMetric() {
+        
shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(shareFetch.groupId(), 
time.hiResClockMs() - acquireStartTimeMs);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to