capistrant commented on code in PR #19535:
URL: https://github.com/apache/druid/pull/19535#discussion_r3404916556


##########
processing/src/main/java/org/apache/druid/segment/V10TimeBoundaryInspector.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link TimeBoundaryInspector} for V10 segments. Reads min/max {@link 
ColumnHolder#TIME_COLUMN_NAME} values from the
+ * base projection's {@link ProjectionMetadata} (persisted at write time by 
{@link IndexMergerBase}) so the inspector
+ * can answer without reading the {@code __time} column. Falls back to the 
segment's data interval (inexact) when the
+ * writer didn't supply min/max, for older V10 segments produced before the 
field existed.
+ * <p>
+ * The exact-bounds path is independent of segment sort order: the writer 
records the actual minimum and maximum
+ * timestamps across all walked rows even when the segment is not time-sorted. 
This is a strict improvement over
+ * the eager-load {@link QueryableIndexTimeBoundaryInspector}, which reports 
inexact bounds for non-time-sorted
+ * segments because its only option is to read positions 0 and N-1 of the 
{@code __time} column.
+ */
+public final class V10TimeBoundaryInspector implements TimeBoundaryInspector
+{
+  /**
+   * Build an inspector that reads from the given base projection metadata, 
falling back to {@code fallbackInterval}
+   * when the writer didn't persist min/max times (e.g. pre-Phase-1.5 
segments).

Review Comment:
   "pre-Phase-1.5" callout feels unnecessary with no added context



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
+ * <p>
+ * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the 
segment to already be fully downloaded,
+ * intended for callers that acquired the segment via the eager 
(download-everything-up-front) path, so by the time
+ * they ask for a cursor every internal file is on disk. If anything is 
missing it throws
+ * {@link DruidException#defensive} so that we never trigger downloads on the 
sync path, since processing threads must
+ * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only 
path that performs downloads on demand;
+ * callers acknowledge that by opting into the async variant when they acquire 
a partial segment.
+ * <p>
+ * <b>Async download granularity.</b> Pre-fetch is column-level. {@link 
#makeCursorHolderAsync} calls
+ * {@link QueryableIndex#getColumnHolder} on each required column; the 
memoized supplier on the underlying
+ * {@link PartialQueryableIndex} eagerly invokes
+ * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} 
inside that call, which is what triggers
+ * the deep-storage range read. The cursor holder constructed afterward sees 
the already-materialized holders via the
+ * same memoized suppliers, so no further downloads happen at cursor-read time.
+ * <p>
+ * If a projection matches, the required columns are looked up against the 
projection's row selector and its rewritten
+ * {@link CursorBuildSpec} (which carries physical columns in the projection's 
namespace). When
+ * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column 
on the chosen row selector is pre-fetched
+ * as required by the contract of {@link CursorBuildSpec}.
+ * <p>
+ * <b>Parallelism.</b> Each column's materialization is submitted as a 
separate task to the supplied download executor.
+ * The cursor holder is constructed once every column task has completed.
+ */
+public class PartialQueryableIndexCursorFactory implements CursorFactory
+{
+  private final PartialQueryableIndex index;
+  private final QueryableIndexCursorFactory delegate;
+  private final PartialBundleAcquirer bundleAcquirer;
+
+  public PartialQueryableIndexCursorFactory(
+      PartialQueryableIndex index,
+      TimeBoundaryInspector timeBoundaryInspector,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.delegate = new QueryableIndexCursorFactory(index, 
timeBoundaryInspector);
+    this.bundleAcquirer = bundleAcquirer;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    // refuse to download here so we never accidentally block a processing 
thread
+    if (!index.isFullyDownloaded()) {
+      throw DruidException.defensive(
+          "Sync makeCursorHolder requires the segment to be fully downloaded; 
use makeCursorHolderAsync for "
+          + "on-demand loading, or acquire the segment via the eager path so 
all files are loaded up front."
+      );
+    }
+    return delegate.makeCursorHolder(spec);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final QueryableProjection<QueryableIndex> matched = 
index.getProjection(spec);
+    final QueryableIndex rowSelector = matched != null ? 
matched.getRowSelector() : index;
+    final Set<String> requiredColumns = requiredColumns(rowSelector, matched, 
spec);
+    final String bundleName = matched != null ? matched.getName() : 
Projections.BASE_TABLE_PROJECTION_NAME;
+
+    // Mount the cache-layer bundle before submitting downloads. Released on 
the cancel/failure paths (requestRelease)
+    // or handed to the produced cursor holder on success (releaser), exactly 
once, and never while an in-flight column
+    // download is mid-mapFile(). See BundleHoldRelease.
+    final BundleHoldRelease holdRelease = new 
BundleHoldRelease(bundleAcquirer.acquire(bundleName));
+
+    try {
+      // submit one materialization task per column so a multi-threaded 
download executor can fan them out
+      final List<AsyncResource<String>> columnDownloads = new 
ArrayList<>(requiredColumns.size());
+      for (String column : requiredColumns) {
+        columnDownloads.add(submitColumnDownload(rowSelector, column, 
holdRelease));
+      }
+      final AsyncResource<List<String>> downloaded = 
AsyncResources.collect(columnDownloads);
+
+      // Canceler runs if the awaiter closes this holder before it's ready 
(e.g. query cancel/timeout). Close the
+      // collected resource to cancel every column download that hasn't begun 
its deep-storage read yet (queued tasks
+      // are skipped; tasks parked on the download executor's permit are 
interrupted out of the interruptible wait
+      // before doing any I/O), then request the hold release through the 
handshake. Once the holder is produced and
+      // handed to set(), ownership transfers to the awaiter, which drains it 
via close() (cancel) or release()
+      // (success); the once-guard keeps the hold release safe across all of 
these paths.
+      final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(() -> {
+        CloseableUtils.closeAndSuppressExceptions(downloaded, ignored -> {});
+        holdRelease.requestRelease();
+      });
+      downloaded.addReadyCallback(() -> {
+        final CursorHolder holder;
+        try {
+          downloaded.get(); // surfaces any column-download failure (or 
cancellation) as the cause
+          final CursorHolder inner = 
delegate.makeCursorHolderForProjection(spec, matched);
+          // The produced holder takes ownership of both inner and the bundle 
hold; from here, closing it releases both
+          holder = wrapWithBundleRelease(inner, holdRelease.releaser());
+        }
+        catch (Throwable t) {
+          // A column download failed or was canceled, this branch can fire 
while a sibling download is still
+          // mid-mapFile(), so the hold release must go through the handshake 
rather than dropping it here directly.
+          holdRelease.requestRelease();
+          asyncHolder.setException(t);
+          return;
+        }
+        if (!asyncHolder.set(holder)) {
+          // wrapper was closed (awaiter canceled) while we were producing the 
holder; close it ourselves so the
+          // holder, its inner, and the bundle hold don't leak.
+          holder.close();
+        }
+      });
+      return asyncHolder;
+    }
+    catch (Throwable t) {
+      // Failure between acquire and wiring up the downloads (submitDownload 
shut-down rejection, etc.). Ownership of
+      // the bundle hold hasn't transferred to the holder yet, so release it 
here.
+      throw CloseableUtils.closeAndWrapInCatch(t, holdRelease.releaser());

Review Comment:
   why do we use `releaser()` here, which is documented as a "success-path" 
releaser, instead of requesting a release? also, do we need to worry about what 
is in `columnDownloads` at all at this point?



##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -536,16 +897,30 @@ private AcquireSegmentAction 
acquireExistingSegment(SegmentCacheEntryIdentifier
             location.addWeakReservationHoldIfExists(identifier)
         );
         if (hold != null) {
-          if (hold.getEntry().isMounted()) {
+          if (!(hold.getEntry() instanceof CompleteSegmentCacheEntry 
complete)) {
+            // The eager (complete) acquire path found a non-complete entry 
under this id. This only arises if
+            // partial-load on-disk state survived a toggle of 
druid.segmentCache.virtualStoragePartialDownloadsEnabled
+            // to false (getCachedSegments reserves an on-disk partial layout 
regardless of the flag). The eager path
+            // cannot serve a partial layout; surface a clear operator error 
rather than a ClassCastException.
+            throw DruidException.forPersona(DruidException.Persona.OPERATOR)

Review Comment:
   I am also interested in these ^



##########
processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java:
##########
@@ -0,0 +1,806 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.projections.Projections;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+class PartialQueryableIndexCursorFactoryTest extends 
InitializedNullHandlingTest
+{
+  private static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT;
+  private static final DateTime TIME = DateTimes.of("2025-01-01");
+  private static final String PROJECTION_NAME = "dim1_metric1_sum";
+
+  private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+                                                                .add("dim1", 
ColumnType.STRING)
+                                                                
.add("metric1", ColumnType.LONG)
+                                                                .build();
+
+  private static final List<AggregateProjectionSpec> PROJECTIONS = 
Collections.singletonList(
+      AggregateProjectionSpec.builder(PROJECTION_NAME)
+                             .groupingColumns(new 
StringDimensionSchema("dim1"))
+                             .aggregators(
+                                 new LongSumAggregatorFactory("_metric1_sum", 
"metric1"),
+                                 new CountAggregatorFactory("_count")
+                             )
+                             .build()
+  );
+
+  private static final List<InputRow> ROWS = Arrays.asList(
+      new ListBasedInputRow(ROW_SIGNATURE, TIME, 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 1L)),
+      new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(1), 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 2L)),
+      new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(2), 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 3L)),
+      new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3), 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 4L))
+  );
+
+  @TempDir
+  static File sharedTempDir;
+
+  private static File segmentDir;
+  private static ListeningExecutorService realExec;
+
+  @TempDir
+  File perTestTempDir;
+
+  @BeforeAll
+  static void buildSegment()
+  {
+    final File tmpDir = new File(sharedTempDir, "build_" + 
ThreadLocalRandom.current().nextInt());
+    segmentDir = IndexBuilder.create()
+                             .useV10()
+                             .tmpDir(tmpDir)
+                             
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                             .schema(
+                                 IncrementalIndexSchema.builder()
+                                                       .withDimensionsSpec(
+                                                           
DimensionsSpec.builder()
+                                                                         
.setDimensions(
+                                                                             
List.of(
+                                                                               
  new StringDimensionSchema("dim1"),
+                                                                               
  new LongDimensionSchema("metric1")
+                                                                             )
+                                                                         )
+                                                                         
.build()
+                                                       )
+                                                       .withRollup(false)
+                                                       
.withMinTimestamp(TIME.getMillis())
+                                                       
.withProjections(PROJECTIONS)
+                                                       .build()
+                             )
+                             
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+                             .rows(ROWS)
+                             .buildMMappedIndexFile();
+
+    realExec = 
MoreExecutors.listeningDecorator(Execs.singleThreaded("partial-cursor-test-%d"));
+  }
+
+  @AfterAll
+  static void teardownExec()
+  {
+    realExec.shutdownNow();
+  }
+
+  private record IndexAndMapper(PartialQueryableIndex index, 
PartialSegmentFileMapperV10 mapper)
+      implements AutoCloseable
+  {
+    @Override
+    public void close()
+    {
+      mapper.close();
+    }
+  }
+
+  private IndexAndMapper openIndex(CountingRangeReader rangeReader, String 
cacheName) throws IOException
+  {
+    final File cacheDir = new File(perTestTempDir, cacheName);
+    FileUtils.mkdirp(cacheDir);
+    final PartialSegmentFileMapperV10 mapper = 
PartialSegmentFileMapperV10.create(
+        rangeReader,
+        TestHelper.makeJsonMapper(),
+        cacheDir,
+        IndexIO.V10_FILE_NAME,
+        Collections.emptyList()
+    );
+    return new IndexAndMapper(
+        new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, 
COLUMN_CONFIG),
+        mapper
+    );
+  }
+
+  private static ListeningExecutorService directExec()
+  {
+    return 
MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
+  }
+
+  private static PartialBundleAcquirer noOpAcquirer(ListeningExecutorService 
downloadExec)
+  {
+    return new PartialBundleAcquirer()
+    {
+      @Override
+      public Closeable acquire(String bundleName)
+      {
+        return () -> {};
+      }
+
+      @Override
+      public <T> AsyncResource<T> submitDownload(Callable<T> task)
+      {
+        return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task));
+      }
+    };
+  }
+
+  @Test
+  void testSyncMakeCursorHolderThrowsWhenSegmentNotFullyDownloaded() throws 
IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "sync_throws")) {
+      final PartialQueryableIndex index = opened.index();
+      Assertions.assertFalse(index.isFullyDownloaded());
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(directExec())
+      );
+
+      // Fresh mapper has only the header on disk; no internal files have been 
downloaded. The sync path must
+      // refuse rather than trigger downloads on the calling thread.
+      Assertions.assertThrows(
+          DruidException.class,
+          () -> factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)
+      );
+    }
+  }
+
+  @Test
+  void testSyncMakeCursorHolderSucceedsAfterFullDownload() throws Exception
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "sync_after_async")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialSegmentFileMapperV10 mapper = opened.mapper();
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(directExec())
+      );
+
+      // Eagerly materialize the entire segment via the file mapper to 
simulate the eager-acquire path.
+      
mapper.ensureFilesAvailable(mapper.getSegmentFileMetadata().getFiles().keySet());
+      Assertions.assertTrue(mapper.isFullyDownloaded(), "test precondition: 
every internal file should be on disk");
+
+      try (CursorHolder holder = 
factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+        Assertions.assertNotNull(holder);
+      }
+    }
+  }
+
+  @Test
+  void testAsyncMakeCursorHolderDefersDownloadUntilExecutorRuns() throws 
Exception
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    final CountDownLatch gate = new CountDownLatch(1);
+    final ExecutorService rawExec = 
Execs.singleThreaded("partial-cursor-defer-%d");
+    final ListeningExecutorService gatedExec = 
MoreExecutors.listeningDecorator(rawExec);
+    try (IndexAndMapper opened = openIndex(rangeReader, "async_defer")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(gatedExec)
+      );
+      rangeReader.resetCount();
+
+      // Hold the executor with a pre-queued blocker so the actual download 
task can't start.
+      // assign the blocker future to an (intentionally unused) local so 
errorprone's CheckReturnValue is satisfied
+      @SuppressWarnings("unused")
+      ListenableFuture<?> unused = gatedExec.submit(() -> {
+        try {
+          gate.await();
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      });
+
+      final AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+      Assertions.assertFalse(
+          asyncHolder.isReady(),
+          "async holder must not be ready while download executor is still 
blocked"
+      );
+      Assertions.assertEquals(0, rangeReader.getReadCount(), "no download 
should have started yet");
+
+      final CountDownLatch ready = new CountDownLatch(1);
+      asyncHolder.addReadyCallback(ready::countDown);
+
+      // Release the gate; download should now run.
+      gate.countDown();
+      Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS), "ready callback 
must fire after executor processes task");
+      Assertions.assertTrue(asyncHolder.isReady());
+      Assertions.assertTrue(rangeReader.getReadCount() > 0, "download must 
have happened by the time holder is ready");
+
+      try (CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+      }
+    }
+    finally {
+      gatedExec.shutdownNow();
+      rawExec.shutdownNow();
+    }
+  }
+
+  @Test
+  void testMatchedProjectionDownloadsOnlyRequestedColumns() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "proj_columns")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialSegmentFileMapperV10 mapper = opened.mapper();
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(directExec())
+      );
+
+      // Query asks for group-by dim1 + sum(metric1), but NOT count. The 
projection has dim1 / _metric1_sum / _count;
+      // the projection match rewrites this to physical columns {dim1, 
_metric1_sum}. We should see those download,
+      // but NOT the _count file; proves column-level pruning, not just 
projection-level.
+      final CursorBuildSpec aggSpec = CursorBuildSpec.builder()
+                                                     
.setGroupingColumns(List.of("dim1"))
+                                                     .setAggregators(List.of(
+                                                         new 
LongSumAggregatorFactory("_metric1_sum", "metric1")
+                                                     ))
+                                                     
.setPhysicalColumns(Set.of("dim1", "metric1"))
+                                                     .build();
+
+      // Async path triggers the column-level pre-download. directExec makes 
the future complete synchronously.
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(aggSpec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+        final String projPrefix = PROJECTION_NAME + "/";
+        final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/";
+        final Set<String> downloaded = mapper.getDownloadedFiles();
+
+        // Requested projection columns materialized.
+        Assertions.assertTrue(downloaded.contains(projPrefix + "dim1"), 
"expected projection dim1; got: " + downloaded);
+        Assertions.assertTrue(
+            downloaded.contains(projPrefix + "_metric1_sum"),
+            "expected projection _metric1_sum; got: " + downloaded
+        );
+        // The projection's _count file was NOT in the query; must not be 
downloaded
+        Assertions.assertFalse(
+            downloaded.contains(projPrefix + "_count"),
+            "expected projection _count NOT to be downloaded; got: " + 
downloaded
+        );
+        // Base __time and base metric1 are NOT touched: projection dim1 may 
pull base dim1 as its parent column
+        // (legitimate dependency), but unrelated base columns must stay 
untouched.
+        Assertions.assertFalse(
+            downloaded.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME),
+            "expected base __time NOT to be downloaded; got: " + downloaded
+        );
+        Assertions.assertFalse(
+            downloaded.contains(basePrefix + "metric1"),
+            "expected base metric1 NOT to be downloaded; got: " + downloaded
+        );
+      }
+    }
+  }
+
+  @Test
+  void testOpeningTimeOrderedProjectionCursorTriggersNoDownload() throws 
IOException
+  {
+    // A projection grouped by [__gran(HOUR), dim1] is time-ordered, so 
QueryableIndexCursorHolder reads the
+    // projection's (downloadable) time column when it builds the cursor 
(interval-checking offset + the projection's
+    // own time-boundary inspector). A GROUP BY dim1 query matches the 
projection by re-aggregating over __gran and
+    // does NOT list __time in physicalColumns. requiredColumns must 
nonetheless pre-fetch __time so that read happens
+    // during the async pre-fetch rather than as a lazy download when the 
cursor is opened on a processing thread.
+    final String projectionName = "hourly_dim1_metric1_sum";
+    final File timeOrderedSegmentDir = 
buildTimeOrderedProjectionSegment(projectionName);
+
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(timeOrderedSegmentDir);
+    final File cacheDir = new File(perTestTempDir, "time_ordered_proj");
+    FileUtils.mkdirp(cacheDir);
+    final PartialSegmentFileMapperV10 mapper = 
PartialSegmentFileMapperV10.create(
+        rangeReader,
+        TestHelper.makeJsonMapper(),
+        cacheDir,
+        IndexIO.V10_FILE_NAME,
+        Collections.emptyList()
+    );
+    try {
+      final PartialQueryableIndex index =
+          new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, 
COLUMN_CONFIG);
+
+      // Precondition: the matched projection's row selector really is 
time-ordered, so cursor build WOULD read its
+      // time column. Without this, the assertion below would be vacuously 
satisfied.
+      final QueryableIndex projectionIndex = 
index.getProjectionQueryableIndex(projectionName);
+      Assertions.assertNotNull(projectionIndex, "test setup: projection should 
be present");
+      Assertions.assertEquals(
+          Order.ASCENDING,
+          Cursors.getTimeOrdering(projectionIndex.getOrdering()),
+          "test setup: projection must be time-ordered for this test to guard 
anything"
+      );
+
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          
V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), 
index.getDataInterval()),
+          noOpAcquirer(directExec())
+      );
+
+      // ETERNITY interval so the offset is not clipped to a residual interval 
(a genuine reason to read __time);
+      // here the only __time read is the time-ordered offset/inspector during 
cursor build, which the pre-fetch
+      // must have already covered. physicalColumns deliberately omits __time.
+      final CursorBuildSpec aggSpec = CursorBuildSpec.builder()
+                                                     
.setInterval(Intervals.ETERNITY)
+                                                     
.setGroupingColumns(List.of("dim1"))
+                                                     .setAggregators(List.of(
+                                                         new 
LongSumAggregatorFactory("_metric1_sum", "metric1")
+                                                     ))
+                                                     
.setPhysicalColumns(Set.of("dim1", "metric1"))
+                                                     .build();
+
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(aggSpec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+
+        // After the async pre-fetch resolves, the projection's required 
columns AND __time are on disk, even though
+        // __time was not declared in physicalColumns.
+        final String projPrefix = projectionName + "/";
+        final Set<String> beforeOpen = mapper.getDownloadedFiles();
+        Assertions.assertTrue(
+            beforeOpen.contains(projPrefix + "_metric1_sum"),
+            "test setup: projection path should have run and pre-fetched 
_metric1_sum; got: " + beforeOpen
+        );
+        // The projection's time column is stored under its granularity 
grouping name (__gran); it is exposed as
+        // __time but the on-disk smoosh file (and thus the downloaded-files 
entry) is __gran.
+        Assertions.assertTrue(
+            beforeOpen.contains(projPrefix + "__gran"),
+            "requiredColumns must pre-fetch the projection's time column for a 
time-ordered cursor; got: " + beforeOpen
+        );
+
+        // Opening the cursor reads __time (interval-checking offset + the 
projection's time-boundary inspector). It
+        // was pre-fetched above, so this must not trigger any further (lazy) 
download on the processing thread.
+        Assertions.assertNotNull(holder.asCursor());
+
+        final Set<String> afterOpen = mapper.getDownloadedFiles();
+        Assertions.assertEquals(
+            beforeOpen,
+            afterOpen,
+            "opening a projection-matched cursor must not trigger a lazy 
download; newly downloaded: "
+            + Sets.difference(afterOpen, beforeOpen)
+        );
+      }
+    }
+    finally {
+      mapper.close();
+    }
+  }
+
+  @Test
+  void testOpeningTimeOrderedBaseCursorTriggersNoDownload() throws IOException
+  {
+    // The base table is time-ordered, so QueryableIndexCursorHolder reads 
__time when building the cursor (for its
+    // interval-checking offset) even though a raw scan does not list __time 
in physicalColumns. requiredColumns must
+    // pre-fetch __time so that read does not become a lazy download on the 
processing thread at cursor-open time.
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, 
"base_open_no_download")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialSegmentFileMapperV10 mapper = opened.mapper();
+      Assertions.assertEquals(
+          Order.ASCENDING,
+          Cursors.getTimeOrdering(index.getOrdering()),
+          "test setup: base table must be time-ordered"
+      );
+
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          
V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), 
index.getDataInterval()),
+          noOpAcquirer(directExec())
+      );
+
+      // Raw scan (no grouping/aggregators) so the aggregate projection is not 
matched and we exercise the base path.
+      // __time is deliberately omitted from physicalColumns.
+      final CursorBuildSpec scanSpec = CursorBuildSpec.builder()
+                                                      
.setInterval(Intervals.ETERNITY)
+                                                      
.setPhysicalColumns(Set.of("dim1"))
+                                                      .build();
+
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(scanSpec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+
+        final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/";
+        final Set<String> beforeOpen = mapper.getDownloadedFiles();
+        // base path ran (dim1 pre-fetched) and __time was pre-fetched despite 
not being in physicalColumns
+        Assertions.assertTrue(
+            beforeOpen.contains(basePrefix + "dim1"),
+            "test setup: base path should have pre-fetched dim1; got: " + 
beforeOpen
+        );
+        Assertions.assertTrue(
+            beforeOpen.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME),
+            "requiredColumns must pre-fetch __time for a time-ordered base 
cursor; got: " + beforeOpen
+        );
+
+        Assertions.assertNotNull(holder.asCursor());
+
+        final Set<String> afterOpen = mapper.getDownloadedFiles();
+        Assertions.assertEquals(
+            beforeOpen,
+            afterOpen,
+            "opening a time-ordered base cursor must not trigger a lazy 
download; newly downloaded: "
+            + Sets.difference(afterOpen, beforeOpen)
+        );
+      }
+    }
+  }
+
+  @Test
+  void testFilteredWrapperOverPartialUsesAsyncPath() throws IOException
+  {
+    // A FilteredCursorFactory wrapping a not-fully-downloaded partial cursor 
factory must route makeCursorHolderAsync
+    // through the delegate's async (download-on-demand) path. If 
FilteredCursorFactory fell back to the CursorFactory
+    // default makeCursorHolderAsync (which calls sync makeCursorHolder), the 
partial delegate would throw because the
+    // segment isn't fully downloaded.
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, 
"filtered_wrapper_async")) {
+      final PartialQueryableIndex index = opened.index();
+      Assertions.assertFalse(index.isFullyDownloaded(), "test setup: segment 
must not be fully downloaded");
+
+      final PartialQueryableIndexCursorFactory partial = new 
PartialQueryableIndexCursorFactory(
+          index,
+          
V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), 
index.getDataInterval()),
+          noOpAcquirer(directExec())
+      );
+      final CursorFactory filtered = new FilteredCursorFactory(
+          partial,
+          new EqualityFilter("dim1", ColumnType.STRING, "a", null)
+      );
+
+      // directExec resolves the async download inline; release() returns the 
built holder. asCursor() exercises the
+      // cursor over the on-demand-loaded columns. Would throw here if the 
wrapper used the sync default.
+      try (AsyncCursorHolder asyncHolder = 
filtered.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+        Assertions.assertNotNull(holder.asCursor(), "async path should build a 
usable cursor without a sync download");
+      }
+    }
+  }
+
+  private File buildTimeOrderedProjectionSegment(String projectionName)

