This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c634fc05b8 feat: clustered segment benchmark and fix (#19623)
2c634fc05b8 is described below

commit 2c634fc05b89d1ca0611eb36e2d722200a4f8bb2
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jun 24 12:25:28 2026 -0700

    feat: clustered segment benchmark and fix (#19623)
---
 .../druid/benchmark/query/SqlBaseBenchmark.java    |  40 +++--
 .../benchmark/query/SqlBenchmarkDatasets.java      | 166 +++++++++++++++++++-
 .../query/SqlClusteredSegmentsBenchmark.java       | 127 +++++++++++++++
 .../druid/segment/QueryableIndexCursorFactory.java |   6 +-
 .../druid/segment/generator/DataGenerator.java     |  26 +---
 .../segment/generator/GeneratorBasicSchemas.java   |  30 ++++
 .../ClusteringColumnSelectorFactory.java           |  10 +-
 ...SingleGroupClusteringColumnSelectorFactory.java | 172 +++++++++++++++++++++
 ...GroupClusteringVectorColumnSelectorFactory.java | 143 +++++++++++++++++
 .../QueryableIndexCursorFactoryClusteredTest.java  |  59 +++++++
 .../druid/segment/UnnestCursorFactoryTest.java     |   2 +-
 .../druid/segment/generator/SegmentGenerator.java  | 163 +++++++++++++++++++
 .../ClusteringColumnSelectorFactoryTest.java       |  37 +++++
 ...leGroupClusteringColumnSelectorFactoryTest.java | 143 +++++++++++++++++
 14 files changed, 1087 insertions(+), 37 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBaseBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBaseBenchmark.java
index 1cdd572babd..b787244373b 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBaseBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBaseBenchmark.java
@@ -289,17 +289,33 @@ public class SqlBaseBenchmark
           );
           realtimeSegments.put(dataSegment, index);
         } else {
-          final QueryableIndex index = segmentGenerator.generate(
-              dataSegment,
-              schema.getGeneratorSchemaInfo(),
-              schema.getDimensionsSpec(),
-              schema.getTransformSpec(),
-              getIndexSpec(),
-              schema.getQueryGranularity(),
-              schema.getProjections(),
-              rowsPerSegment,
-              CalciteTests.getJsonMapper()
-          );
+          final QueryableIndex index;
+          if (schema.isUseV10()) {
+            // clustered (and the unclustered comparison) segments use the V10 
writer
+            index = segmentGenerator.generateV10(
+                dataSegment,
+                schema.getGeneratorSchemaInfo(),
+                schema.getDimensionsSpec(),
+                schema.getTransformSpec(),
+                getIndexSpec(),
+                schema.getQueryGranularity(),
+                schema.getClusterSpec(),
+                rowsPerSegment,
+                CalciteTests.getJsonMapper()
+            );
+          } else {
+            index = segmentGenerator.generate(
+                dataSegment,
+                schema.getGeneratorSchemaInfo(),
+                schema.getDimensionsSpec(),
+                schema.getTransformSpec(),
+                getIndexSpec(),
+                schema.getQueryGranularity(),
+                schema.getProjections(),
+                rowsPerSegment,
+                CalciteTests.getJsonMapper()
+            );
+          }
           log.info(
               "Segment metadata: %s",
               
CalciteTests.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(index.getMetadata())
@@ -373,7 +389,7 @@ public class SqlBaseBenchmark
     }
   }
 
