adixitconfluent commented on code in PR #19598:
URL: https://github.com/apache/kafka/pull/19598#discussion_r2071132002


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1338,26 +1344,33 @@ public boolean maybeAcquireFetchLock() {
             long currentTime = time.hiResClockMs();
             fetchLockAcquiredTimeMs = currentTime;
             fetchLockIdleDurationMs = fetchLockReleasedTimeMs != 0 ? 
currentTime - fetchLockReleasedTimeMs : 0;
+            fetchLockAcquiredBy = fetchId;
         }
         return acquired;
     }
 
     /**
-     * Release the fetch lock once the records are fetched from the leader.
+     * Release the fetch lock once the records are fetched from the leader. It 
is imperative that the DelayedShareFetch instance
+     * that acquired the fetch lock should be the one releasing it.
+     * @param fetchId - The DelayedShareFetch instance uuid that is trying to 
release the fetch lock.
      */
-    void releaseFetchLock() {
+    void releaseFetchLock(Uuid fetchId) {
         // Register the metric for the duration the fetch lock was held. Do 
not register the metric
         // if the fetch lock was not acquired.
-        if (fetchLock.get()) {
+        if (fetchLock.get() && fetchId.equals(fetchLockAcquiredBy)) {
             long currentTime = time.hiResClockMs();
             long acquiredDurationMs = currentTime - fetchLockAcquiredTimeMs;
             // Update the metric for the fetch lock time.
             sharePartitionMetrics.recordFetchLockTimeMs(acquiredDurationMs);
             // Update fetch lock ratio metric.
             recordFetchLockRatioMetric(acquiredDurationMs);
             fetchLockReleasedTimeMs = currentTime;
+            fetchLock.set(false);
+        } else {
+            // This code should not be reached unless we are in error-prone 
scenarios.
+            log.warn("Instance {} does not hold the fetch lock, yet trying to 
release it for share partition {}-{}",

Review Comment:
   If we come across this situation, we don't necessarily want the instance 
holding the fetch lock to release it because it might not necessarily be caused 
due to a code path for the instance holding the fetch lock. It could very well 
be a case that this lock might have already been released and the same instance 
is trying to release it again, hence this warn log will be thrown. What I have 
done in my latest commit is add `TRACE` logs whenever the fetch lock is being 
acquired and released. This could lead us to the source of the problem when the 
locks are incorrectly released.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to