This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e02b9af07d7 feat: support aggregate projections on clustered segments
(#19599)
e02b9af07d7 is described below
commit e02b9af07d79e8295217cf8f55c5fa168a5d5fac
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Jun 22 12:08:06 2026 -0700
feat: support aggregate projections on clustered segments (#19599)
changes:
* allow `AggregateProjectionSpec` on a clustered base table; remove the
build and merge time guards that rejected it
* persist projections in `IndexMergerV10.makeClusteredIndexFiles` via the
shared `makeProjections(...)`
* fix a bug on a non-clustering dictionary column over the clustered base
table which conflated values (per-group-local dictionary IDs reused across the
`ConcatenatingCursor` causing values to inappropriately group together). Force
value-based grouping by reporting non-clustering columns as
non-dictionary-encoded, on capabilities and on the selector
(cardinality/name-lookup)
* tests: build and query tests for projections for both incremental and
persisted segments, and also added first E2E coverage for clustered segments
(`ClusteredSegmentProjectionQueryTest`, native ingestion +
projection-vs-noProjections queries)
* remove unused vector selectors
---
.../query/ClusteredSegmentProjectionQueryTest.java | 294 +++++++++++++++++++++
.../org/apache/druid/segment/IndexMergerV10.java | 35 ++-
.../incremental/OnheapIncrementalIndex.java | 6 -
.../ClusteringColumnSelectorFactory.java | 29 +-
.../ClusteringVectorColumnSelectorFactory.java | 265 ++-----------------
.../druid/segment/IndexMergerV10ClusteredTest.java | 260 ++++++++++++++++++
.../IncrementalIndexClusteredProjectionTest.java | 211 +++++++++++++++
.../ClusteringVectorColumnSelectorFactoryTest.java | 108 +++++---
8 files changed, 910 insertions(+), 298 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ClusteredSegmentProjectionQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ClusteredSegmentProjectionQueryTest.java
new file mode 100644
index 00000000000..4bc31b452f9
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ClusteredSegmentProjectionQueryTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.SegmentGranularitySpec;
+import org.apache.druid.indexing.common.task.TaskBuilder;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End-to-end coverage for native (index_parallel) ingestion of a CLUSTERED
base-table segment that also carries an
+ * aggregate projection, and for querying it both through the projection and
directly against the clustered base table.
+ * <p>
+ * This exercises:
+ * <ul>
+ * <li>writing a clustered base-table V10 segment via {@code
index_parallel}</li>
+ * <li>an aggregate projection built on top of the clustered base table</li>
+ * <li>the projection being chosen to satisfy a matching aggregation
(asserted via the {@code projection} dimension
+ * on the historical's {@code query/segment/time} metric)</li>
+ * <li>forcing direct base-table access with {@code noProjections=true} and
proving identical results</li>
+ * <li>queries that the projection cannot satisfy (group-by / filter on the
clustering column), which exercise the
+ * per-cluster-group cursor path on the clustered base table</li>
+ * </ul>
+ */
+class ClusteredSegmentProjectionQueryTest extends EmbeddedClusterTestBase
+{
+ private static final String PROJECTION_NAME = "country_delta";
+ private static final String SEGMENT_TIME_METRIC = "query/segment/time";
+
+ private final EmbeddedBroker broker = new EmbeddedBroker();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ private final EmbeddedRouter router = new EmbeddedRouter();
+
+ @Override
+ public EmbeddedDruidCluster createCluster()
+ {
+ historical.setServerMemory(300_000_000);
+
+ broker.setServerMemory(200_000_000)
+ .addProperty("druid.sql.planner.enableSysQueriesTable", "true");
+
+ coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"always");
+
+ overlord.addProperty("druid.manager.segments.useIncrementalCache",
"always")
+ .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+ indexer.setServerMemory(300_000_000)
+ .addProperty("druid.worker.capacity", "2")
+ .addProperty("druid.processing.numThreads", "2")
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .useDefaultTimeoutForLatchableEmitter(60)
+ // Clustered base-table segments can only be written in the V10
segment format.
+ .addCommonProperty("druid.indexer.task.buildV10", "true")
+ .addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
+ .addServer(coordinator)
+ .addServer(overlord)
+ .addServer(indexer)
+ .addServer(historical)
+ .addServer(broker)
+ .addServer(router);
+ }
+
+ @BeforeAll
+ void loadData() throws IOException
+ {
+ dataSource = "clustered-" + IdUtils.getRandomId();
+ ingestClusteredSegmentWithProjection();
+ }
+
+ @Override
+ protected void refreshDatasourceName()
+ {
+ // don't change the datasource name for each run because we set things up
before all tests
+ }
+
+ @Test
+ void testProjectionServesAggregationQuery()
+ {
+ // The aggregation matches the country_delta projection (group by
countryName, sum delta).
+ final String sql = "SELECT \"countryName\", SUM(\"delta\") FROM \"%s\"
GROUP BY 1 ORDER BY 1";
+
+ final LatchableEmitter emitter = historical.latchableEmitter();
+ emitter.flush();
+
+ cluster.callApi().verifySqlQuery(sql, dataSource, "CA,3\nFR,7\nUS,17");
+
+ // When the projection is used, the segment-scan query metrics carry the
projection name as a dimension.
+ emitter.waitForEvent(
+ event -> event.hasMetricName(SEGMENT_TIME_METRIC)
+ .hasDimension("projection", PROJECTION_NAME)
+ );
+ }
+
+ /**
+ * Forces the clustered base table to serve the {@code GROUP BY countryName,
SUM(delta)} aggregation directly
+ * (bypassing the projection via {@code noProjections=true}) and asserts the
results match the projection-served
+ * results exactly.
+ * <p>
+ * Regression test for a clustered read-path bug: {@code countryName} is a
non-clustering column whose
+ * per-cluster-group dictionaries assign different values the same local ID
(group {@code #en}: {@code {US, CA}},
+ * group {@code #fr}: {@code {FR, US}}). A dictionary-id-keyed group-by over
the concatenated per-group cursors (see
+ * {@link org.apache.druid.segment.ConcatenatingCursor}) would conflate
those, returning {@code FR,10 / US,17}. The
+ * fix makes the concatenating cursor report non-clustering columns as
non-dictionary-encoded (see
+ * {@link
org.apache.druid.segment.projections.ClusteringColumnSelectorFactory#getColumnCapabilities}),
forcing
+ * value-based grouping, so the correct {@code CA,3 / FR,7 / US,17} is
returned.
+ */
+ @Test
+ void testNoProjectionsServesFromClusteredBaseTable()
+ {
+ final String sql = "SELECT \"countryName\", SUM(\"delta\") FROM \"" +
dataSource + "\" GROUP BY 1 ORDER BY 1";
+
+ final LatchableEmitter emitter = historical.latchableEmitter();
+ emitter.flush();
+
+ // Force the clustered base table to serve the query directly, bypassing
the projection.
+ final String result = cluster.callApi().onAnyBroker(
+ b -> b.submitSqlQuery(
+ new ClientSqlQuery(
+ sql,
+ "CSV",
+ false,
+ false,
+ false,
+ Map.of(QueryContexts.NO_PROJECTIONS, true),
+ null
+ )
+ )
+ ).trim();
+
+ // Identical results, proving the clustered base table serves them.
+ Assertions.assertEquals("CA,3\nFR,7\nUS,17", result);
+
+ // Confirm a segment scan actually happened, and that no segment scan
reported the projection dimension for this run.
+ emitter.waitForEvent(event -> event.hasMetricName(SEGMENT_TIME_METRIC));
+ for (ServiceMetricEvent event :
emitter.getMetricEvents(SEGMENT_TIME_METRIC)) {
+ Assertions.assertNull(
+ event.getUserDims().get("projection"),
+ "expected no projection dimension when noProjections=true, but
found: " + event.getUserDims()
+ );
+ }
+ }
+
+ @Test
+ void testGroupByClusteringColumnUsesBaseTable()
+ {
+ // Grouping on the clustering column cannot be satisfied by the
country_delta projection, so it dispatches to the
+ // clustered base table and exercises the per-cluster-group cursor path.
+ final String sql = "SELECT \"channel\", SUM(\"delta\") FROM \"%s\" GROUP
BY 1 ORDER BY 1";
+
+ final LatchableEmitter emitter = historical.latchableEmitter();
+ emitter.flush();
+
+ cluster.callApi().verifySqlQuery(sql, dataSource, "#en,18\n#fr,9");
+
+ // The projection cannot serve this query, so no segment scan should
report the projection dimension.
+ emitter.waitForEvent(event -> event.hasMetricName(SEGMENT_TIME_METRIC));
+ for (ServiceMetricEvent event :
emitter.getMetricEvents(SEGMENT_TIME_METRIC)) {
+ Assertions.assertNull(
+ event.getUserDims().get("projection"),
+ "expected no projection dimension for a clustering-column group-by,
but found: " + event.getUserDims()
+ );
+ }
+ }
+
+ @Test
+ void testFilterOnClusteringColumnUsesBaseTable()
+ {
+ // Filtering on the clustering column exercises a single cluster group of
the clustered base table.
+ cluster.callApi().verifySqlQuery(
+ "SELECT SUM(\"delta\") FROM \"%s\" WHERE \"channel\" = '#en'",
+ dataSource,
+ "18"
+ );
+ cluster.callApi().verifySqlQuery(
+ "SELECT \"countryName\", SUM(\"delta\") FROM \"%s\" WHERE \"channel\"
= '#en' GROUP BY 1 ORDER BY 1",
+ dataSource,
+ "CA,3\nUS,15"
+ );
+ }
+
+ /**
+ * Ingests a single clustered base-table segment (clustered by {@code
channel}) that also defines a
+ * {@code country_delta} aggregate projection (group by {@code countryName},
sum {@code delta}) using a native
+ * {@code index_parallel} task.
+ */
+ private void ingestClusteredSegmentWithProjection() throws IOException
+ {
+ final File tmpDir = cluster.getTestFolder().newFolder();
+ final File inputFile = new File(tmpDir, "clustered-input.json");
+ final String inputData =
+
"{\"time\":\"2024-01-01T00:10:00Z\",\"channel\":\"#en\",\"countryName\":\"US\",\"delta\":10}\n"
+ +
"{\"time\":\"2024-01-01T00:20:00Z\",\"channel\":\"#en\",\"countryName\":\"US\",\"delta\":5}\n"
+ +
"{\"time\":\"2024-01-01T00:30:00Z\",\"channel\":\"#en\",\"countryName\":\"CA\",\"delta\":3}\n"
+ +
"{\"time\":\"2024-01-01T00:40:00Z\",\"channel\":\"#fr\",\"countryName\":\"FR\",\"delta\":7}\n"
+ +
"{\"time\":\"2024-01-01T00:50:00Z\",\"channel\":\"#fr\",\"countryName\":\"US\",\"delta\":2}\n";
+ Files.write(inputFile.toPath(),
inputData.getBytes(StandardCharsets.UTF_8));
+
+ final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+ ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(
+ new StringDimensionSchema("channel"), // clustering
prefix
+ new StringDimensionSchema("countryName"),
+ new LongDimensionSchema("delta"),
+ new LongDimensionSchema("__time") // __time present,
non-clustering
+ )
+ .clusteringColumns("channel")
+ .build();
+
+ final AggregateProjectionSpec projection =
+ AggregateProjectionSpec.builder(PROJECTION_NAME)
+ .groupingColumns(new StringDimensionSchema("countryName"))
+ .aggregators(new LongSumAggregatorFactory("sumDelta", "delta"))
+ .build();
+
+ final SegmentGranularitySpec segmentGranularitySpec = new
SegmentGranularitySpec(
+ Granularities.HOUR,
+ List.of(Intervals.of("2024-01-01/2024-01-02"))
+ );
+
+ final String taskId = IdUtils.getRandomId();
+ final ParallelIndexSupervisorTask task = TaskBuilder
+ .ofTypeIndexParallel()
+ .jsonInputFormat()
+ .localInputSourceWithFiles(inputFile)
+ .dataSchema(
+ builder -> builder
+ .withDataSource(dataSource)
+ .withTimestamp(new TimestampSpec("time", "iso", null))
+ .withSegmentGranularity(segmentGranularitySpec)
+ .withBaseTable(clusterSpec)
+ .withProjections(List.of(projection))
+ )
+ .tuningConfig(t -> t.withMaxNumConcurrentSubTasks(1))
+ .withId(taskId);
+
+ cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+ cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
index 86be6566f8f..68eb4acdc5f 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
@@ -137,13 +137,10 @@ public class IndexMergerV10 extends IndexMergerBase
allClustered,
"Cannot merge clustered and non-clustered base table segments
together"
);
- DruidException.conditionalDefensive(
- CollectionUtils.isNullOrEmpty(segmentMetadata.getProjections()),
- "Clustered base table segments do not yet support aggregate
projections"
- );
return makeClusteredIndexFiles(
adapters,
clusterSchemas,
+ segmentMetadata,
outDir,
progress,
mergedMetrics,
@@ -362,6 +359,7 @@ public class IndexMergerV10 extends IndexMergerBase
private File makeClusteredIndexFiles(
final List<IndexableAdapter> adapters,
final List<ClusteredValueGroupsBaseTableSchema> clusterSchemas,
+ final Metadata segmentMetadata,
final File outDir,
final ProgressIndicator progress,
final List<String> mergedMetrics,
@@ -544,9 +542,32 @@ public class IndexMergerV10 extends IndexMergerBase
mergedGroupSpecs
);
- v10Smoosher.addProjections(
- List.of(new ProjectionMetadata(totalRows, mergedSchema, minTime,
maxTime))
- );
+ final List<ProjectionMetadata> projections = new ArrayList<>();
+ projections.add(new ProjectionMetadata(totalRows, mergedSchema, minTime,
maxTime));
+
+ // Aggregate projections are segment-wide pre-aggregated views over the
clustered base table. Unlike the
+ // non-clustered path, a clustered segment has no segment-wide base
dimension mergers to attach to (columns are
+ // written per cluster group), so we pass empty parentMergers and the
projection mergers build their own
+ // dictionaries.
+ if (!CollectionUtils.isNullOrEmpty(segmentMetadata.getProjections())) {
+ final Metadata updatedMetadata = makeProjections(
+ v10Smoosher,
+ segmentMetadata.getProjections(),
+ adapters,
+ indexSpec,
+ segmentWriteOutMedium,
+ progress,
+ outDir,
+ closer,
+ Map.of(),
+ segmentMetadata
+ );
+ for (AggregateProjectionMetadata aggMeta :
updatedMetadata.getProjections()) {
+ projections.add(new ProjectionMetadata(aggMeta.getNumRows(),
aggMeta.getSchema()));
+ }
+ }
+
+ v10Smoosher.addProjections(projections);
progress.progress();
v10Smoosher.close();
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index c2d4885d94b..f9d6ff965c3 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -140,12 +140,6 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
initializeProjections(incrementalIndexSchema);
if (incrementalIndexSchema.getClusterSpec() != null) {
- if (!projections.isEmpty()) {
- throw DruidException.defensive(
- "clustered base table mode does not yet support aggregate
projections — got [%d] projection(s)",
- projections.size()
- );
- }
this.clusteredBaseTable = new OnHeapClusteredBaseTable(
incrementalIndexSchema.getClusterSpec(),
incrementalIndexSchema.getVirtualColumns(),
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
index b83f7c5201c..6e9ab3126a7 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
@@ -29,6 +29,7 @@ import
org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.ConstantExprEvalSelector;
+import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.RowIdSupplier;
@@ -150,7 +151,20 @@ public class ClusteringColumnSelectorFactory implements
ColumnSelectorFactory
{
final int idx = clusteringColumns.indexOf(column);
if (idx < 0) {
- return delegate.getColumnCapabilities(column);
+ // Non-clustering columns are stored per cluster group, each with its
own local dictionary. The
+ // ConcatenatingCursor walks those groups behind a single cursor, so a
column's dictionary IDs are NOT stable
+ // across the whole cursor. We therefore must not advertise dictionary
encoding here: otherwise the group-by
+ // engine keys on the (per-group-local) IDs and conflates distinct
values from different groups. Reporting the
+ // column as non-dictionary-encoded forces value-based grouping, which
is correct across groups.
+ final ColumnCapabilities delegateCapabilities =
delegate.getColumnCapabilities(column);
+ if (delegateCapabilities == null) {
+ return null;
+ }
+ return ColumnCapabilitiesImpl.copyOf(delegateCapabilities)
+ .setDictionaryEncoded(false)
+ .setDictionaryValuesSorted(false)
+ .setDictionaryValuesUnique(false)
+ .setHasBitmapIndexes(false);
}
final ColumnType type = clusteringColumns.getColumnType(idx).orElseThrow();
if (type.is(ValueType.STRING)) {
@@ -477,7 +491,12 @@ public class ClusteringColumnSelectorFactory implements
ColumnSelectorFactory
@Override
public int getValueCardinality()
{
- return currentInner().getValueCardinality();
+ // The dictionary is per cluster group and NOT stable across the
concatenating cursor (the same local id means
+ // different values in different groups). Reporting CARDINALITY_UNKNOWN
forces query engines onto the
+ // value-based (rather than dictionary-id-keyed) path, which is correct
across groups; a dictionary-id-keyed
+ // group-by would otherwise conflate distinct values that share an id.
lookupName() still resolves per-row
+ // against the current group, so value-based grouping reads the right
value.
+ return DimensionDictionarySelector.CARDINALITY_UNKNOWN;
}
@Nullable
@@ -490,14 +509,16 @@ public class ClusteringColumnSelectorFactory implements
ColumnSelectorFactory
@Override
public boolean nameLookupPossibleInAdvance()
{
- return currentInner().nameLookupPossibleInAdvance();
+ // Per-group dictionaries cannot be enumerated in advance across the
concatenating cursor.
+ return false;
}
@Nullable
@Override
public IdLookup idLookup()
{
- return currentInner().idLookup();
+ // No stable id<->name mapping across groups; callers must resolve by
value.
+ return null;
}
@Nullable
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactory.java
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactory.java
index 450bd859815..722e1017e9a 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactory.java
@@ -21,13 +21,11 @@ package org.apache.druid.segment.projections;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.dimension.DimensionSpec;
-import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
-import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.vector.ConstantVectorSelectors;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.ReadableVectorInspector;
@@ -134,24 +132,23 @@ public class ClusteringVectorColumnSelectorFactory
implements VectorColumnSelect
@Override
public SingleValueDimensionVectorSelector
makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
{
- final int idx = clusteringColumns.indexOf(dimensionSpec.getDimension());
- if (idx < 0) {
- return new DelegatingSingleValueDimensionVectorSelector(this,
dimensionSpec);
- }
- return new ClusteringSingleValueDimensionVectorSelector(this, idx,
dimensionSpec);
+ // clusteredValueGroups columns are never dictionary-encoded (see
getColumnCapabilities), so the vector engine
+ // always uses object/value selectors and never requests a dimension
vector selector here. A request would mean a
+ // column was wrongly advertised as dictionary-encoded.
+ throw DruidException.defensive(
+ "clusteredValueGroups columns are not dictionary-encoded; no
single-value dimension vector selector for [%s]",
+ dimensionSpec.getDimension()
+ );
}
@Override
public MultiValueDimensionVectorSelector
makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
{
- final int idx = clusteringColumns.indexOf(dimensionSpec.getDimension());
- if (idx < 0) {
- return new DelegatingMultiValueDimensionVectorSelector(this,
dimensionSpec);
- }
- // Clustering values are single-typed primitives. Multi-value requests on
a clustering column shouldn't happen
- // in practice; throw to surface caller bugs rather than silently
misbehave.
+ // See makeSingleValueDimensionSelector: clusteredValueGroups columns are
never dictionary-encoded, so a
+ // multi-value dimension vector selector is never requested either.
throw DruidException.defensive(
- "multi-value vector selector not supported for clustering column [" +
dimensionSpec.getDimension() + "]"
+ "clusteredValueGroups columns are not dictionary-encoded; no
multi-value dimension vector selector for [%s]",
+ dimensionSpec.getDimension()
);
}
@@ -181,7 +178,21 @@ public class ClusteringVectorColumnSelectorFactory
implements VectorColumnSelect
{
final int idx = clusteringColumns.indexOf(column);
if (idx < 0) {
- return delegate.getColumnCapabilities(column);
+ // Non-clustering columns are stored per cluster group with per-group
local dictionaries that are NOT stable
+ // across the concatenating vector cursor. We must not advertise
dictionary encoding: the vectorized group-by
+ // keys on the selector's (per-group-local) IDs when the column reports
as dictionary-encoded
+ // (GroupByVectorColumnProcessorFactory#useDictionaryEncodedSelector),
conflating distinct values across groups.
+ // Reporting non-dictionary-encoded routes it to the value-building
vector selector, which is correct across
+ // groups.
+ final ColumnCapabilities delegateCapabilities =
delegate.getColumnCapabilities(column);
+ if (delegateCapabilities == null) {
+ return null;
+ }
+ return ColumnCapabilitiesImpl.copyOf(delegateCapabilities)
+ .setDictionaryEncoded(false)
+ .setDictionaryValuesSorted(false)
+ .setDictionaryValuesUnique(false)
+ .setHasBitmapIndexes(false);
}
final ColumnType type = clusteringColumns.getColumnType(idx).orElseThrow();
if (type.is(ValueType.STRING)) {
@@ -190,88 +201,6 @@ public class ClusteringVectorColumnSelectorFactory
implements VectorColumnSelect
return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(type);
}
- private static final class ClusteringSingleValueDimensionVectorSelector
implements SingleValueDimensionVectorSelector
- {
- private final ClusteringVectorColumnSelectorFactory parent;
- private final int idx;
- private final DimensionSpec spec;
- private long cachedGeneration = -1;
- private SingleValueDimensionVectorSelector cachedInner;
-
- private ClusteringSingleValueDimensionVectorSelector(
- ClusteringVectorColumnSelectorFactory parent,
- int idx,
- DimensionSpec spec
- )
- {
- this.parent = parent;
- this.idx = idx;
- this.spec = spec;
- }
-
- private SingleValueDimensionVectorSelector currentInner()
- {
- final long currentGeneration = parent.getGeneration();
- if (cachedGeneration == currentGeneration) {
- return cachedInner;
- }
- final Object raw = parent.currentValue(idx);
- final String stringValue = raw == null ? null : String.valueOf(raw);
- final String afterExtraction =
- spec.getExtractionFn() == null ? stringValue :
spec.getExtractionFn().apply(stringValue);
- cachedInner = ConstantVectorSelectors.singleValueDimensionVectorSelector(
- parent.getReadableVectorInspector(),
- afterExtraction
- );
- cachedGeneration = currentGeneration;
- return cachedInner;
- }
-
- @Override
- public int[] getRowVector()
- {
- return currentInner().getRowVector();
- }
-
- @Override
- public int getValueCardinality()
- {
- return currentInner().getValueCardinality();
- }
-
- @Nullable
- @Override
- public String lookupName(int id)
- {
- return currentInner().lookupName(id);
- }
-
- @Override
- public boolean nameLookupPossibleInAdvance()
- {
- return currentInner().nameLookupPossibleInAdvance();
- }
-
- @Nullable
- @Override
- public IdLookup idLookup()
- {
- return currentInner().idLookup();
- }
-
- @Override
- public int getMaxVectorSize()
- {
- return parent.getMaxVectorSize();
- }
-
- @Override
- public int getCurrentVectorSize()
- {
- return parent.getReadableVectorInspector().getCurrentVectorSize();
- }
- }
-
private static final class ClusteringVectorValueSelector implements
VectorValueSelector
{
private final ClusteringVectorColumnSelectorFactory parent;
@@ -384,148 +313,6 @@ public class ClusteringVectorColumnSelectorFactory
implements VectorColumnSelect
}
}
- private static final class DelegatingSingleValueDimensionVectorSelector
implements SingleValueDimensionVectorSelector
- {
- private final ClusteringVectorColumnSelectorFactory parent;
- private final DimensionSpec spec;
- private long cachedGeneration = -1;
- private SingleValueDimensionVectorSelector cachedInner;
-
- private DelegatingSingleValueDimensionVectorSelector(
- ClusteringVectorColumnSelectorFactory parent,
- DimensionSpec spec
- )
- {
- this.parent = parent;
- this.spec = spec;
- }
-
- private SingleValueDimensionVectorSelector currentInner()
- {
- final long currentGeneration = parent.getGeneration();
- if (cachedGeneration != currentGeneration) {
- cachedInner =
parent.getDelegate().makeSingleValueDimensionSelector(spec);
- cachedGeneration = currentGeneration;
- }
- return cachedInner;
- }
-
- @Override
- public int[] getRowVector()
- {
- return currentInner().getRowVector();
- }
-
- @Override
- public int getValueCardinality()
- {
- return currentInner().getValueCardinality();
- }
-
- @Nullable
- @Override
- public String lookupName(int id)
- {
- return currentInner().lookupName(id);
- }
-
- @Override
- public boolean nameLookupPossibleInAdvance()
- {
- return currentInner().nameLookupPossibleInAdvance();
- }
-
- @Nullable
- @Override
- public IdLookup idLookup()
- {
- return currentInner().idLookup();
- }
-
- @Override
- public int getMaxVectorSize()
- {
- return currentInner().getMaxVectorSize();
- }
-
- @Override
- public int getCurrentVectorSize()
- {
- return currentInner().getCurrentVectorSize();
- }
- }
-
- private static final class DelegatingMultiValueDimensionVectorSelector
implements MultiValueDimensionVectorSelector
- {
- private final ClusteringVectorColumnSelectorFactory parent;
- private final DimensionSpec spec;
- private long cachedGeneration = -1;
- private MultiValueDimensionVectorSelector cachedInner;
-
- private DelegatingMultiValueDimensionVectorSelector(
- ClusteringVectorColumnSelectorFactory parent,
- DimensionSpec spec
- )
- {
- this.parent = parent;
- this.spec = spec;
- }
-
- private MultiValueDimensionVectorSelector currentInner()
- {
- final long currentGeneration = parent.getGeneration();
- if (cachedGeneration != currentGeneration) {
- cachedInner =
parent.getDelegate().makeMultiValueDimensionSelector(spec);
- cachedGeneration = currentGeneration;
- }
- return cachedInner;
- }
-
- @Override
- public IndexedInts[] getRowVector()
- {
- return currentInner().getRowVector();
- }
-
- @Override
- public int getValueCardinality()
- {
- return currentInner().getValueCardinality();
- }
-
- @Nullable
- @Override
- public String lookupName(int id)
- {
- return currentInner().lookupName(id);
- }
-
- @Override
- public boolean nameLookupPossibleInAdvance()
- {
- return currentInner().nameLookupPossibleInAdvance();
- }
-
- @Nullable
- @Override
- public IdLookup idLookup()
- {
- return currentInner().idLookup();
- }
-
- @Override
- public int getMaxVectorSize()
- {
- return currentInner().getMaxVectorSize();
- }
-
- @Override
- public int getCurrentVectorSize()
- {
- return currentInner().getCurrentVectorSize();
- }
- }
-
private static final class DelegatingVectorValueSelector implements
VectorValueSelector
{
private final ClusteringVectorColumnSelectorFactory parent;
diff --git
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV10ClusteredTest.java
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV10ClusteredTest.java
index c0d4fc20e0f..3782b61a60f 100644
---
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV10ClusteredTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV10ClusteredTest.java
@@ -21,16 +21,21 @@ package org.apache.druid.segment;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec.Direction;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import
org.apache.druid.segment.projections.ClusteredValueGroupsBaseTableSchema;
import org.apache.druid.segment.projections.TableClusterGroupSpec;
@@ -46,6 +51,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -133,6 +139,260 @@ class IndexMergerV10ClusteredTest extends
InitializedNullHandlingTest
return out;
}
+ /**
+ * Cluster spec with a numeric column {@code x} (in addition to {@code
tenant}/{@code region}) so an aggregate
+ * projection can sum it. Clustering is still on {@code tenant}.
+ */
+ private static ClusteredValueGroupsBaseTableProjectionSpec
tenantClusterSpecWithX()
+ {
+ return ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(
+ new StringDimensionSchema("tenant"),
+ new StringDimensionSchema("region"),
+ new LongDimensionSchema("x"),
+ new LongDimensionSchema("__time")
+ )
+ .clusteringColumns("tenant")
+ .build();
+ }
+
+ /**
+ * The aggregate projection persisted alongside the clustered base table:
group-by {@code tenant} with
+ * {@code cnt}=count and {@code sum_x}=sum(x).
+ */
+ private static AggregateProjectionSpec tenantProjectionSpec()
+ {
+ return AggregateProjectionSpec.builder("proj")
+ .groupingColumns(new
StringDimensionSchema("tenant"))
+ .aggregators(
+ new CountAggregatorFactory("cnt"),
+ new LongSumAggregatorFactory("sum_x",
"x")
+ )
+ .build();
+ }
+
+ private static IncrementalIndexSchema clusteredSchemaWithProjection(
+ ClusteredValueGroupsBaseTableProjectionSpec spec,
+ AggregateProjectionSpec projectionSpec
+ )
+ {
+ return IncrementalIndexSchema.builder()
+ .withMinTimestamp(T0)
+ .withTimestampSpec(TIMESTAMP_SPEC)
+ .withQueryGranularity(Granularities.NONE)
+ .withDimensionsSpec(spec.getDimensionsSpec())
+ .withRollup(false)
+ .withClusterSpec(spec)
+ .withProjections(List.of(projectionSpec))
+ .build();
+ }
+
+ private static InputRow row(long ts, String tenant, String region, long x)
+ {
+ final Map<String, Object> event = new HashMap<>();
+ event.put("ts", ts);
+ event.put("tenant", tenant);
+ event.put("region", region);
+ event.put("x", x);
+ return new MapBasedInputRow(ts, List.of("tenant", "region", "x"), event);
+ }
+
+ private QueryableIndex buildSegmentWithProjection(String dirName,
List<InputRow> rows)
+ {
+ return IndexBuilder.create()
+ .useV10()
+ .tmpDir(new File(tempDir, dirName))
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+
.schema(clusteredSchemaWithProjection(tenantClusterSpecWithX(),
tenantProjectionSpec()))
+ .rows(rows)
+ .buildMMappedIndex();
+ }
+
+ /**
+ * A group-by on the projection's grouping column + aggregators, granularity
ALL over ETERNITY, built through the
+ * real {@link GroupingEngine#makeCursorBuildSpec} path so the spec matches
what production planning produces.
+ */
+ private static CursorBuildSpec projectionMatchingBuildSpec()
+ {
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .addDimension("tenant")
+ .addAggregator(new CountAggregatorFactory("cnt"))
+ .addAggregator(new LongSumAggregatorFactory("sum_x", "x"))
+ .addOrderByColumn("tenant", Direction.ASCENDING)
+ .build();
+ return GroupingEngine.makeCursorBuildSpec(query, null);
+ }
+
+ /**
+ * Run the projection-matching group-by against a loaded/merged clustered
{@link QueryableIndex} via the real
+ * {@link QueryableIndexCursorFactory}, asserting the projection was
actually selected (the holder is
+ * pre-aggregated), and return tenant -> {cnt, sum_x}.
+ */
+ private static Map<String, long[]> queryProjection(QueryableIndex index)
+ {
+ final CursorBuildSpec buildSpec = projectionMatchingBuildSpec();
+
+ // The loader must surface the persisted aggregate projection on the
clustered segment, and it must be selected for
+ // this spec. If this is null the IndexIO loader isn't exposing the
projection for clustered segments.
+ Assertions.assertNotNull(
+ index.getProjection(buildSpec),
+ "aggregate projection 'proj' was not selected for the group-by build
spec on the loaded clustered segment"
+ );
+
+ final QueryableIndexCursorFactory factory = new
QueryableIndexCursorFactory(
+ index,
+ QueryableIndexTimeBoundaryInspector.create(index)
+ );
+ final Map<String, long[]> byTenant = new LinkedHashMap<>();
+ try (CursorHolder holder = factory.makeCursorHolder(buildSpec)) {
+ // The chosen holder must be the aggregate-projection cursor serving
pre-aggregated rows.
+ Assertions.assertTrue(
+ holder.isPreAggregated(),
+ "expected the projection (pre-aggregated) cursor holder on the
loaded clustered segment"
+ );
+ final Cursor cursor = holder.asCursor();
+ final DimensionSelector tenantSel =
+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+ final ColumnValueSelector<?> cntSel =
cursor.getColumnSelectorFactory().makeColumnValueSelector("cnt");
+ final ColumnValueSelector<?> sumSel =
cursor.getColumnSelectorFactory().makeColumnValueSelector("sum_x");
+ while (!cursor.isDone()) {
+ final String tenant = tenantSel.lookupName(tenantSel.getRow().get(0));
+ byTenant.put(tenant, new long[]{cntSel.getLong(), sumSel.getLong()});
+ cursor.advance();
+ }
+ }
+ return byTenant;
+ }
+
+ @Test
+ void testPersistAndLoadClusteredSegmentWithProjection()
+ {
+ // Ingest out of clustering order; two acme rows (x=10, x=5) and one
globex row (x=7).
+ final QueryableIndex index = buildSegmentWithProjection(
+ "persist-projection",
+ List.of(
+ row(T0 + 2, "globex", "eu-west-1", 7),
+ row(T0, "acme", "us-east-1", 10),
+ row(T0 + 1, "acme", "us-west-2", 5)
+ )
+ );
+
+ // Base clustered table is still written: cluster groups present in
clustering order.
+ final ClusteredValueGroupsBaseTableSchema summary =
index.getClusteredBaseSummary();
+ Assertions.assertNotNull(summary, "clustered base summary must still be
present alongside the projection");
+ Assertions.assertEquals(
+ List.of("acme", "globex"),
+ summary.getClusteringDictionaries().getStringDictionary()
+ );
+ final List<TableClusterGroupSpec> groups = summary.getClusterGroups();
+ Assertions.assertEquals(2, groups.size());
+ Assertions.assertEquals(List.of(0), groups.get(0).getClusteringValueIds());
+ Assertions.assertEquals(2, groups.get(0).getNumRows());
+ Assertions.assertEquals(List.of(1), groups.get(1).getClusteringValueIds());
+ Assertions.assertEquals(1, groups.get(1).getNumRows());
+
+ // The persisted projection must be surfaced by the loader, selected for
the projection query, and aggregate
+ // correctly: acme -> cnt=2,sum_x=15 ; globex -> cnt=1,sum_x=7.
+ final Map<String, long[]> projected = queryProjection(index);
+ Assertions.assertEquals(2, projected.size(), "expected one pre-aggregated
row per tenant");
+ Assertions.assertArrayEquals(new long[]{2L, 15L}, projected.get("acme"));
+ Assertions.assertArrayEquals(new long[]{1L, 7L}, projected.get("globex"));
+
+ // Base cluster-group querying remains correct alongside the projection
(FULL_SCAN walks groups in clustering
+ // order, constants injected).
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(index)
+ );
+ }
+
+ @Test
+ void testMergeClusteredSegmentsWithProjectionCombinesAggregates() throws
Exception
+ {
+ // Segment 1: acme (x=10, x=5), globex (x=7). Segment 2: globex (x=3),
initech (x=4). The merged projection must
+ // combine the globex aggregates across both segments.
+ final QueryableIndex segment1 = buildSegmentWithProjection(
+ "merge-projection-input-1",
+ List.of(
+ row(T0, "acme", "us-east-1", 10),
+ row(T0 + 1, "acme", "us-west-2", 5),
+ row(T0 + 2, "globex", "eu-west-1", 7)
+ )
+ );
+ final QueryableIndex segment2 = buildSegmentWithProjection(
+ "merge-projection-input-2",
+ List.of(
+ row(T0 + 3, "globex", "eu-central-1", 3),
+ row(T0 + 4, "initech", "us-east-1", 4)
+ )
+ );
+
+ final IndexBuilder builderForMerger = IndexBuilder.create().useV10();
+ final IndexIO indexIO = builderForMerger.getIndexIO();
+ final IndexMergerV10 merger = new IndexMergerV10(
+ TestHelper.makeJsonMapper(),
+ indexIO,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance()
+ );
+
+ final File mergedDir = new File(tempDir, "merged-projection");
+ merger.mergeQueryableIndex(
+ List.of(segment1, segment2),
+ false,
+ new AggregatorFactory[0],
+ mergedDir,
+ IndexSpec.getDefault(),
+ OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+ -1
+ );
+ final QueryableIndex mergedIndex = indexIO.loadIndex(mergedDir);
+
+ // Base table still merges correctly: dictionary [acme, globex, initech]
and 3 groups.
+ final ClusteredValueGroupsBaseTableSchema summary =
mergedIndex.getClusteredBaseSummary();
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(
+ List.of("acme", "globex", "initech"),
+ summary.getClusteringDictionaries().getStringDictionary()
+ );
+ final List<TableClusterGroupSpec> groups = summary.getClusterGroups();
+ Assertions.assertEquals(3, groups.size());
+ Assertions.assertEquals(List.of(0), groups.get(0).getClusteringValueIds());
+ Assertions.assertEquals(2, groups.get(0).getNumRows()); // acme: both
rows from segment 1
+ Assertions.assertEquals(List.of(1), groups.get(1).getClusteringValueIds());
+ Assertions.assertEquals(2, groups.get(1).getNumRows()); // globex: one
row from each segment
+ Assertions.assertEquals(List.of(2), groups.get(2).getClusteringValueIds());
+ Assertions.assertEquals(1, groups.get(2).getNumRows()); // initech: from
segment 2
+ Assertions.assertEquals(5, mergedIndex.getNumRows());
+
+ // The merged projection must combine aggregates across segments: acme ->
cnt=2,sum_x=15 ;
+ // globex -> cnt=2,sum_x=10 (7 from seg1 + 3 from seg2) ; initech ->
cnt=1,sum_x=4.
+ final Map<String, long[]> projected = queryProjection(mergedIndex);
+ Assertions.assertEquals(3, projected.size(), "expected one pre-aggregated
row per tenant after merge");
+ Assertions.assertArrayEquals(new long[]{2L, 15L}, projected.get("acme"));
+ Assertions.assertArrayEquals(new long[]{2L, 10L}, projected.get("globex"));
+ Assertions.assertArrayEquals(new long[]{1L, 4L}, projected.get("initech"));
+
+ // Base cluster-group querying remains correct: all 5 base rows in
clustering order (region-sorted within group).
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-central-1"),
+ List.of("globex", "eu-west-1"),
+ List.of("initech", "us-east-1")
+ ),
+ scanTenantRegion(mergedIndex)
+ );
+ }
+
@Test
void testPersistAndLoadClusteredSegment()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexClusteredProjectionTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexClusteredProjectionTest.java
new file mode 100644
index 00000000000..78fa0107d62
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexClusteredProjectionTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec.Direction;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.CursorBuildSpec;
+import org.apache.druid.segment.CursorHolder;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class IncrementalIndexClusteredProjectionTest extends
InitializedNullHandlingTest
+{
+ private static final long T0 =
DateTimes.of("2026-01-01T00:00:00").getMillis();
+ private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts",
"millis", null);
+
+ private static MapBasedInputRow row(long ts, String tenant, String region,
long x)
+ {
+ final Map<String, Object> event = new HashMap<>();
+ event.put("ts", ts);
+ event.put("tenant", tenant);
+ event.put("region", region);
+ event.put("x", x);
+ return new MapBasedInputRow(ts, List.of("tenant", "region", "x"), event);
+ }
+
+ private static OnheapIncrementalIndex clusteredWithProjection()
+ {
+ final ClusteredValueGroupsBaseTableProjectionSpec spec =
ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(
+ new StringDimensionSchema("tenant"),
+ new StringDimensionSchema("region"),
+ new LongDimensionSchema("x"),
+ new LongDimensionSchema("__time")
+ )
+ .clusteringColumns("tenant")
+ .build();
+ final AggregateProjectionSpec projectionSpec =
+ AggregateProjectionSpec.builder("proj")
+ .groupingColumns(new
StringDimensionSchema("tenant"))
+ .aggregators(
+ new CountAggregatorFactory("cnt"),
+ new LongSumAggregatorFactory("sum_x", "x")
+ )
+ .build();
+ final IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
+ .withMinTimestamp(T0)
+ .withTimestampSpec(TIMESTAMP_SPEC)
+ .withQueryGranularity(Granularities.NONE)
+ .withDimensionsSpec(spec.getDimensionsSpec())
+ .withRollup(false)
+ .withClusterSpec(spec)
+ .withProjections(List.of(projectionSpec))
+ .build();
+ final OnheapIncrementalIndex index = (OnheapIncrementalIndex) new
OnheapIncrementalIndex.Builder()
+ .setIndexSchema(schema)
+ .setMaxRowCount(10_000)
+ .build();
+ // Add tenants out of clustering order to prove the plain scan still walks
groups in clustering-sorted order.
+ index.add(row(T0 + 2, "globex", "eu-west-1", 7));
+ index.add(row(T0, "acme", "us-east-1", 10));
+ index.add(row(T0 + 1, "acme", "us-west-2", 5));
+ return index;
+ }
+
+ /**
+ * A group-by on the projection's grouping column + aggregators, granularity
ALL over ETERNITY. Built through the
+ * real {@link GroupingEngine#makeCursorBuildSpec} path so the resulting
spec is what production planning produces.
+ */
+ private static CursorBuildSpec projectionMatchingBuildSpec()
+ {
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .addDimension("tenant")
+ .addAggregator(new CountAggregatorFactory("cnt"))
+ .addAggregator(new LongSumAggregatorFactory("sum_x", "x"))
+ .addOrderByColumn("tenant", Direction.ASCENDING)
+ .build();
+ return GroupingEngine.makeCursorBuildSpec(query, null);
+ }
+
+ @Test
+ void testProjectionMaterializedOnClusteredIndex()
+ {
+ try (OnheapIncrementalIndex index = clusteredWithProjection()) {
+ // Construction did not throw (the OnheapIncrementalIndex guard against
clusterSpec + projections was removed)
+ // and the projection accumulated rows during ingest.
+ Assertions.assertNotNull(index.getProjection("proj"));
+ }
+ }
+
+ @Test
+ void testProjectionSelectedAndAggregatesCorrectlyOnClusteredIndex()
+ {
+ try (OnheapIncrementalIndex index = clusteredWithProjection()) {
+ final CursorBuildSpec buildSpec = projectionMatchingBuildSpec();
+
+ // The projection must actually be selected for this spec on a clustered
index -- not merely present. If this is
+ // null, guard removal alone is insufficient and the query path needs
more work.
+ Assertions.assertNotNull(
+ index.getProjection(buildSpec),
+ "projection 'proj' was not selected for the group-by build spec on
the clustered index"
+ );
+
+ final IncrementalIndexCursorFactory factory = new
IncrementalIndexCursorFactory(index);
+ try (CursorHolder holder = factory.makeCursorHolder(buildSpec)) {
+ // The chosen holder is the aggregate-projection cursor: it serves
pre-aggregated rows.
+ Assertions.assertTrue(holder.isPreAggregated(), "expected the
projection (pre-aggregated) cursor holder");
+
+ final Cursor cursor = holder.asCursor();
+ final DimensionSelector tenantSel =
+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+ final ColumnValueSelector<?> cntSel =
cursor.getColumnSelectorFactory().makeColumnValueSelector("cnt");
+ final ColumnValueSelector<?> sumSel =
cursor.getColumnSelectorFactory().makeColumnValueSelector("sum_x");
+
+ final Map<String, long[]> byTenant = new LinkedHashMap<>();
+ while (!cursor.isDone()) {
+ final String tenant =
tenantSel.lookupName(tenantSel.getRow().get(0));
+ byTenant.put(tenant, new long[]{cntSel.getLong(), sumSel.getLong()});
+ cursor.advance();
+ }
+
+ // acme: 2 rows (x=10, x=5) -> cnt=2, sum_x=15 ; globex: 1 row (x=7)
-> cnt=1, sum_x=7
+ Assertions.assertEquals(2, byTenant.size(), "expected one
pre-aggregated row per tenant");
+ Assertions.assertArrayEquals(new long[]{2L, 15L},
byTenant.get("acme"));
+ Assertions.assertArrayEquals(new long[]{1L, 7L},
byTenant.get("globex"));
+ }
+ }
+ }
+
+ @Test
+ void testBaseClusteredScanStillWalksGroupsInClusteringOrder()
+ {
+ try (OnheapIncrementalIndex index = clusteredWithProjection()) {
+ final IncrementalIndexCursorFactory factory = new
IncrementalIndexCursorFactory(index);
+ // FULL_SCAN does not match the projection, so it falls through to the
clustered base-table path and walks the
+ // cluster groups in clustering-ascending order (acme rows before
globex), exactly as the non-projection
+ // clustered index does.
+ try (CursorHolder holder =
factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+ Assertions.assertFalse(
+ holder.isPreAggregated(),
+ "FULL_SCAN must not pick the projection; it should use the
clustered base-table cursor"
+ );
+ final Cursor cursor = holder.asCursor();
+ final DimensionSelector tenantSel =
+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+ final DimensionSelector regionSel =
+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("region"));
+ final List<List<String>> out = new ArrayList<>();
+ while (!cursor.isDone()) {
+ final String tenant = tenantSel.getRow().size() == 0 ? null :
tenantSel.lookupName(tenantSel.getRow().get(0));
+ final String region = regionSel.getRow().size() == 0 ? null :
regionSel.lookupName(regionSel.getRow().get(0));
+ out.add(Arrays.asList(tenant, region));
+ cursor.advance();
+ }
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ out
+ );
+ }
+ }
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactoryTest.java
b/processing/src/test/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactoryTest.java
index 9a76566b3e1..6e1dfbf8bcb 100644
---
a/processing/src/test/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactoryTest.java
@@ -42,7 +42,7 @@ class ClusteringVectorColumnSelectorFactoryTest
private static final RowSignature CLUSTER_SIGNATURE =
RowSignature.builder().add("tenant", ColumnType.STRING).build();
@Test
- void testStringClusteringSingleValueDimensionSelector()
+ void testSingleValueDimensionSelectorRejected()
{
StubDelegate delegate = new StubDelegate(inspectorFor(8));
ClusteringVectorColumnSelectorFactory f = new
ClusteringVectorColumnSelectorFactory(
@@ -51,10 +51,54 @@ class ClusteringVectorColumnSelectorFactoryTest
new Object[]{"acme"}
);
- SingleValueDimensionVectorSelector sel =
f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("tenant"));
- Assertions.assertNull(delegate.lastSingleValDimRequest, "delegate must not
be hit for clustering column");
- Assertions.assertEquals("acme", sel.lookupName(0));
- Assertions.assertEquals(1, sel.getValueCardinality());
+ // clusteredValueGroups columns are never dictionary-encoded, so a
single-value dimension vector selector is never
+ // requested. Both a clustering column and a non-clustering column must
throw.
+ Assertions.assertThrows(
+ DruidException.class,
+ () ->
f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("tenant"))
+ );
+ Assertions.assertThrows(
+ DruidException.class,
+ () ->
f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("region"))
+ );
+ Assertions.assertNull(delegate.lastSingleValDimRequest, "delegate must not
be hit");
+ }
+
+ @Test
+ void testStringClusteringObjectSelector()
+ {
+ // STRING clustering columns are read through the object selector (they
report non-dictionary-encoded). Verify the
+ // object selector returns the per-group clustering value without
consulting the delegate.
+ StubDelegate delegate = new StubDelegate(inspectorFor(8));
+ ClusteringVectorColumnSelectorFactory f = new
ClusteringVectorColumnSelectorFactory(
+ delegate,
+ CLUSTER_SIGNATURE,
+ new Object[]{"acme"}
+ );
+
+ VectorObjectSelector sel = f.makeObjectSelector("tenant");
+ Assertions.assertNull(delegate.lastObjectRequest, "delegate must not be
hit for clustering column");
+ Object[] vec = sel.getObjectVector();
+ for (Object v : vec) {
+ Assertions.assertEquals("acme", v);
+ }
+ }
+
+ @Test
+ void testNullStringClusteringObjectSelectorIsNull()
+ {
+ StubDelegate delegate = new StubDelegate(inspectorFor(4));
+ ClusteringVectorColumnSelectorFactory f = new
ClusteringVectorColumnSelectorFactory(
+ delegate,
+ CLUSTER_SIGNATURE,
+ new Object[]{null}
+ );
+
+ VectorObjectSelector sel = f.makeObjectSelector("tenant");
+ Object[] vec = sel.getObjectVector();
+ for (Object v : vec) {
+ Assertions.assertNull(v);
+ }
}
@Test
@@ -109,21 +153,6 @@ class ClusteringVectorColumnSelectorFactoryTest
}
}
- @Test
- void testNullStringClusteringValueDimensionSelectorIsNil()
- {
- StubDelegate delegate = new StubDelegate(inspectorFor(4));
- ClusteringVectorColumnSelectorFactory f = new
ClusteringVectorColumnSelectorFactory(
- delegate,
- CLUSTER_SIGNATURE,
- new Object[]{null}
- );
-
- SingleValueDimensionVectorSelector sel =
f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("tenant"));
- // Nil vector selector returns null on lookupName(0) regardless of id.
- Assertions.assertNull(sel.lookupName(0));
- }
-
@Test
void testNonClusteringColumnDelegated()
{
@@ -137,17 +166,16 @@ class ClusteringVectorColumnSelectorFactoryTest
// Non-clustering selectors are wrapped in lazy delegating wrappers; the
delegate is only consulted on first
// use, so a multi-group ConcatenatingVectorCursor can swap delegates
between groups without recreating the
// selector instance.
- SingleValueDimensionVectorSelector svdSel =
- f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("region"));
- Assertions.assertNull(delegate.lastSingleValDimRequest, "delegate must not
be hit until selector is used");
+ VectorObjectSelector regionObjSel = f.makeObjectSelector("region");
+ Assertions.assertNull(delegate.lastObjectRequest, "delegate must not be
hit until selector is used");
try {
- svdSel.getRowVector();
+ regionObjSel.getObjectVector();
}
catch (NullPointerException expected) {
// StubDelegate returns null for the inner selector; the wrapper
forwards to it. We just want to confirm
- // the delegate's makeSingleValueDimensionSelector was invoked.
+ // the delegate's makeObjectSelector was invoked.
}
- Assertions.assertEquals("region", delegate.lastSingleValDimRequest);
+ Assertions.assertEquals("region", delegate.lastObjectRequest);
VectorValueSelector vvSel = f.makeValueSelector("metric");
Assertions.assertNull(delegate.lastValueRequest);
@@ -159,15 +187,6 @@ class ClusteringVectorColumnSelectorFactoryTest
}
Assertions.assertEquals("metric", delegate.lastValueRequest);
- VectorObjectSelector voSel = f.makeObjectSelector("region");
- try {
- voSel.getObjectVector();
- }
- catch (NullPointerException expected) {
- // same
- }
- Assertions.assertEquals("region", delegate.lastObjectRequest);
-
// getColumnCapabilities is NOT lazy; it returns the result directly.
f.getColumnCapabilities("metric");
Assertions.assertEquals("metric", delegate.lastCapsRequest);
@@ -209,26 +228,25 @@ class ClusteringVectorColumnSelectorFactoryTest
new Object[]{"acme"}
);
- SingleValueDimensionVectorSelector sel =
- f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("region"));
+ VectorObjectSelector sel = f.makeObjectSelector("region");
try {
- sel.getRowVector(); // warms the cache against the first delegate
+ sel.getObjectVector(); // warms the cache against the first delegate
}
catch (NullPointerException expected) {
// expected; just confirming the route
}
- Assertions.assertEquals("region", first.lastSingleValDimRequest);
+ Assertions.assertEquals("region", first.lastObjectRequest);
StubDelegate second = new StubDelegate(inspectorFor(4));
f.setDelegate(second, new Object[]{"globex"});
try {
- sel.getRowVector(); // generation bumped → re-fetches against second
delegate
+ sel.getObjectVector(); // generation bumped → re-fetches against
second delegate
}
catch (NullPointerException expected) {
// expected
}
- Assertions.assertEquals("region", second.lastSingleValDimRequest);
+ Assertions.assertEquals("region", second.lastObjectRequest);
}
@Test
@@ -254,17 +272,23 @@ class ClusteringVectorColumnSelectorFactoryTest
}
@Test
- void testMultiValueDimensionSelectorOnClusteringRejected()
+ void testMultiValueDimensionSelectorRejected()
{
ClusteringVectorColumnSelectorFactory f = new
ClusteringVectorColumnSelectorFactory(
new StubDelegate(inspectorFor(4)),
CLUSTER_SIGNATURE,
new Object[]{"acme"}
);
+ // clusteredValueGroups columns are never dictionary-encoded, so a
multi-value dimension vector selector is never
+ // requested. Both a clustering column and a non-clustering column must
throw.
Assertions.assertThrows(
DruidException.class,
() ->
f.makeMultiValueDimensionSelector(DefaultDimensionSpec.of("tenant"))
);
+ Assertions.assertThrows(
+ DruidException.class,
+ () ->
f.makeMultiValueDimensionSelector(DefaultDimensionSpec.of("region"))
+ );
}
private static ReadableVectorInspector inspectorFor(int size)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]