This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2c634fc05b8 feat: clustered segment benchmark and fix (#19623)
2c634fc05b8 is described below
commit 2c634fc05b89d1ca0611eb36e2d722200a4f8bb2
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jun 24 12:25:28 2026 -0700
feat: clustered segment benchmark and fix (#19623)
---
.../druid/benchmark/query/SqlBaseBenchmark.java | 40 +++--
.../benchmark/query/SqlBenchmarkDatasets.java | 166 +++++++++++++++++++-
.../query/SqlClusteredSegmentsBenchmark.java | 127 +++++++++++++++
.../druid/segment/QueryableIndexCursorFactory.java | 6 +-
.../druid/segment/generator/DataGenerator.java | 26 +---
.../segment/generator/GeneratorBasicSchemas.java | 30 ++++
.../ClusteringColumnSelectorFactory.java | 10 +-
...SingleGroupClusteringColumnSelectorFactory.java | 172 +++++++++++++++++++++
...GroupClusteringVectorColumnSelectorFactory.java | 143 +++++++++++++++++
.../QueryableIndexCursorFactoryClusteredTest.java | 59 +++++++
.../druid/segment/UnnestCursorFactoryTest.java | 2 +-
.../druid/segment/generator/SegmentGenerator.java | 163 +++++++++++++++++++
.../ClusteringColumnSelectorFactoryTest.java | 37 +++++
...leGroupClusteringColumnSelectorFactoryTest.java | 143 +++++++++++++++++
14 files changed, 1087 insertions(+), 37 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBaseBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBaseBenchmark.java
index 1cdd572babd..b787244373b 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBaseBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBaseBenchmark.java
@@ -289,17 +289,33 @@ public class SqlBaseBenchmark
);
realtimeSegments.put(dataSegment, index);
} else {
- final QueryableIndex index = segmentGenerator.generate(
- dataSegment,
- schema.getGeneratorSchemaInfo(),
- schema.getDimensionsSpec(),
- schema.getTransformSpec(),
- getIndexSpec(),
- schema.getQueryGranularity(),
- schema.getProjections(),
- rowsPerSegment,
- CalciteTests.getJsonMapper()
- );
+ final QueryableIndex index;
+ if (schema.isUseV10()) {
+ // clustered (and the unclustered comparison) segments use the V10
writer
+ index = segmentGenerator.generateV10(
+ dataSegment,
+ schema.getGeneratorSchemaInfo(),
+ schema.getDimensionsSpec(),
+ schema.getTransformSpec(),
+ getIndexSpec(),
+ schema.getQueryGranularity(),
+ schema.getClusterSpec(),
+ rowsPerSegment,
+ CalciteTests.getJsonMapper()
+ );
+ } else {
+ index = segmentGenerator.generate(
+ dataSegment,
+ schema.getGeneratorSchemaInfo(),
+ schema.getDimensionsSpec(),
+ schema.getTransformSpec(),
+ getIndexSpec(),
+ schema.getQueryGranularity(),
+ schema.getProjections(),
+ rowsPerSegment,
+ CalciteTests.getJsonMapper()
+ );
+ }
log.info(
"Segment metadata: %s",
CalciteTests.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(index.getMetadata())
@@ -373,7 +389,7 @@ public class SqlBaseBenchmark
}
}
- private void checkIncompatibleParameters()
+ protected void checkIncompatibleParameters()
{
// we only support NONE object storage encoding for auto column with mmap
segments
if (ObjectStorageEncoding.NONE.equals(jsonObjectStorageEncoding)) {
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmarkDatasets.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmarkDatasets.java
index aabce633e1c..aac9dca0ac3 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmarkDatasets.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmarkDatasets.java
@@ -22,10 +22,13 @@ package org.apache.druid.benchmark.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -36,6 +39,7 @@ import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAg
import
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.AutoTypeColumnSchema;
+import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.nested.NestedCommonFormatColumnFormatSpec;
@@ -45,6 +49,8 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -62,6 +68,9 @@ public class SqlBenchmarkDatasets
public static String DATASKETCHES = "datasketches";
public static String PROJECTIONS = "projections";
public static String GROUPER = "grouper";
+ // clustering datasets are not registered statically; they are built on
demand by getSchema() from a structured
+ // datasource name (see clusteringDatasource()) so benchmarks can sweep
clustering cardinality and layout.
+ public static String CLUSTERING = "clustering";
// initialize all benchmark dataset schemas to feed the data generators when
running benchmarks, add any additional
// datasets to this initializer as needed and they will be available to any
benchmarks at the table name added here
@@ -319,9 +328,113 @@ public class SqlBenchmarkDatasets
public static BenchmarkSchema getSchema(String dataset)
{
+ if (dataset.startsWith(CLUSTERING + "_")) {
+ return makeClusteringSchema(dataset);
+ }
return DATASET_SCHEMAS.get(dataset);
}
+ /**
+ * Builds the structured datasource (table) name for a clustering benchmark
dataset, encoding the segment layout and
+ * the clustering parameters so {@link #getSchema(String)} can reconstruct
the {@link BenchmarkSchema} on demand. The
+ * encoded params keep the shared benchmark setup + static-map architecture
intact while still allowing the
+ * clustering cardinality and number of clustering columns to be swept as
JMH parameters.
+ * <p>
+ * {@code layout} is one of {@code CLUSTERED}, {@code UNCLUSTERED}, or
{@code TIME_ORDERED} (see
+ * {@link #makeClusteringSchema}).
+ */
+ public static String clusteringDatasource(String layout, int
clusteringCardinality, int numClusteringColumns)
+ {
+ return StringUtils.format(
+ "%s_%s_c%d_k%d",
+ CLUSTERING,
+ layout,
+ clusteringCardinality,
+ numClusteringColumns
+ );
+ }
+
+ /**
+ * Builds a clustering {@link BenchmarkSchema} from a datasource name
produced by
+ * {@link #clusteringDatasource(String, int, int)}. The same generated data
is laid out three ways for comparison:
+ * <ul>
+ * <li>{@code CLUSTERED}: V10 clustered base-table segment, partitioned
into per-clustering-value groups, ordered
+ * clustering columns → {@code __time} → non-clustering
columns (the intended real-world layout);</li>
+ * <li>{@code UNCLUSTERED}: V10 regular segment with the <em>same</em>
clustering-first ordering but no clustered
+ * grouping — isolates the cost/benefit of the clustered
grouping mechanism at a fixed sort order;</li>
+ * <li>{@code TIME_ORDERED}: regular (V9) segment with the
production-style {@code __time}-first ordering
+ * ({@code __time} → clustering columns → non-clustering
columns) — the "what we run today"
+ * baseline the clustered layout is being compared against.</li>
+ * </ul>
+ */
+ private static BenchmarkSchema makeClusteringSchema(String dataset)
+ {
+ // Encoded as clustering_<LAYOUT>_c<card>_k<k>. The layout may itself
contain underscores (e.g. TIME_ORDERED), so
+ // parse the trailing c<card>/k<k> positionally and treat everything
between the prefix and them as the layout.
+ final String[] parts = dataset.split("_");
+ if (parts.length < 4) {
+ throw new IAE("Malformed clustering datasource[%s]", dataset);
+ }
+ final int numClusteringColumns = Integer.parseInt(parts[parts.length -
1].substring(1));
+ final int clusteringCardinality = Integer.parseInt(parts[parts.length -
2].substring(1));
+ final String layout = String.join("_", Arrays.copyOfRange(parts, 1,
parts.length - 2));
+
+ final GeneratorSchemaInfo schemaInfo =
GeneratorBasicSchemas.makeClusteringSchemaInfo(clusteringCardinality);
+ // generated data dimensions in schema order: [clusterKey1, clusterKey2,
dimSecondary, valueLong, valueDouble]
+ final List<DimensionSchema> dataDimensions =
schemaInfo.getDimensionsSpec().getDimensions();
+
+ if ("TIME_ORDERED".equals(layout)) {
+ // Regular time-ordered segment: __time first, then the columns in
schema order. forceSegmentSortByTime defaults
+ // to true, so schemaInfo.getDimensionsSpec() already yields the
__time-first ordering.
+ return new BenchmarkSchema(
+ Collections.singletonList(makeSegment(dataset,
schemaInfo.getDataInterval())),
+ schemaInfo,
+ TransformSpec.NONE,
+ schemaInfo.getDimensionsSpec(),
+ new AggregatorFactory[0],
+ Collections.emptyList(),
+ Granularities.NONE,
+ null,
+ false
+ );
+ }
+
+ final boolean clustered = "CLUSTERED".equals(layout);
+ if (!clustered && !"UNCLUSTERED".equals(layout)) {
+ throw new IAE("Unknown clustering layout[%s] in datasource[%s]", layout,
dataset);
+ }
+
+ // Ordered columns: clustering prefix, then __time, then the remaining
non-clustering columns (clustering columns
+ // -> time -> non-clustering), matching the intended real-world clustered
schema ordering.
+ final List<DimensionSchema> columns = new
ArrayList<>(dataDimensions.size() + 1);
+ columns.addAll(dataDimensions.subList(0, numClusteringColumns));
+ columns.add(new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME));
+ columns.addAll(dataDimensions.subList(numClusteringColumns,
dataDimensions.size()));
+ final List<String> clusteringColumns = dataDimensions.subList(0,
numClusteringColumns)
+ .stream()
+
.map(DimensionSchema::getName)
+
.collect(Collectors.toList());
+
+ final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+ ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(columns)
+
.clusteringColumns(clusteringColumns)
+ .build();
+
+ return new BenchmarkSchema(
+ Collections.singletonList(makeSegment(dataset,
schemaInfo.getDataInterval())),
+ schemaInfo,
+ TransformSpec.NONE,
+ // clustering columns -> __time -> non-clustering, with
forceSegmentSortByTime=false
+ clusterSpec.getDimensionsSpec(),
+ new AggregatorFactory[0],
+ Collections.emptyList(),
+ Granularities.NONE,
+ clustered ? clusterSpec : null,
+ true
+ );
+ }
+
private static DataSegment makeSegment(String datasource, Interval interval)
{
return makeSegment(datasource, interval, 0);
@@ -352,6 +465,17 @@ public class SqlBenchmarkDatasets
private final AggregatorFactory[] aggregators;
private final Granularity queryGranularity;
private final List<AggregateProjectionSpec> projections;
+ /**
+ * Optional clustered base-table spec. When non-null the segment is built
as clustered value groups. Only used by
+ * the clustering benchmark; null for all classic datasets.
+ */
+ @Nullable
+ private final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec;
+ /**
+ * Whether to build this dataset's segments with the V10 writer (required
for clustered segments, and used for the
+ * unclustered comparison layout so the two are an apples-to-apples
comparison). False for all classic datasets.
+ */
+ private final boolean useV10;
public BenchmarkSchema(
List<DataSegment> dataSegments,
@@ -362,6 +486,31 @@ public class SqlBenchmarkDatasets
List<AggregateProjectionSpec> projections,
Granularity queryGranularity
)
+ {
+ this(
+ dataSegments,
+ generatorSchemaInfo,
+ transformSpec,
+ dimensionSpec,
+ aggregators,
+ projections,
+ queryGranularity,
+ null,
+ false
+ );
+ }
+
+ public BenchmarkSchema(
+ List<DataSegment> dataSegments,
+ GeneratorSchemaInfo generatorSchemaInfo,
+ TransformSpec transformSpec,
+ DimensionsSpec dimensionSpec,
+ AggregatorFactory[] aggregators,
+ List<AggregateProjectionSpec> projections,
+ Granularity queryGranularity,
+ @Nullable ClusteredValueGroupsBaseTableProjectionSpec clusterSpec,
+ boolean useV10
+ )
{
this.dataSegments = dataSegments;
this.generatorSchemaInfo = generatorSchemaInfo;
@@ -370,6 +519,8 @@ public class SqlBenchmarkDatasets
this.aggregators = aggregators;
this.queryGranularity = queryGranularity;
this.projections = projections;
+ this.clusterSpec = clusterSpec;
+ this.useV10 = useV10;
}
public List<DataSegment> getDataSegments()
@@ -407,6 +558,17 @@ public class SqlBenchmarkDatasets
return projections;
}
+ @Nullable
+ public ClusteredValueGroupsBaseTableProjectionSpec getClusterSpec()
+ {
+ return clusterSpec;
+ }
+
+ public boolean isUseV10()
+ {
+ return useV10;
+ }
+
public BenchmarkSchema convertDimensions(boolean convertToAuto,
NestedCommonFormatColumnFormatSpec columnFormatSpec)
{
return new SqlBenchmarkDatasets.BenchmarkSchema(
@@ -434,7 +596,9 @@ public class SqlBenchmarkDatasets
)
.build()
).collect(Collectors.toList()),
- queryGranularity
+ queryGranularity,
+ clusterSpec,
+ useV10
);
}
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlClusteredSegmentsBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlClusteredSegmentsBenchmark.java
new file mode 100644
index 00000000000..cd6b2cc4985
--- /dev/null
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlClusteredSegmentsBenchmark.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.query;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.StringUtils;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.List;
+
+/**
+ * Benchmarks queries against clustered base-table segments versus equivalent
non-clustered segments, over the same
+ * generated data. The {@link #segmentLayout} parameter selects one of three
layouts:
+ * <ul>
+ * <li>{@code CLUSTERED}: V10 clustered value-groups segment, ordered
clustering columns → {@code __time} →
+ * non-clustering columns;</li>
+ * <li>{@code UNCLUSTERED}: V10 regular segment with the same
clustering-first ordering but no clustered grouping
+ * (isolates the clustered grouping mechanism from the sort order);</li>
+ * <li>{@code TIME_ORDERED}: regular time-ordered segment ({@code __time}
→ clustering columns →
+ * non-clustering columns)</li>
+ * </ul>
+ * The clustering-column cardinality ({@link #clusteringCardinality}) is
parameterized so the workload can be measured
+ * as the number of value groups grows. {@link #numClusteringColumns} defaults
to a single value (cluster only on
+ * {@code clusterKey1}); set it to {@code "2"} to additionally cluster on the
fixed low-cardinality {@code clusterKey2}.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+public class SqlClusteredSegmentsBenchmark extends SqlBaseQueryBenchmark
+{
+ // %s is the datasource (table) name, filled in from the current parameters
+ private static final List<String> QUERIES = ImmutableList.of(
+ // 0: equality filter on the clustering column (clustered groups can
prune non-matching groups)
+ "SELECT SUM(valueLong) FROM %s WHERE clusterKey1 = '3'",
+ // 1: group by the clustering column.
+ "SELECT clusterKey1, SUM(valueLong) FROM %s GROUP BY 1 ORDER BY 2",
+ // 2: filter on the clustering column + group by a secondary (higher
cardinality) column
+ "SELECT dimSecondary, SUM(valueLong) FROM %s WHERE clusterKey1 = '3'
GROUP BY 1 ORDER BY 2",
+ // 3: no-filter full aggregate (full scan, no pruning) as an overhead
baseline
+ "SELECT SUM(valueLong) FROM %s"
+ );
+
+ @Param({
+ "CLUSTERED",
+ "UNCLUSTERED",
+ "TIME_ORDERED"
+ })
+ private String segmentLayout;
+
+ @Param({
+ "10",
+ "40",
+ "160"
+ })
+ private int clusteringCardinality;
+
+ @Param({
+ "1"
+ })
+ private int numClusteringColumns;
+
+ @Param({
+ "0",
+ "1",
+ "2",
+ "3"
+ })
+ private int query;
+
+ private String datasource()
+ {
+ return SqlBenchmarkDatasets.clusteringDatasource(
+ segmentLayout,
+ clusteringCardinality,
+ numClusteringColumns
+ );
+ }
+
+ @Override
+ public String getQuery()
+ {
+ return StringUtils.format(QUERIES.get(query), datasource());
+ }
+
+ @Override
+ public List<String> getDatasources()
+ {
+ return ImmutableList.of(datasource());
+ }
+
+ @Override
+ protected void checkIncompatibleParameters()
+ {
+ // the clustered-vs-unclustered comparison is about persisted (mmap) V10
segment layout; skip the other
+ // storage / schema / encoding / compression combinations the base
benchmark sweeps so the matrix stays focused.
+ if (storageType != BenchmarkStorage.MMAP
+ || !"explicit".equals(schemaType)
+ || stringEncoding != BenchmarkStringEncodingStrategy.UTF8
+ || !"none".equals(complexCompression)) {
+ System.exit(0);
+ }
+ super.checkIncompatibleParameters();
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
index 4acd2d4e28a..1d752cb6686 100644
---
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
@@ -38,6 +38,8 @@ import
org.apache.druid.segment.projections.ClusteringColumnSelectorFactory;
import
org.apache.druid.segment.projections.ClusteringVectorColumnSelectorFactory;
import org.apache.druid.segment.projections.Projections;
import org.apache.druid.segment.projections.QueryableProjection;
+import
org.apache.druid.segment.projections.SingleGroupClusteringColumnSelectorFactory;
+import
org.apache.druid.segment.projections.SingleGroupClusteringVectorColumnSelectorFactory;
import org.apache.druid.segment.projections.TableClusterGroupSpec;
import org.apache.druid.segment.vector.ConcatenatingVectorCursor;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
@@ -213,7 +215,7 @@ public class QueryableIndexCursorFactory implements
CursorFactory
Offset baseOffset
)
{
- return new ClusteringColumnSelectorFactory(
+ return new SingleGroupClusteringColumnSelectorFactory(
super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset),
valueGroup.getSummary().getClusteringColumns(),
valueGroup.lookupClusteringValues()
@@ -226,7 +228,7 @@ public class QueryableIndexCursorFactory implements
CursorFactory
VectorOffset baseOffset
)
{
- return new ClusteringVectorColumnSelectorFactory(
+ return new SingleGroupClusteringVectorColumnSelectorFactory(
super.makeVectorColumnSelectorFactoryForOffset(columnCache,
baseOffset),
valueGroup.getSummary().getClusteringColumns(),
valueGroup.lookupClusteringValues()
diff --git
a/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
b/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
index 1f0839ca684..9e8e23e524d 100644
---
a/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
+++
b/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
@@ -19,9 +19,7 @@
package org.apache.druid.segment.generator;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.TimestampSpec;
@@ -134,22 +132,14 @@ public class DataGenerator
}
}
- columnGenerators = new ArrayList<>();
- columnGenerators.addAll(
- Lists.transform(
- columnSchemas,
- new Function<>()
- {
- @Override
- public ColumnValueGenerator apply(
- GeneratorColumnSchema input
- )
- {
- return input.makeGenerator(seed);
- }
- }
- )
- );
+ // Seed each column with a distinct seed (offset by its position) rather
than the same seed for every column.
+ // Columns that use the same underlying distribution implementation would
otherwise draw from identically-seeded
+ // RNGs in lockstep and become almost perfectly correlated, which makes
cross-column filters + group-bys collapse
+ // to a single group. Offsetting the seed per column keeps the columns
independent while remaining deterministic.
+ columnGenerators = new ArrayList<>(columnSchemas.size());
+ for (int i = 0; i < columnSchemas.size(); i++) {
+ columnGenerators.add(columnSchemas.get(i).makeGenerator(seed + i));
+ }
return this;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
b/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
index ef2bb096cb9..49afe94c667 100644
---
a/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
+++
b/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
@@ -551,4 +551,34 @@ public class GeneratorBasicSchemas
}
public static final Map<String, GeneratorSchemaInfo> SCHEMA_MAP =
SCHEMA_INFO_BUILDER.build();
+
+ /**
+ * Low-cardinality testbench for clustered segments. Every column is a
single-valued, non-rollup, non-metric
+ * dimension (clustered base tables have no metrics and no rollup). The
primary clustering column {@code clusterKey1}
+ * has its cardinality controlled by {@code clusteringCardinality} so
benchmarks can sweep how clustering behaves as
+ * the number of value groups grows; {@code clusterKey2} is a second, fixed
low-cardinality string available as an
+ * optional second clustering column. {@code dimSecondary} is a
higher-cardinality dimension for group-by workloads,
+ * and {@code valueLong}/{@code valueDouble} are numeric measures to
aggregate at query time.
+ */
+ public static GeneratorSchemaInfo makeClusteringSchemaInfo(int
clusteringCardinality)
+ {
+ final List<GeneratorColumnSchema> columns = ImmutableList.of(
+ // primary clustering key, parameterized cardinality
+ GeneratorColumnSchema.makeDiscreteUniform("clusterKey1",
ValueType.STRING, false, 1, null, 1, clusteringCardinality),
+ // secondary clustering key, fixed low cardinality
+ GeneratorColumnSchema.makeDiscreteUniform("clusterKey2",
ValueType.STRING, false, 1, null, 1, 8),
+ // higher cardinality dimension for group-by
+ GeneratorColumnSchema.makeZipf("dimSecondary", ValueType.STRING,
false, 1, null, 1, 1000, 1.0),
+ // numeric columns (dimensions, since clustered segments have no
metrics)
+ GeneratorColumnSchema.makeZipf("valueLong", ValueType.LONG, false, 1,
null, 0, 10000, 1.0),
+ GeneratorColumnSchema.makeZipf("valueDouble", ValueType.DOUBLE, false,
1, null, 0, 1000, 1.0)
+ );
+
+ return new GeneratorSchemaInfo(
+ columns,
+ Collections.emptyList(),
+ Intervals.of("2000-01-01/P1D"),
+ false
+ );
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
index 6e9ab3126a7..0c5b9eacdf4 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
@@ -280,7 +280,11 @@ public class ClusteringColumnSelectorFactory implements
ColumnSelectorFactory
@Override
public int getValueCardinality()
{
- return currentSelector().getValueCardinality();
+ // The per-group constant selector reports cardinality 1 and always
returns id 0, but that id is NOT stable
+ // across the concatenating cursor: id 0 resolves to a different
clustering value in each group. Forwarding it
+ // would let the group-by engine take the dictionary-id-keyed (array)
path and silently conflate every group
+ // into the single id-0 bucket.
+ return DimensionDictionarySelector.CARDINALITY_UNKNOWN;
}
@Nullable
@@ -293,14 +297,14 @@ public class ClusteringColumnSelectorFactory implements
ColumnSelectorFactory
@Override
public boolean nameLookupPossibleInAdvance()
{
- return currentSelector().nameLookupPossibleInAdvance();
+ return false;
}
@Nullable
@Override
public IdLookup idLookup()
{
- return currentSelector().idLookup();
+ return null;
}
@Nullable
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactory.java
b/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactory.java
new file mode 100644
index 00000000000..aada6b78816
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactory.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.ConstantExprEvalSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.RowIdSupplier;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+
+/**
+ * Single-cluster-group counterpart of {@link
ClusteringColumnSelectorFactory}, used when a query prunes a clustered
+ * {@link org.apache.druid.segment.QueryableIndexCursorFactory}).
Non-clustering columns are delegated directly to
+ * the per-group factory; clustering columns, which are not physically stored
in the per-group data, are surfaced as
+ * constants from the group's clustering tuple.
+ */
+public class SingleGroupClusteringColumnSelectorFactory implements
ColumnSelectorFactory
+{
+ private final ColumnSelectorFactory delegate;
+ private final RowSignature clusteringColumns;
+ private final Object[] clusteringValues;
+
+ public SingleGroupClusteringColumnSelectorFactory(
+ ColumnSelectorFactory delegate,
+ RowSignature clusteringColumns,
+ Object[] clusteringValues
+ )
+ {
+ if (clusteringValues == null || clusteringValues.length !=
clusteringColumns.size()) {
+ throw DruidException.defensive(
+ "clusteringValues length [%s] must match clusteringColumns size
[%s]",
+ clusteringValues == null ? "null" : clusteringValues.length,
+ clusteringColumns.size()
+ );
+ }
+ this.delegate = delegate;
+ this.clusteringColumns = clusteringColumns;
+ this.clusteringValues = clusteringValues;
+ }
+
+ @Override
+ public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ final int idx = clusteringColumns.indexOf(dimensionSpec.getDimension());
+ if (idx < 0) {
+ return delegate.makeDimensionSelector(dimensionSpec);
+ }
+ final Object raw = clusteringValues[idx];
+ return DimensionSelector.constant(Evals.asString(raw),
dimensionSpec.getExtractionFn());
+ }
+
+ @Override
+ public ColumnValueSelector makeColumnValueSelector(String columnName)
+ {
+ final int idx = clusteringColumns.indexOf(columnName);
+ if (idx < 0) {
+ return delegate.makeColumnValueSelector(columnName);
+ }
+ return new
ConstantClusteringValueSelector(clusteringColumns.getColumnType(idx).orElseThrow(),
clusteringValues[idx]);
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ final int idx = clusteringColumns.indexOf(column);
+ if (idx < 0) {
+ return delegate.getColumnCapabilities(column);
+ }
+ final ColumnType type = clusteringColumns.getColumnType(idx).orElseThrow();
+ if (type.is(ValueType.STRING)) {
+ return
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+ .setDictionaryEncoded(true)
+ .setDictionaryValuesSorted(true)
+ .setDictionaryValuesUnique(true);
+ }
+ return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(type);
+ }
+
+ @Nullable
+ @Override
+ public RowIdSupplier getRowIdSupplier()
+ {
+ return delegate.getRowIdSupplier();
+ }
+
+ /**
+ * Constant value selector for a clustering column's group-constant typed
value.
+ */
+ private static final class ConstantClusteringValueSelector implements
ColumnValueSelector<Object>
+ {
+ private final ConstantExprEvalSelector inner;
+
+ private ConstantClusteringValueSelector(ColumnType columnType, Object
value)
+ {
+ this.inner = new
ConstantExprEvalSelector(ExprEval.ofType(ExpressionType.fromColumnTypeStrict(columnType),
value));
+ }
+
+ @Override
+ public double getDouble()
+ {
+ return inner.getDouble();
+ }
+
+ @Override
+ public float getFloat()
+ {
+ return inner.getFloat();
+ }
+
+ @Override
+ public long getLong()
+ {
+ return inner.getLong();
+ }
+
+ @Override
+ public boolean isNull()
+ {
+ return inner.isNull();
+ }
+
+ @Nullable
+ @Override
+ public Object getObject()
+ {
+ return inner.getObject().value();
+ }
+
+ @Override
+ public Class<Object> classOfObject()
+ {
+ return Object.class;
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inner.inspectRuntimeShape(inspector);
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringVectorColumnSelectorFactory.java
b/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringVectorColumnSelectorFactory.java
new file mode 100644
index 00000000000..8fd8dbd41ed
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringVectorColumnSelectorFactory.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.ConstantVectorSelectors;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.ReadableVectorInspector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+
+/**
+ * Vectorized counterpart of {@link
SingleGroupClusteringColumnSelectorFactory}: the single-cluster-group fast path
for
+ * a clustered base-table segment. Non-clustering columns delegate directly to
the per-group
+ * {@link VectorColumnSelectorFactory}; clustering columns are surfaced as
constant vector selectors.
+ */
+public class SingleGroupClusteringVectorColumnSelectorFactory implements
VectorColumnSelectorFactory
+{
+ private final VectorColumnSelectorFactory delegate;
+ private final RowSignature clusteringColumns;
+ private final Object[] clusteringValues;
+
+ public SingleGroupClusteringVectorColumnSelectorFactory(
+ VectorColumnSelectorFactory delegate,
+ RowSignature clusteringColumns,
+ Object[] clusteringValues
+ )
+ {
+ if (clusteringValues == null || clusteringValues.length !=
clusteringColumns.size()) {
+ throw DruidException.defensive(
+ "clusteringValues length [%s] must match clusteringColumns size
[%s]",
+ clusteringValues == null ? "null" : clusteringValues.length,
+ clusteringColumns.size()
+ );
+ }
+ this.delegate = delegate;
+ this.clusteringColumns = clusteringColumns;
+ this.clusteringValues = clusteringValues;
+ }
+
+ @Override
+ public ReadableVectorInspector getReadableVectorInspector()
+ {
+ return delegate.getReadableVectorInspector();
+ }
+
+ @Override
+ public int getMaxVectorSize()
+ {
+ return delegate.getMaxVectorSize();
+ }
+
+ @Override
+ public SingleValueDimensionVectorSelector
makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ final int idx = clusteringColumns.indexOf(dimensionSpec.getDimension());
+ if (idx < 0) {
+ return delegate.makeSingleValueDimensionSelector(dimensionSpec);
+ }
+ final Object raw = clusteringValues[idx];
+ return
ConstantVectorSelectors.singleValueDimensionVectorSelector(delegate.getReadableVectorInspector(),
Evals.asString(raw));
+ }
+
+ @Override
+ public MultiValueDimensionVectorSelector
makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ if (clusteringColumns.indexOf(dimensionSpec.getDimension()) < 0) {
+ return delegate.makeMultiValueDimensionSelector(dimensionSpec);
+ }
+ throw DruidException.defensive(
+ "clustering column [%s] is not dictionary-encoded; no multi-value
dimension vector selector",
+ dimensionSpec.getDimension()
+ );
+ }
+
+ @Override
+ public VectorValueSelector makeValueSelector(String column)
+ {
+ final int idx = clusteringColumns.indexOf(column);
+ if (idx < 0) {
+ return delegate.makeValueSelector(column);
+ }
+ final Object raw = clusteringValues[idx];
+ final Number number = (raw instanceof Number) ? (Number) raw : null;
+ return
ConstantVectorSelectors.vectorValueSelector(delegate.getReadableVectorInspector(),
number);
+ }
+
+ @Override
+ public VectorObjectSelector makeObjectSelector(String column)
+ {
+ final int idx = clusteringColumns.indexOf(column);
+ if (idx < 0) {
+ return delegate.makeObjectSelector(column);
+ }
+ return
ConstantVectorSelectors.vectorObjectSelector(delegate.getReadableVectorInspector(),
clusteringValues[idx]);
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ final int idx = clusteringColumns.indexOf(column);
+ if (idx < 0) {
+ return delegate.getColumnCapabilities(column);
+ }
+ final ColumnType type = clusteringColumns.getColumnType(idx).orElseThrow();
+ if (type.is(ValueType.STRING)) {
+ return
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+ .setDictionaryEncoded(true)
+ .setDictionaryValuesSorted(true)
+ .setDictionaryValuesUnique(true);
+ }
+ return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(type);
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorFactoryClusteredTest.java
b/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorFactoryClusteredTest.java
index be9dfc28dab..8f58fac9044 100644
---
a/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorFactoryClusteredTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorFactoryClusteredTest.java
@@ -649,6 +649,65 @@ class QueryableIndexCursorFactoryClusteredTest
Assertions.assertEquals(3L, ((Number) results.get(0).get(1)).longValue());
}
+ @Test
+ void testGroupByOnNonClusteringColumnWithinSingleGroup()
+ {
+ // Filter to a single cluster group (tenant=acme), then GROUP BY a
non-clustering column. Exercises the
+ // single-group cursor-holder path +
SingleGroupClusteringColumnSelectorFactory end-to-end: `region` groups
+ // correctly via the group's own (stable) dictionary, and the pruned-out
globex group contributes nothing.
+ segmentIndex = buildSegment(List.of(
+ row("acme", "2025-01-01T00:00:00", "us-east-1"),
+ row("acme", "2025-01-01T00:10:00", "us-east-1"),
+ row("acme", "2025-01-01T01:00:00", "us-west-2"),
+ row("globex", "2025-01-01T00:30:00", "eu-west-1")
+ ));
+ final QueryableIndexCursorFactory factory = new
QueryableIndexCursorFactory(
+ segmentIndex,
+ QueryableIndexTimeBoundaryInspector.create(segmentIndex)
+ );
+
+ final GroupByQuery query = GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .setDimFilter(new
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+ .addDimension("region")
+ .addOrderByColumn("region")
+ .setAggregatorSpecs(new
CountAggregatorFactory("count"))
+ .build();
+ final List<ResultRow> results = groupingEngine.process(query, factory,
null, nonBlockingPool, null).toList();
+ Assertions.assertEquals(2, results.size());
+ Assertions.assertEquals("us-east-1", results.get(0).get(0));
+ Assertions.assertEquals(2L, ((Number) results.get(0).get(1)).longValue());
+ Assertions.assertEquals("us-west-2", results.get(1).get(0));
+ Assertions.assertEquals(1L, ((Number) results.get(1).get(1)).longValue());
+ }
+
+ @Test
+ void testGroupByClusteringColumnWithinSingleGroup()
+ {
+ // Filter to a single group, then GROUP BY the clustering column itself.
The single-group factory advertises the
+ // clustering column as a one-entry dictionary, so this rides the
dictionary-id grouping path over the constant
+ // clustering selector: one bucket, the constant value, full count.
+ segmentIndex = standardTwoGroup();
+ final QueryableIndexCursorFactory factory = new
QueryableIndexCursorFactory(
+ segmentIndex,
+ QueryableIndexTimeBoundaryInspector.create(segmentIndex)
+ );
+ final GroupByQuery query = GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .setDimFilter(new
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+ .addDimension("tenant")
+ .setAggregatorSpecs(new
CountAggregatorFactory("count"))
+ .build();
+ final List<ResultRow> results = groupingEngine.process(query, factory,
null, nonBlockingPool, null).toList();
+ Assertions.assertEquals(1, results.size());
+ Assertions.assertEquals("acme", results.get(0).get(0));
+ Assertions.assertEquals(2L, ((Number) results.get(0).get(1)).longValue());
+ }
+
private static Druids.TimeseriesQueryBuilder newTimeseries()
{
return Druids.newTimeseriesQueryBuilder()
diff --git
a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
index 6a4cadcc79e..8ede1ac203c 100644
---
a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
@@ -836,7 +836,7 @@ public class UnnestCursorFactoryTest extends
InitializedNullHandlingTest
cursor.advance();
count++;
}
- Assert.assertEquals(count, 618);
+ Assert.assertEquals(count, 569);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
index 97a9d25afd2..968457c185e 100644
---
a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
+++
b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
@@ -24,6 +24,7 @@ import com.google.common.hash.Hashing;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
@@ -37,6 +38,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV10;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
@@ -321,6 +324,145 @@ public class SegmentGenerator implements Closeable
return retVal;
}
+ /**
+ * Generates a V10-format segment, used to benchmark clustered base-table
segments against an equivalent unclustered
+ * layout. Mirrors {@link #generate}, but always uses {@link IndexMergerV10}
and threads an optional
+ * {@link ClusteredValueGroupsBaseTableProjectionSpec}:
+ * <ul>
+ * <li>when {@code clusterSpec} is non-null the segment is written as
clustered value groups;</li>
+ * <li>when {@code clusterSpec} is null the segment is a regular V10
segment, sorted according to
+ * {@code dimensionsSpec}.</li>
+ * </ul>
+ * Clustered base tables have no rollup and no metrics, so this path always
builds with rollup off and an empty
+ * aggregator set; {@code dimensionsSpec} is expected to include {@code
__time} as the explicit (non-clustering) time
+ * position.
+ */
+ public QueryableIndex generateV10(
+ final DataSegment dataSegment,
+ final GeneratorSchemaInfo schemaInfo,
+ final DimensionsSpec dimensionsSpec,
+ final TransformSpec transformSpec,
+ final IndexSpec indexSpec,
+ final Granularity queryGranularity,
+ @Nullable final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec,
+ final int numRows,
+ final ObjectMapper jsonMapper
+ )
+ {
+ BuiltInTypesModule.registerHandlersAndSerde();
+
+ final String dataHash = Hashing.sha256()
+ .newHasher()
+ .putString(dataSegment.getId().toString(),
StandardCharsets.UTF_8)
+ .putString(schemaInfo.toString(),
StandardCharsets.UTF_8)
+ .putString(dimensionsSpec.toString(),
StandardCharsets.UTF_8)
+ .putString(queryGranularity.toString(),
StandardCharsets.UTF_8)
+ .putString(indexSpec.toString(),
StandardCharsets.UTF_8)
+ .putString(transformSpec.toString(),
StandardCharsets.UTF_8)
+ .putString(String.valueOf(clusterSpec),
StandardCharsets.UTF_8)
+ .putString("v10", StandardCharsets.UTF_8)
+ .putInt(numRows)
+ .hash()
+ .toString();
+
+ final IndexIO indexIO = TestHelper.getTestIndexIO(jsonMapper,
ColumnConfig.DEFAULT);
+ final File outDir = new File(getSegmentDir(dataSegment.getId(), dataHash),
"merged");
+
+ if (outDir.exists()) {
+ try {
+ log.info("Found segment with hash[%s] cached in directory[%s].",
dataHash, outDir);
+ return indexIO.loadIndex(outDir);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ log.info("Writing segment with hash[%s] to directory[%s].", dataHash,
outDir);
+
+ final DataGenerator dataGenerator = new DataGenerator(
+ schemaInfo.getColumnSchemas(),
+ dataSegment.getId().hashCode(),
+ schemaInfo.getDataInterval(),
+ numRows
+ );
+
+ final IncrementalIndexSchema indexSchema = new
IncrementalIndexSchema.Builder()
+ .withDimensionsSpec(dimensionsSpec)
+ .withRollup(false)
+ .withQueryGranularity(queryGranularity)
+ .withClusterSpec(clusterSpec)
+ .build();
+
+ final List<InputRow> rows = new ArrayList<>();
+ final List<QueryableIndex> indexes = new ArrayList<>();
+
+ final Transformer transformer = transformSpec.toTransformer();
+ final InputRowSchema rowSchema = new InputRowSchema(
+ TimestampSpec.DEFAULT,
+ dimensionsSpec,
+ null
+ );
+
+ for (int i = 0; i < numRows; i++) {
+ final Map<String, Object> raw = dataGenerator.nextRaw();
+ final InputRow inputRow = MapInputRowParser.parse(rowSchema, raw);
+ final InputRow transformedRow = transformer.transform(inputRow);
+ rows.add(transformedRow);
+
+ if ((i + 1) % 20000 == 0) {
+ log.info("%,d/%,d rows generated for[%s].", i + 1, numRows,
dataSegment);
+ }
+
+ if (rows.size() % MAX_ROWS_IN_MEMORY == 0) {
+ indexes.add(makeIndexV10(dataSegment.getId(), dataHash,
indexes.size(), rows, indexSchema, indexSpec, jsonMapper));
+ rows.clear();
+ }
+ }
+
+ log.info("%,d/%,d rows generated for[%s].", numRows, numRows, dataSegment);
+
+ if (rows.size() > 0) {
+ indexes.add(makeIndexV10(dataSegment.getId(), dataHash, indexes.size(),
rows, indexSchema, indexSpec, jsonMapper));
+ rows.clear();
+ }
+
+ final QueryableIndex retVal;
+
+ if (indexes.isEmpty()) {
+ throw new ISE("No rows to index?");
+ } else {
+ try {
+ retVal = indexIO.loadIndex(
+ new IndexMergerV10(jsonMapper, indexIO,
OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .mergeQueryableIndex(
+ indexes,
+ false,
+ new AggregatorFactory[0],
+ null,
+ outDir,
+ indexSpec,
+ indexSpec,
+ new BaseProgressIndicator(),
+ null,
+ -1
+ )
+ );
+
+ for (QueryableIndex index : indexes) {
+ index.close();
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ log.info("Finished writing segment[%s] to[%s]", dataSegment, outDir);
+
+ return retVal;
+ }
+
public IncrementalIndex generateIncrementalIndex(
final DataSegment dataSegment,
final GeneratorSchemaInfo schemaInfo,
@@ -475,6 +617,27 @@ public class SegmentGenerator implements Closeable
.buildMMappedIndex();
}
+ private QueryableIndex makeIndexV10(
+ final SegmentId identifier,
+ final String dataHash,
+ final int indexNumber,
+ final List<InputRow> rows,
+ final IncrementalIndexSchema indexSchema,
+ final IndexSpec indexSpec,
+ final ObjectMapper jsonMapper
+ )
+ {
+ return IndexBuilder
+ .create(jsonMapper)
+ .useV10()
+ .schema(indexSchema)
+ .indexSpec(indexSpec)
+ .tmpDir(new File(getSegmentDir(identifier, dataHash),
String.valueOf(indexNumber)))
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .rows(rows)
+ .buildMMappedIndex();
+ }
+
private IncrementalIndex makeIncrementalIndex(
final SegmentId identifier,
final String dataHash,
diff --git
a/processing/src/test/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactoryTest.java
b/processing/src/test/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactoryTest.java
index 03f0b9fc395..e5181c970bb 100644
---
a/processing/src/test/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactoryTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.projections;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.RowIdSupplier;
@@ -321,6 +322,42 @@ class ClusteringColumnSelectorFactoryTest
Assertions.assertEquals("region", second.lastDimSelectorName);
}
+ @Test
+ void testClusteringColumnDimensionSelectorForcesValueBasedGrouping()
+ {
+ // Across a concatenating multi-group cursor the clustering column's
per-group constant id (always 0) is not
+ // stable: id 0 means a different clustering value in each group. The
selector must therefore NOT advertise
+ // dictionary-encoded grouping, otherwise the group-by engine keys on the
per-group id and silently collapses
+ // every group into one bucket.
+ ClusteringColumnSelectorFactory f = new ClusteringColumnSelectorFactory(
+ new RecordingDelegate(),
+ SIGNATURE,
+ new Object[]{"acme"}
+ );
+ DimensionSelector sel =
f.makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+ Assertions.assertEquals(DimensionDictionarySelector.CARDINALITY_UNKNOWN,
sel.getValueCardinality());
+ Assertions.assertFalse(sel.nameLookupPossibleInAdvance());
+ Assertions.assertNull(sel.idLookup());
+ // Value resolution still works per row against the current group's
constant.
+ Assertions.assertEquals("acme", sel.lookupName(sel.getRow().get(0)));
+ }
+
+ @Test
+ void testNonClusteringDelegatingDimensionSelectorForcesValueBasedGrouping()
+ {
+ // The non-clustering (delegating) path has the same cross-group
id-instability and must also force value-based
+ // grouping, even though its delegate here (a constant selector) would
otherwise report a stable dictionary.
+ ClusteringColumnSelectorFactory f = new ClusteringColumnSelectorFactory(
+ new RecordingDelegate(),
+ SIGNATURE,
+ new Object[]{"acme"}
+ );
+ DimensionSelector sel =
f.makeDimensionSelector(DefaultDimensionSpec.of("region"));
+ Assertions.assertEquals(DimensionDictionarySelector.CARDINALITY_UNKNOWN,
sel.getValueCardinality());
+ Assertions.assertFalse(sel.nameLookupPossibleInAdvance());
+ Assertions.assertNull(sel.idLookup());
+ }
+
@Test
void testStringDimensionCapabilitiesFlavorIsSingleValue()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactoryTest.java
b/processing/src/test/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactoryTest.java
new file mode 100644
index 00000000000..f7fc6ba1f54
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactoryTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.RowIdSupplier;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+class SingleGroupClusteringColumnSelectorFactoryTest
+{
+ private static final RowSignature CLUSTERING =
RowSignature.builder().add("tenant", ColumnType.STRING).build();
+
+ @Test
+ void testNonClusteringColumnPreservesDictionaryEncodedCapabilities()
+ {
+ final ColumnCapabilities dictEncoded =
+
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+ .setDictionaryEncoded(true)
+ .setDictionaryValuesUnique(true)
+ .setDictionaryValuesSorted(true);
+ final RecordingDelegate delegate = new RecordingDelegate(dictEncoded);
+ final SingleGroupClusteringColumnSelectorFactory f =
+ new SingleGroupClusteringColumnSelectorFactory(delegate, CLUSTERING,
new Object[]{"acme"});
+
+ final ColumnCapabilities caps = f.getColumnCapabilities("region");
+ Assertions.assertNotNull(caps);
+ Assertions.assertTrue(caps.isDictionaryEncoded().isTrue());
+ Assertions.assertEquals("region", delegate.lastCapabilitiesColumn);
+ }
+
+ @Test
+ void testNonClusteringColumnSelectorsDelegatedDirectly()
+ {
+ final RecordingDelegate delegate = new RecordingDelegate(null);
+ final SingleGroupClusteringColumnSelectorFactory f =
+ new SingleGroupClusteringColumnSelectorFactory(delegate, CLUSTERING,
new Object[]{"acme"});
+
+ // No wrapper / generation indirection: the delegate is consulted
immediately on selector creation.
+ final DimensionSelector dim =
f.makeDimensionSelector(DefaultDimensionSpec.of("region"));
+ Assertions.assertEquals("region", delegate.lastDimSelectorName);
+ Assertions.assertEquals("delegated:region", dim.lookupName(0));
+
+ f.makeColumnValueSelector("metric");
+ Assertions.assertEquals("metric", delegate.lastValueSelectorName);
+ }
+
+ @Test
+ void testClusteringColumnReturnsConstantWithoutHittingDelegate()
+ {
+ final RecordingDelegate delegate = new RecordingDelegate(null);
+ final SingleGroupClusteringColumnSelectorFactory f =
+ new SingleGroupClusteringColumnSelectorFactory(delegate, CLUSTERING,
new Object[]{"acme"});
+
+ final DimensionSelector dim =
f.makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+ Assertions.assertEquals("acme", dim.lookupName(dim.getRow().get(0)));
+ Assertions.assertNull(delegate.lastDimSelectorName, "delegate must not be
hit for clustering columns");
+
+ final ColumnValueSelector val = f.makeColumnValueSelector("tenant");
+ Assertions.assertEquals("acme", val.getObject());
+
+ // A single group's clustering value is a constant 1-entry dictionary, so
it is reported as dictionary-encoded /
+ // sorted / unique to route grouping through the dictionary-id path.
+ final ColumnCapabilities caps = f.getColumnCapabilities("tenant");
+ Assertions.assertNotNull(caps);
+ Assertions.assertTrue(caps.is(ValueType.STRING));
+ Assertions.assertTrue(caps.isDictionaryEncoded().isTrue());
+ Assertions.assertTrue(caps.areDictionaryValuesSorted().isTrue());
+ Assertions.assertTrue(caps.areDictionaryValuesUnique().isTrue());
+ }
+
+ private static class RecordingDelegate implements ColumnSelectorFactory
+ {
+ @Nullable
+ private final ColumnCapabilities capabilities;
+ String lastDimSelectorName;
+ String lastValueSelectorName;
+ String lastCapabilitiesColumn;
+
+ RecordingDelegate(@Nullable ColumnCapabilities capabilities)
+ {
+ this.capabilities = capabilities;
+ }
+
+ @Override
+ public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ lastDimSelectorName = dimensionSpec.getDimension();
+ return DimensionSelector.constant("delegated:" +
dimensionSpec.getDimension());
+ }
+
+ @Override
+ public ColumnValueSelector makeColumnValueSelector(String columnName)
+ {
+ lastValueSelectorName = columnName;
+ return NilColumnValueSelector.instance();
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ lastCapabilitiesColumn = column;
+ return capabilities;
+ }
+
+ @Nullable
+ @Override
+ public RowIdSupplier getRowIdSupplier()
+ {
+ return null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]