AndrewJSchofield commented on code in PR #19598: URL: https://github.com/apache/kafka/pull/19598#discussion_r2069993467
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1338,26 +1344,33 @@ public boolean maybeAcquireFetchLock() { long currentTime = time.hiResClockMs(); fetchLockAcquiredTimeMs = currentTime; fetchLockIdleDurationMs = fetchLockReleasedTimeMs != 0 ? currentTime - fetchLockReleasedTimeMs : 0; + fetchLockAcquiredBy = fetchId; Review Comment: I wonder if you've considered changing the `fetchLock` from an `AtomicBoolean` to an `AtomicReference`, and thus not needing a separate `fetchLockAcquiredBy`. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1338,26 +1344,33 @@ public boolean maybeAcquireFetchLock() { long currentTime = time.hiResClockMs(); fetchLockAcquiredTimeMs = currentTime; fetchLockIdleDurationMs = fetchLockReleasedTimeMs != 0 ? currentTime - fetchLockReleasedTimeMs : 0; + fetchLockAcquiredBy = fetchId; } return acquired; } /** - * Release the fetch lock once the records are fetched from the leader. + * Release the fetch lock once the records are fetched from the leader. It is imperative that the DelayedShareFetch instance + * that acquired the fetch lock should be the one releasing it. + * @param fetchId - The DelayedShareFetch instance uuid that is trying to release the fetch lock. */ - void releaseFetchLock() { + void releaseFetchLock(Uuid fetchId) { // Register the metric for the duration the fetch lock was held. Do not register the metric // if the fetch lock was not acquired. - if (fetchLock.get()) { + if (fetchLock.get() && fetchId.equals(fetchLockAcquiredBy)) { long currentTime = time.hiResClockMs(); long acquiredDurationMs = currentTime - fetchLockAcquiredTimeMs; // Update the metric for the fetch lock time. sharePartitionMetrics.recordFetchLockTimeMs(acquiredDurationMs); // Update fetch lock ratio metric. recordFetchLockRatioMetric(acquiredDurationMs); fetchLockReleasedTimeMs = currentTime; + fetchLock.set(false); + } else { + // This code should not be reached unless we are in error-prone scenarios. + log.warn("Instance {} does not hold the fetch lock, yet trying to release it for share partition {}-{}", Review Comment: Realistically, what happens now? Do we expect the instance which holds the fetch lock to release it? Maybe that code path could also log a matching warning so we can see the other end of the problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org