This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f0bb382e380 remove storage location lock from segment cache mount in 
favor of duplicate reserved check (#18481)
f0bb382e380 is described below

commit f0bb382e38064fb81a91e6420f089b2206fd5c0a
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Sep 4 03:52:56 2025 -0700

    remove storage location lock from segment cache mount in favor of duplicate 
reserved check (#18481)
---
 .../segment/loading/SegmentLocalCacheManager.java  | 66 +++++++++++-----------
 .../druid/segment/loading/StorageLocation.java     |  2 +-
 .../SegmentLocalCacheManagerConcurrencyTest.java   | 29 +++++++++-
 3 files changed, 60 insertions(+), 37 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 8ed033683aa..aa7e1914ff4 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -774,18 +774,19 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
     @Override
     public void mount(StorageLocation mountLocation) throws 
SegmentLoadingException
     {
-      final Lock lock = mountLocation.getLock().readLock();
-      lock.lock();
-      try {
+      // check to see if we should still be mounting by making sure we are 
still reserved in the location
+      // this is not done under a lock of the location, and that is ok.. we 
will check again at the end to prevent any
+      // orphaned files
+      if (!mountLocation.isReserved(this.id) && 
!mountLocation.isWeakReserved(this.id)) {
+        log.debug(
+            "aborting mount in location[%s] since entry[%s] is no longer 
reserved",
+            mountLocation.getPath(),
+            this.id
+        );
+        return;
+      }
 
-        if (!mountLocation.isReserved(this.id) && 
!mountLocation.isWeakReserved(this.id)) {
-          log.debug(
-              "aborting mount in location[%s] since entry[%s] is no longer 
reserved",
-              mountLocation.getPath(),
-              this.id
-          );
-          return;
-        }
+      try {
         synchronized (this) {
           if (location != null) {
             log.debug(
@@ -830,6 +831,18 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
           lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP;
           referenceProvider = ReferenceCountedSegmentProvider.of(segment);
         }
+
+
+        // since we do not hold a lock on the location while mounting, make 
sure that we actually are reserved and
+        // should have mounted, otherwise unmount so we don't leave any 
orphaned files
+        if (!mountLocation.isReserved(this.id) && 
!mountLocation.isWeakReserved(this.id)) {
+          log.debug(
+              "aborting mount in location[%s] since entry[%s] is no longer 
reserved",
+              mountLocation.getPath(),
+              this.id
+          );
+          unmount();
+        }
       }
       catch (SegmentLoadingException e) {
         try {
@@ -853,9 +866,6 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
         unmount();
         throw t;
       }
-      finally {
-        lock.unlock();
-      }
     }
 
     @Override
@@ -903,30 +913,20 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       if (!isMounted()) {
         return;
       }
-      final Lock lock;
       synchronized (this) {
-        lock = location.getLock().readLock();
-      }
-      lock.lock();
-      try {
-        synchronized (this) {
-          final File[] children = storageDir.listFiles();
-          if (children != null) {
-            for (File child : children) {
-              try (InputStream in = Files.newInputStream(child.toPath())) {
-                IOUtils.copy(in, NullOutputStream.NULL_OUTPUT_STREAM);
-                log.info("Loaded [%s] into page cache.", 
child.getAbsolutePath());
-              }
-              catch (Exception e) {
-                log.error(e, "Failed to load [%s] into page cache", 
child.getAbsolutePath());
-              }
+        final File[] children = storageDir.listFiles();
+        if (children != null) {
+          for (File child : children) {
+            try (InputStream in = Files.newInputStream(child.toPath())) {
+              IOUtils.copy(in, NullOutputStream.NULL_OUTPUT_STREAM);
+              log.info("Loaded [%s] into page cache.", 
child.getAbsolutePath());
+            }
+            catch (Exception e) {
+              log.error(e, "Failed to load [%s] into page cache", 
child.getAbsolutePath());
             }
           }
         }
       }
-      finally {
-        lock.unlock();
-      }
     }
 
     public boolean checkExists(final File location)
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java 
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index f47c10e6cfe..15a9f6eb477 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -128,7 +128,7 @@ public class StorageLocation
    * {@link #linkNewWeakEntry(WeakCacheEntry)} or {@link 
#unlinkWeakEntry(WeakCacheEntry)}, including calling
    * {@link #canHandle(CacheEntry)} which can unlink entries
    */
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
   public StorageLocation(File path, long maxSizeBytes, @Nullable Double 
freeSpacePercent)
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index 4f23d2cd74f..298e9ad0a04 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -195,15 +195,38 @@ class SegmentLocalCacheManagerConcurrencyTest
     final Interval interval = Intervals.of("2019-01-01/P1D");
     makeSegmentsToLoad(8, localStorageFolder, interval, segmentsToLoad);
 
-    final List<Future<?>> futures = segmentsToLoad
+    final List<Future<?>> loadFutures = segmentsToLoad
         .stream()
         .map(segment -> executorService.submit(new Load(manager, segment)))
         .collect(Collectors.toList());
 
-    for (Future<?> future : futures) {
+    for (Future<?> future : loadFutures) {
       future.get();
     }
-    Assertions.assertTrue(true);
+
+    final List<Future<Integer>> acquireFutures = segmentsToLoad
+        .stream()
+        .map(segment -> executorService.submit(new LoadCached(manager, 
segment, 50, 50)))
+        .collect(Collectors.toList());
+
+
+    int rows = 0;
+    int success = 0;
+    for (Future<Integer> future : acquireFutures) {
+      try {
+        Integer s = future.get();
+        success++;
+        if (s != null) {
+          rows += s;
+        }
+      }
+      catch (Throwable t) {
+        Assertions.fail();
+      }
+    }
+
+    Assertions.assertEquals(8, success);
+    Assertions.assertEquals(8 * 1209, rows);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to