apoorvmittal10 commented on code in PR #19598:
URL: https://github.com/apache/kafka/pull/19598#discussion_r2074262088


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1343,21 +1346,31 @@ public boolean maybeAcquireFetchLock() {
     }
 
     /**
-     * Release the fetch lock once the records are fetched from the leader.
+     * Release the fetch lock once the records are fetched from the leader. It 
is imperative that the DelayedShareFetch instance
+     * that acquired the fetch lock should be the one releasing it.
+     * @param fetchId - The DelayedShareFetch instance uuid that is trying to 
release the fetch lock.
      */
-    void releaseFetchLock() {
+    void releaseFetchLock(Uuid fetchId) {
         // Register the metric for the duration the fetch lock was held. Do 
not register the metric
         // if the fetch lock was not acquired.
-        if (fetchLock.get()) {
-            long currentTime = time.hiResClockMs();
+        long currentTime = time.hiResClockMs();
+        if (fetchLock.compareAndSet(fetchId, null)) {

Review Comment:
   Also add a non-null check in the method for `fetchId`, 
`Objects.requireNonNull(fetchId)`.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1327,13 +1329,14 @@ boolean canAcquireRecords() {
      * share partition is not fetched concurrently by multiple clients. The 
fetch lock is released once
      * the records are fetched and acquired.
      *
+     * @param fetchId - the DelayedShareFetch instance uuid that is trying to 
acquire the fetch lock.
      * @return A boolean which indicates whether the fetch lock is acquired.
      */
-    public boolean maybeAcquireFetchLock() {
+    public boolean maybeAcquireFetchLock(Uuid fetchId) {
         if (stateNotActive()) {
             return false;
         }
-        boolean acquired = fetchLock.compareAndSet(false, true);
+        boolean acquired = fetchLock.compareAndSet(null, fetchId);

Review Comment:
   ```suggestion
           boolean acquired = fetchLock.compareAndSet(null, 
Objects.requireNonNull(fetchId));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to