apoorvmittal10 commented on code in PR #19598:
URL: https://github.com/apache/kafka/pull/19598#discussion_r2074249531


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -344,20 +349,22 @@ LinkedHashMap<TopicIdPartition, Long> 
acquirablePartitions(
         sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition) 
-> {
             // Add the share partition to the list of partitions to be fetched 
only if we can
             // acquire the fetch lock on it.
-            if (sharePartition.maybeAcquireFetchLock()) {
+            if (sharePartition.maybeAcquireFetchLock(fetchId)) {
                 try {
+                    log.trace("Fetch lock for share partition {} has been 
acquired by {}", sharePartition, fetchId);

Review Comment:
   Is there any benefit of logging `sharePartition` here? There isn't any 
`toString()` method there.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1343,21 +1346,31 @@ public boolean maybeAcquireFetchLock() {
     }
 
     /**
-     * Release the fetch lock once the records are fetched from the leader.
+     * Release the fetch lock once the records are fetched from the leader. It 
is imperative that the DelayedShareFetch instance
+     * that acquired the fetch lock should be the one releasing it.
+     * @param fetchId - The DelayedShareFetch instance uuid that is trying to 
release the fetch lock.
      */
-    void releaseFetchLock() {
+    void releaseFetchLock(Uuid fetchId) {
         // Register the metric for the duration the fetch lock was held. Do 
not register the metric
         // if the fetch lock was not acquired.
-        if (fetchLock.get()) {
-            long currentTime = time.hiResClockMs();
+        long currentTime = time.hiResClockMs();

Review Comment:
   The metric is not updated in `else` though we have released the lock anyways.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -94,6 +95,7 @@ public class DelayedShareFetch extends DelayedOperation {
      * Metric for the rate of expired delayed fetch requests.
      */
     private final Meter expiredRequestMeter;
+    private final Uuid fetchId;

Review Comment:
   Can you please write comments for the `fetchId` usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to