Review Comment:
   nit: private helper method interleaved within tests



##########
processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java:
##########
@@ -0,0 +1,806 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.projections.Projections;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+class PartialQueryableIndexCursorFactoryTest extends 
InitializedNullHandlingTest
+{
+  private static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT;
+  private static final DateTime TIME = DateTimes.of("2025-01-01");
+  private static final String PROJECTION_NAME = "dim1_metric1_sum";
+
+  private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+                                                                .add("dim1", 
ColumnType.STRING)
+                                                                
.add("metric1", ColumnType.LONG)
+                                                                .build();
+
+  private static final List<AggregateProjectionSpec> PROJECTIONS = 
Collections.singletonList(
+      AggregateProjectionSpec.builder(PROJECTION_NAME)
+                             .groupingColumns(new 
StringDimensionSchema("dim1"))
+                             .aggregators(
+                                 new LongSumAggregatorFactory("_metric1_sum", 
"metric1"),
+                                 new CountAggregatorFactory("_count")
+                             )
+                             .build()
+  );
+
+  private static final List<InputRow> ROWS = Arrays.asList(
+      new ListBasedInputRow(ROW_SIGNATURE, TIME, 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 1L)),
+      new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(1), 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 2L)),
+      new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(2), 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 3L)),
+      new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3), 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 4L))
+  );
+
+  @TempDir
+  static File sharedTempDir;
+
+  private static File segmentDir;
+  private static ListeningExecutorService realExec;
+
+  @TempDir
+  File perTestTempDir;
+
+  @BeforeAll
+  static void buildSegment()
+  {
+    final File tmpDir = new File(sharedTempDir, "build_" + 
ThreadLocalRandom.current().nextInt());
+    segmentDir = IndexBuilder.create()
+                             .useV10()
+                             .tmpDir(tmpDir)
+                             
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                             .schema(
+                                 IncrementalIndexSchema.builder()
+                                                       .withDimensionsSpec(
+                                                           
DimensionsSpec.builder()
+                                                                         
.setDimensions(
+                                                                             
List.of(
+                                                                               
  new StringDimensionSchema("dim1"),
+                                                                               
  new LongDimensionSchema("metric1")
+                                                                             )
+                                                                         )
+                                                                         
.build()
+                                                       )
+                                                       .withRollup(false)
+                                                       
.withMinTimestamp(TIME.getMillis())
+                                                       
.withProjections(PROJECTIONS)
+                                                       .build()
+                             )
+                             
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+                             .rows(ROWS)
+                             .buildMMappedIndexFile();
+
+    realExec = 
MoreExecutors.listeningDecorator(Execs.singleThreaded("partial-cursor-test-%d"));
+  }
+
+  @AfterAll
+  static void teardownExec()
+  {
+    realExec.shutdownNow();
+  }
+
+  private record IndexAndMapper(PartialQueryableIndex index, 
PartialSegmentFileMapperV10 mapper)
+      implements AutoCloseable
+  {
+    @Override
+    public void close()
+    {
+      mapper.close();
+    }
+  }
+
+  private IndexAndMapper openIndex(CountingRangeReader rangeReader, String 
cacheName) throws IOException
+  {
+    final File cacheDir = new File(perTestTempDir, cacheName);
+    FileUtils.mkdirp(cacheDir);
+    final PartialSegmentFileMapperV10 mapper = 
PartialSegmentFileMapperV10.create(
+        rangeReader,
+        TestHelper.makeJsonMapper(),
+        cacheDir,
+        IndexIO.V10_FILE_NAME,
+        Collections.emptyList()
+    );
+    return new IndexAndMapper(
+        new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, 
COLUMN_CONFIG),
+        mapper
+    );
+  }
+
+  private static ListeningExecutorService directExec()
+  {
+    return 
MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
+  }
+
+  private static PartialBundleAcquirer noOpAcquirer(ListeningExecutorService 
downloadExec)
+  {
+    return new PartialBundleAcquirer()
+    {
+      @Override
+      public Closeable acquire(String bundleName)
+      {
+        return () -> {};
+      }
+
+      @Override
+      public <T> AsyncResource<T> submitDownload(Callable<T> task)
+      {
+        return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task));
+      }
+    };
+  }
+
+  @Test
+  void testSyncMakeCursorHolderThrowsWhenSegmentNotFullyDownloaded() throws 
IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "sync_throws")) {
+      final PartialQueryableIndex index = opened.index();
+      Assertions.assertFalse(index.isFullyDownloaded());
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(directExec())
+      );
+
+      // Fresh mapper has only the header on disk; no internal files have been 
downloaded. The sync path must
+      // refuse rather than trigger downloads on the calling thread.
+      Assertions.assertThrows(
+          DruidException.class,
+          () -> factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)
+      );
+    }
+  }
+
+  @Test
+  void testSyncMakeCursorHolderSucceedsAfterFullDownload() throws Exception
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "sync_after_async")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialSegmentFileMapperV10 mapper = opened.mapper();
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(directExec())
+      );
+
+      // Eagerly materialize the entire segment via the file mapper to 
simulate the eager-acquire path.
+      
mapper.ensureFilesAvailable(mapper.getSegmentFileMetadata().getFiles().keySet());
+      Assertions.assertTrue(mapper.isFullyDownloaded(), "test precondition: 
every internal file should be on disk");
+
+      try (CursorHolder holder = 
factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+        Assertions.assertNotNull(holder);
+      }
+    }
+  }
+
+  @Test
+  void testAsyncMakeCursorHolderDefersDownloadUntilExecutorRuns() throws 
Exception
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    final CountDownLatch gate = new CountDownLatch(1);
+    final ExecutorService rawExec = 
Execs.singleThreaded("partial-cursor-defer-%d");
+    final ListeningExecutorService gatedExec = 
MoreExecutors.listeningDecorator(rawExec);
+    try (IndexAndMapper opened = openIndex(rangeReader, "async_defer")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(gatedExec)
+      );
+      rangeReader.resetCount();
+
+      // Hold the executor with a pre-queued blocker so the actual download 
task can't start.
+      // assign the blocker future to an (intentionally unused) local so 
errorprone's CheckReturnValue is satisfied
+      @SuppressWarnings("unused")
+      ListenableFuture<?> unused = gatedExec.submit(() -> {
+        try {
+          gate.await();
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      });
+
+      final AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+      Assertions.assertFalse(
+          asyncHolder.isReady(),
+          "async holder must not be ready while download executor is still 
blocked"
+      );
+      Assertions.assertEquals(0, rangeReader.getReadCount(), "no download 
should have started yet");
+
+      final CountDownLatch ready = new CountDownLatch(1);
+      asyncHolder.addReadyCallback(ready::countDown);
+
+      // Release the gate; download should now run.
+      gate.countDown();
+      Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS), "ready callback 
must fire after executor processes task");
+      Assertions.assertTrue(asyncHolder.isReady());
+      Assertions.assertTrue(rangeReader.getReadCount() > 0, "download must 
have happened by the time holder is ready");
+
+      try (CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+      }
+    }
+    finally {
+      gatedExec.shutdownNow();
+      rawExec.shutdownNow();
+    }
+  }
+
+  @Test
+  void testMatchedProjectionDownloadsOnlyRequestedColumns() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "proj_columns")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialSegmentFileMapperV10 mapper = opened.mapper();
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(directExec())
+      );
+
+      // Query asks for group-by dim1 + sum(metric1), but NOT count. The 
projection has dim1 / _metric1_sum / _count;
+      // the projection match rewrites this to physical columns {dim1, 
_metric1_sum}. We should see those download,
+      // but NOT the _count file; proves column-level pruning, not just 
projection-level.
+      final CursorBuildSpec aggSpec = CursorBuildSpec.builder()
+                                                     
.setGroupingColumns(List.of("dim1"))
+                                                     .setAggregators(List.of(
+                                                         new 
LongSumAggregatorFactory("_metric1_sum", "metric1")
+                                                     ))
+                                                     
.setPhysicalColumns(Set.of("dim1", "metric1"))
+                                                     .build();
+
+      // Async path triggers the column-level pre-download. directExec makes 
the future complete synchronously.
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(aggSpec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+        final String projPrefix = PROJECTION_NAME + "/";
+        final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/";
+        final Set<String> downloaded = mapper.getDownloadedFiles();
+
+        // Requested projection columns materialized.
+        Assertions.assertTrue(downloaded.contains(projPrefix + "dim1"), 
"expected projection dim1; got: " + downloaded);
+        Assertions.assertTrue(
+            downloaded.contains(projPrefix + "_metric1_sum"),
+            "expected projection _metric1_sum; got: " + downloaded
+        );
+        // The projection's _count file was NOT in the query; must not be 
downloaded
+        Assertions.assertFalse(
+            downloaded.contains(projPrefix + "_count"),
+            "expected projection _count NOT to be downloaded; got: " + 
downloaded
+        );
+        // Base __time and base metric1 are NOT touched: projection dim1 may 
pull base dim1 as its parent column
+        // (legitimate dependency), but unrelated base columns must stay 
untouched.
+        Assertions.assertFalse(
+            downloaded.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME),
+            "expected base __time NOT to be downloaded; got: " + downloaded
+        );
+        Assertions.assertFalse(
+            downloaded.contains(basePrefix + "metric1"),
+            "expected base metric1 NOT to be downloaded; got: " + downloaded
+        );
+      }
+    }
+  }
+
+  @Test
+  void testOpeningTimeOrderedProjectionCursorTriggersNoDownload() throws 
IOException
+  {
+    // A projection grouped by [__gran(HOUR), dim1] is time-ordered, so 
QueryableIndexCursorHolder reads the
+    // projection's (downloadable) time column when it builds the cursor 
(interval-checking offset + the projection's
+    // own time-boundary inspector). A GROUP BY dim1 query matches the 
projection by re-aggregating over __gran and
+    // does NOT list __time in physicalColumns. requiredColumns must 
nonetheless pre-fetch __time so that read happens
+    // during the async pre-fetch rather than as a lazy download when the 
cursor is opened on a processing thread.
+    final String projectionName = "hourly_dim1_metric1_sum";
+    final File timeOrderedSegmentDir = 
buildTimeOrderedProjectionSegment(projectionName);
+
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(timeOrderedSegmentDir);
+    final File cacheDir = new File(perTestTempDir, "time_ordered_proj");
+    FileUtils.mkdirp(cacheDir);
+    final PartialSegmentFileMapperV10 mapper = 
PartialSegmentFileMapperV10.create(
+        rangeReader,
+        TestHelper.makeJsonMapper(),
+        cacheDir,
+        IndexIO.V10_FILE_NAME,
+        Collections.emptyList()
+    );
+    try {
+      final PartialQueryableIndex index =
+          new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, 
COLUMN_CONFIG);
+
+      // Precondition: the matched projection's row selector really is 
time-ordered, so cursor build WOULD read its
+      // time column. Without this, the assertion below would be vacuously 
satisfied.
+      final QueryableIndex projectionIndex = 
index.getProjectionQueryableIndex(projectionName);
+      Assertions.assertNotNull(projectionIndex, "test setup: projection should 
be present");
+      Assertions.assertEquals(
+          Order.ASCENDING,
+          Cursors.getTimeOrdering(projectionIndex.getOrdering()),
+          "test setup: projection must be time-ordered for this test to guard 
anything"
+      );
+
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          
V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), 
index.getDataInterval()),
+          noOpAcquirer(directExec())
+      );
+
+      // ETERNITY interval so the offset is not clipped to a residual interval 
(a genuine reason to read __time);
+      // here the only __time read is the time-ordered offset/inspector during 
cursor build, which the pre-fetch
+      // must have already covered. physicalColumns deliberately omits __time.
+      final CursorBuildSpec aggSpec = CursorBuildSpec.builder()
+                                                     
.setInterval(Intervals.ETERNITY)
+                                                     
.setGroupingColumns(List.of("dim1"))
+                                                     .setAggregators(List.of(
+                                                         new 
LongSumAggregatorFactory("_metric1_sum", "metric1")
+                                                     ))
+                                                     
.setPhysicalColumns(Set.of("dim1", "metric1"))
+                                                     .build();
+
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(aggSpec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+
+        // After the async pre-fetch resolves, the projection's required 
columns AND __time are on disk, even though
+        // __time was not declared in physicalColumns.
+        final String projPrefix = projectionName + "/";
+        final Set<String> beforeOpen = mapper.getDownloadedFiles();
+        Assertions.assertTrue(
+            beforeOpen.contains(projPrefix + "_metric1_sum"),
+            "test setup: projection path should have run and pre-fetched 
_metric1_sum; got: " + beforeOpen
+        );
+        // The projection's time column is stored under its granularity 
grouping name (__gran); it is exposed as
+        // __time but the on-disk smoosh file (and thus the downloaded-files 
entry) is __gran.
+        Assertions.assertTrue(
+            beforeOpen.contains(projPrefix + "__gran"),
+            "requiredColumns must pre-fetch the projection's time column for a 
time-ordered cursor; got: " + beforeOpen
+        );
+
+        // Opening the cursor reads __time (interval-checking offset + the 
projection's time-boundary inspector). It
+        // was pre-fetched above, so this must not trigger any further (lazy) 
download on the processing thread.
+        Assertions.assertNotNull(holder.asCursor());
+
+        final Set<String> afterOpen = mapper.getDownloadedFiles();
+        Assertions.assertEquals(
+            beforeOpen,
+            afterOpen,
+            "opening a projection-matched cursor must not trigger a lazy 
download; newly downloaded: "
+            + Sets.difference(afterOpen, beforeOpen)
+        );
+      }
+    }
+    finally {
+      mapper.close();
+    }
+  }
+
+  @Test
+  void testOpeningTimeOrderedBaseCursorTriggersNoDownload() throws IOException
+  {
+    // The base table is time-ordered, so QueryableIndexCursorHolder reads 
__time when building the cursor (for its
+    // interval-checking offset) even though a raw scan does not list __time 
in physicalColumns. requiredColumns must
+    // pre-fetch __time so that read does not become a lazy download on the 
processing thread at cursor-open time.
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, 
"base_open_no_download")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialSegmentFileMapperV10 mapper = opened.mapper();
+      Assertions.assertEquals(
+          Order.ASCENDING,
+          Cursors.getTimeOrdering(index.getOrdering()),
+          "test setup: base table must be time-ordered"
+      );
+
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          
V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), 
index.getDataInterval()),
+          noOpAcquirer(directExec())
+      );
+
+      // Raw scan (no grouping/aggregators) so the aggregate projection is not 
matched and we exercise the base path.
+      // __time is deliberately omitted from physicalColumns.
+      final CursorBuildSpec scanSpec = CursorBuildSpec.builder()
+                                                      
.setInterval(Intervals.ETERNITY)
+                                                      
.setPhysicalColumns(Set.of("dim1"))
+                                                      .build();
+
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(scanSpec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+
+        final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/";
+        final Set<String> beforeOpen = mapper.getDownloadedFiles();
+        // base path ran (dim1 pre-fetched) and __time was pre-fetched despite 
not being in physicalColumns
+        Assertions.assertTrue(
+            beforeOpen.contains(basePrefix + "dim1"),
+            "test setup: base path should have pre-fetched dim1; got: " + 
beforeOpen
+        );
+        Assertions.assertTrue(
+            beforeOpen.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME),
+            "requiredColumns must pre-fetch __time for a time-ordered base 
cursor; got: " + beforeOpen
+        );
+
+        Assertions.assertNotNull(holder.asCursor());
+
+        final Set<String> afterOpen = mapper.getDownloadedFiles();
+        Assertions.assertEquals(
+            beforeOpen,
+            afterOpen,
+            "opening a time-ordered base cursor must not trigger a lazy 
download; newly downloaded: "
+            + Sets.difference(afterOpen, beforeOpen)
+        );
+      }
+    }
+  }
+
+  @Test
+  void testFilteredWrapperOverPartialUsesAsyncPath() throws IOException
+  {
+    // A FilteredCursorFactory wrapping a not-fully-downloaded partial cursor 
factory must route makeCursorHolderAsync
+    // through the delegate's async (download-on-demand) path. If 
FilteredCursorFactory fell back to the CursorFactory
+    // default makeCursorHolderAsync (which calls sync makeCursorHolder), the 
partial delegate would throw because the
+    // segment isn't fully downloaded.
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, 
"filtered_wrapper_async")) {
+      final PartialQueryableIndex index = opened.index();
+      Assertions.assertFalse(index.isFullyDownloaded(), "test setup: segment 
must not be fully downloaded");
+
+      final PartialQueryableIndexCursorFactory partial = new 
PartialQueryableIndexCursorFactory(
+          index,
+          
V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), 
index.getDataInterval()),
+          noOpAcquirer(directExec())
+      );
+      final CursorFactory filtered = new FilteredCursorFactory(
+          partial,
+          new EqualityFilter("dim1", ColumnType.STRING, "a", null)
+      );
+
+      // directExec resolves the async download inline; release() returns the 
built holder. asCursor() exercises the
+      // cursor over the on-demand-loaded columns. Would throw here if the 
wrapper used the sync default.
+      try (AsyncCursorHolder asyncHolder = 
filtered.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder);
+        Assertions.assertNotNull(holder.asCursor(), "async path should build a 
usable cursor without a sync download");
+      }
+    }
+  }
+
+  private File buildTimeOrderedProjectionSegment(String projectionName)
+  {
+    final List<AggregateProjectionSpec> projections = 
Collections.singletonList(
+        AggregateProjectionSpec.builder(projectionName)
+                               
.virtualColumns(Granularities.toVirtualColumn(Granularities.HOUR, "__gran"))
+                               .groupingColumns(
+                                   new LongDimensionSchema("__gran"),
+                                   new StringDimensionSchema("dim1")
+                               )
+                               .aggregators(
+                                   new 
LongSumAggregatorFactory("_metric1_sum", "metric1"),
+                                   new CountAggregatorFactory("_count")
+                               )
+                               .build()
+    );
+    final File tmpDir = new File(perTestTempDir, "build_time_ordered_" + 
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
+    return IndexBuilder.create()
+                       .useV10()
+                       .tmpDir(tmpDir)
+                       
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                       .schema(
+                           IncrementalIndexSchema.builder()
+                                                 .withDimensionsSpec(
+                                                     DimensionsSpec.builder()
+                                                                   
.setDimensions(
+                                                                       List.of(
+                                                                           new 
StringDimensionSchema("dim1"),
+                                                                           new 
LongDimensionSchema("metric1")
+                                                                       )
+                                                                   )
+                                                                   .build()
+                                                 )
+                                                 .withRollup(false)
+                                                 
.withMinTimestamp(TIME.getMillis())
+                                                 .withProjections(projections)
+                                                 .build()
+                       )
+                       
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+                       .rows(ROWS)
+                       .buildMMappedIndexFile();
+  }
+
+  @Test
+  void testRowSignatureAndCapabilitiesDelegatedToUnderlyingFactory() throws 
IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "delegate")) {
+      final PartialQueryableIndex index = opened.index();
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          index,
+          QueryableIndexTimeBoundaryInspector.create(index),
+          noOpAcquirer(directExec())
+      );
+
+      // delegating to QueryableIndexCursorFactory which returns the index's 
row signature
+      final RowSignature signature = factory.getRowSignature();
+      Assertions.assertNotNull(signature);
+      Assertions.assertTrue(signature.size() > 0);
+
+      Assertions.assertNotNull(factory.getColumnCapabilities("dim1"));
+    }
+  }
+
+  @Test
+  void testAsyncCloseBeforeDownloadCompletesClosesCursorHolderOnce() throws 
Exception
+  {
+    // Producer-doesn't-leak invariant: if the wrapper is closed before the 
producer's set(holder) fires, the producer
+    // is responsible for closing the holder. Use the real (single-threaded) 
executor with a gate that pauses the
+    // download task before it builds the cursor, then close the wrapper, then 
release the gate.
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    final AtomicReference<RuntimeException> bgErr = new AtomicReference<>();

Review Comment:
   bgErr doesn't actually seem to be wired into the test at all. it is just 
created and used in get assertion below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to