smengcl commented on code in PR #7745:
URL: https://github.com/apache/ozone/pull/7745#discussion_r1929633851


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -119,12 +155,136 @@ public void invalidateAll() {
 
   @Override
   public void close() {
+    closed.set(true);
     invalidateAll();
     if (this.scheduler != null) {
       this.scheduler.close();
     }
   }
 
+  /**
+   * Decreases the lock count. When the count reaches zero all new threads 
would be able to get a handle of snapshot.
+   */
+  private Runnable decrementLockCount() {
+    lockCnt -= 1;
+    if (lockCnt <= 0) {
+      LOG.warn("Invalid negative lock count : {}. Setting it to 0", lockCnt);
+      lockCnt = 0;
+    }
+
+    if (lockCnt == 0) {
+      snapshotRefThreadIds.clear();
+    }
+    return () -> {
+      readLock.lock();
+      try {
+        if (lockCnt == 0) {
+          lockReleased.signalAll();
+        }
+      } finally {
+        readLock.unlock();
+      }
+    };
+  }
+
+  /**
+   * Releases a lock on the cache.
+   */
+  public void releaseLock() {
+    writeLock.lock();
+    Runnable callback = decrementLockCount();
+    try {
+      decrementLockCount();
+    } finally {
+      writeLock.unlock();
+    }
+    callback.run();
+  }
+
+  /**
+   * Acquires lock on the cache within max amount time.
+   * @param timeout Max time to wait to acquire lock.
+   * @return true if lock is acquired otherwise false.
+   * @throws InterruptedException
+   */
+  public boolean tryAcquire(long timeout) throws InterruptedException {
+    long endTime = System.currentTimeMillis() + timeout;
+    if (timeout <= 0) {
+      endTime = Long.MAX_VALUE;
+      timeout = Long.MAX_VALUE;
+    }
+    if (writeLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+      Runnable rollbackCallback = null;
+      try {
+        lockCnt += 1;
+        if (lockCnt == 1) {
+          snapshotRefThreadIds.clear();
+          dbMap.values().stream()
+              .flatMap(referenceCounted ->
+                  
referenceCounted.getThreadCntMap().entrySet().stream().map(entry -> 
Pair.of(entry, referenceCounted)))
+              .forEach(entry -> updateThreadCnt(entry.getKey().getKey(), 
entry.getValue().get().getSnapshotID(),
+                  entry.getKey().getValue()));
+        }
+        while (!snapshotRefThreadIds.isEmpty()) {
+          long currentTime = System.currentTimeMillis();
+          if (currentTime >= endTime) {
+            // If and release acquired lock
+            rollbackCallback = decrementLockCount();
+            break;
+          }
+          dbClosed.await(Math.min(endTime - currentTime, lockTimeout), 
TimeUnit.MILLISECONDS);
+        }
+      } finally {
+        writeLock.unlock();
+      }
+      if (rollbackCallback != null) {
+        rollbackCallback.run();
+        return false;
+      }
+      invalidateAll();
+      return true;
+    }
+    return false;
+  }
+
+  private void updateThreadCnt(long threadId, UUID key, long cnt) {
+    snapshotRefThreadIds.compute(threadId, (tid, countMap) -> {
+      if (countMap == null) {
+        if (cnt <= 0) {
+          return null;
+        }
+        countMap = new ConcurrentHashMap<>();
+      }
+      countMap.compute(key, (snapId, count) -> {
+        if (count == null) {
+          count = 0L;
+        }
+        count += cnt;
+        return count > 0 ? count : null;
+      });
+      return countMap.isEmpty() ? null : countMap;
+    });
+  }
+
+  /**
+   * Waits for lock to be released. This function doesn't wait for the lock if 
the thread already has a few snapshots
+   * open. It only waits if the thread is reading it's first snapshot.
+   * @param threadId
+   * @throws InterruptedException
+   */
+  private void waitForLock(long threadId) throws IOException {
+    if (snapshotRefThreadIds.computeIfPresent(threadId, (k, v) -> v) != null) {
+      while (lockCnt > 0) {
+        try {
+          lockReleased.await(lockTimeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          throw new IOException("Error while waiting for locks to be 
released.", e);
+        }

Review Comment:
   Does Condition `await()` throw `InterruptedException` upon timeout? I don't 
think so. You should check the return value instead if that is the intention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to