-  private void checkIncompatibleParameters()
+  protected void checkIncompatibleParameters()
   {
     // we only support NONE object storage encoding for auto column with mmap 
segments
     if (ObjectStorageEncoding.NONE.equals(jsonObjectStorageEncoding)) {
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmarkDatasets.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmarkDatasets.java
index aabce633e1c..aac9dca0ac3 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmarkDatasets.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmarkDatasets.java
@@ -22,10 +22,13 @@ package org.apache.druid.benchmark.query;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -36,6 +39,7 @@ import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAg
 import 
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.segment.AutoTypeColumnSchema;
+import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.nested.NestedCommonFormatColumnFormatSpec;
@@ -45,6 +49,8 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -62,6 +68,9 @@ public class SqlBenchmarkDatasets
   public static String DATASKETCHES = "datasketches";
   public static String PROJECTIONS = "projections";
   public static String GROUPER = "grouper";
+  // clustering datasets are not registered statically; they are built on 
demand by getSchema() from a structured
+  // datasource name (see clusteringDatasource()) so benchmarks can sweep 
clustering cardinality and layout.
+  public static String CLUSTERING = "clustering";
 
   // initialize all benchmark dataset schemas to feed the data generators when 
running benchmarks, add any additional
   // datasets to this initializer as needed and they will be available to any 
benchmarks at the table name added here
@@ -319,9 +328,113 @@ public class SqlBenchmarkDatasets
 
   public static BenchmarkSchema getSchema(String dataset)
   {
+    if (dataset.startsWith(CLUSTERING + "_")) {
+      return makeClusteringSchema(dataset);
+    }
     return DATASET_SCHEMAS.get(dataset);
   }
 
+  /**
+   * Builds the structured datasource (table) name for a clustering benchmark 
dataset, encoding the segment layout and
+   * the clustering parameters so {@link #getSchema(String)} can reconstruct 
the {@link BenchmarkSchema} on demand. The
+   * encoded params keep the shared benchmark setup + static-map architecture 
intact while still allowing the
+   * clustering cardinality and number of clustering columns to be swept as 
JMH parameters.
+   * <p>
+   * {@code layout} is one of {@code CLUSTERED}, {@code UNCLUSTERED}, or 
{@code TIME_ORDERED} (see
+   * {@link #makeClusteringSchema}).
+   */
+  public static String clusteringDatasource(String layout, int 
clusteringCardinality, int numClusteringColumns)
+  {
+    return StringUtils.format(
+        "%s_%s_c%d_k%d",
+        CLUSTERING,
+        layout,
+        clusteringCardinality,
+        numClusteringColumns
+    );
+  }
+
+  /**
+   * Builds a clustering {@link BenchmarkSchema} from a datasource name 
produced by
+   * {@link #clusteringDatasource(String, int, int)}. The same generated data 
is laid out three ways for comparison:
+   * <ul>
+   *   <li>{@code CLUSTERED}: V10 clustered base-table segment, partitioned 
into per-clustering-value groups, ordered
+   *       clustering columns &rarr; {@code __time} &rarr; non-clustering 
columns (the intended real-world layout);</li>
+   *   <li>{@code UNCLUSTERED}: V10 regular segment with the <em>same</em> 
clustering-first ordering but no clustered
+   *       grouping &mdash; isolates the cost/benefit of the clustered 
grouping mechanism at a fixed sort order;</li>
+   *   <li>{@code TIME_ORDERED}: regular (V9) segment with the 
production-style {@code __time}-first ordering
+   *       ({@code __time} &rarr; clustering columns &rarr; non-clustering 
columns) &mdash; the "what we run today"
+   *       baseline the clustered layout is being compared against.</li>
+   * </ul>
+   */
+  private static BenchmarkSchema makeClusteringSchema(String dataset)
+  {
+    // Encoded as clustering_<LAYOUT>_c<card>_k<k>. The layout may itself 
contain underscores (e.g. TIME_ORDERED), so
+    // parse the trailing c<card>/k<k> positionally and treat everything 
between the prefix and them as the layout.
+    final String[] parts = dataset.split("_");
+    if (parts.length < 4) {
+      throw new IAE("Malformed clustering datasource[%s]", dataset);
+    }
+    final int numClusteringColumns = Integer.parseInt(parts[parts.length - 
1].substring(1));
+    final int clusteringCardinality = Integer.parseInt(parts[parts.length - 
2].substring(1));
+    final String layout = String.join("_", Arrays.copyOfRange(parts, 1, 
parts.length - 2));
+
+    final GeneratorSchemaInfo schemaInfo = 
GeneratorBasicSchemas.makeClusteringSchemaInfo(clusteringCardinality);
+    // generated data dimensions in schema order: [clusterKey1, clusterKey2, 
dimSecondary, valueLong, valueDouble]
+    final List<DimensionSchema> dataDimensions = 
schemaInfo.getDimensionsSpec().getDimensions();
+
+    if ("TIME_ORDERED".equals(layout)) {
+      // Regular time-ordered segment: __time first, then the columns in 
schema order. forceSegmentSortByTime defaults
+      // to true, so schemaInfo.getDimensionsSpec() already yields the 
__time-first ordering.
+      return new BenchmarkSchema(
+          Collections.singletonList(makeSegment(dataset, 
schemaInfo.getDataInterval())),
+          schemaInfo,
+          TransformSpec.NONE,
+          schemaInfo.getDimensionsSpec(),
+          new AggregatorFactory[0],
+          Collections.emptyList(),
+          Granularities.NONE,
+          null,
+          false
+      );
+    }
+
+    final boolean clustered = "CLUSTERED".equals(layout);
+    if (!clustered && !"UNCLUSTERED".equals(layout)) {
+      throw new IAE("Unknown clustering layout[%s] in datasource[%s]", layout, 
dataset);
+    }
+
+    // Ordered columns: clustering prefix, then __time, then the remaining 
non-clustering columns (clustering columns
+    // -> time -> non-clustering), matching the intended real-world clustered 
schema ordering.
+    final List<DimensionSchema> columns = new 
ArrayList<>(dataDimensions.size() + 1);
+    columns.addAll(dataDimensions.subList(0, numClusteringColumns));
+    columns.add(new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME));
+    columns.addAll(dataDimensions.subList(numClusteringColumns, 
dataDimensions.size()));
+    final List<String> clusteringColumns = dataDimensions.subList(0, 
numClusteringColumns)
+                                                         .stream()
+                                                         
.map(DimensionSchema::getName)
+                                                         
.collect(Collectors.toList());
+
+    final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+        ClusteredValueGroupsBaseTableProjectionSpec.builder()
+                                                   .columns(columns)
+                                                   
.clusteringColumns(clusteringColumns)
+                                                   .build();
+
+    return new BenchmarkSchema(
+        Collections.singletonList(makeSegment(dataset, 
schemaInfo.getDataInterval())),
+        schemaInfo,
+        TransformSpec.NONE,
+        // clustering columns -> __time -> non-clustering, with 
forceSegmentSortByTime=false
+        clusterSpec.getDimensionsSpec(),
+        new AggregatorFactory[0],
+        Collections.emptyList(),
+        Granularities.NONE,
+        clustered ? clusterSpec : null,
+        true
+    );
+  }
+
   private static DataSegment makeSegment(String datasource, Interval interval)
   {
     return makeSegment(datasource, interval, 0);
@@ -352,6 +465,17 @@ public class SqlBenchmarkDatasets
     private final AggregatorFactory[] aggregators;
     private final Granularity queryGranularity;
     private final List<AggregateProjectionSpec> projections;
+    /**
+     * Optional clustered base-table spec. When non-null the segment is built 
as clustered value groups. Only used by
+     * the clustering benchmark; null for all classic datasets.
+     */
+    @Nullable
+    private final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec;
+    /**
+     * Whether to build this dataset's segments with the V10 writer (required 
for clustered segments, and used for the
+     * unclustered comparison layout so the two are an apples-to-apples 
comparison). False for all classic datasets.
+     */
+    private final boolean useV10;
 
     public BenchmarkSchema(
         List<DataSegment> dataSegments,
@@ -362,6 +486,31 @@ public class SqlBenchmarkDatasets
         List<AggregateProjectionSpec> projections,
         Granularity queryGranularity
     )
+    {
+      this(
+          dataSegments,
+          generatorSchemaInfo,
+          transformSpec,
+          dimensionSpec,
+          aggregators,
+          projections,
+          queryGranularity,
+          null,
+          false
+      );
+    }
+
+    public BenchmarkSchema(
+        List<DataSegment> dataSegments,
+        GeneratorSchemaInfo generatorSchemaInfo,
+        TransformSpec transformSpec,
+        DimensionsSpec dimensionSpec,
+        AggregatorFactory[] aggregators,
+        List<AggregateProjectionSpec> projections,
+        Granularity queryGranularity,
+        @Nullable ClusteredValueGroupsBaseTableProjectionSpec clusterSpec,
+        boolean useV10
+    )
     {
       this.dataSegments = dataSegments;
       this.generatorSchemaInfo = generatorSchemaInfo;
@@ -370,6 +519,8 @@ public class SqlBenchmarkDatasets
       this.aggregators = aggregators;
       this.queryGranularity = queryGranularity;
       this.projections = projections;
+      this.clusterSpec = clusterSpec;
+      this.useV10 = useV10;
     }
 
     public List<DataSegment> getDataSegments()
@@ -407,6 +558,17 @@ public class SqlBenchmarkDatasets
       return projections;
     }
 
+    @Nullable
+    public ClusteredValueGroupsBaseTableProjectionSpec getClusterSpec()
+    {
+      return clusterSpec;
+    }
+
+    public boolean isUseV10()
+    {
+      return useV10;
+    }
+
     public BenchmarkSchema convertDimensions(boolean convertToAuto, 
NestedCommonFormatColumnFormatSpec columnFormatSpec)
     {
       return new SqlBenchmarkDatasets.BenchmarkSchema(
@@ -434,7 +596,9 @@ public class SqlBenchmarkDatasets
                           )
                           .build()
               ).collect(Collectors.toList()),
-          queryGranularity
+          queryGranularity,
+          clusterSpec,
+          useV10
       );
     }
 
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlClusteredSegmentsBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlClusteredSegmentsBenchmark.java
new file mode 100644
index 00000000000..cd6b2cc4985
--- /dev/null
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlClusteredSegmentsBenchmark.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.query;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.StringUtils;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.List;
+
+/**
+ * Benchmarks queries against clustered base-table segments versus equivalent 
non-clustered segments, over the same
+ * generated data. The {@link #segmentLayout} parameter selects one of three 
layouts:
+ * <ul>
+ *   <li>{@code CLUSTERED}: V10 clustered value-groups segment, ordered 
clustering columns &rarr; {@code __time} &rarr;
+ *       non-clustering columns;</li>
+ *   <li>{@code UNCLUSTERED}: V10 regular segment with the same 
clustering-first ordering but no clustered grouping
+ *       (isolates the clustered grouping mechanism from the sort order);</li>
+ *   <li>{@code TIME_ORDERED}: regular time-ordered segment ({@code __time} 
&rarr; clustering columns &rarr;
+ *       non-clustering columns)</li>
+ * </ul>
+ * The clustering-column cardinality ({@link #clusteringCardinality}) is 
parameterized so the workload can be measured
+ * as the number of value groups grows. {@link #numClusteringColumns} defaults 
to a single value (cluster only on
+ * {@code clusterKey1}); set it to {@code "2"} to additionally cluster on the 
fixed low-cardinality {@code clusterKey2}.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+public class SqlClusteredSegmentsBenchmark extends SqlBaseQueryBenchmark
+{
+  // %s is the datasource (table) name, filled in from the current parameters
+  private static final List<String> QUERIES = ImmutableList.of(
+      // 0: equality filter on the clustering column (clustered groups can 
prune non-matching groups)
+      "SELECT SUM(valueLong) FROM %s WHERE clusterKey1 = '3'",
+      // 1: group by the clustering column.
+      "SELECT clusterKey1, SUM(valueLong) FROM %s GROUP BY 1 ORDER BY 2",
+      // 2: filter on the clustering column + group by a secondary (higher 
cardinality) column
+      "SELECT dimSecondary, SUM(valueLong) FROM %s WHERE clusterKey1 = '3' 
GROUP BY 1 ORDER BY 2",
+      // 3: no-filter full aggregate (full scan, no pruning) as an overhead 
baseline
+      "SELECT SUM(valueLong) FROM %s"
+  );
+
+  @Param({
+      "CLUSTERED",
+      "UNCLUSTERED",
+      "TIME_ORDERED"
+  })
+  private String segmentLayout;
+
+  @Param({
+      "10",
+      "40",
+      "160"
+  })
+  private int clusteringCardinality;
+
+  @Param({
+      "1"
+  })
+  private int numClusteringColumns;
+
+  @Param({
+      "0",
+      "1",
+      "2",
+      "3"
+  })
+  private int query;
+
+  private String datasource()
+  {
+    return SqlBenchmarkDatasets.clusteringDatasource(
+        segmentLayout,
+        clusteringCardinality,
+        numClusteringColumns
+    );
+  }
+
+  @Override
+  public String getQuery()
+  {
+    return StringUtils.format(QUERIES.get(query), datasource());
+  }
+
+  @Override
+  public List<String> getDatasources()
+  {
+    return ImmutableList.of(datasource());
+  }
+
+  @Override
+  protected void checkIncompatibleParameters()
+  {
+    // the clustered-vs-unclustered comparison is about persisted (mmap) V10 
segment layout; skip the other
+    // storage / schema / encoding / compression combinations the base 
benchmark sweeps so the matrix stays focused.
+    if (storageType != BenchmarkStorage.MMAP
+        || !"explicit".equals(schemaType)
+        || stringEncoding != BenchmarkStringEncodingStrategy.UTF8
+        || !"none".equals(complexCompression)) {
+      System.exit(0);
+    }
+    super.checkIncompatibleParameters();
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
index 4acd2d4e28a..1d752cb6686 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
@@ -38,6 +38,8 @@ import 
org.apache.druid.segment.projections.ClusteringColumnSelectorFactory;
 import 
org.apache.druid.segment.projections.ClusteringVectorColumnSelectorFactory;
 import org.apache.druid.segment.projections.Projections;
 import org.apache.druid.segment.projections.QueryableProjection;
+import 
org.apache.druid.segment.projections.SingleGroupClusteringColumnSelectorFactory;
+import 
org.apache.druid.segment.projections.SingleGroupClusteringVectorColumnSelectorFactory;
 import org.apache.druid.segment.projections.TableClusterGroupSpec;
 import org.apache.druid.segment.vector.ConcatenatingVectorCursor;
 import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
@@ -213,7 +215,7 @@ public class QueryableIndexCursorFactory implements 
CursorFactory
           Offset baseOffset
       )
       {
-        return new ClusteringColumnSelectorFactory(
+        return new SingleGroupClusteringColumnSelectorFactory(
             super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset),
             valueGroup.getSummary().getClusteringColumns(),
             valueGroup.lookupClusteringValues()
@@ -226,7 +228,7 @@ public class QueryableIndexCursorFactory implements 
CursorFactory
           VectorOffset baseOffset
       )
       {
-        return new ClusteringVectorColumnSelectorFactory(
+        return new SingleGroupClusteringVectorColumnSelectorFactory(
             super.makeVectorColumnSelectorFactoryForOffset(columnCache, 
baseOffset),
             valueGroup.getSummary().getClusteringColumns(),
             valueGroup.lookupClusteringValues()
diff --git 
a/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
 
b/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
index 1f0839ca684..9e8e23e524d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
@@ -19,9 +19,7 @@
 
 package org.apache.druid.segment.generator;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.TimestampSpec;
@@ -134,22 +132,14 @@ public class DataGenerator
       }
     }
 
-    columnGenerators = new ArrayList<>();
-    columnGenerators.addAll(
-        Lists.transform(
-            columnSchemas,
-            new Function<>()
-            {
-              @Override
-              public ColumnValueGenerator apply(
-                  GeneratorColumnSchema input
-              )
-              {
-                return input.makeGenerator(seed);
-              }
-            }
-        )
-    );
+    // Seed each column with a distinct seed (offset by its position) rather 
than the same seed for every column.
+    // Columns that use the same underlying distribution implementation would 
otherwise draw from identically-seeded
+    // RNGs in lockstep and become almost perfectly correlated, which makes 
cross-column filters + group-bys collapse
+    // to a single group. Offsetting the seed per column keeps the columns 
independent while remaining deterministic.
+    columnGenerators = new ArrayList<>(columnSchemas.size());
+    for (int i = 0; i < columnSchemas.size(); i++) {
+      columnGenerators.add(columnSchemas.get(i).makeGenerator(seed + i));
+    }
 
     return this;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
 
b/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
index ef2bb096cb9..49afe94c667 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
@@ -551,4 +551,34 @@ public class GeneratorBasicSchemas
   }
 
   public static final Map<String, GeneratorSchemaInfo> SCHEMA_MAP = 
SCHEMA_INFO_BUILDER.build();
+
+  /**
+   * Low-cardinality testbench for clustered segments. Every column is a 
single-valued, non-rollup, non-metric
+   * dimension (clustered base tables have no metrics and no rollup). The 
primary clustering column {@code clusterKey1}
+   * has its cardinality controlled by {@code clusteringCardinality} so 
benchmarks can sweep how clustering behaves as
+   * the number of value groups grows; {@code clusterKey2} is a second, fixed 
low-cardinality string available as an
+   * optional second clustering column. {@code dimSecondary} is a 
higher-cardinality dimension for group-by workloads,
+   * and {@code valueLong}/{@code valueDouble} are numeric measures to 
aggregate at query time.
+   */
+  public static GeneratorSchemaInfo makeClusteringSchemaInfo(int 
clusteringCardinality)
+  {
+    final List<GeneratorColumnSchema> columns = ImmutableList.of(
+        // primary clustering key, parameterized cardinality
+        GeneratorColumnSchema.makeDiscreteUniform("clusterKey1", 
ValueType.STRING, false, 1, null, 1, clusteringCardinality),
+        // secondary clustering key, fixed low cardinality
+        GeneratorColumnSchema.makeDiscreteUniform("clusterKey2", 
ValueType.STRING, false, 1, null, 1, 8),
+        // higher cardinality dimension for group-by
+        GeneratorColumnSchema.makeZipf("dimSecondary", ValueType.STRING, 
false, 1, null, 1, 1000, 1.0),
+        // numeric columns (dimensions, since clustered segments have no 
metrics)
+        GeneratorColumnSchema.makeZipf("valueLong", ValueType.LONG, false, 1, 
null, 0, 10000, 1.0),
+        GeneratorColumnSchema.makeZipf("valueDouble", ValueType.DOUBLE, false, 
1, null, 0, 1000, 1.0)
+    );
+
+    return new GeneratorSchemaInfo(
+        columns,
+        Collections.emptyList(),
+        Intervals.of("2000-01-01/P1D"),
+        false
+    );
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
index 6e9ab3126a7..0c5b9eacdf4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
@@ -280,7 +280,11 @@ public class ClusteringColumnSelectorFactory implements 
ColumnSelectorFactory
     @Override
     public int getValueCardinality()
     {
-      return currentSelector().getValueCardinality();
+      // The per-group constant selector reports cardinality 1 and always 
returns id 0, but that id is NOT stable
+      // across the concatenating cursor: id 0 resolves to a different 
clustering value in each group. Forwarding it
+      // would let the group-by engine take the dictionary-id-keyed (array) 
path and silently conflate every group
+      // into the single id-0 bucket.
+      return DimensionDictionarySelector.CARDINALITY_UNKNOWN;
     }
 
     @Nullable
