This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9cdedcddc52 feat: partial load for clustered segments (#19620)
9cdedcddc52 is described below
commit 9cdedcddc52f6de1c70a8c52568b4f051743d557
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jun 23 17:05:12 2026 -0700
feat: partial load for clustered segments (#19620)
---
.../org/apache/druid/segment/IndexMergerV10.java | 5 +-
.../druid/segment/PartialQueryableIndex.java | 202 +++++++--
.../PartialQueryableIndexCursorFactory.java | 115 ++++-
.../druid/segment/QueryableIndexCursorFactory.java | 14 +-
.../druid/segment/projections/Projections.java | 11 +-
...eIndexCursorFactoryClusteredProjectionTest.java | 227 ++++++++++
...alQueryableIndexCursorFactoryClusteredTest.java | 466 +++++++++++++++++++++
.../PartialQueryableIndexCursorFactoryTest.java | 58 +--
...PartialQueryableIndexCursorFactoryTestBase.java | 109 +++++
.../loading/PartialSegmentMetadataCacheEntry.java | 31 +-
.../PartialSegmentMetadataCacheEntryTest.java | 94 ++++-
...SegmentLocalCacheManagerPartialAcquireTest.java | 131 +++++-
12 files changed, 1331 insertions(+), 132 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
index 68eb4acdc5f..d20f037496e 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
@@ -418,9 +418,8 @@ public class IndexMergerV10 extends IndexMergerBase
for (Map.Entry<List<Integer>, List<AdapterAndGroup>> groupEntry :
merged.groupSources.entrySet()) {
final List<Integer> mergedTuple = groupEntry.getKey();
- final String groupPrefix =
Projections.getClusterGroupSegmentInternalFilePrefix(mergedTuple);
- // file-bundle name is the prefix without the trailing slash
- final String groupName = groupPrefix.substring(0, groupPrefix.length()
- 1);
+ final String groupName =
Projections.getClusterGroupBundleName(mergedTuple);
+ final String groupPrefix = groupName + "/";
final List<IndexableAdapter> groupAdapters =
Lists.newArrayListWithCapacity(groupEntry.getValue().size());
for (AdapterAndGroup source : groupEntry.getValue()) {
diff --git
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
index cfa7735300d..cfe5f6f4d68 100644
---
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
@@ -34,6 +34,8 @@ import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
@@ -46,6 +48,7 @@ import
org.apache.druid.segment.projections.ConstantTimeColumn;
import org.apache.druid.segment.projections.ProjectionMetadata;
import org.apache.druid.segment.projections.Projections;
import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.projections.TableClusterGroupSpec;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -58,6 +61,7 @@ import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -103,6 +107,16 @@ public class PartialQueryableIndex implements
QueryableIndex
private final ConcurrentHashMap<String, Map<String,
Supplier<BaseColumnHolder>>> projectionColumnsByName =
new ConcurrentHashMap<>();
+ // clustered base summary when this segment is a clustered base table, else
null. when non-null, the base table has
+ // no top-level columns
+ @Nullable
+ private final ClusteredValueGroupsBaseTableSchema clusteredBaseSummary;
+
+ // per-cluster-group column suppliers, keyed by group index (into the
summary's group list). built on demand like
+ // projectionColumnsByName; each supplier defers both mapFile() and
deserialization until the column is read.
+ private final ConcurrentHashMap<Integer, Map<String,
Supplier<BaseColumnHolder>>> clusterGroupColumnsByIndex =
+ new ConcurrentHashMap<>();
+
// lazy dimension handlers
private final Supplier<Map<String, DimensionHandler>> dimensionHandlers;
@@ -125,14 +139,14 @@ public class PartialQueryableIndex implements
QueryableIndex
baseProjection.getSchema().getName()
);
final BaseTableProjectionSchema baseSchema = (BaseTableProjectionSchema)
baseProjection.getSchema();
- // Clustered V10 segments keep their data in per-cluster-group bundles,
matched from metadata much like
- // projections, but partial loading does not wire that up yet (and the
write side isn't available to build one for
- // testing). Fail loudly here rather than mis-treating the clustered base
summary as a plain base table, whose
- // top-level columns are empty. Both acquire modes route partial-eligible
segments through this index, so this
- // guards the full path too. Remove once partial loading supports
clustered segments.
- if (baseSchema instanceof ClusteredValueGroupsBaseTableSchema) {
+ this.clusteredBaseSummary = baseSchema instanceof
ClusteredValueGroupsBaseTableSchema
+ ? (ClusteredValueGroupsBaseTableSchema)
baseSchema
+ : null;
+ if (clusteredBaseSummary != null &&
!clusteredBaseSummary.getSharedColumns().isEmpty()) {
+ // Shared base columns aren't wired into partial loading yet
throw DruidException.defensive(
- "Clustered V10 segments are not yet supported for partial loading
(interval[%s])",
+ "Clustered V10 segments with shared columns%s are not yet supported
for partial loading (interval[%s])",
+ clusteredBaseSummary.getSharedColumns(),
metadata.getInterval()
);
}
@@ -141,16 +155,24 @@ public class PartialQueryableIndex implements
QueryableIndex
this.baseProjectionPrefix =
Projections.getProjectionSegmentInternalFilePrefix(baseSchema);
this.dataInterval = Intervals.of(metadata.getInterval());
this.bitmapFactory = metadata.getBitmapEncoding().getBitmapFactory();
- this.availableDimensions = new
ListIndexed<>(baseSchema.getDimensionNames());
- // build column names (dimensions first, then other columns, excluding
__time)
- final LinkedHashSet<String> dimsFirst = new
LinkedHashSet<>(baseSchema.getDimensionNames());
- for (String columnName : baseSchema.getColumnNames()) {
- if (!ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
- dimsFirst.add(columnName);
+ // A clustered base table has no top-level columns (its data lives in
per-cluster-group bundles), so its available
+ // dimensions / column names are empty; logical columns are resolved via
getColumnCapabilities + cluster-group
+ // dispatch (matching the eager SimpleQueryableIndex). A regular base
table lists dimensions first, then the
+ // remaining columns (excluding __time).
+ if (clusteredBaseSummary != null) {
+ this.availableDimensions = new ListIndexed<>(List.of());
+ this.columnNames = List.of();
+ } else {
+ this.availableDimensions = new
ListIndexed<>(baseSchema.getDimensionNames());
+ final LinkedHashSet<String> dimsFirst = new
LinkedHashSet<>(baseSchema.getDimensionNames());
+ for (String columnName : baseSchema.getColumnNames()) {
+ if (!ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
+ dimsFirst.add(columnName);
+ }
}
+ this.columnNames = List.copyOf(dimsFirst);
}
- this.columnNames = List.copyOf(dimsFirst);
// build aggregate projection metadata for matching
final List<AggregateProjectionMetadata> aggProjections = new ArrayList<>();
@@ -193,8 +215,11 @@ public class PartialQueryableIndex implements
QueryableIndex
}
// build per-column suppliers for the base table. each supplier is
memoized and defers both mapFile() and
- // deserialization until the column is accessed.
- this.baseColumns = buildProjectionColumnSuppliers(baseProjection,
Map.of());
+ // deserialization until the column is accessed. A clustered base has no
top-level columns (its data lives in
+ // per-cluster-group bundles), so its base column map is empty.
+ this.baseColumns = clusteredBaseSummary != null
+ ? Map.of()
+ : buildProjectionColumnSuppliers(baseProjection,
Map.of());
this.dimensionHandlers = Suppliers.memoize(this::initDimensionHandlers);
}
@@ -281,9 +306,42 @@ public class PartialQueryableIndex implements
QueryableIndex
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
+ if (clusteredBaseSummary != null) {
+ return getClusteredColumnCapabilities(column);
+ }
// look up the column in the base table projection's namespace
final String smooshName = baseProjectionPrefix + column;
- final ColumnDescriptor descriptor =
metadata.getColumnDescriptors().get(smooshName);
+ return
capabilitiesFromDescriptor(metadata.getColumnDescriptors().get(smooshName));
+ }
+
+ /**
+ * Column capabilities for a clustered base table, answered from metadata
only (no downloads): clustering columns
+ * resolve from the summary's typed clustering signature; data columns (and
{@code __time}) resolve from the first
+ * cluster group's {@link ColumnDescriptor} (all groups share the same
per-group shape). Mirrors the eager
+ * {@link SimpleQueryableIndex} clustered branch, but reads the descriptor
directly rather than routing through a
+ * group sub-index's {@code getColumnHolder} (which would trigger a
download).
+ */
+ @Nullable
+ private ColumnCapabilities getClusteredColumnCapabilities(String column)
+ {
+ final ColumnType clusteringType =
clusteredBaseSummary.getClusteringColumns().getColumnType(column).orElse(null);
+ if (clusteringType != null) {
+ return clusteringType.is(ValueType.STRING)
+ ?
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+ :
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(clusteringType);
+ }
+ final List<TableClusterGroupSpec> groups =
clusteredBaseSummary.getClusterGroups();
+ if (groups.isEmpty()) {
+ return null;
+ }
+ final String smooshName =
+
Projections.getClusterGroupSegmentInternalFileName(groups.getFirst().getClusteringValueIds(),
column);
+ return
capabilitiesFromDescriptor(metadata.getColumnDescriptors().get(smooshName));
+ }
+
+ @Nullable
+ private static ColumnCapabilities capabilitiesFromDescriptor(@Nullable
ColumnDescriptor descriptor)
+ {
if (descriptor == null) {
return null;
}
@@ -378,6 +436,74 @@ public class PartialQueryableIndex implements
QueryableIndex
};
}
+ @Nullable
+ @Override
+ public ClusteredValueGroupsBaseTableSchema getClusteredBaseSummary()
+ {
+ return clusteredBaseSummary;
+ }
+
+ @Override
+ public List<TableClusterGroupSpec> getClusterGroupSchemas()
+ {
+ return clusteredBaseSummary == null ? List.of() :
clusteredBaseSummary.getClusterGroups();
+ }
+
+ /**
+ * Returns a {@link QueryableIndex} sub-view scoped to a single cluster
group's column data, The per-group column
+ * suppliers are memoized by group index so a pre-fetch (async path) and the
later cursor build share the same
+ * already-downloaded columns. Clustering columns are NOT present in the
returned index; they are injected at the
+ * cursor-factory level via {@code ClusteringColumnSelectorFactory}.
+ */
+ @Override
+ public QueryableIndex getClusterGroupQueryableIndex(TableClusterGroupSpec
groupSpec)
+ {
+ if (clusteredBaseSummary == null) {
+ throw DruidException.defensive("getClusterGroupQueryableIndex called on
a non-clustered segment");
+ }
+ final List<TableClusterGroupSpec> groups =
clusteredBaseSummary.getClusterGroups();
+ final int groupIndex = groups.indexOf(groupSpec);
+ if (groupIndex < 0) {
+ throw DruidException.defensive("Cluster group spec is not part of this
segment");
+ }
+ final Map<String, Supplier<BaseColumnHolder>> groupColumns =
clusterGroupColumnsByIndex.computeIfAbsent(
+ groupIndex,
+ i -> buildColumnSuppliers(
+ clusteredBaseSummary.getTimeColumnName(),
+ groupSpec.getNumRows(),
+ clusteredBaseSummary.getGroupColumnNames(),
+ column ->
Projections.getClusterGroupSegmentInternalFileName(groupSpec.getClusteringValueIds(),
column),
+ Map.of()
+ )
+ );
+ final Metadata groupMetadata = new Metadata(
+ null,
+ null,
+ null,
+ clusteredBaseSummary.getEffectiveGranularity(),
+ false,
+ clusteredBaseSummary.getGroupOrdering(),
+ null,
+ null
+ );
+ return new SimpleQueryableIndex(
+ dataInterval,
+ new ListIndexed<>(clusteredBaseSummary.getGroupDimensionNames()),
+ bitmapFactory,
+ groupColumns,
+ fileMapper,
+ groupMetadata,
+ null
+ )
+ {
+ @Override
+ public Metadata getMetadata()
+ {
+ return groupMetadata;
+ }
+ };
+ }
+
@Override
public void close()
{
@@ -406,12 +532,34 @@ public class PartialQueryableIndex implements
QueryableIndex
Map<String, Supplier<BaseColumnHolder>> parentColumns
)
{
- final String timeColumnName =
projectionSpec.getSchema().getTimeColumnName();
+ return buildColumnSuppliers(
+ projectionSpec.getSchema().getTimeColumnName(),
+ projectionSpec.getNumRows(),
+ projectionSpec.getSchema().getColumnNames(),
+ column ->
Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(),
column),
+ parentColumns
+ );
+ }
+
+ /**
+ * Shared builder for lazy per-column suppliers. Each supplier is memoized
and defers both
+ * {@link SegmentFileMapper#mapFile} and {@link ColumnDescriptor#read} until
the column is actually accessed, so
+ * queries only trigger downloads for the specific columns they use. {@code
fileNameFn} maps a logical column name to
+ * its segment-internal (smoosh) file name in the right bundle namespace.
+ */
+ private Map<String, Supplier<BaseColumnHolder>> buildColumnSuppliers(
+ @Nullable String timeColumnName,
+ int numRows,
+ List<String> columnNames,
+ Function<String, String> fileNameFn,
+ Map<String, Supplier<BaseColumnHolder>> parentColumns
+ )
+ {
final boolean renameTime =
!ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName);
- final Map<String, Supplier<BaseColumnHolder>> projectionColumns = new
LinkedHashMap<>();
+ final Map<String, Supplier<BaseColumnHolder>> columns = new
LinkedHashMap<>();
- for (String column : projectionSpec.getSchema().getColumnNames()) {
- final String smooshName =
Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(),
column);
+ for (String column : columnNames) {
+ final String smooshName = fileNameFn.apply(column);
final ColumnDescriptor columnDescriptor =
metadata.getColumnDescriptors().get(smooshName);
if (columnDescriptor == null) {
continue;
@@ -430,21 +578,21 @@ public class PartialQueryableIndex implements
QueryableIndex
}
});
- projectionColumns.put(internedColumnName, columnSupplier);
+ columns.put(internedColumnName, columnSupplier);
if (column.equals(timeColumnName) && renameTime) {
- projectionColumns.put(ColumnHolder.TIME_COLUMN_NAME,
projectionColumns.get(column));
- projectionColumns.remove(column);
+ columns.put(ColumnHolder.TIME_COLUMN_NAME, columns.get(column));
+ columns.remove(column);
}
}
if (timeColumnName == null) {
- projectionColumns.put(
+ columns.put(
ColumnHolder.TIME_COLUMN_NAME,
-
ConstantTimeColumn.makeConstantTimeSupplier(projectionSpec.getNumRows(),
dataInterval.getStartMillis())
+ ConstantTimeColumn.makeConstantTimeSupplier(numRows,
dataInterval.getStartMillis())
);
}
- return projectionColumns;
+ return columns;
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java
index 31de0f9a3fb..3704c055ff2 100644
---
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java
@@ -29,8 +29,10 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.ClusterGroupQueryPlan;
import org.apache.druid.segment.projections.Projections;
import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.projections.TableClusterGroupSpec;
import org.apache.druid.segment.vector.VectorCursor;
import org.apache.druid.utils.CloseableUtils;
@@ -42,6 +44,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
/**
* Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
@@ -102,51 +105,102 @@ public class PartialQueryableIndexCursorFactory
implements CursorFactory
public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
{
final QueryableProjection<QueryableIndex> matched =
index.getProjection(spec);
+ if (matched == null && index.getClusteredBaseSummary() != null) {
+ return makeClusteredCursorHolderAsync(spec);
+ }
+
+ // Aggregate-projection match, or the plain base table: a single bundle
(the projection's, or __base) with the
+ // query's required columns.
final QueryableIndex rowSelector = matched != null ?
matched.getRowSelector() : index;
- final Set<String> requiredColumns = requiredColumns(rowSelector, matched,
spec);
final String bundleName = matched != null ? matched.getName() :
Projections.BASE_TABLE_PROJECTION_NAME;
+ final DownloadBundle bundle = new DownloadBundle(bundleName, rowSelector,
requiredColumns(rowSelector, matched, spec));
+ return buildAsyncCursorHolder(List.of(bundle), () ->
delegate.makeCursorHolderForProjection(spec, matched));
+ }
- // Mount the cache-layer bundle before submitting downloads. Released on
the cancel/failure paths (requestRelease)
- // or handed to the produced cursor holder on success (releaser), exactly
once, and never while an in-flight column
- // download is mid-mapFile(). See BundleHoldRelease.
- final BundleHoldRelease holdRelease = new
BundleHoldRelease(bundleAcquirer.acquire(bundleName));
+ /**
+ * Async cursor holder for a clustered base table. Cluster-group resolution
is metadata-only
+ * ({@link Projections#planClusterGroupQuery}): only the groups whose
clustering tuples survive the query's filters
+ * are downloaded, one bundle (and its required columns) per surviving
group. Once every group is resident, the
+ * holder is built via the delegate's clustered path.
+ */
+ private AsyncCursorHolder makeClusteredCursorHolderAsync(CursorBuildSpec
spec)
+ {
+ final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
+ new ArrayList<>(index.getClusterGroupSchemas()),
+ spec
+ );
+ final List<TableClusterGroupSpec> surviving = plan.survivingGroups();
+ if (surviving.isEmpty()) {
+ // Filter rules out every group: no bundle to acquire, nothing to
download.
+ return AsyncCursorHolder.completed(EmptyCursorHolder.INSTANCE);
+ }
+ final List<DownloadBundle> bundles = new ArrayList<>(surviving.size());
+ for (TableClusterGroupSpec group : surviving) {
+ final QueryableIndex groupIndex =
index.getClusterGroupQueryableIndex(group);
+ final CursorBuildSpec groupSpec = plan.rebuildCursorBuildSpec(spec,
group);
+ bundles.add(
+ new DownloadBundle(
+
Projections.getClusterGroupBundleName(group.getClusteringValueIds()),
+ groupIndex,
+ requiredColumns(groupIndex, null, groupSpec)
+ )
+ );
+ }
+ // Reuse the plan we already computed: delegate.makeClusteredCursorHolder
skips a second planClusterGroupQuery and
+ // builds over the now-resident surviving groups (their columns memoized
via getClusterGroupQueryableIndex).
+ return buildAsyncCursorHolder(bundles, () ->
delegate.makeClusteredCursorHolder(spec, plan));
+ }
+
+ /**
+ * Generalized async-holder build over one or more bundles. Each {@link
DownloadBundle} mounts its cache-layer bundle
+ * (its own {@link BundleHoldRelease}, since eviction is per-bundle) and
submits a download per required column; the
+ * holder becomes ready once every column of every bundle is downloaded, at
which point {@code innerBuilder} builds the
+ * (now fully-resident) cursor holder and the produced holder takes
ownership of {@code inner} plus every bundle hold.
+ */
+ private AsyncCursorHolder buildAsyncCursorHolder(List<DownloadBundle>
bundles, Supplier<CursorHolder> innerBuilder)
+ {
+ final List<BundleHoldRelease> holdReleases = new
ArrayList<>(bundles.size());
try {
- // submit one materialization task per column so a multi-threaded
download executor can fan them out
- final List<AsyncResource<String>> columnDownloads = new
ArrayList<>(requiredColumns.size());
- for (String column : requiredColumns) {
- columnDownloads.add(submitColumnDownload(rowSelector, column,
holdRelease));
+ final List<AsyncResource<String>> columnDownloads = new ArrayList<>();
+ for (DownloadBundle bundle : bundles) {
+ final BundleHoldRelease holdRelease = new
BundleHoldRelease(bundleAcquirer.acquire(bundle.bundleName()));
+ holdReleases.add(holdRelease);
+ // submit one materialization task per column so a multi-threaded
download executor can fan them out
+ for (String column : bundle.requiredColumns()) {
+ columnDownloads.add(submitColumnDownload(bundle.rowSelector(),
column, holdRelease));
+ }
}
final AsyncResource<List<String>> downloaded =
AsyncResources.collect(columnDownloads);
// Canceler runs if the awaiter closes this holder before it's ready
(e.g. query cancel/timeout). Close the
// collected resource to cancel every column download that hasn't begun
its deep-storage read yet (queued tasks
// are skipped; tasks parked on the download executor's permit are
interrupted out of the interruptible wait
- // before doing any I/O), then request the hold release through the
handshake. Once the holder is produced and
- // handed to set(), ownership transfers to the awaiter, which drains it
via close() (cancel) or release()
- // (success); the once-guard keeps the hold release safe across all of
these paths.
+ // before doing any I/O), then request the hold release on every bundle
through the handshake. Once the holder is
+ // produced and handed to set(), ownership transfers to the awaiter,
which drains it via close() (cancel) or
+ // release() (success); each bundle's once-guard keeps its hold release
safe across all of these paths.
final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(() -> {
CloseableUtils.closeAndSuppressExceptions(downloaded, ignored -> {});
- holdRelease.requestRelease();
+ holdReleases.forEach(BundleHoldRelease::requestRelease);
});
downloaded.addReadyCallback(() -> {
final CursorHolder holder;
try {
downloaded.get(); // surfaces any column-download failure (or
cancellation) as the cause
- final CursorHolder inner =
delegate.makeCursorHolderForProjection(spec, matched);
- // The produced holder takes ownership of both inner and the bundle
hold; from here, closing it releases both
- holder = wrapWithBundleRelease(inner, holdRelease.releaser());
+ final CursorHolder inner = innerBuilder.get();
+ // The produced holder takes ownership of inner and every bundle
hold; closing it releases all of them.
+ holder = wrapWithBundleRelease(inner, releasers(holdReleases));
}
catch (Throwable t) {
// A column download failed or was canceled, this branch can fire
while a sibling download is still
- // mid-mapFile(), so the hold release must go through the handshake
rather than dropping it here directly.
- holdRelease.requestRelease();
+ // mid-mapFile(), so each hold release must go through the handshake
rather than being dropped here directly.
+ holdReleases.forEach(BundleHoldRelease::requestRelease);
asyncHolder.setException(t);
return;
}
if (!asyncHolder.set(holder)) {
// wrapper was closed (awaiter canceled) while we were producing the
holder; close it ourselves so the
- // holder, its inner, and the bundle hold don't leak.
+ // holder, its inner, and the bundle holds don't leak.
holder.close();
}
});
@@ -154,9 +208,9 @@ public class PartialQueryableIndexCursorFactory implements
CursorFactory
}
catch (Throwable t) {
// Failure while submitting downloads / wiring up the holder (e.g.
submitDownload shutdown rejection). A column
- // download submitted before the failure may already be in flight, so
release the bundle hold through the
+ // download submitted before the failure may already be in flight, so
release every bundle hold through the
// handshake (requestRelease defers to the last in-flight body) rather
than dropping it directly mid-mapFile().
- throw CloseableUtils.closeAndWrapInCatch(t, holdRelease::requestRelease);
+ throw CloseableUtils.closeAndWrapInCatch(t, () ->
holdReleases.forEach(BundleHoldRelease::requestRelease));
}
}
@@ -285,6 +339,25 @@ public class PartialQueryableIndexCursorFactory implements
CursorFactory
};
}
+ /**
+ * Compose every bundle's success-path {@link BundleHoldRelease#releaser}
into one {@link Closeable} that the produced
+ * cursor holder owns: closing the holder releases all the bundle holds
together.
+ */
+ private static Closeable releasers(List<BundleHoldRelease> holdReleases)
+ {
+ final Closer closer = Closer.create();
+ holdReleases.forEach(holdRelease ->
closer.register(holdRelease.releaser()));
+ return closer;
+ }
+
+ /**
+ * One bundle's worth of async download work: the cache-layer {@code
bundleName} to mount, the {@link QueryableIndex}
+ * row selector whose {@code getColumnHolder} triggers the per-column
downloads, and the columns to pre-fetch.
+ */
+ private record DownloadBundle(String bundleName, QueryableIndex rowSelector,
Set<String> requiredColumns)
+ {
+ }
+
/**
* Owns a bundle hold's release for one {@link #makeCursorHolderAsync}
build, guarding the file mapper's
* evict-vs-mapFile contract: releasing the hold makes the bundle entry
eligible for eviction, which unmaps the
diff --git
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
index 1537454e9fc..4acd2d4e28a 100644
---
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
@@ -163,11 +163,19 @@ public class QueryableIndexCursorFactory implements
CursorFactory
private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec)
{
- final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
- new ArrayList<>(index.getClusterGroupSchemas()),
- spec
+ return makeClusteredCursorHolder(
+ spec,
+ Projections.planClusterGroupQuery(new
ArrayList<>(index.getClusterGroupSchemas()), spec)
);
+ }
+ /**
+ * Build a clustered-base-table cursor holder from an already-computed
{@link ClusterGroupQueryPlan}. Exposed so the
+ * partial (on-demand) cursor factory can plan the cluster groups once — to
decide which group bundles to download —
+ * and reuse the same plan to build the holder, rather than re-running
{@link Projections#planClusterGroupQuery}.
+ */
+ public CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec,
ClusterGroupQueryPlan plan)
+ {
if (plan.survivingGroups().isEmpty()) {
return EmptyCursorHolder.INSTANCE;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index 8f03f7d0685..40980d000bb 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -568,6 +568,16 @@ public class Projections
}
public static String getClusterGroupSegmentInternalFilePrefix(List<Integer>
clusteringValueIds)
+ {
+ return getClusterGroupBundleName(clusteringValueIds) + '/';
+ }
+
+ /**
+ * Bundle name for a cluster group's containers in the V10 file, matching
the tag {@code IndexMergerV10} applies at
+ * write time: {@code __base$<id0>_<id1>...<idK>}. This is the cluster
group's canonical identity; the per-group file
+ * prefix ({@link #getClusterGroupSegmentInternalFilePrefix}) is just this
name plus a trailing {@code '/'} separator.
+ */
+ public static String getClusterGroupBundleName(List<Integer>
clusteringValueIds)
{
if (clusteringValueIds == null || clusteringValueIds.isEmpty()) {
throw DruidException.defensive("clusteringValueIds must not be null or
empty");
@@ -579,7 +589,6 @@ public class Projections
}
sb.append(clusteringValueIds.get(i));
}
- sb.append('/');
return sb.toString();
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredProjectionTest.java
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredProjectionTest.java
new file mode 100644
index 00000000000..ea30fd4e6b0
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredProjectionTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.projections.QueryableProjection;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Partial (on-demand) download coverage for a segment that is <em>both</em>
clustered <em>and</em> carries an aggregate
+ * projection. The partial cursor factory matches the aggregate projection
first and, if nothing matches, dispatches to
+ * the clustered groups, so this verifies both paths download only their own
bundle(s): a projection-matching query
+ * fetches the {@code proj} bundle and none of the {@code __base$<ids>} group
bundles, while a non-matching query
+ * fetches only the surviving group bundle and not {@code proj}. With no
shared columns there is no {@code __base}
+ * bundle, so the projection is self-contained.
+ */
+class PartialQueryableIndexCursorFactoryClusteredProjectionTest extends
PartialQueryableIndexCursorFactoryTestBase
+{
+ private static final long T0 = DateTimes.of("2025-01-01").getMillis();
+ private static final String PROJECTION_BUNDLE = "proj";
+ // tenants sort to dictionary ids acme=0, globex=1 → group bundles __base$0
(2 rows) and __base$1 (1 row)
+ private static final String ACME_BUNDLE = "__base$0";
+ private static final String GLOBEX_BUNDLE = "__base$1";
+
+ @TempDir
+ static File sharedTempDir;
+
+ private static File segmentDir;
+
+ @BeforeAll
+ static void buildSegment()
+ {
+ final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+ ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(
+ new StringDimensionSchema("tenant"),
+ new StringDimensionSchema("region"),
+ new LongDimensionSchema("x"),
+ new LongDimensionSchema("__time")
+ )
+ .clusteringColumns("tenant")
+ .build();
+ final AggregateProjectionSpec projectionSpec =
+ AggregateProjectionSpec.builder(PROJECTION_BUNDLE)
+ .groupingColumns(new
StringDimensionSchema("tenant"))
+ .aggregators(
+ new CountAggregatorFactory("cnt"),
+ new LongSumAggregatorFactory("sum_x", "x")
+ )
+ .build();
+ final IncrementalIndexSchema schema =
+ IncrementalIndexSchema.builder()
+ .withMinTimestamp(T0)
+ .withTimestampSpec(new TimestampSpec("ts",
"millis", null))
+ .withQueryGranularity(Granularities.NONE)
+
.withDimensionsSpec(clusterSpec.getDimensionsSpec())
+ .withRollup(false)
+ .withClusterSpec(clusterSpec)
+ .withProjections(List.of(projectionSpec))
+ .build();
+ final File tmpDir = new File(sharedTempDir, "build_" +
ThreadLocalRandom.current().nextInt());
+ segmentDir = IndexBuilder.create()
+ .useV10()
+ .tmpDir(tmpDir)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(schema)
+
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+ .rows(List.of(
+ row(T0 + 2, "globex", "eu-west-1", 5),
+ row(T0, "acme", "us-east-1", 10),
+ row(T0 + 1, "acme", "us-west-2", 20)
+ ))
+ .buildMMappedIndexFile();
+ }
+
+ @Test
+ void testProjectionMatchDownloadsOnlyProjectionBundle() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "combo_projection")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+ // group-by tenant + sum(x): matches the aggregate projection, so the
clustered path is never consulted.
+ final CursorBuildSpec aggSpec = CursorBuildSpec.builder()
+
.setGroupingColumns(List.of("tenant"))
+
.setAggregators(List.of(new LongSumAggregatorFactory("sum_x", "x")))
+
.setPhysicalColumns(Set.of("tenant", "x"))
+ .build();
+
+ // sanity: the index is both clustered and projection-bearing, and this
spec resolves to the projection
+ Assertions.assertNotNull(opened.index().getClusteredBaseSummary(),
"segment must be clustered");
+ final QueryableProjection<QueryableIndex> matched =
opened.index().getProjection(aggSpec);
+ Assertions.assertNotNull(matched, "spec must match the aggregate
projection");
+ Assertions.assertEquals(PROJECTION_BUNDLE, matched.getName());
+
+ try (AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(aggSpec);
+ CursorHolder holder = asyncHolder.release()) {
+ Assertions.assertNotNull(holder.asCursor(), "projection-matched cursor
must build");
+
+ final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+ // The projection bundle's columns were materialized.
+ Assertions.assertTrue(
+ downloaded.stream().anyMatch(f -> f.startsWith(PROJECTION_BUNDLE +
"/")),
+ "projection bundle must be downloaded; got: " + downloaded
+ );
+ // None of the clustered group bundles were touched: the projection
match short-circuits clustered dispatch.
+ Assertions.assertTrue(
+ downloaded.stream().noneMatch(f -> f.startsWith(ACME_BUNDLE + "/")
|| f.startsWith(GLOBEX_BUNDLE + "/")),
+ "no cluster group bundle should be downloaded for a
projection-matched query; got: " + downloaded
+ );
+ }
+ }
+ }
+
+ @Test
+ void testNonMatchingQueryDispatchesToClusteredGroups() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "combo_clustered")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+ // a filter on the clustering column with no grouping/aggregators does
not match the aggregate projection, so the
+ // clustered path runs and prunes to the surviving group only.
+ final CursorBuildSpec spec = CursorBuildSpec.builder()
+ .setFilter(new
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+ .build();
+ Assertions.assertNull(opened.index().getProjection(spec), "filter-only
spec must not match the projection");
+
+ try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+ CursorHolder holder = asyncHolder.release()) {
+ Assertions.assertEquals(List.of("us-east-1", "us-west-2"),
scanRegion(holder));
+
+ final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+ // Only the surviving (acme) group bundle materialized.
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"),
"got: " + downloaded);
+ Assertions.assertTrue(
+ downloaded.stream().noneMatch(f -> f.startsWith(GLOBEX_BUNDLE +
"/")),
+ "pruned globex group must not download; got: " + downloaded
+ );
+ // The projection bundle is irrelevant to this query and must not be
downloaded.
+ Assertions.assertTrue(
+ downloaded.stream().noneMatch(f -> f.startsWith(PROJECTION_BUNDLE
+ "/")),
+ "projection bundle must not download for a clustered-dispatch
query; got: " + downloaded
+ );
+ }
+ }
+ }
+
+ private PartialQueryableIndexCursorFactory factory(PartialQueryableIndex
index)
+ {
+ return new PartialQueryableIndexCursorFactory(
+ index,
+ QueryableIndexTimeBoundaryInspector.create(index),
+ noOpAcquirer(directExec())
+ );
+ }
+
+ private static List<String> scanRegion(CursorHolder holder)
+ {
+ final Cursor cursor = holder.asCursor();
+ final List<String> out = new ArrayList<>();
+ if (cursor == null) {
+ return out;
+ }
+ final DimensionSelector regionSel =
+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("region"));
+ while (!cursor.isDone()) {
+ out.add(regionSel.getRow().size() == 0 ? null :
regionSel.lookupName(regionSel.getRow().get(0)));
+ cursor.advance();
+ }
+ return out;
+ }
+
+ private static InputRow row(long ts, String tenant, String region, long x)
+ {
+ final Map<String, Object> event = new HashMap<>();
+ event.put("ts", ts);
+ event.put("tenant", tenant);
+ event.put("region", region);
+ event.put("x", x);
+ return new MapBasedInputRow(ts, List.of("tenant", "region", "x"), event);
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredTest.java
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredTest.java
new file mode 100644
index 00000000000..f0c2693bb9b
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Partial (on-demand) download coverage for a clustered base-table V10
segment. Cluster groups are resolved from
+ * metadata only ({@code planClusterGroupQuery}); the async cursor factory
downloads one bundle ({@code __base$<ids>})
+ * per surviving group.
+ */
+class PartialQueryableIndexCursorFactoryClusteredTest extends
PartialQueryableIndexCursorFactoryTestBase
+{
+ private static final long T0 = DateTimes.of("2025-01-01").getMillis();
+
+ // tenants sort to dictionary ids acme=0, globex=1 → bundles __base$0 (2
rows) and __base$1 (1 row)
+ private static final String ACME_BUNDLE = "__base$0";
+ private static final String GLOBEX_BUNDLE = "__base$1";
+
+ @TempDir
+ static File sharedTempDir;
+
+ private static File segmentDir;
+
+ @BeforeAll
+ static void buildSegment()
+ {
+ final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+ ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(
+ new StringDimensionSchema("tenant"),
+ new StringDimensionSchema("region"),
+ new LongDimensionSchema("__time")
+ )
+ .clusteringColumns("tenant")
+ .build();
+ final IncrementalIndexSchema schema =
+ IncrementalIndexSchema.builder()
+ .withMinTimestamp(T0)
+ .withTimestampSpec(new TimestampSpec("ts",
"millis", null))
+ .withQueryGranularity(Granularities.NONE)
+
.withDimensionsSpec(clusterSpec.getDimensionsSpec())
+ .withRollup(false)
+ .withClusterSpec(clusterSpec)
+ .build();
+ final File tmpDir = new File(sharedTempDir, "build_" +
ThreadLocalRandom.current().nextInt());
+ segmentDir = IndexBuilder.create()
+ .useV10()
+ .tmpDir(tmpDir)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(schema)
+
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+ // ingest out of clustering order; the writer
sorts groups by clustering value
+ .rows(List.of(
+ row(T0 + 2, "globex", "eu-west-1"),
+ row(T0, "acme", "us-east-1"),
+ row(T0 + 1, "acme", "us-west-2")
+ ))
+ .buildMMappedIndexFile();
+ }
+
+ @Test
+ void testFilterOnClusteringColumnDownloadsOnlySurvivingGroup() throws
IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "filter_acme")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+ final CursorBuildSpec spec = CursorBuildSpec.builder()
+ .setFilter(new
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+ .build();
+
+ try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+ CursorHolder holder = asyncHolder.release()) {
+ // Only the acme group survives the clustering-column filter, so only
its rows are returned.
+ Assertions.assertEquals(
+ List.of(List.of("acme", "us-east-1"), List.of("acme",
"us-west-2")),
+ scanTenantRegion(holder)
+ );
+
+ final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+ // The acme group's columns were materialized.
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"),
"got: " + downloaded);
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/__time"),
"got: " + downloaded);
+ // The globex group bundle was pruned by metadata-only resolution and
never touched.
+ Assertions.assertTrue(
+ downloaded.stream().noneMatch(f -> f.startsWith(GLOBEX_BUNDLE +
"/")),
+ "globex group must not be downloaded; got: " + downloaded
+ );
+ }
+ }
+ }
+
+ @Test
+ void testFilterMatchingNoGroupDownloadsNothing() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "filter_none")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+ final CursorBuildSpec spec = CursorBuildSpec.builder()
+ .setFilter(new
EqualityFilter("tenant", ColumnType.STRING, "nobody", null))
+ .build();
+
+ try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+ CursorHolder holder = asyncHolder.release()) {
+ Assertions.assertTrue(scanTenantRegion(holder).isEmpty(), "no group
should survive");
+ Assertions.assertTrue(
+ opened.mapper().getDownloadedFiles().isEmpty(),
+ "no group bundle should be downloaded when nothing survives; got:
" + opened.mapper().getDownloadedFiles()
+ );
+ }
+ }
+ }
+
+ @Test
+ void testFullScanConcatenatesAllGroups() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "full_scan")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+
+ try (AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ CursorHolder holder = asyncHolder.release()) {
+ // Groups are concatenated in clustering (dictionary id) order: acme
then globex, clustering constant injected.
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(holder)
+ );
+
+ final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"),
"got: " + downloaded);
+ Assertions.assertTrue(downloaded.contains(GLOBEX_BUNDLE + "/region"),
"got: " + downloaded);
+ }
+ }
+ }
+
+ @Test
+ void testSyncMakeCursorHolderAfterFullDownload() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "sync_full")) {
+ final PartialSegmentFileMapperV10 mapper = opened.mapper();
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+
+ // Sync path refuses until everything is resident.
+ Assertions.assertThrows(
+ DruidException.class,
+ () -> factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)
+ );
+
+ // Eager-acquire equivalent: materialize every internal file, then the
sync clustered path works via the delegate.
+
mapper.ensureFilesAvailable(mapper.getSegmentFileMetadata().getFiles().keySet());
+ Assertions.assertTrue(mapper.isFullyDownloaded());
+ try (CursorHolder holder =
factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(holder)
+ );
+ }
+ }
+ }
+
+ @Test
+ void testAsyncMultiGroupDefersDownloadUntilExecutorRuns() throws Exception
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final CountDownLatch gate = new CountDownLatch(1);
+ final ExecutorService rawExec =
Execs.singleThreaded("partial-clustered-defer-%d");
+ final ListeningExecutorService gatedExec =
MoreExecutors.listeningDecorator(rawExec);
+ try (IndexAndMapper opened = openIndex(rangeReader, "async_defer")) {
+ final PartialQueryableIndexCursorFactory factory = new
PartialQueryableIndexCursorFactory(
+ opened.index(),
+ QueryableIndexTimeBoundaryInspector.create(opened.index()),
+ noOpAcquirer(gatedExec)
+ );
+ rangeReader.resetCount();
+
+ // Block the download executor so no group download can start yet.
+ @SuppressWarnings("unused")
+ ListenableFuture<?> unused = gatedExec.submit(() -> {
+ try {
+ gate.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // FULL_SCAN survives both groups → both bundles must download before
the holder is ready.
+ final AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ Assertions.assertFalse(asyncHolder.isReady(), "must wait while the
download executor is blocked");
+ Assertions.assertEquals(0, rangeReader.getReadCount(), "no group
download should have started yet");
+
+ final CountDownLatch ready = new CountDownLatch(1);
+ asyncHolder.addReadyCallback(ready::countDown);
+ gate.countDown();
+ Assertions.assertTrue(ready.await(15, TimeUnit.SECONDS), "ready callback
must fire once downloads run");
+ Assertions.assertTrue(asyncHolder.isReady());
+
+ try (CursorHolder holder = asyncHolder.release()) {
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(holder)
+ );
+ }
+ }
+ finally {
+ gatedExec.shutdownNow();
+ rawExec.shutdownNow();
+ }
+ }
+
+ @Test
+ void testMultiGroupSuccessCloseReleasesEveryBundleHold() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final CountingBundleAcquirer acquirer = new
CountingBundleAcquirer(directExec());
+ try (IndexAndMapper opened = openIndex(rangeReader, "release_success")) {
+ final PartialQueryableIndexCursorFactory factory = new
PartialQueryableIndexCursorFactory(
+ opened.index(),
+ QueryableIndexTimeBoundaryInspector.create(opened.index()),
+ acquirer
+ );
+
+ // FULL_SCAN survives both groups → both bundles acquired during the
build; with the direct executor the
+ // downloads run inline so the holder is immediately ready.
+ try (AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN)) {
+ Assertions.assertTrue(asyncHolder.isReady());
+ Assertions.assertEquals(1, acquirer.acquiredCount(ACME_BUNDLE), "acme
group bundle acquired");
+ Assertions.assertEquals(1, acquirer.acquiredCount(GLOBEX_BUNDLE),
"globex group bundle acquired");
+
+ final CursorHolder holder = asyncHolder.release();
+ // The produced holder owns every group's hold; nothing is released
until it closes.
+ Assertions.assertEquals(0, acquirer.releasedCount(ACME_BUNDLE));
+ Assertions.assertEquals(0, acquirer.releasedCount(GLOBEX_BUNDLE));
+
+ holder.close();
+ // Closing the holder fans out the release across all surviving
groups' bundle holds.
+ Assertions.assertEquals(1, acquirer.releasedCount(ACME_BUNDLE), "acme
hold released on holder close");
+ Assertions.assertEquals(1, acquirer.releasedCount(GLOBEX_BUNDLE),
"globex hold released on holder close");
+ }
+ }
+ }
+
+ @Test
+ void testMultiGroupCancelBeforeReadyReleasesEveryBundleHold() throws
Exception
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final CountDownLatch gate = new CountDownLatch(1);
+ final ExecutorService rawExec =
Execs.singleThreaded("partial-clustered-cancel-%d");
+ final ListeningExecutorService gatedExec =
MoreExecutors.listeningDecorator(rawExec);
+ final CountingBundleAcquirer acquirer = new
CountingBundleAcquirer(gatedExec);
+ try (IndexAndMapper opened = openIndex(rangeReader, "release_cancel")) {
+ final PartialQueryableIndexCursorFactory factory = new
PartialQueryableIndexCursorFactory(
+ opened.index(),
+ QueryableIndexTimeBoundaryInspector.create(opened.index()),
+ acquirer
+ );
+
+ // Block the download executor so the per-column download bodies stay
queued and the holder never becomes ready.
+ @SuppressWarnings("unused")
+ ListenableFuture<?> unused = gatedExec.submit(() -> {
+ try {
+ gate.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ final AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ // Both surviving groups' bundles are acquired synchronously during the
build, before any download runs.
+ Assertions.assertEquals(1, acquirer.acquiredCount(ACME_BUNDLE), "acme
group bundle acquired");
+ Assertions.assertEquals(1, acquirer.acquiredCount(GLOBEX_BUNDLE),
"globex group bundle acquired");
+ Assertions.assertFalse(asyncHolder.isReady(), "must wait while the
download executor is blocked");
+ Assertions.assertEquals(0, acquirer.releasedCount(ACME_BUNDLE));
+ Assertions.assertEquals(0, acquirer.releasedCount(GLOBEX_BUNDLE));
+
+ // Cancel before the holder is ready (query cancel/timeout). No download
body has run, so the per-bundle
+ // handshake releases each hold immediately, and the canceler must fan
the release out across every group.
+ asyncHolder.close();
+ Assertions.assertEquals(1, acquirer.releasedCount(ACME_BUNDLE), "acme
hold released on cancel");
+ Assertions.assertEquals(1, acquirer.releasedCount(GLOBEX_BUNDLE),
"globex hold released on cancel");
+ }
+ finally {
+ gate.countDown();
+ gatedExec.shutdownNow();
+ rawExec.shutdownNow();
+ }
+ }
+
+ @Test
+ void testClusteredColumnCapabilitiesAreMetadataOnly() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "capabilities")) {
+ final PartialQueryableIndex index = opened.index();
+ // clustering column: type comes from the clustered summary's typed
signature
+ Assertions.assertEquals(ValueType.STRING,
index.getColumnCapabilities("tenant").getType());
+ // per-group data columns: type comes from the first group's
ColumnDescriptor, never a group sub-index download
+ Assertions.assertEquals(ValueType.STRING,
index.getColumnCapabilities("region").getType());
+ Assertions.assertEquals(ValueType.LONG,
index.getColumnCapabilities("__time").getType());
+ // an unknown column resolves to null without touching the file mapper
+ Assertions.assertNull(index.getColumnCapabilities("nonexistent"));
+
+ Assertions.assertTrue(
+ opened.mapper().getDownloadedFiles().isEmpty(),
+ "column capabilities must be answered from metadata with no
download; got: "
+ + opened.mapper().getDownloadedFiles()
+ );
+ }
+ }
+
+ private PartialQueryableIndexCursorFactory factory(PartialQueryableIndex
index)
+ {
+ return new PartialQueryableIndexCursorFactory(
+ index,
+ QueryableIndexTimeBoundaryInspector.create(index),
+ noOpAcquirer(directExec())
+ );
+ }
+
+ /**
+ * Walk (tenant, region) pairs out of a clustered cursor holder. {@code
tenant} is the clustering column, injected as
+ * a constant by the cluster-group cursor path; {@code region} is the
per-group physical column.
+ */
+ private static List<List<String>> scanTenantRegion(CursorHolder holder)
+ {
+ final Cursor cursor = holder.asCursor();
+ final List<List<String>> out = new ArrayList<>();
+ if (cursor == null) {
+ return out;
+ }
+ final DimensionSelector tenantSel =
+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+ final DimensionSelector regionSel =
+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("region"));
+ while (!cursor.isDone()) {
+ final String tenant = tenantSel.getRow().size() == 0 ? null :
tenantSel.lookupName(tenantSel.getRow().get(0));
+ final String region = regionSel.getRow().size() == 0 ? null :
regionSel.lookupName(regionSel.getRow().get(0));
+ out.add(Arrays.asList(tenant, region));
+ cursor.advance();
+ }
+ return out;
+ }
+
+ private static InputRow row(long ts, String tenant, String region)
+ {
+ final Map<String, Object> event = new HashMap<>();
+ event.put("ts", ts);
+ event.put("tenant", tenant);
+ event.put("region", region);
+ return new MapBasedInputRow(ts, List.of("tenant", "region"), event);
+ }
+
+ /**
+ * A {@link PartialBundleAcquirer} that counts how many times each bundle is
acquired and released, so a test can
+ * assert the cursor factory holds (and ultimately releases) one hold per
surviving cluster group. Downloads are
+ * submitted to the supplied executor (direct for the success path, gated
for the cancel path).
+ */
+ private static final class CountingBundleAcquirer implements
PartialBundleAcquirer
+ {
+ private final ListeningExecutorService downloadExec;
+ private final Map<String, Integer> acquired = new ConcurrentHashMap<>();
+ private final Map<String, Integer> released = new ConcurrentHashMap<>();
+
+ private CountingBundleAcquirer(ListeningExecutorService downloadExec)
+ {
+ this.downloadExec = downloadExec;
+ }
+
+ @Override
+ public Closeable acquire(String bundleName)
+ {
+ acquired.merge(bundleName, 1, Integer::sum);
+ return () -> released.merge(bundleName, 1, Integer::sum);
+ }
+
+ @Override
+ public <T> AsyncResource<T> submitDownload(Callable<T> task)
+ {
+ return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task));
+ }
+
+ private int acquiredCount(String bundleName)
+ {
+ return acquired.getOrDefault(bundleName, 0);
+ }
+
+ private int releasedCount(String bundleName)
+ {
+ return released.getOrDefault(bundleName, 0);
+ }
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java
index 74c1e4d4aa1..1e2ef260f3f 100644
---
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java
@@ -42,7 +42,6 @@ import org.apache.druid.query.Order;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.EqualityFilter;
-import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@@ -52,7 +51,6 @@ import
org.apache.druid.segment.file.PartialSegmentFileMapperV10;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.projections.Projections;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -78,9 +76,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-class PartialQueryableIndexCursorFactoryTest extends
InitializedNullHandlingTest
+class PartialQueryableIndexCursorFactoryTest extends
PartialQueryableIndexCursorFactoryTestBase
{
- private static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT;
private static final DateTime TIME = DateTimes.of("2025-01-01");
private static final String PROJECTION_NAME = "dim1_metric1_sum";
@@ -112,9 +109,6 @@ class PartialQueryableIndexCursorFactoryTest extends
InitializedNullHandlingTest
private static File segmentDir;
private static ListeningExecutorService realExec;
- @TempDir
- File perTestTempDir;
-
@BeforeAll
static void buildSegment()
{
@@ -704,54 +698,4 @@ class PartialQueryableIndexCursorFactoryTest extends
InitializedNullHandlingTest
.rows(ROWS)
.buildMMappedIndexFile();
}
-
- private IndexAndMapper openIndex(CountingRangeReader rangeReader, String
cacheName) throws IOException
- {
- final File cacheDir = new File(perTestTempDir, cacheName);
- FileUtils.mkdirp(cacheDir);
- final PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
- rangeReader,
- TestHelper.makeJsonMapper(),
- cacheDir,
- IndexIO.V10_FILE_NAME,
- Collections.emptyList()
- );
- return new IndexAndMapper(
- new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper,
COLUMN_CONFIG),
- mapper
- );
- }
-
- private static ListeningExecutorService directExec()
- {
- return
MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
- }
-
- private static PartialBundleAcquirer noOpAcquirer(ListeningExecutorService
downloadExec)
- {
- return new PartialBundleAcquirer()
- {
- @Override
- public Closeable acquire(String bundleName)
- {
- return () -> {};
- }
-
- @Override
- public <T> AsyncResource<T> submitDownload(Callable<T> task)
- {
- return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task));
- }
- };
- }
-
- private record IndexAndMapper(PartialQueryableIndex index,
PartialSegmentFileMapperV10 mapper)
- implements AutoCloseable
- {
- @Override
- public void close()
- {
- mapper.close();
- }
- }
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTestBase.java
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTestBase.java
new file mode 100644
index 00000000000..0b8ccf3b11a
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTestBase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+/**
+ * Shared harness for {@link PartialQueryableIndexCursorFactory} tests:
building the segment is left to each subclass
+ * (a plain aggregate-projection segment vs. a clustered base table), but
opening a partial index over it from a
+ * {@link CountingRangeReader} and wiring the on-demand download executor is
identical and lives here.
+ */
+abstract class PartialQueryableIndexCursorFactoryTestBase extends
InitializedNullHandlingTest
+{
+ protected static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT;
+
+ @TempDir
+ protected File perTestTempDir;
+
+ /**
+ * Mount a fresh partial index over the (already built) segment via a {@link
PartialSegmentFileMapperV10} backed by
+ * {@code rangeReader}, into a per-test cache subdirectory named {@code
cacheName}. Only the V10 header is read up
+ * front; no internal column files are downloaded until a cursor requires
them.
+ */
+ protected IndexAndMapper openIndex(CountingRangeReader rangeReader, String
cacheName) throws IOException
+ {
+ final File cacheDir = new File(perTestTempDir, cacheName);
+ FileUtils.mkdirp(cacheDir);
+ final PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ TestHelper.makeJsonMapper(),
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ Collections.emptyList()
+ );
+ return new IndexAndMapper(
+ new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper,
COLUMN_CONFIG),
+ mapper
+ );
+ }
+
+ protected static ListeningExecutorService directExec()
+ {
+ return
MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
+ }
+
+ /**
+ * A {@link PartialBundleAcquirer} whose holds are no-ops, submitting
downloads to {@code downloadExec}. Used by tests
+ * that don't care about the cache-layer hold lifecycle.
+ */
+ protected static PartialBundleAcquirer noOpAcquirer(ListeningExecutorService
downloadExec)
+ {
+ return new PartialBundleAcquirer()
+ {
+ @Override
+ public Closeable acquire(String bundleName)
+ {
+ return () -> {};
+ }
+
+ @Override
+ public <T> AsyncResource<T> submitDownload(Callable<T> task)
+ {
+ return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task));
+ }
+ };
+ }
+
+ protected record IndexAndMapper(PartialQueryableIndex index,
PartialSegmentFileMapperV10 mapper)
+ implements AutoCloseable
+ {
+ @Override
+ public void close()
+ {
+ mapper.close();
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java
b/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java
index 244dd680fd3..e5f8aa55fab 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java
@@ -884,21 +884,21 @@ public class PartialSegmentMetadataCacheEntry implements
SegmentCacheEntry, Resi
}
/**
- * Structural inference of the parent bundles that the given {@code
bundleName} depends on within this segment.
- * Single source of truth for both bootstrap (which post-filters by what's
actually restorable on disk) and the
- * query-time acquire path (which uses the result directly to seed
- * {@link PartialSegmentBundleCacheEntry#forBundle}'s {@code
parentEntryIds}).
+ * Inference of the parent bundles that the given {@code bundleName} depends
on within this segment.
* <p>
- * Today's rule is structural and trivial: any non-base bundle depends on
the base bundle. The base bundle and the
- * {@link SegmentFileBuilder#ROOT_BUNDLE_NAME root bundle} have no parents,
the root bundle owns everything written
- * without an explicit {@code startFileBundle} call (older fileGroup-less
segments, or shared internal metadata) and
- * is structurally a peer of the base. If future writers introduce richer
dependency graphs, the rule will need to
- * grow, likely by reading dependency metadata that the writer records
explicitly rather than by inference here.
+ * The rule is uniform: the base bundle and the {@link
SegmentFileBuilder#ROOT_BUNDLE_NAME root bundle} have no
+ * parents (the root bundle owns everything written without an explicit
{@code startFileBundle} call, for older
+ * fileGroup-less segments, or any future shared internal metadata and is
structurally a peer of the base);
+ * every other bundle depends on the base bundle, but only if this segment
actually carries one.
+ * <p>
+ * If future writers introduce richer dependency graphs, the rule will need
to grow, likely by reading dependency
+ * metadata the writer records explicitly.
*/
public List<PartialSegmentBundleCacheEntryIdentifier>
inferParentBundles(String bundleName)
{
if (Projections.BASE_TABLE_PROJECTION_NAME.equals(bundleName)
- || SegmentFileBuilder.ROOT_BUNDLE_NAME.equals(bundleName)) {
+ || SegmentFileBuilder.ROOT_BUNDLE_NAME.equals(bundleName)
+ || !hasBaseBundle()) {
return List.of();
}
return List.of(
@@ -909,6 +909,17 @@ public class PartialSegmentMetadataCacheEntry implements
SegmentCacheEntry, Resi
);
}
+ /**
+ * Whether this segment carries a {@code __base} bundle (shared base-table
column data). Probed from the mounted file
+ * mapper's actual bundle set; returns false when the entry is not mounted.
+ */
+ private boolean hasBaseBundle()
+ {
+ final PartialSegmentFileMapperV10 mapper = getFileMapper();
+ return mapper != null
+ &&
PartialSegmentBundleCacheEntry.bundleNames(mapper).contains(Projections.BASE_TABLE_PROJECTION_NAME);
+ }
+
/**
* Register a bundle entry as a current dependent of this metadata entry.
Called by
* {@link PartialSegmentBundleCacheEntry} after a successful mount; the drop
path uses {@link #snapshotLinkedBundles}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntryTest.java
b/server/src/test/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntryTest.java
index 620234e562b..f09ca7ba1be 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntryTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntryTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.file.CountingRangeReader;
import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.file.SegmentFileBuilder;
import org.apache.druid.segment.file.SegmentFileBuilderV10;
import org.apache.druid.segment.projections.Projections;
import org.apache.druid.timeline.SegmentId;
@@ -62,6 +63,7 @@ class PartialSegmentMetadataCacheEntryTest
private File segmentFile;
private File cacheDir;
+ private int fixtureSeq;
@BeforeEach
void setup() throws IOException
@@ -393,16 +395,51 @@ class PartialSegmentMetadataCacheEntryTest
}
@Test
- void testInferParentBundlesForAggregateReturnsBase()
+ void testInferParentBundlesForRootReturnsEmpty()
{
final PartialSegmentMetadataCacheEntry entry = newEntry(ESTIMATE);
- final List<PartialSegmentBundleCacheEntryIdentifier> parents =
entry.inferParentBundles("some_aggregate_projection");
- Assertions.assertEquals(1, parents.size());
- Assertions.assertEquals(SEGMENT_ID, parents.getFirst().segmentId());
Assertions.assertEquals(
- Projections.BASE_TABLE_PROJECTION_NAME,
- parents.getFirst().bundleName()
+ List.of(),
+ entry.inferParentBundles(SegmentFileBuilder.ROOT_BUNDLE_NAME)
+ );
+ }
+
+ @Test
+ void testInferParentBundlesDependsOnBaseWhenBaseBundlePresent() throws
IOException
+ {
+ // A segment that carries a __base bundle (the non-clustered
base+projection shape): every non-base/root bundle
+ // depends on it. Asserted uniformly for an aggregate-projection bundle
and for a cluster-group bundle name (the
+ // latter standing in for the future clustered+shared-columns layout,
where groups will share __base).
+ final PartialSegmentMetadataCacheEntry entry = mountedEntryOver(
+ buildSegmentWithBundles(Projections.BASE_TABLE_PROJECTION_NAME,
"some_projection")
);
+ for (String dependent : List.of("some_projection",
Projections.getClusterGroupBundleName(List.of(0, 1)))) {
+ final List<PartialSegmentBundleCacheEntryIdentifier> parents =
entry.inferParentBundles(dependent);
+ Assertions.assertEquals(1, parents.size(), "expected a __base parent for
bundle[" + dependent + "]");
+ Assertions.assertEquals(SEGMENT_ID, parents.getFirst().segmentId());
+ Assertions.assertEquals(Projections.BASE_TABLE_PROJECTION_NAME,
parents.getFirst().bundleName());
+ }
+ }
+
+ @Test
+ void testInferParentBundlesEmptyWhenSegmentHasNoBaseBundle() throws
IOException
+ {
+ // A clustered + aggregate-projection segment with no shared columns has
no __base bundle: the base data lives in
+ // per-group __base$<ids> bundles and the aggregate projection is
self-contained. So neither a cluster group nor
+ // the aggregate projection has a parent to depend on.
(Pre-shared-columns; the old "aggregate always depends on
+ // __base" rule would have wrongly tried to mount a nonexistent __base for
the projection bundle.)
+ final PartialSegmentMetadataCacheEntry entry = mountedEntryOver(
+ buildSegmentWithBundles(
+ Projections.getClusterGroupBundleName(List.of(0)),
+ Projections.getClusterGroupBundleName(List.of(1)),
+ "some_projection"
+ )
+ );
+ Assertions.assertEquals(
+ List.of(),
+
entry.inferParentBundles(Projections.getClusterGroupBundleName(List.of(0)))
+ );
+ Assertions.assertEquals(List.of(),
entry.inferParentBundles("some_projection"));
}
private PartialSegmentMetadataCacheEntry newEntry(long estimate)
@@ -433,4 +470,49 @@ class PartialSegmentMetadataCacheEntryTest
return new File(baseDir, IndexIO.V10_FILE_NAME);
}
+ /**
+ * Build a V10 segment whose containers are tagged with exactly the given
bundle names (one column file per bundle),
+ * so {@link PartialSegmentMetadataCacheEntry#inferParentBundles} can be
exercised against a known bundle set without
+ * a full ingestion. Returns the deep-storage directory containing the V10
file.
+ */
+ private File buildSegmentWithBundles(String... bundleNames) throws
IOException
+ {
+ final int seq = fixtureSeq++;
+ final File baseDir = new File(tempDir, "deep_" + seq);
+ FileUtils.mkdirp(baseDir);
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir, CompressionStrategy.NONE)) {
+ for (int i = 0; i < bundleNames.length; ++i) {
+ builder.startFileBundle(bundleNames[i]);
+ final File tmpFile = new File(tempDir,
StringUtils.format("fixture-%d-%d.bin", seq, i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ builder.add(bundleNames[i] + "/col", tmpFile);
+ }
+ }
+ return baseDir;
+ }
+
+ /**
+ * Reserve and mount a fresh metadata entry over the segment in {@code
deepStorageDir}, into a per-call cache
+ * directory. The mounted entry's file mapper is what {@code
inferParentBundles} probes for the base-bundle existence.
+ */
+ private PartialSegmentMetadataCacheEntry mountedEntryOver(File
deepStorageDir) throws IOException
+ {
+ final File cache = new File(tempDir, "cache_" + (fixtureSeq++));
+ FileUtils.mkdirp(cache);
+ final StorageLocation location = new StorageLocation(cache, ESTIMATE * 4,
null);
+ final PartialSegmentMetadataCacheEntry entry = new
PartialSegmentMetadataCacheEntry(
+ SEGMENT_ID,
+ cache,
+ IndexIO.V10_FILE_NAME,
+ List.of(),
+ new DirectoryBackedRangeReader(deepStorageDir),
+ JSON_MAPPER,
+ null,
+ ESTIMATE
+ );
+ Assertions.assertTrue(location.reserve(entry));
+ entry.mount(location);
+ return entry;
+ }
+
}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java
index d8d37381381..61993321047 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java
@@ -24,15 +24,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.LocalDataStorageDruidModule;
import org.apache.druid.jackson.SegmentizerModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -75,6 +79,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -113,10 +118,17 @@ class SegmentLocalCacheManagerPartialAcquireTest
new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 4L))
);
+ // A second segment that is both clustered and carries an aggregate
projection: no shared columns, so it has
+ // per-group __base$<ids> bundles + a self-contained "proj" bundle, but no
__base bundle.
+ private static final SegmentId CLUSTERED_SEGMENT_ID =
+ SegmentId.of("test_clustered", Intervals.of("2025/2026"), "v1", 0);
+ private static final String CLUSTERED_PROJECTION_BUNDLE = "proj";
+
@TempDir
static File SHARED_TEMP_DIR;
private static File DEEP_STORAGE_DIR;
+ private static File CLUSTERED_DEEP_STORAGE_DIR;
@TempDir
File perTestTempDir;
@@ -156,9 +168,70 @@ class SegmentLocalCacheManagerPartialAcquireTest
.build())
.rows(ROWS)
.buildMMappedIndexFile();
+ CLUSTERED_DEEP_STORAGE_DIR = buildClusteredProjectionSegment();
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
+ /**
+ * Build a clustered base-table segment that also carries an aggregate
projection (group-by {@code tenant} with
+ * {@code sum(x)}). With no shared columns the layout is per-group {@code
__base$<ids>} bundles + a self-contained
+ * {@code proj} bundle and no {@code __base} bundle.
+ */
+ private static File buildClusteredProjectionSegment()
+ {
+ final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+ ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(
+ new StringDimensionSchema("tenant"),
+ new StringDimensionSchema("region"),
+ new LongDimensionSchema("x"),
+ new LongDimensionSchema("__time")
+ )
+ .clusteringColumns("tenant")
+ .build();
+ final AggregateProjectionSpec projectionSpec =
+ AggregateProjectionSpec.builder(CLUSTERED_PROJECTION_BUNDLE)
+ .groupingColumns(new
StringDimensionSchema("tenant"))
+ .aggregators(
+ new CountAggregatorFactory("cnt"),
+ new LongSumAggregatorFactory("sum_x", "x")
+ )
+ .build();
+ final File tmp = new File(SHARED_TEMP_DIR, "build_clustered_" +
ThreadLocalRandom.current().nextInt());
+ return IndexBuilder.create()
+ .useV10()
+ .tmpDir(tmp)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withMinTimestamp(TIME.getMillis())
+ .withTimestampSpec(new
TimestampSpec("ts", "millis", null))
+
.withQueryGranularity(Granularities.NONE)
+
.withDimensionsSpec(clusterSpec.getDimensionsSpec())
+ .withRollup(false)
+ .withClusterSpec(clusterSpec)
+
.withProjections(List.of(projectionSpec))
+ .build()
+ )
+
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+ .rows(List.of(
+ clusteredRow(TIME.getMillis() + 2, "globex",
"eu-west-1", 5),
+ clusteredRow(TIME.getMillis(), "acme", "us-east-1",
10),
+ clusteredRow(TIME.getMillis() + 1, "acme",
"us-west-2", 20)
+ ))
+ .buildMMappedIndexFile();
+ }
+
+ private static InputRow clusteredRow(long ts, String tenant, String region,
long x)
+ {
+ final Map<String, Object> event = new HashMap<>();
+ event.put("ts", ts);
+ event.put("tenant", tenant);
+ event.put("region", region);
+ event.put("x", x);
+ return new MapBasedInputRow(ts, List.of("tenant", "region", "x"), event);
+ }
+
@BeforeEach
void setup() throws IOException
{
@@ -206,10 +279,7 @@ class SegmentLocalCacheManagerPartialAcquireTest
};
// DataSegment with a LocalLoadSpec pointing at the deep storage directory
(unzipped V10 layout).
- partialSegment = DataSegment.builder()
- .dataSource(SEGMENT_ID.getDataSource())
- .interval(SEGMENT_ID.getInterval())
- .version(SEGMENT_ID.getVersion())
+ partialSegment = DataSegment.builder(SEGMENT_ID)
.shardSpec(NoneShardSpec.instance())
.loadSpec(Map.of("type", "local", "path",
DEEP_STORAGE_DIR.getAbsolutePath()))
.size(0)
@@ -405,6 +475,59 @@ class SegmentLocalCacheManagerPartialAcquireTest
);
}
+ @Test
+ void
testPartialAcquireClusteredWithProjectionMountsProjectionBundleWithoutBase()
+ throws ExecutionException, InterruptedException, IOException
+ {
+ final DataSegment clusteredSegment =
+ DataSegment.builder(CLUSTERED_SEGMENT_ID)
+ .shardSpec(NoneShardSpec.instance())
+ .loadSpec(Map.of("type", "local", "path",
CLUSTERED_DEEP_STORAGE_DIR.getAbsolutePath()))
+ .size(0)
+ .build();
+ try (AcquireSegmentAction action =
manager.acquireSegment(clusteredSegment, AcquireMode.PARTIAL)) {
+ final AcquireSegmentResult result = action.getSegmentFuture().get();
+ try (Segment segment =
result.getReferenceProvider().acquireReference().orElseThrow()) {
+ Assertions.assertEquals(CLUSTERED_SEGMENT_ID, segment.getId());
+
+ // group-by tenant + sum(x) matches the aggregate projection. Building
this cursor drives the 'proj' bundle to
+ // mount through the real acquire path. inferParentBundles must return
no parent for it (the clustered segment
+ // has no __base bundle); the old "aggregate always depends on __base"
rule would have tried to mount a
+ // nonexistent __base here and failed.
+ final CursorBuildSpec aggSpec = CursorBuildSpec.builder()
+
.setGroupingColumns(List.of("tenant"))
+
.setAggregators(List.of(new LongSumAggregatorFactory("sum_x", "x")))
+
.setPhysicalColumns(Set.of("tenant", "x"))
+ .build();
+ try (var asyncHolder =
segment.as(CursorFactory.class).makeCursorHolderAsync(aggSpec)) {
+ final CountDownLatch ready = new CountDownLatch(1);
+ asyncHolder.addReadyCallback(ready::countDown);
+ Assertions.assertTrue(ready.await(15, TimeUnit.SECONDS));
+ try (CursorHolder cursorHolder = asyncHolder.release()) {
+ Assertions.assertNotNull(cursorHolder.asCursor(),
"projection-matched cursor must build over the combo segment");
+ }
+ }
+ }
+
+ final StorageLocation loc = manager.getLocations().get(0);
+ // the projection bundle mounted through the real acquire path...
+ Assertions.assertTrue(
+ loc.isWeakReserved(new
PartialSegmentBundleCacheEntryIdentifier(CLUSTERED_SEGMENT_ID,
CLUSTERED_PROJECTION_BUNDLE)),
+ "projection bundle must be registered after the projection-matched
cursor build"
+ );
+ // ...and NO phantom __base bundle was created: a clustered segment with
no shared columns has no base bundle.
+ Assertions.assertFalse(
+ loc.isWeakReserved(
+ new
PartialSegmentBundleCacheEntryIdentifier(CLUSTERED_SEGMENT_ID,
Projections.BASE_TABLE_PROJECTION_NAME)
+ ),
+ "no __base bundle should be mounted for a clustered segment with no
shared columns"
+ );
+ }
+ finally {
+ manager.drop(clusteredSegment);
+ }
+ }
+
@Test
void testAcquireSegmentForcesFullDownloadOnPartialEligible()
throws ExecutionException, InterruptedException, IOException
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]