AndrewJSchofield commented on code in PR #19598:
URL: https://github.com/apache/kafka/pull/19598#discussion_r2072332551


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1343,21 +1346,30 @@ public boolean maybeAcquireFetchLock() {
     }
 
     /**
-     * Release the fetch lock once the records are fetched from the leader.
+     * Release the fetch lock once the records are fetched from the leader. It 
is imperative that the DelayedShareFetch instance
+     * that acquired the fetch lock should be the one releasing it.
+     * @param fetchId - The DelayedShareFetch instance uuid that is trying to 
release the fetch lock.
      */
-    void releaseFetchLock() {
+    void releaseFetchLock(Uuid fetchId) {
         // Register the metric for the duration the fetch lock was held. Do 
not register the metric
         // if the fetch lock was not acquired.
-        if (fetchLock.get()) {
+        if (fetchLock.get().equals(fetchId)) {

Review Comment:
   Personally, I would prefer to see `fetchLock.compareAndSet(fetchId, null)`. 
You could calculate `acquiredDurationMs` before the lock is released, although 
you'd have to update the metrics and `fetchLockReleasedTimeMs`. Given you are 
using atomic operations, CAS seems a sensible precaution to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to