capistrant commented on code in PR #19535: URL: https://github.com/apache/druid/pull/19535#discussion_r3404916556
########## processing/src/main/java/org/apache/druid/segment/V10TimeBoundaryInspector.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.projections.ProjectionMetadata; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; + +/** + * {@link TimeBoundaryInspector} for V10 segments. Reads min/max {@link ColumnHolder#TIME_COLUMN_NAME} values from the + * base projection's {@link ProjectionMetadata} (persisted at write time by {@link IndexMergerBase}) so the inspector + * can answer without reading the {@code __time} column. Falls back to the segment's data interval (inexact) when the + * writer didn't supply min/max, for older V10 segments produced before the field existed. + * <p> + * The exact-bounds path is independent of segment sort order: the writer records the actual minimum and maximum + * timestamps across all walked rows even when the segment is not time-sorted. This is a strict improvement over + * the eager-load {@link QueryableIndexTimeBoundaryInspector}, which reports inexact bounds for non-time-sorted + * segments because its only option is to read positions 0 and N-1 of the {@code __time} column. + */ +public final class V10TimeBoundaryInspector implements TimeBoundaryInspector +{ + /** + * Build an inspector that reads from the given base projection metadata, falling back to {@code fallbackInterval} + * when the writer didn't persist min/max times (e.g. pre-Phase-1.5 segments). Review Comment: "pre-Phase-1.5" callout feels unnecessary with no added context ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.common.asyncresource.AsyncResource; +import org.apache.druid.common.asyncresource.AsyncResources; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + // refuse to download here so we never accidentally block a processing thread + if (!index.isFullyDownloaded()) { + throw DruidException.defensive( + "Sync makeCursorHolder requires the segment to be fully downloaded; use makeCursorHolderAsync for " + + "on-demand loading, or acquire the segment via the eager path so all files are loaded up front." + ); + } + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final QueryableProjection<QueryableIndex> matched = index.getProjection(spec); + final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; + final Set<String> requiredColumns = requiredColumns(rowSelector, matched, spec); + final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; + + // Mount the cache-layer bundle before submitting downloads. Released on the cancel/failure paths (requestRelease) + // or handed to the produced cursor holder on success (releaser), exactly once, and never while an in-flight column + // download is mid-mapFile(). See BundleHoldRelease. + final BundleHoldRelease holdRelease = new BundleHoldRelease(bundleAcquirer.acquire(bundleName)); + + try { + // submit one materialization task per column so a multi-threaded download executor can fan them out + final List<AsyncResource<String>> columnDownloads = new ArrayList<>(requiredColumns.size()); + for (String column : requiredColumns) { + columnDownloads.add(submitColumnDownload(rowSelector, column, holdRelease)); + } + final AsyncResource<List<String>> downloaded = AsyncResources.collect(columnDownloads); + + // Canceler runs if the awaiter closes this holder before it's ready (e.g. query cancel/timeout). Close the + // collected resource to cancel every column download that hasn't begun its deep-storage read yet (queued tasks + // are skipped; tasks parked on the download executor's permit are interrupted out of the interruptible wait + // before doing any I/O), then request the hold release through the handshake. Once the holder is produced and + // handed to set(), ownership transfers to the awaiter, which drains it via close() (cancel) or release() + // (success); the once-guard keeps the hold release safe across all of these paths. + final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(() -> { + CloseableUtils.closeAndSuppressExceptions(downloaded, ignored -> {}); + holdRelease.requestRelease(); + }); + downloaded.addReadyCallback(() -> { + final CursorHolder holder; + try { + downloaded.get(); // surfaces any column-download failure (or cancellation) as the cause + final CursorHolder inner = delegate.makeCursorHolderForProjection(spec, matched); + // The produced holder takes ownership of both inner and the bundle hold; from here, closing it releases both + holder = wrapWithBundleRelease(inner, holdRelease.releaser()); + } + catch (Throwable t) { + // A column download failed or was canceled, this branch can fire while a sibling download is still + // mid-mapFile(), so the hold release must go through the handshake rather than dropping it here directly. + holdRelease.requestRelease(); + asyncHolder.setException(t); + return; + } + if (!asyncHolder.set(holder)) { + // wrapper was closed (awaiter canceled) while we were producing the holder; close it ourselves so the + // holder, its inner, and the bundle hold don't leak. + holder.close(); + } + }); + return asyncHolder; + } + catch (Throwable t) { + // Failure between acquire and wiring up the downloads (submitDownload shut-down rejection, etc.). Ownership of + // the bundle hold hasn't transferred to the holder yet, so release it here. + throw CloseableUtils.closeAndWrapInCatch(t, holdRelease.releaser()); Review Comment: why do we use `releaser()` here, which is documented as a "success-path" releaser, instead of requesting a release? also, do we need to worry about what is in `columnDownloads` at all at this point? ########## server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java: ########## @@ -536,16 +897,30 @@ private AcquireSegmentAction acquireExistingSegment(SegmentCacheEntryIdentifier location.addWeakReservationHoldIfExists(identifier) ); if (hold != null) { - if (hold.getEntry().isMounted()) { + if (!(hold.getEntry() instanceof CompleteSegmentCacheEntry complete)) { + // The eager (complete) acquire path found a non-complete entry under this id. This only arises if + // partial-load on-disk state survived a toggle of druid.segmentCache.virtualStoragePartialDownloadsEnabled + // to false (getCachedSegments reserves an on-disk partial layout regardless of the flag). The eager path + // cannot serve a partial layout; surface a clear operator error rather than a ClassCastException. + throw DruidException.forPersona(DruidException.Persona.OPERATOR) Review Comment: I am also interested in these ^ ########## processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java: ########## @@ -0,0 +1,806 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.druid.common.asyncresource.AsyncResource; +import org.apache.druid.common.asyncresource.AsyncResources; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.ListBasedInputRow; +import org.apache.druid.data.input.impl.AggregateProjectionSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Order; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.filter.EqualityFilter; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.file.CountingRangeReader; +import org.apache.druid.segment.file.PartialSegmentFileMapperV10; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +class PartialQueryableIndexCursorFactoryTest extends InitializedNullHandlingTest +{ + private static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT; + private static final DateTime TIME = DateTimes.of("2025-01-01"); + private static final String PROJECTION_NAME = "dim1_metric1_sum"; + + private static final RowSignature ROW_SIGNATURE = RowSignature.builder() + .add("dim1", ColumnType.STRING) + .add("metric1", ColumnType.LONG) + .build(); + + private static final List<AggregateProjectionSpec> PROJECTIONS = Collections.singletonList( + AggregateProjectionSpec.builder(PROJECTION_NAME) + .groupingColumns(new StringDimensionSchema("dim1")) + .aggregators( + new LongSumAggregatorFactory("_metric1_sum", "metric1"), + new CountAggregatorFactory("_count") + ) + .build() + ); + + private static final List<InputRow> ROWS = Arrays.asList( + new ListBasedInputRow(ROW_SIGNATURE, TIME, ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 1L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(1), ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 2L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(2), ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 3L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3), ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 4L)) + ); + + @TempDir + static File sharedTempDir; + + private static File segmentDir; + private static ListeningExecutorService realExec; + + @TempDir + File perTestTempDir; + + @BeforeAll + static void buildSegment() + { + final File tmpDir = new File(sharedTempDir, "build_" + ThreadLocalRandom.current().nextInt()); + segmentDir = IndexBuilder.create() + .useV10() + .tmpDir(tmpDir) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + IncrementalIndexSchema.builder() + .withDimensionsSpec( + DimensionsSpec.builder() + .setDimensions( + List.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("metric1") + ) + ) + .build() + ) + .withRollup(false) + .withMinTimestamp(TIME.getMillis()) + .withProjections(PROJECTIONS) + .build() + ) + .indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build()) + .rows(ROWS) + .buildMMappedIndexFile(); + + realExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("partial-cursor-test-%d")); + } + + @AfterAll + static void teardownExec() + { + realExec.shutdownNow(); + } + + private record IndexAndMapper(PartialQueryableIndex index, PartialSegmentFileMapperV10 mapper) + implements AutoCloseable + { + @Override + public void close() + { + mapper.close(); + } + } + + private IndexAndMapper openIndex(CountingRangeReader rangeReader, String cacheName) throws IOException + { + final File cacheDir = new File(perTestTempDir, cacheName); + FileUtils.mkdirp(cacheDir); + final PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + ); + return new IndexAndMapper( + new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, COLUMN_CONFIG), + mapper + ); + } + + private static ListeningExecutorService directExec() + { + return MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService()); + } + + private static PartialBundleAcquirer noOpAcquirer(ListeningExecutorService downloadExec) + { + return new PartialBundleAcquirer() + { + @Override + public Closeable acquire(String bundleName) + { + return () -> {}; + } + + @Override + public <T> AsyncResource<T> submitDownload(Callable<T> task) + { + return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task)); + } + }; + } + + @Test + void testSyncMakeCursorHolderThrowsWhenSegmentNotFullyDownloaded() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "sync_throws")) { + final PartialQueryableIndex index = opened.index(); + Assertions.assertFalse(index.isFullyDownloaded()); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(directExec()) + ); + + // Fresh mapper has only the header on disk; no internal files have been downloaded. The sync path must + // refuse rather than trigger downloads on the calling thread. + Assertions.assertThrows( + DruidException.class, + () -> factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN) + ); + } + } + + @Test + void testSyncMakeCursorHolderSucceedsAfterFullDownload() throws Exception + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "sync_after_async")) { + final PartialQueryableIndex index = opened.index(); + final PartialSegmentFileMapperV10 mapper = opened.mapper(); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(directExec()) + ); + + // Eagerly materialize the entire segment via the file mapper to simulate the eager-acquire path. + mapper.ensureFilesAvailable(mapper.getSegmentFileMetadata().getFiles().keySet()); + Assertions.assertTrue(mapper.isFullyDownloaded(), "test precondition: every internal file should be on disk"); + + try (CursorHolder holder = factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) { + Assertions.assertNotNull(holder); + } + } + } + + @Test + void testAsyncMakeCursorHolderDefersDownloadUntilExecutorRuns() throws Exception + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final CountDownLatch gate = new CountDownLatch(1); + final ExecutorService rawExec = Execs.singleThreaded("partial-cursor-defer-%d"); + final ListeningExecutorService gatedExec = MoreExecutors.listeningDecorator(rawExec); + try (IndexAndMapper opened = openIndex(rangeReader, "async_defer")) { + final PartialQueryableIndex index = opened.index(); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(gatedExec) + ); + rangeReader.resetCount(); + + // Hold the executor with a pre-queued blocker so the actual download task can't start. + // assign the blocker future to an (intentionally unused) local so errorprone's CheckReturnValue is satisfied + @SuppressWarnings("unused") + ListenableFuture<?> unused = gatedExec.submit(() -> { + try { + gate.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + final AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + Assertions.assertFalse( + asyncHolder.isReady(), + "async holder must not be ready while download executor is still blocked" + ); + Assertions.assertEquals(0, rangeReader.getReadCount(), "no download should have started yet"); + + final CountDownLatch ready = new CountDownLatch(1); + asyncHolder.addReadyCallback(ready::countDown); + + // Release the gate; download should now run. + gate.countDown(); + Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS), "ready callback must fire after executor processes task"); + Assertions.assertTrue(asyncHolder.isReady()); + Assertions.assertTrue(rangeReader.getReadCount() > 0, "download must have happened by the time holder is ready"); + + try (CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + } + } + finally { + gatedExec.shutdownNow(); + rawExec.shutdownNow(); + } + } + + @Test + void testMatchedProjectionDownloadsOnlyRequestedColumns() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "proj_columns")) { + final PartialQueryableIndex index = opened.index(); + final PartialSegmentFileMapperV10 mapper = opened.mapper(); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(directExec()) + ); + + // Query asks for group-by dim1 + sum(metric1), but NOT count. The projection has dim1 / _metric1_sum / _count; + // the projection match rewrites this to physical columns {dim1, _metric1_sum}. We should see those download, + // but NOT the _count file; proves column-level pruning, not just projection-level. + final CursorBuildSpec aggSpec = CursorBuildSpec.builder() + .setGroupingColumns(List.of("dim1")) + .setAggregators(List.of( + new LongSumAggregatorFactory("_metric1_sum", "metric1") + )) + .setPhysicalColumns(Set.of("dim1", "metric1")) + .build(); + + // Async path triggers the column-level pre-download. directExec makes the future complete synchronously. + try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(aggSpec); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + final String projPrefix = PROJECTION_NAME + "/"; + final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/"; + final Set<String> downloaded = mapper.getDownloadedFiles(); + + // Requested projection columns materialized. + Assertions.assertTrue(downloaded.contains(projPrefix + "dim1"), "expected projection dim1; got: " + downloaded); + Assertions.assertTrue( + downloaded.contains(projPrefix + "_metric1_sum"), + "expected projection _metric1_sum; got: " + downloaded + ); + // The projection's _count file was NOT in the query; must not be downloaded + Assertions.assertFalse( + downloaded.contains(projPrefix + "_count"), + "expected projection _count NOT to be downloaded; got: " + downloaded + ); + // Base __time and base metric1 are NOT touched: projection dim1 may pull base dim1 as its parent column + // (legitimate dependency), but unrelated base columns must stay untouched. + Assertions.assertFalse( + downloaded.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME), + "expected base __time NOT to be downloaded; got: " + downloaded + ); + Assertions.assertFalse( + downloaded.contains(basePrefix + "metric1"), + "expected base metric1 NOT to be downloaded; got: " + downloaded + ); + } + } + } + + @Test + void testOpeningTimeOrderedProjectionCursorTriggersNoDownload() throws IOException + { + // A projection grouped by [__gran(HOUR), dim1] is time-ordered, so QueryableIndexCursorHolder reads the + // projection's (downloadable) time column when it builds the cursor (interval-checking offset + the projection's + // own time-boundary inspector). A GROUP BY dim1 query matches the projection by re-aggregating over __gran and + // does NOT list __time in physicalColumns. requiredColumns must nonetheless pre-fetch __time so that read happens + // during the async pre-fetch rather than as a lazy download when the cursor is opened on a processing thread. + final String projectionName = "hourly_dim1_metric1_sum"; + final File timeOrderedSegmentDir = buildTimeOrderedProjectionSegment(projectionName); + + final CountingRangeReader rangeReader = new CountingRangeReader(timeOrderedSegmentDir); + final File cacheDir = new File(perTestTempDir, "time_ordered_proj"); + FileUtils.mkdirp(cacheDir); + final PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + ); + try { + final PartialQueryableIndex index = + new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, COLUMN_CONFIG); + + // Precondition: the matched projection's row selector really is time-ordered, so cursor build WOULD read its + // time column. Without this, the assertion below would be vacuously satisfied. + final QueryableIndex projectionIndex = index.getProjectionQueryableIndex(projectionName); + Assertions.assertNotNull(projectionIndex, "test setup: projection should be present"); + Assertions.assertEquals( + Order.ASCENDING, + Cursors.getTimeOrdering(projectionIndex.getOrdering()), + "test setup: projection must be time-ordered for this test to guard anything" + ); + + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), index.getDataInterval()), + noOpAcquirer(directExec()) + ); + + // ETERNITY interval so the offset is not clipped to a residual interval (a genuine reason to read __time); + // here the only __time read is the time-ordered offset/inspector during cursor build, which the pre-fetch + // must have already covered. physicalColumns deliberately omits __time. + final CursorBuildSpec aggSpec = CursorBuildSpec.builder() + .setInterval(Intervals.ETERNITY) + .setGroupingColumns(List.of("dim1")) + .setAggregators(List.of( + new LongSumAggregatorFactory("_metric1_sum", "metric1") + )) + .setPhysicalColumns(Set.of("dim1", "metric1")) + .build(); + + try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(aggSpec); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + + // After the async pre-fetch resolves, the projection's required columns AND __time are on disk, even though + // __time was not declared in physicalColumns. + final String projPrefix = projectionName + "/"; + final Set<String> beforeOpen = mapper.getDownloadedFiles(); + Assertions.assertTrue( + beforeOpen.contains(projPrefix + "_metric1_sum"), + "test setup: projection path should have run and pre-fetched _metric1_sum; got: " + beforeOpen + ); + // The projection's time column is stored under its granularity grouping name (__gran); it is exposed as + // __time but the on-disk smoosh file (and thus the downloaded-files entry) is __gran. + Assertions.assertTrue( + beforeOpen.contains(projPrefix + "__gran"), + "requiredColumns must pre-fetch the projection's time column for a time-ordered cursor; got: " + beforeOpen + ); + + // Opening the cursor reads __time (interval-checking offset + the projection's time-boundary inspector). It + // was pre-fetched above, so this must not trigger any further (lazy) download on the processing thread. + Assertions.assertNotNull(holder.asCursor()); + + final Set<String> afterOpen = mapper.getDownloadedFiles(); + Assertions.assertEquals( + beforeOpen, + afterOpen, + "opening a projection-matched cursor must not trigger a lazy download; newly downloaded: " + + Sets.difference(afterOpen, beforeOpen) + ); + } + } + finally { + mapper.close(); + } + } + + @Test + void testOpeningTimeOrderedBaseCursorTriggersNoDownload() throws IOException + { + // The base table is time-ordered, so QueryableIndexCursorHolder reads __time when building the cursor (for its + // interval-checking offset) even though a raw scan does not list __time in physicalColumns. requiredColumns must + // pre-fetch __time so that read does not become a lazy download on the processing thread at cursor-open time. + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "base_open_no_download")) { + final PartialQueryableIndex index = opened.index(); + final PartialSegmentFileMapperV10 mapper = opened.mapper(); + Assertions.assertEquals( + Order.ASCENDING, + Cursors.getTimeOrdering(index.getOrdering()), + "test setup: base table must be time-ordered" + ); + + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), index.getDataInterval()), + noOpAcquirer(directExec()) + ); + + // Raw scan (no grouping/aggregators) so the aggregate projection is not matched and we exercise the base path. + // __time is deliberately omitted from physicalColumns. + final CursorBuildSpec scanSpec = CursorBuildSpec.builder() + .setInterval(Intervals.ETERNITY) + .setPhysicalColumns(Set.of("dim1")) + .build(); + + try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(scanSpec); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + + final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/"; + final Set<String> beforeOpen = mapper.getDownloadedFiles(); + // base path ran (dim1 pre-fetched) and __time was pre-fetched despite not being in physicalColumns + Assertions.assertTrue( + beforeOpen.contains(basePrefix + "dim1"), + "test setup: base path should have pre-fetched dim1; got: " + beforeOpen + ); + Assertions.assertTrue( + beforeOpen.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME), + "requiredColumns must pre-fetch __time for a time-ordered base cursor; got: " + beforeOpen + ); + + Assertions.assertNotNull(holder.asCursor()); + + final Set<String> afterOpen = mapper.getDownloadedFiles(); + Assertions.assertEquals( + beforeOpen, + afterOpen, + "opening a time-ordered base cursor must not trigger a lazy download; newly downloaded: " + + Sets.difference(afterOpen, beforeOpen) + ); + } + } + } + + @Test + void testFilteredWrapperOverPartialUsesAsyncPath() throws IOException + { + // A FilteredCursorFactory wrapping a not-fully-downloaded partial cursor factory must route makeCursorHolderAsync + // through the delegate's async (download-on-demand) path. If FilteredCursorFactory fell back to the CursorFactory + // default makeCursorHolderAsync (which calls sync makeCursorHolder), the partial delegate would throw because the + // segment isn't fully downloaded. + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "filtered_wrapper_async")) { + final PartialQueryableIndex index = opened.index(); + Assertions.assertFalse(index.isFullyDownloaded(), "test setup: segment must not be fully downloaded"); + + final PartialQueryableIndexCursorFactory partial = new PartialQueryableIndexCursorFactory( + index, + V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), index.getDataInterval()), + noOpAcquirer(directExec()) + ); + final CursorFactory filtered = new FilteredCursorFactory( + partial, + new EqualityFilter("dim1", ColumnType.STRING, "a", null) + ); + + // directExec resolves the async download inline; release() returns the built holder. asCursor() exercises the + // cursor over the on-demand-loaded columns. Would throw here if the wrapper used the sync default. + try (AsyncCursorHolder asyncHolder = filtered.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + Assertions.assertNotNull(holder.asCursor(), "async path should build a usable cursor without a sync download"); + } + } + } + + private File buildTimeOrderedProjectionSegment(String projectionName) Review Comment: nit: private helper method interleaved within tests ########## processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java: ########## @@ -0,0 +1,806 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.druid.common.asyncresource.AsyncResource; +import org.apache.druid.common.asyncresource.AsyncResources; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.ListBasedInputRow; +import org.apache.druid.data.input.impl.AggregateProjectionSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Order; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.filter.EqualityFilter; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.file.CountingRangeReader; +import org.apache.druid.segment.file.PartialSegmentFileMapperV10; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +class PartialQueryableIndexCursorFactoryTest extends InitializedNullHandlingTest +{ + private static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT; + private static final DateTime TIME = DateTimes.of("2025-01-01"); + private static final String PROJECTION_NAME = "dim1_metric1_sum"; + + private static final RowSignature ROW_SIGNATURE = RowSignature.builder() + .add("dim1", ColumnType.STRING) + .add("metric1", ColumnType.LONG) + .build(); + + private static final List<AggregateProjectionSpec> PROJECTIONS = Collections.singletonList( + AggregateProjectionSpec.builder(PROJECTION_NAME) + .groupingColumns(new StringDimensionSchema("dim1")) + .aggregators( + new LongSumAggregatorFactory("_metric1_sum", "metric1"), + new CountAggregatorFactory("_count") + ) + .build() + ); + + private static final List<InputRow> ROWS = Arrays.asList( + new ListBasedInputRow(ROW_SIGNATURE, TIME, ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 1L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(1), ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 2L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(2), ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 3L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3), ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 4L)) + ); + + @TempDir + static File sharedTempDir; + + private static File segmentDir; + private static ListeningExecutorService realExec; + + @TempDir + File perTestTempDir; + + @BeforeAll + static void buildSegment() + { + final File tmpDir = new File(sharedTempDir, "build_" + ThreadLocalRandom.current().nextInt()); + segmentDir = IndexBuilder.create() + .useV10() + .tmpDir(tmpDir) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + IncrementalIndexSchema.builder() + .withDimensionsSpec( + DimensionsSpec.builder() + .setDimensions( + List.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("metric1") + ) + ) + .build() + ) + .withRollup(false) + .withMinTimestamp(TIME.getMillis()) + .withProjections(PROJECTIONS) + .build() + ) + .indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build()) + .rows(ROWS) + .buildMMappedIndexFile(); + + realExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("partial-cursor-test-%d")); + } + + @AfterAll + static void teardownExec() + { + realExec.shutdownNow(); + } + + private record IndexAndMapper(PartialQueryableIndex index, PartialSegmentFileMapperV10 mapper) + implements AutoCloseable + { + @Override + public void close() + { + mapper.close(); + } + } + + private IndexAndMapper openIndex(CountingRangeReader rangeReader, String cacheName) throws IOException + { + final File cacheDir = new File(perTestTempDir, cacheName); + FileUtils.mkdirp(cacheDir); + final PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + ); + return new IndexAndMapper( + new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, COLUMN_CONFIG), + mapper + ); + } + + private static ListeningExecutorService directExec() + { + return MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService()); + } + + private static PartialBundleAcquirer noOpAcquirer(ListeningExecutorService downloadExec) + { + return new PartialBundleAcquirer() + { + @Override + public Closeable acquire(String bundleName) + { + return () -> {}; + } + + @Override + public <T> AsyncResource<T> submitDownload(Callable<T> task) + { + return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task)); + } + }; + } + + @Test + void testSyncMakeCursorHolderThrowsWhenSegmentNotFullyDownloaded() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "sync_throws")) { + final PartialQueryableIndex index = opened.index(); + Assertions.assertFalse(index.isFullyDownloaded()); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(directExec()) + ); + + // Fresh mapper has only the header on disk; no internal files have been downloaded. The sync path must + // refuse rather than trigger downloads on the calling thread. + Assertions.assertThrows( + DruidException.class, + () -> factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN) + ); + } + } + + @Test + void testSyncMakeCursorHolderSucceedsAfterFullDownload() throws Exception + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "sync_after_async")) { + final PartialQueryableIndex index = opened.index(); + final PartialSegmentFileMapperV10 mapper = opened.mapper(); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(directExec()) + ); + + // Eagerly materialize the entire segment via the file mapper to simulate the eager-acquire path. + mapper.ensureFilesAvailable(mapper.getSegmentFileMetadata().getFiles().keySet()); + Assertions.assertTrue(mapper.isFullyDownloaded(), "test precondition: every internal file should be on disk"); + + try (CursorHolder holder = factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) { + Assertions.assertNotNull(holder); + } + } + } + + @Test + void testAsyncMakeCursorHolderDefersDownloadUntilExecutorRuns() throws Exception + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final CountDownLatch gate = new CountDownLatch(1); + final ExecutorService rawExec = Execs.singleThreaded("partial-cursor-defer-%d"); + final ListeningExecutorService gatedExec = MoreExecutors.listeningDecorator(rawExec); + try (IndexAndMapper opened = openIndex(rangeReader, "async_defer")) { + final PartialQueryableIndex index = opened.index(); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(gatedExec) + ); + rangeReader.resetCount(); + + // Hold the executor with a pre-queued blocker so the actual download task can't start. + // assign the blocker future to an (intentionally unused) local so errorprone's CheckReturnValue is satisfied + @SuppressWarnings("unused") + ListenableFuture<?> unused = gatedExec.submit(() -> { + try { + gate.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + final AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + Assertions.assertFalse( + asyncHolder.isReady(), + "async holder must not be ready while download executor is still blocked" + ); + Assertions.assertEquals(0, rangeReader.getReadCount(), "no download should have started yet"); + + final CountDownLatch ready = new CountDownLatch(1); + asyncHolder.addReadyCallback(ready::countDown); + + // Release the gate; download should now run. + gate.countDown(); + Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS), "ready callback must fire after executor processes task"); + Assertions.assertTrue(asyncHolder.isReady()); + Assertions.assertTrue(rangeReader.getReadCount() > 0, "download must have happened by the time holder is ready"); + + try (CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + } + } + finally { + gatedExec.shutdownNow(); + rawExec.shutdownNow(); + } + } + + @Test + void testMatchedProjectionDownloadsOnlyRequestedColumns() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "proj_columns")) { + final PartialQueryableIndex index = opened.index(); + final PartialSegmentFileMapperV10 mapper = opened.mapper(); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(directExec()) + ); + + // Query asks for group-by dim1 + sum(metric1), but NOT count. The projection has dim1 / _metric1_sum / _count; + // the projection match rewrites this to physical columns {dim1, _metric1_sum}. We should see those download, + // but NOT the _count file; proves column-level pruning, not just projection-level. + final CursorBuildSpec aggSpec = CursorBuildSpec.builder() + .setGroupingColumns(List.of("dim1")) + .setAggregators(List.of( + new LongSumAggregatorFactory("_metric1_sum", "metric1") + )) + .setPhysicalColumns(Set.of("dim1", "metric1")) + .build(); + + // Async path triggers the column-level pre-download. directExec makes the future complete synchronously. + try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(aggSpec); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + final String projPrefix = PROJECTION_NAME + "/"; + final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/"; + final Set<String> downloaded = mapper.getDownloadedFiles(); + + // Requested projection columns materialized. + Assertions.assertTrue(downloaded.contains(projPrefix + "dim1"), "expected projection dim1; got: " + downloaded); + Assertions.assertTrue( + downloaded.contains(projPrefix + "_metric1_sum"), + "expected projection _metric1_sum; got: " + downloaded + ); + // The projection's _count file was NOT in the query; must not be downloaded + Assertions.assertFalse( + downloaded.contains(projPrefix + "_count"), + "expected projection _count NOT to be downloaded; got: " + downloaded + ); + // Base __time and base metric1 are NOT touched: projection dim1 may pull base dim1 as its parent column + // (legitimate dependency), but unrelated base columns must stay untouched. + Assertions.assertFalse( + downloaded.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME), + "expected base __time NOT to be downloaded; got: " + downloaded + ); + Assertions.assertFalse( + downloaded.contains(basePrefix + "metric1"), + "expected base metric1 NOT to be downloaded; got: " + downloaded + ); + } + } + } + + @Test + void testOpeningTimeOrderedProjectionCursorTriggersNoDownload() throws IOException + { + // A projection grouped by [__gran(HOUR), dim1] is time-ordered, so QueryableIndexCursorHolder reads the + // projection's (downloadable) time column when it builds the cursor (interval-checking offset + the projection's + // own time-boundary inspector). A GROUP BY dim1 query matches the projection by re-aggregating over __gran and + // does NOT list __time in physicalColumns. requiredColumns must nonetheless pre-fetch __time so that read happens + // during the async pre-fetch rather than as a lazy download when the cursor is opened on a processing thread. + final String projectionName = "hourly_dim1_metric1_sum"; + final File timeOrderedSegmentDir = buildTimeOrderedProjectionSegment(projectionName); + + final CountingRangeReader rangeReader = new CountingRangeReader(timeOrderedSegmentDir); + final File cacheDir = new File(perTestTempDir, "time_ordered_proj"); + FileUtils.mkdirp(cacheDir); + final PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + ); + try { + final PartialQueryableIndex index = + new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, COLUMN_CONFIG); + + // Precondition: the matched projection's row selector really is time-ordered, so cursor build WOULD read its + // time column. Without this, the assertion below would be vacuously satisfied. + final QueryableIndex projectionIndex = index.getProjectionQueryableIndex(projectionName); + Assertions.assertNotNull(projectionIndex, "test setup: projection should be present"); + Assertions.assertEquals( + Order.ASCENDING, + Cursors.getTimeOrdering(projectionIndex.getOrdering()), + "test setup: projection must be time-ordered for this test to guard anything" + ); + + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), index.getDataInterval()), + noOpAcquirer(directExec()) + ); + + // ETERNITY interval so the offset is not clipped to a residual interval (a genuine reason to read __time); + // here the only __time read is the time-ordered offset/inspector during cursor build, which the pre-fetch + // must have already covered. physicalColumns deliberately omits __time. + final CursorBuildSpec aggSpec = CursorBuildSpec.builder() + .setInterval(Intervals.ETERNITY) + .setGroupingColumns(List.of("dim1")) + .setAggregators(List.of( + new LongSumAggregatorFactory("_metric1_sum", "metric1") + )) + .setPhysicalColumns(Set.of("dim1", "metric1")) + .build(); + + try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(aggSpec); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + + // After the async pre-fetch resolves, the projection's required columns AND __time are on disk, even though + // __time was not declared in physicalColumns. + final String projPrefix = projectionName + "/"; + final Set<String> beforeOpen = mapper.getDownloadedFiles(); + Assertions.assertTrue( + beforeOpen.contains(projPrefix + "_metric1_sum"), + "test setup: projection path should have run and pre-fetched _metric1_sum; got: " + beforeOpen + ); + // The projection's time column is stored under its granularity grouping name (__gran); it is exposed as + // __time but the on-disk smoosh file (and thus the downloaded-files entry) is __gran. + Assertions.assertTrue( + beforeOpen.contains(projPrefix + "__gran"), + "requiredColumns must pre-fetch the projection's time column for a time-ordered cursor; got: " + beforeOpen + ); + + // Opening the cursor reads __time (interval-checking offset + the projection's time-boundary inspector). It + // was pre-fetched above, so this must not trigger any further (lazy) download on the processing thread. + Assertions.assertNotNull(holder.asCursor()); + + final Set<String> afterOpen = mapper.getDownloadedFiles(); + Assertions.assertEquals( + beforeOpen, + afterOpen, + "opening a projection-matched cursor must not trigger a lazy download; newly downloaded: " + + Sets.difference(afterOpen, beforeOpen) + ); + } + } + finally { + mapper.close(); + } + } + + @Test + void testOpeningTimeOrderedBaseCursorTriggersNoDownload() throws IOException + { + // The base table is time-ordered, so QueryableIndexCursorHolder reads __time when building the cursor (for its + // interval-checking offset) even though a raw scan does not list __time in physicalColumns. requiredColumns must + // pre-fetch __time so that read does not become a lazy download on the processing thread at cursor-open time. + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "base_open_no_download")) { + final PartialQueryableIndex index = opened.index(); + final PartialSegmentFileMapperV10 mapper = opened.mapper(); + Assertions.assertEquals( + Order.ASCENDING, + Cursors.getTimeOrdering(index.getOrdering()), + "test setup: base table must be time-ordered" + ); + + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), index.getDataInterval()), + noOpAcquirer(directExec()) + ); + + // Raw scan (no grouping/aggregators) so the aggregate projection is not matched and we exercise the base path. + // __time is deliberately omitted from physicalColumns. + final CursorBuildSpec scanSpec = CursorBuildSpec.builder() + .setInterval(Intervals.ETERNITY) + .setPhysicalColumns(Set.of("dim1")) + .build(); + + try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(scanSpec); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + + final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/"; + final Set<String> beforeOpen = mapper.getDownloadedFiles(); + // base path ran (dim1 pre-fetched) and __time was pre-fetched despite not being in physicalColumns + Assertions.assertTrue( + beforeOpen.contains(basePrefix + "dim1"), + "test setup: base path should have pre-fetched dim1; got: " + beforeOpen + ); + Assertions.assertTrue( + beforeOpen.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME), + "requiredColumns must pre-fetch __time for a time-ordered base cursor; got: " + beforeOpen + ); + + Assertions.assertNotNull(holder.asCursor()); + + final Set<String> afterOpen = mapper.getDownloadedFiles(); + Assertions.assertEquals( + beforeOpen, + afterOpen, + "opening a time-ordered base cursor must not trigger a lazy download; newly downloaded: " + + Sets.difference(afterOpen, beforeOpen) + ); + } + } + } + + @Test + void testFilteredWrapperOverPartialUsesAsyncPath() throws IOException + { + // A FilteredCursorFactory wrapping a not-fully-downloaded partial cursor factory must route makeCursorHolderAsync + // through the delegate's async (download-on-demand) path. If FilteredCursorFactory fell back to the CursorFactory + // default makeCursorHolderAsync (which calls sync makeCursorHolder), the partial delegate would throw because the + // segment isn't fully downloaded. + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "filtered_wrapper_async")) { + final PartialQueryableIndex index = opened.index(); + Assertions.assertFalse(index.isFullyDownloaded(), "test setup: segment must not be fully downloaded"); + + final PartialQueryableIndexCursorFactory partial = new PartialQueryableIndexCursorFactory( + index, + V10TimeBoundaryInspector.forBaseProjection(index.getBaseProjectionMetadata(), index.getDataInterval()), + noOpAcquirer(directExec()) + ); + final CursorFactory filtered = new FilteredCursorFactory( + partial, + new EqualityFilter("dim1", ColumnType.STRING, "a", null) + ); + + // directExec resolves the async download inline; release() returns the built holder. asCursor() exercises the + // cursor over the on-demand-loaded columns. Would throw here if the wrapper used the sync default. + try (AsyncCursorHolder asyncHolder = filtered.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + Assertions.assertNotNull(holder.asCursor(), "async path should build a usable cursor without a sync download"); + } + } + } + + private File buildTimeOrderedProjectionSegment(String projectionName) + { + final List<AggregateProjectionSpec> projections = Collections.singletonList( + AggregateProjectionSpec.builder(projectionName) + .virtualColumns(Granularities.toVirtualColumn(Granularities.HOUR, "__gran")) + .groupingColumns( + new LongDimensionSchema("__gran"), + new StringDimensionSchema("dim1") + ) + .aggregators( + new LongSumAggregatorFactory("_metric1_sum", "metric1"), + new CountAggregatorFactory("_count") + ) + .build() + ); + final File tmpDir = new File(perTestTempDir, "build_time_ordered_" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); + return IndexBuilder.create() + .useV10() + .tmpDir(tmpDir) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + IncrementalIndexSchema.builder() + .withDimensionsSpec( + DimensionsSpec.builder() + .setDimensions( + List.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("metric1") + ) + ) + .build() + ) + .withRollup(false) + .withMinTimestamp(TIME.getMillis()) + .withProjections(projections) + .build() + ) + .indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build()) + .rows(ROWS) + .buildMMappedIndexFile(); + } + + @Test + void testRowSignatureAndCapabilitiesDelegatedToUnderlyingFactory() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "delegate")) { + final PartialQueryableIndex index = opened.index(); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(directExec()) + ); + + // delegating to QueryableIndexCursorFactory which returns the index's row signature + final RowSignature signature = factory.getRowSignature(); + Assertions.assertNotNull(signature); + Assertions.assertTrue(signature.size() > 0); + + Assertions.assertNotNull(factory.getColumnCapabilities("dim1")); + } + } + + @Test + void testAsyncCloseBeforeDownloadCompletesClosesCursorHolderOnce() throws Exception + { + // Producer-doesn't-leak invariant: if the wrapper is closed before the producer's set(holder) fires, the producer + // is responsible for closing the holder. Use the real (single-threaded) executor with a gate that pauses the + // download task before it builds the cursor, then close the wrapper, then release the gate. + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final AtomicReference<RuntimeException> bgErr = new AtomicReference<>(); Review Comment: bgErr doesn't actually seem to be wired into the test at all. it is just created and used in get assertion below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
