FrankChen021 commented on code in PR #19539:
URL: https://github.com/apache/druid/pull/19539#discussion_r3341105641


##########
server/src/main/java/org/apache/druid/segment/loading/external/StorageLocationVirtualStorageManager.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading.external;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.common.asyncresource.SettableAsyncResource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.io.FilePopulator;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.loading.CacheEntry;
+import org.apache.druid.segment.loading.StorageLoadingThreadPool;
+import org.apache.druid.segment.loading.StorageLocation;
+import org.apache.druid.segment.loading.StorageLocationSelectorStrategy;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+
+/**
+ * Default implementation of VirtualStorageManager that delegates to 
StorageLocation.
+ * Uses weak reservations for all cached files, making them eligible for 
eviction.
+ */
+public class StorageLocationVirtualStorageManager implements 
VirtualStorageManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(StorageLocationVirtualStorageManager.class);
+
+  private final StorageLocationSelectorStrategy strategy;
+  private final StorageLoadingThreadPool loadingThreadPool;
+
+  /**
+   * Per-identifier locks to ensure only one thread populates a given 
identifier.
+   * Once a location is resolved, it's stored in the lock so subsequent 
threads can check it.
+   */
+  private final ConcurrentHashMap<String, PopulationLock> populationLocks = 
new ConcurrentHashMap<>();
+
+  @Inject
+  public StorageLocationVirtualStorageManager(
+      List<StorageLocation> locations,
+      StorageLocationSelectorStrategy strategy,
+      StorageLoadingThreadPool loadingThreadPool
+  )
+  {
+    this.strategy = strategy;
+    this.loadingThreadPool = loadingThreadPool;
+    log.info("Initialized VirtualStorageManager with [%d] storage locations", 
locations.size());
+  }
+
+  @Nullable
+  @Override
+  public CachedFile get(String identifier)
+  {
+    PopulationLock lock = populationLocks.get(identifier);
+    if (lock == null) {
+      return null;
+    }
+
+    StorageLocation resolvedLocation = lock.getResolvedLocation();
+    if (resolvedLocation == null) {
+      // Population is still in flight: the lock exists but doesn't have a 
location yet.
+      // Return that the file doesn't exist instead of blocking.
+      return null;
+    }
+
+    StorageLocation.ReservationHold<CacheEntry> hold =
+        resolvedLocation.addWeakReservationHoldIfExists(new 
StringCacheEntryIdentifier(identifier));
+    return hold == null ? null : new CachedFile(hold, identifier);
+  }
+
+  @Override
+  public CachedFile reserveAndPopulate(
+      String identifier,
+      LongSupplier sizeSupplier,
+      FilePopulator populator
+  )
+  {
+    // Get or create lock for this identifier
+    final PopulationLock lock = populationLocks.computeIfAbsent(identifier, 
ignored -> new PopulationLock());
+
+    synchronized (lock) {
+      StringCacheEntryIdentifier cacheId = new 
StringCacheEntryIdentifier(identifier);
+
+      // If multiple threads are trying to reserve the same location, the 
first one will update the state on the lock
+      // so check that first.
+      StorageLocation resolvedLocation = lock.getResolvedLocation();
+      if (resolvedLocation == null) {
+        // Determining the size often requires an external system call, so we 
use this supplier to defer it until
+        // we absolutely need it
+        long sizeBytes = sizeSupplier.getAsLong();
+
+        // Try to reserve in each location according to strategy
+        Iterator<StorageLocation> locationIter = strategy.getLocations();
+        Throwable lastException = null;
+
+        while (locationIter.hasNext()) {
+          StorageLocation location = locationIter.next();
+
+          File locationFile = location.getPath();
+          try {
+            // Create cache entry that will call populator on mount
+            DownloadableCacheEntry entry = new DownloadableCacheEntry(cacheId, 
sizeBytes, populator, locationFile)
+            {
+              final AtomicBoolean mounted = new AtomicBoolean(false);
+
+              @Override
+              public void mount(StorageLocation location)
+              {
+                super.mount(location);
+                mounted.set(true);
+              }
+
+              @Override
+              public void unmount()
+              {
+                if (mounted.get()) {
+                  populationLocks.remove(identifier, lock);
+                }
+                super.unmount();
+              }
+            };
+
+            // Try to reserve weak space
+            if (!location.reserveWeak(entry)) {

Review Comment:
   [P1] Hold weak entries before populating them
   
   This calls reserveWeak(entry) and then populates the file before acquiring 
any ReservationHold. reserveWeak creates an unheld weak cache entry, so another 
concurrent reservation under storage pressure can reclaim/unmount it while the 
populator is still writing. If unmount runs before mountedFile is set, it also 
will not delete the just-written file, leaving untracked bytes or causing the 
first fetch to fail/fallback despite having reserved space. Please create the 
weak entry with a hold, or acquire a hold before population, and release it on 
failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to