@@ -293,14 +297,14 @@ public class ClusteringColumnSelectorFactory implements 
ColumnSelectorFactory
     @Override
     public boolean nameLookupPossibleInAdvance()
     {
-      return currentSelector().nameLookupPossibleInAdvance();
+      return false;
     }
 
     @Nullable
     @Override
     public IdLookup idLookup()
     {
-      return currentSelector().idLookup();
+      return null;
     }
 
     @Nullable
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactory.java
new file mode 100644
index 00000000000..aada6b78816
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactory.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.ConstantExprEvalSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.RowIdSupplier;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+
+/**
+ * Single-cluster-group counterpart of {@link 
ClusteringColumnSelectorFactory}, used when a query prunes a clustered
+ * {@link org.apache.druid.segment.QueryableIndexCursorFactory}). 
Non-clustering columns are delegated directly to
+ * the per-group factory; clustering columns, which are not physically stored 
in the per-group data, are surfaced as
+ * constants from the group's clustering tuple.
+ */
+public class SingleGroupClusteringColumnSelectorFactory implements 
ColumnSelectorFactory
+{
+  private final ColumnSelectorFactory delegate;
+  private final RowSignature clusteringColumns;
+  private final Object[] clusteringValues;
+
+  public SingleGroupClusteringColumnSelectorFactory(
+      ColumnSelectorFactory delegate,
+      RowSignature clusteringColumns,
+      Object[] clusteringValues
+  )
+  {
+    if (clusteringValues == null || clusteringValues.length != 
clusteringColumns.size()) {
+      throw DruidException.defensive(
+          "clusteringValues length [%s] must match clusteringColumns size 
[%s]",
+          clusteringValues == null ? "null" : clusteringValues.length,
+          clusteringColumns.size()
+      );
+    }
+    this.delegate = delegate;
+    this.clusteringColumns = clusteringColumns;
+    this.clusteringValues = clusteringValues;
+  }
+
+  @Override
+  public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+  {
+    final int idx = clusteringColumns.indexOf(dimensionSpec.getDimension());
+    if (idx < 0) {
+      return delegate.makeDimensionSelector(dimensionSpec);
+    }
+    final Object raw = clusteringValues[idx];
+    return DimensionSelector.constant(Evals.asString(raw), 
dimensionSpec.getExtractionFn());
+  }
+
+  @Override
+  public ColumnValueSelector makeColumnValueSelector(String columnName)
+  {
+    final int idx = clusteringColumns.indexOf(columnName);
+    if (idx < 0) {
+      return delegate.makeColumnValueSelector(columnName);
+    }
+    return new 
ConstantClusteringValueSelector(clusteringColumns.getColumnType(idx).orElseThrow(),
 clusteringValues[idx]);
+  }
+
+  @Nullable
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String column)
+  {
+    final int idx = clusteringColumns.indexOf(column);
+    if (idx < 0) {
+      return delegate.getColumnCapabilities(column);
+    }
+    final ColumnType type = clusteringColumns.getColumnType(idx).orElseThrow();
+    if (type.is(ValueType.STRING)) {
+      return 
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+                                   .setDictionaryEncoded(true)
+                                   .setDictionaryValuesSorted(true)
+                                   .setDictionaryValuesUnique(true);
+    }
+    return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(type);
+  }
+
+  @Nullable
+  @Override
+  public RowIdSupplier getRowIdSupplier()
+  {
+    return delegate.getRowIdSupplier();
+  }
+
+  /**
+   * Constant value selector for a clustering column's group-constant typed 
value.
+   */
+  private static final class ConstantClusteringValueSelector implements 
ColumnValueSelector<Object>
+  {
+    private final ConstantExprEvalSelector inner;
+
+    private ConstantClusteringValueSelector(ColumnType columnType, Object 
value)
+    {
+      this.inner = new 
ConstantExprEvalSelector(ExprEval.ofType(ExpressionType.fromColumnTypeStrict(columnType),
 value));
+    }
+
+    @Override
+    public double getDouble()
+    {
+      return inner.getDouble();
+    }
+
+    @Override
+    public float getFloat()
+    {
+      return inner.getFloat();
+    }
+
+    @Override
+    public long getLong()
+    {
+      return inner.getLong();
+    }
+
+    @Override
+    public boolean isNull()
+    {
+      return inner.isNull();
+    }
+
+    @Nullable
+    @Override
+    public Object getObject()
+    {
+      return inner.getObject().value();
+    }
+
+    @Override
+    public Class<Object> classOfObject()
+    {
+      return Object.class;
+    }
+
+    @Override
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    {
+      inner.inspectRuntimeShape(inspector);
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringVectorColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringVectorColumnSelectorFactory.java
new file mode 100644
index 00000000000..8fd8dbd41ed
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/SingleGroupClusteringVectorColumnSelectorFactory.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.math.expr.Evals;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.ConstantVectorSelectors;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.ReadableVectorInspector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+
+/**
+ * Vectorized counterpart of {@link 
SingleGroupClusteringColumnSelectorFactory}: the single-cluster-group fast path 
for
+ * a clustered base-table segment. Non-clustering columns delegate directly to 
the per-group
+ * {@link VectorColumnSelectorFactory}; clustering columns are surfaced as 
constant vector selectors.
+ */
+public class SingleGroupClusteringVectorColumnSelectorFactory implements 
VectorColumnSelectorFactory
+{
+  private final VectorColumnSelectorFactory delegate;
+  private final RowSignature clusteringColumns;
+  private final Object[] clusteringValues;
+
+  public SingleGroupClusteringVectorColumnSelectorFactory(
+      VectorColumnSelectorFactory delegate,
+      RowSignature clusteringColumns,
+      Object[] clusteringValues
+  )
+  {
+    if (clusteringValues == null || clusteringValues.length != 
clusteringColumns.size()) {
+      throw DruidException.defensive(
+          "clusteringValues length [%s] must match clusteringColumns size 
[%s]",
+          clusteringValues == null ? "null" : clusteringValues.length,
+          clusteringColumns.size()
+      );
+    }
+    this.delegate = delegate;
+    this.clusteringColumns = clusteringColumns;
+    this.clusteringValues = clusteringValues;
+  }
+
+  @Override
+  public ReadableVectorInspector getReadableVectorInspector()
+  {
+    return delegate.getReadableVectorInspector();
+  }
+
+  @Override
+  public int getMaxVectorSize()
+  {
+    return delegate.getMaxVectorSize();
+  }
+
+  @Override
+  public SingleValueDimensionVectorSelector 
makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
+  {
+    final int idx = clusteringColumns.indexOf(dimensionSpec.getDimension());
+    if (idx < 0) {
+      return delegate.makeSingleValueDimensionSelector(dimensionSpec);
+    }
+    final Object raw = clusteringValues[idx];
+    return 
ConstantVectorSelectors.singleValueDimensionVectorSelector(delegate.getReadableVectorInspector(),
 Evals.asString(raw));
+  }
+
+  @Override
+  public MultiValueDimensionVectorSelector 
makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
+  {
+    if (clusteringColumns.indexOf(dimensionSpec.getDimension()) < 0) {
+      return delegate.makeMultiValueDimensionSelector(dimensionSpec);
+    }
+    throw DruidException.defensive(
+        "clustering column [%s] is not dictionary-encoded; no multi-value 
dimension vector selector",
+        dimensionSpec.getDimension()
+    );
+  }
+
+  @Override
+  public VectorValueSelector makeValueSelector(String column)
+  {
+    final int idx = clusteringColumns.indexOf(column);
+    if (idx < 0) {
+      return delegate.makeValueSelector(column);
+    }
+    final Object raw = clusteringValues[idx];
+    final Number number = (raw instanceof Number) ? (Number) raw : null;
+    return 
ConstantVectorSelectors.vectorValueSelector(delegate.getReadableVectorInspector(),
 number);
+  }
+
+  @Override
+  public VectorObjectSelector makeObjectSelector(String column)
+  {
+    final int idx = clusteringColumns.indexOf(column);
+    if (idx < 0) {
+      return delegate.makeObjectSelector(column);
+    }
+    return 
ConstantVectorSelectors.vectorObjectSelector(delegate.getReadableVectorInspector(),
 clusteringValues[idx]);
+  }
+
+  @Nullable
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String column)
+  {
+    final int idx = clusteringColumns.indexOf(column);
+    if (idx < 0) {
+      return delegate.getColumnCapabilities(column);
+    }
+    final ColumnType type = clusteringColumns.getColumnType(idx).orElseThrow();
+    if (type.is(ValueType.STRING)) {
+      return 
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+                                   .setDictionaryEncoded(true)
+                                   .setDictionaryValuesSorted(true)
+                                   .setDictionaryValuesUnique(true);
+    }
+    return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(type);
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorFactoryClusteredTest.java
 
b/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorFactoryClusteredTest.java
index be9dfc28dab..8f58fac9044 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorFactoryClusteredTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorFactoryClusteredTest.java
@@ -649,6 +649,65 @@ class QueryableIndexCursorFactoryClusteredTest
     Assertions.assertEquals(3L, ((Number) results.get(0).get(1)).longValue());
   }
 
+  @Test
+  void testGroupByOnNonClusteringColumnWithinSingleGroup()
+  {
+    // Filter to a single cluster group (tenant=acme), then GROUP BY a 
non-clustering column. Exercises the
+    // single-group cursor-holder path + 
SingleGroupClusteringColumnSelectorFactory end-to-end: `region` groups
+    // correctly via the group's own (stable) dictionary, and the pruned-out 
globex group contributes nothing.
+    segmentIndex = buildSegment(List.of(
+        row("acme", "2025-01-01T00:00:00", "us-east-1"),
+        row("acme", "2025-01-01T00:10:00", "us-east-1"),
+        row("acme", "2025-01-01T01:00:00", "us-west-2"),
+        row("globex", "2025-01-01T00:30:00", "eu-west-1")
+    ));
+    final QueryableIndexCursorFactory factory = new 
QueryableIndexCursorFactory(
+        segmentIndex,
+        QueryableIndexTimeBoundaryInspector.create(segmentIndex)
+    );
+
+    final GroupByQuery query = GroupByQuery.builder()
+                                           .setDataSource("test")
+                                           .setGranularity(Granularities.ALL)
+                                           .setInterval(Intervals.ETERNITY)
+                                           .setDimFilter(new 
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+                                           .addDimension("region")
+                                           .addOrderByColumn("region")
+                                           .setAggregatorSpecs(new 
CountAggregatorFactory("count"))
+                                           .build();
+    final List<ResultRow> results = groupingEngine.process(query, factory, 
null, nonBlockingPool, null).toList();
+    Assertions.assertEquals(2, results.size());
+    Assertions.assertEquals("us-east-1", results.get(0).get(0));
+    Assertions.assertEquals(2L, ((Number) results.get(0).get(1)).longValue());
+    Assertions.assertEquals("us-west-2", results.get(1).get(0));
+    Assertions.assertEquals(1L, ((Number) results.get(1).get(1)).longValue());
+  }
+
+  @Test
+  void testGroupByClusteringColumnWithinSingleGroup()
+  {
+    // Filter to a single group, then GROUP BY the clustering column itself. 
The single-group factory advertises the
+    // clustering column as a one-entry dictionary, so this rides the 
dictionary-id grouping path over the constant
+    // clustering selector: one bucket, the constant value, full count.
+    segmentIndex = standardTwoGroup();
+    final QueryableIndexCursorFactory factory = new 
QueryableIndexCursorFactory(
+        segmentIndex,
+        QueryableIndexTimeBoundaryInspector.create(segmentIndex)
+    );
+    final GroupByQuery query = GroupByQuery.builder()
+                                           .setDataSource("test")
+                                           .setGranularity(Granularities.ALL)
+                                           .setInterval(Intervals.ETERNITY)
+                                           .setDimFilter(new 
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+                                           .addDimension("tenant")
+                                           .setAggregatorSpecs(new 
CountAggregatorFactory("count"))
+                                           .build();
+    final List<ResultRow> results = groupingEngine.process(query, factory, 
null, nonBlockingPool, null).toList();
+    Assertions.assertEquals(1, results.size());
+    Assertions.assertEquals("acme", results.get(0).get(0));
+    Assertions.assertEquals(2L, ((Number) results.get(0).get(1)).longValue());
+  }
+
   private static Druids.TimeseriesQueryBuilder newTimeseries()
   {
     return Druids.newTimeseriesQueryBuilder()
diff --git 
a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
index 6a4cadcc79e..8ede1ac203c 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
@@ -836,7 +836,7 @@ public class UnnestCursorFactoryTest extends 
InitializedNullHandlingTest
         cursor.advance();
         count++;
       }
-      Assert.assertEquals(count, 618);
+      Assert.assertEquals(count, 569);
     }
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
 
b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
index 97a9d25afd2..968457c185e 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
@@ -24,6 +24,7 @@ import com.google.common.hash.Hashing;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
@@ -37,6 +38,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
 import org.apache.druid.segment.BaseProgressIndicator;
 import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV10;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.TestHelper;
@@ -321,6 +324,145 @@ public class SegmentGenerator implements Closeable
     return retVal;
   }
 
+  /**
+   * Generates a V10-format segment, used to benchmark clustered base-table 
segments against an equivalent unclustered
+   * layout. Mirrors {@link #generate}, but always uses {@link IndexMergerV10} 
and threads an optional
+   * {@link ClusteredValueGroupsBaseTableProjectionSpec}:
+   * <ul>
+   *   <li>when {@code clusterSpec} is non-null the segment is written as 
clustered value groups;</li>
+   *   <li>when {@code clusterSpec} is null the segment is a regular V10 
segment, sorted according to
+   *       {@code dimensionsSpec}.</li>
+   * </ul>
+   * Clustered base tables have no rollup and no metrics, so this path always 
builds with rollup off and an empty
+   * aggregator set; {@code dimensionsSpec} is expected to include {@code 
__time} as the explicit (non-clustering) time
+   * position.
+   */
+  public QueryableIndex generateV10(
+      final DataSegment dataSegment,
+      final GeneratorSchemaInfo schemaInfo,
+      final DimensionsSpec dimensionsSpec,
+      final TransformSpec transformSpec,
+      final IndexSpec indexSpec,
+      final Granularity queryGranularity,
+      @Nullable final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec,
+      final int numRows,
+      final ObjectMapper jsonMapper
+  )
+  {
+    BuiltInTypesModule.registerHandlersAndSerde();
+
+    final String dataHash = Hashing.sha256()
+                                   .newHasher()
+                                   .putString(dataSegment.getId().toString(), 
StandardCharsets.UTF_8)
+                                   .putString(schemaInfo.toString(), 
StandardCharsets.UTF_8)
+                                   .putString(dimensionsSpec.toString(), 
StandardCharsets.UTF_8)
+                                   .putString(queryGranularity.toString(), 
StandardCharsets.UTF_8)
+                                   .putString(indexSpec.toString(), 
StandardCharsets.UTF_8)
+                                   .putString(transformSpec.toString(), 
StandardCharsets.UTF_8)
+                                   .putString(String.valueOf(clusterSpec), 
StandardCharsets.UTF_8)
+                                   .putString("v10", StandardCharsets.UTF_8)
+                                   .putInt(numRows)
+                                   .hash()
+                                   .toString();
+
+    final IndexIO indexIO = TestHelper.getTestIndexIO(jsonMapper, 
ColumnConfig.DEFAULT);
+    final File outDir = new File(getSegmentDir(dataSegment.getId(), dataHash), 
"merged");
+
+    if (outDir.exists()) {
+      try {
+        log.info("Found segment with hash[%s] cached in directory[%s].", 
dataHash, outDir);
+        return indexIO.loadIndex(outDir);
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    log.info("Writing segment with hash[%s] to directory[%s].", dataHash, 
outDir);
+
+    final DataGenerator dataGenerator = new DataGenerator(
+        schemaInfo.getColumnSchemas(),
+        dataSegment.getId().hashCode(),
+        schemaInfo.getDataInterval(),
+        numRows
+    );
+
+    final IncrementalIndexSchema indexSchema = new 
IncrementalIndexSchema.Builder()
+        .withDimensionsSpec(dimensionsSpec)
+        .withRollup(false)
+        .withQueryGranularity(queryGranularity)
+        .withClusterSpec(clusterSpec)
+        .build();
+
+    final List<InputRow> rows = new ArrayList<>();
+    final List<QueryableIndex> indexes = new ArrayList<>();
+
+    final Transformer transformer = transformSpec.toTransformer();
+    final InputRowSchema rowSchema = new InputRowSchema(
+        TimestampSpec.DEFAULT,
+        dimensionsSpec,
+        null
+    );
+
+    for (int i = 0; i < numRows; i++) {
+      final Map<String, Object> raw = dataGenerator.nextRaw();
+      final InputRow inputRow = MapInputRowParser.parse(rowSchema, raw);
+      final InputRow transformedRow = transformer.transform(inputRow);
+      rows.add(transformedRow);
+
+      if ((i + 1) % 20000 == 0) {
+        log.info("%,d/%,d rows generated for[%s].", i + 1, numRows, 
dataSegment);
+      }
+
+      if (rows.size() % MAX_ROWS_IN_MEMORY == 0) {
+        indexes.add(makeIndexV10(dataSegment.getId(), dataHash, 
indexes.size(), rows, indexSchema, indexSpec, jsonMapper));
+        rows.clear();
+      }
+    }
+
+    log.info("%,d/%,d rows generated for[%s].", numRows, numRows, dataSegment);
+
+    if (rows.size() > 0) {
+      indexes.add(makeIndexV10(dataSegment.getId(), dataHash, indexes.size(), 
rows, indexSchema, indexSpec, jsonMapper));
+      rows.clear();
+    }
+
+    final QueryableIndex retVal;
+
+    if (indexes.isEmpty()) {
+      throw new ISE("No rows to index?");
+    } else {
+      try {
+        retVal = indexIO.loadIndex(
+            new IndexMergerV10(jsonMapper, indexIO, 
OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                .mergeQueryableIndex(
+                    indexes,
+                    false,
+                    new AggregatorFactory[0],
+                    null,
+                    outDir,
+                    indexSpec,
+                    indexSpec,
+                    new BaseProgressIndicator(),
+                    null,
+                    -1
+                )
+        );
+
+        for (QueryableIndex index : indexes) {
+          index.close();
+        }
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    log.info("Finished writing segment[%s] to[%s]", dataSegment, outDir);
+
+    return retVal;
+  }
+
   public IncrementalIndex generateIncrementalIndex(
       final DataSegment dataSegment,
       final GeneratorSchemaInfo schemaInfo,
@@ -475,6 +617,27 @@ public class SegmentGenerator implements Closeable
         .buildMMappedIndex();
   }
 
+  private QueryableIndex makeIndexV10(
+      final SegmentId identifier,
+      final String dataHash,
+      final int indexNumber,
+      final List<InputRow> rows,
+      final IncrementalIndexSchema indexSchema,
+      final IndexSpec indexSpec,
+      final ObjectMapper jsonMapper
+  )
+  {
+    return IndexBuilder
+        .create(jsonMapper)
+        .useV10()
+        .schema(indexSchema)
+        .indexSpec(indexSpec)
+        .tmpDir(new File(getSegmentDir(identifier, dataHash), 
String.valueOf(indexNumber)))
+        
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+        .rows(rows)
+        .buildMMappedIndex();
+  }
+
   private IncrementalIndex makeIncrementalIndex(
       final SegmentId identifier,
       final String dataHash,
diff --git 
a/processing/src/test/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactoryTest.java
index 03f0b9fc395..e5181c970bb 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactoryTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.projections;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionDictionarySelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.NilColumnValueSelector;
 import org.apache.druid.segment.RowIdSupplier;
@@ -321,6 +322,42 @@ class ClusteringColumnSelectorFactoryTest
     Assertions.assertEquals("region", second.lastDimSelectorName);
   }
 
+  @Test
+  void testClusteringColumnDimensionSelectorForcesValueBasedGrouping()
+  {
+    // Across a concatenating multi-group cursor the clustering column's 
per-group constant id (always 0) is not
+    // stable: id 0 means a different clustering value in each group. The 
selector must therefore NOT advertise
+    // dictionary-encoded grouping, otherwise the group-by engine keys on the 
per-group id and silently collapses
+    // every group into one bucket.
+    ClusteringColumnSelectorFactory f = new ClusteringColumnSelectorFactory(
+        new RecordingDelegate(),
+        SIGNATURE,
+        new Object[]{"acme"}
+    );
+    DimensionSelector sel = 
f.makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+    Assertions.assertEquals(DimensionDictionarySelector.CARDINALITY_UNKNOWN, 
sel.getValueCardinality());
+    Assertions.assertFalse(sel.nameLookupPossibleInAdvance());
+    Assertions.assertNull(sel.idLookup());
+    // Value resolution still works per row against the current group's 
constant.
+    Assertions.assertEquals("acme", sel.lookupName(sel.getRow().get(0)));
+  }
+
+  @Test
+  void testNonClusteringDelegatingDimensionSelectorForcesValueBasedGrouping()
+  {
+    // The non-clustering (delegating) path has the same cross-group 
id-instability and must also force value-based
+    // grouping, even though its delegate here (a constant selector) would 
otherwise report a stable dictionary.
+    ClusteringColumnSelectorFactory f = new ClusteringColumnSelectorFactory(
+        new RecordingDelegate(),
+        SIGNATURE,
+        new Object[]{"acme"}
+    );
+    DimensionSelector sel = 
f.makeDimensionSelector(DefaultDimensionSpec.of("region"));
+    Assertions.assertEquals(DimensionDictionarySelector.CARDINALITY_UNKNOWN, 
sel.getValueCardinality());
+    Assertions.assertFalse(sel.nameLookupPossibleInAdvance());
+    Assertions.assertNull(sel.idLookup());
+  }
+
   @Test
   void testStringDimensionCapabilitiesFlavorIsSingleValue()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactoryTest.java
new file mode 100644
index 00000000000..f7fc6ba1f54
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/projections/SingleGroupClusteringColumnSelectorFactoryTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.RowIdSupplier;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+class SingleGroupClusteringColumnSelectorFactoryTest
+{
+  private static final RowSignature CLUSTERING = 
RowSignature.builder().add("tenant", ColumnType.STRING).build();
+
+  @Test
+  void testNonClusteringColumnPreservesDictionaryEncodedCapabilities()
+  {
+    final ColumnCapabilities dictEncoded =
+        
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+                              .setDictionaryEncoded(true)
+                              .setDictionaryValuesUnique(true)
+                              .setDictionaryValuesSorted(true);
+    final RecordingDelegate delegate = new RecordingDelegate(dictEncoded);
+    final SingleGroupClusteringColumnSelectorFactory f =
+        new SingleGroupClusteringColumnSelectorFactory(delegate, CLUSTERING, 
new Object[]{"acme"});
+
+    final ColumnCapabilities caps = f.getColumnCapabilities("region");
+    Assertions.assertNotNull(caps);
+    Assertions.assertTrue(caps.isDictionaryEncoded().isTrue());
+    Assertions.assertEquals("region", delegate.lastCapabilitiesColumn);
+  }
+
+  @Test
+  void testNonClusteringColumnSelectorsDelegatedDirectly()
+  {
+    final RecordingDelegate delegate = new RecordingDelegate(null);
+    final SingleGroupClusteringColumnSelectorFactory f =
+        new SingleGroupClusteringColumnSelectorFactory(delegate, CLUSTERING, 
new Object[]{"acme"});
+
+    // No wrapper / generation indirection: the delegate is consulted 
immediately on selector creation.
+    final DimensionSelector dim = 
f.makeDimensionSelector(DefaultDimensionSpec.of("region"));
+    Assertions.assertEquals("region", delegate.lastDimSelectorName);
+    Assertions.assertEquals("delegated:region", dim.lookupName(0));
+
+    f.makeColumnValueSelector("metric");
+    Assertions.assertEquals("metric", delegate.lastValueSelectorName);
+  }
+
+  @Test
+  void testClusteringColumnReturnsConstantWithoutHittingDelegate()
+  {
+    final RecordingDelegate delegate = new RecordingDelegate(null);
+    final SingleGroupClusteringColumnSelectorFactory f =
+        new SingleGroupClusteringColumnSelectorFactory(delegate, CLUSTERING, 
new Object[]{"acme"});
+
+    final DimensionSelector dim = 
f.makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+    Assertions.assertEquals("acme", dim.lookupName(dim.getRow().get(0)));
+    Assertions.assertNull(delegate.lastDimSelectorName, "delegate must not be 
hit for clustering columns");
+
+    final ColumnValueSelector val = f.makeColumnValueSelector("tenant");
+    Assertions.assertEquals("acme", val.getObject());
+
+    // A single group's clustering value is a constant 1-entry dictionary, so 
it is reported as dictionary-encoded /
+    // sorted / unique to route grouping through the dictionary-id path.
+    final ColumnCapabilities caps = f.getColumnCapabilities("tenant");
+    Assertions.assertNotNull(caps);
+    Assertions.assertTrue(caps.is(ValueType.STRING));
+    Assertions.assertTrue(caps.isDictionaryEncoded().isTrue());
+    Assertions.assertTrue(caps.areDictionaryValuesSorted().isTrue());
+    Assertions.assertTrue(caps.areDictionaryValuesUnique().isTrue());
+  }
+
+  private static class RecordingDelegate implements ColumnSelectorFactory
+  {
+    @Nullable
+    private final ColumnCapabilities capabilities;
+    String lastDimSelectorName;
+    String lastValueSelectorName;
+    String lastCapabilitiesColumn;
+
+    RecordingDelegate(@Nullable ColumnCapabilities capabilities)
+    {
+      this.capabilities = capabilities;
+    }
+
+    @Override
+    public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+    {
+      lastDimSelectorName = dimensionSpec.getDimension();
+      return DimensionSelector.constant("delegated:" + 
dimensionSpec.getDimension());
+    }
+
+    @Override
+    public ColumnValueSelector makeColumnValueSelector(String columnName)
+    {
+      lastValueSelectorName = columnName;
+      return NilColumnValueSelector.instance();
+    }
+
+    @Nullable
+    @Override
+    public ColumnCapabilities getColumnCapabilities(String column)
+    {
+      lastCapabilitiesColumn = column;
+      return capabilities;
+    }
+
+    @Nullable
+    @Override
+    public RowIdSupplier getRowIdSupplier()
+    {
+      return null;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to