This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f0bb382e380 remove storage location lock from segment cache mount in
favor of duplicate reserved check (#18481)
f0bb382e380 is described below
commit f0bb382e38064fb81a91e6420f089b2206fd5c0a
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Sep 4 03:52:56 2025 -0700
remove storage location lock from segment cache mount in favor of duplicate
reserved check (#18481)
---
.../segment/loading/SegmentLocalCacheManager.java | 66 +++++++++++-----------
.../druid/segment/loading/StorageLocation.java | 2 +-
.../SegmentLocalCacheManagerConcurrencyTest.java | 29 +++++++++-
3 files changed, 60 insertions(+), 37 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 8ed033683aa..aa7e1914ff4 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -774,18 +774,19 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
@Override
public void mount(StorageLocation mountLocation) throws
SegmentLoadingException
{
- final Lock lock = mountLocation.getLock().readLock();
- lock.lock();
- try {
+ // check to see if we should still be mounting by making sure we are
still reserved in the location
+ // this is not done under a lock of the location, and that is ok.. we
will check again at the end to prevent any
+ // orphaned files
+ if (!mountLocation.isReserved(this.id) &&
!mountLocation.isWeakReserved(this.id)) {
+ log.debug(
+ "aborting mount in location[%s] since entry[%s] is no longer
reserved",
+ mountLocation.getPath(),
+ this.id
+ );
+ return;
+ }
- if (!mountLocation.isReserved(this.id) &&
!mountLocation.isWeakReserved(this.id)) {
- log.debug(
- "aborting mount in location[%s] since entry[%s] is no longer
reserved",
- mountLocation.getPath(),
- this.id
- );
- return;
- }
+ try {
synchronized (this) {
if (location != null) {
log.debug(
@@ -830,6 +831,18 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP;
referenceProvider = ReferenceCountedSegmentProvider.of(segment);
}
+
+
+ // since we do not hold a lock on the location while mounting, make
sure that we actually are reserved and
+ // should have mounted, otherwise unmount so we don't leave any
orphaned files
+ if (!mountLocation.isReserved(this.id) &&
!mountLocation.isWeakReserved(this.id)) {
+ log.debug(
+ "aborting mount in location[%s] since entry[%s] is no longer
reserved",
+ mountLocation.getPath(),
+ this.id
+ );
+ unmount();
+ }
}
catch (SegmentLoadingException e) {
try {
@@ -853,9 +866,6 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
unmount();
throw t;
}
- finally {
- lock.unlock();
- }
}
@Override
@@ -903,30 +913,20 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
if (!isMounted()) {
return;
}
- final Lock lock;
synchronized (this) {
- lock = location.getLock().readLock();
- }
- lock.lock();
- try {
- synchronized (this) {
- final File[] children = storageDir.listFiles();
- if (children != null) {
- for (File child : children) {
- try (InputStream in = Files.newInputStream(child.toPath())) {
- IOUtils.copy(in, NullOutputStream.NULL_OUTPUT_STREAM);
- log.info("Loaded [%s] into page cache.",
child.getAbsolutePath());
- }
- catch (Exception e) {
- log.error(e, "Failed to load [%s] into page cache",
child.getAbsolutePath());
- }
+ final File[] children = storageDir.listFiles();
+ if (children != null) {
+ for (File child : children) {
+ try (InputStream in = Files.newInputStream(child.toPath())) {
+ IOUtils.copy(in, NullOutputStream.NULL_OUTPUT_STREAM);
+ log.info("Loaded [%s] into page cache.",
child.getAbsolutePath());
+ }
+ catch (Exception e) {
+ log.error(e, "Failed to load [%s] into page cache",
child.getAbsolutePath());
}
}
}
}
- finally {
- lock.unlock();
- }
}
public boolean checkExists(final File location)
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index f47c10e6cfe..15a9f6eb477 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -128,7 +128,7 @@ public class StorageLocation
* {@link #linkNewWeakEntry(WeakCacheEntry)} or {@link
#unlinkWeakEntry(WeakCacheEntry)}, including calling
* {@link #canHandle(CacheEntry)} which can unlink entries
*/
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public StorageLocation(File path, long maxSizeBytes, @Nullable Double
freeSpacePercent)
{
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index 4f23d2cd74f..298e9ad0a04 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -195,15 +195,38 @@ class SegmentLocalCacheManagerConcurrencyTest
final Interval interval = Intervals.of("2019-01-01/P1D");
makeSegmentsToLoad(8, localStorageFolder, interval, segmentsToLoad);
- final List<Future<?>> futures = segmentsToLoad
+ final List<Future<?>> loadFutures = segmentsToLoad
.stream()
.map(segment -> executorService.submit(new Load(manager, segment)))
.collect(Collectors.toList());
- for (Future<?> future : futures) {
+ for (Future<?> future : loadFutures) {
future.get();
}
- Assertions.assertTrue(true);
+
+ final List<Future<Integer>> acquireFutures = segmentsToLoad
+ .stream()
+ .map(segment -> executorService.submit(new LoadCached(manager,
segment, 50, 50)))
+ .collect(Collectors.toList());
+
+
+ int rows = 0;
+ int success = 0;
+ for (Future<Integer> future : acquireFutures) {
+ try {
+ Integer s = future.get();
+ success++;
+ if (s != null) {
+ rows += s;
+ }
+ }
+ catch (Throwable t) {
+ Assertions.fail();
+ }
+ }
+
+ Assertions.assertEquals(8, success);
+ Assertions.assertEquals(8 * 1209, rows);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]