This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e02b9af07d7 feat: support aggregate projections on clustered segments 
(#19599)
e02b9af07d7 is described below

commit e02b9af07d79e8295217cf8f55c5fa168a5d5fac
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Jun 22 12:08:06 2026 -0700

    feat: support aggregate projections on clustered segments (#19599)
    
    changes:
    * allow `AggregateProjectionSpec` on a clustered base table; remove the 
build and merge time guards that rejected it
    * persist projections in `IndexMergerV10.makeClusteredIndexFiles` via the 
shared `makeProjections(...)`
    * fix a bug on a non-clustering dictionary column over the clustered base 
table which conflated values (per-group-local dictionary IDs reused across the 
`ConcatenatingCursor` causing values to inappropriately group together). Force 
value-based grouping by reporting non-clustering columns as 
non-dictionary-encoded, on capabilities and on the selector 
(cardinality/name-lookup)
    * tests: build and query tests for projections for both incremental and 
persisted segments, and also added first E2E coverage for clustered segments 
(`ClusteredSegmentProjectionQueryTest`, native ingestion + 
projection-vs-noProjections queries)
    * remove unused vector selectors
---
 .../query/ClusteredSegmentProjectionQueryTest.java | 294 +++++++++++++++++++++
 .../org/apache/druid/segment/IndexMergerV10.java   |  35 ++-
 .../incremental/OnheapIncrementalIndex.java        |   6 -
 .../ClusteringColumnSelectorFactory.java           |  29 +-
 .../ClusteringVectorColumnSelectorFactory.java     | 265 ++-----------------
 .../druid/segment/IndexMergerV10ClusteredTest.java | 260 ++++++++++++++++++
 .../IncrementalIndexClusteredProjectionTest.java   | 211 +++++++++++++++
 .../ClusteringVectorColumnSelectorFactoryTest.java | 108 +++++---
 8 files changed, 910 insertions(+), 298 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ClusteredSegmentProjectionQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ClusteredSegmentProjectionQueryTest.java
new file mode 100644
index 00000000000..4bc31b452f9
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ClusteredSegmentProjectionQueryTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.SegmentGranularitySpec;
+import org.apache.druid.indexing.common.task.TaskBuilder;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End-to-end coverage for native (index_parallel) ingestion of a CLUSTERED 
base-table segment that also carries an
+ * aggregate projection, and for querying it both through the projection and 
directly against the clustered base table.
+ * <p>
+ * This exercises:
+ * <ul>
+ *   <li>writing a clustered base-table V10 segment via {@code 
index_parallel}</li>
+ *   <li>an aggregate projection built on top of the clustered base table</li>
+ *   <li>the projection being chosen to satisfy a matching aggregation 
(asserted via the {@code projection} dimension
+ *       on the historical's {@code query/segment/time} metric)</li>
+ *   <li>forcing direct base-table access with {@code noProjections=true} and 
proving identical results</li>
+ *   <li>queries that the projection cannot satisfy (group-by / filter on the 
clustering column), which exercise the
+ *       per-cluster-group cursor path on the clustered base table</li>
+ * </ul>
+ */
+class ClusteredSegmentProjectionQueryTest extends EmbeddedClusterTestBase
+{
+  private static final String PROJECTION_NAME = "country_delta";
+  private static final String SEGMENT_TIME_METRIC = "query/segment/time";
+
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedRouter router = new EmbeddedRouter();
+
+  @Override
+  public EmbeddedDruidCluster createCluster()
+  {
+    historical.setServerMemory(300_000_000);
+
+    broker.setServerMemory(200_000_000)
+          .addProperty("druid.sql.planner.enableSysQueriesTable", "true");
+
+    coordinator.addProperty("druid.manager.segments.useIncrementalCache", 
"always");
+
+    overlord.addProperty("druid.manager.segments.useIncrementalCache", 
"always")
+            .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+    indexer.setServerMemory(300_000_000)
+           .addProperty("druid.worker.capacity", "2")
+           .addProperty("druid.processing.numThreads", "2")
+           .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .useDefaultTimeoutForLatchableEmitter(60)
+        // Clustered base-table segments can only be written in the V10 
segment format.
+        .addCommonProperty("druid.indexer.task.buildV10", "true")
+        .addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
+        .addServer(coordinator)
+        .addServer(overlord)
+        .addServer(indexer)
+        .addServer(historical)
+        .addServer(broker)
+        .addServer(router);
+  }
+
+  @BeforeAll
+  void loadData() throws IOException
+  {
+    dataSource = "clustered-" + IdUtils.getRandomId();
+    ingestClusteredSegmentWithProjection();
+  }
+
+  @Override
+  protected void refreshDatasourceName()
+  {
+    // don't change the datasource name for each run because we set things up 
before all tests
+  }
+
+  @Test
+  void testProjectionServesAggregationQuery()
+  {
+    // The aggregation matches the country_delta projection (group by 
countryName, sum delta).
+    final String sql = "SELECT \"countryName\", SUM(\"delta\") FROM \"%s\" 
GROUP BY 1 ORDER BY 1";
+
+    final LatchableEmitter emitter = historical.latchableEmitter();
+    emitter.flush();
+
+    cluster.callApi().verifySqlQuery(sql, dataSource, "CA,3\nFR,7\nUS,17");
+
+    // When the projection is used, the segment-scan query metrics carry the 
projection name as a dimension.
+    emitter.waitForEvent(
+        event -> event.hasMetricName(SEGMENT_TIME_METRIC)
+                      .hasDimension("projection", PROJECTION_NAME)
+    );
+  }
+
+  /**
+   * Forces the clustered base table to serve the {@code GROUP BY countryName, 
SUM(delta)} aggregation directly
+   * (bypassing the projection via {@code noProjections=true}) and asserts the 
results match the projection-served
+   * results exactly.
+   * <p>
+   * Regression test for a clustered read-path bug: {@code countryName} is a 
non-clustering column whose
+   * per-cluster-group dictionaries assign different values the same local ID 
(group {@code #en}: {@code {US, CA}},
+   * group {@code #fr}: {@code {FR, US}}). A dictionary-id-keyed group-by over 
the concatenated per-group cursors (see
+   * {@link org.apache.druid.segment.ConcatenatingCursor}) would conflate 
those, returning {@code FR,10 / US,17}. The
+   * fix makes the concatenating cursor report non-clustering columns as 
non-dictionary-encoded (see
+   * {@link 
org.apache.druid.segment.projections.ClusteringColumnSelectorFactory#getColumnCapabilities}),
 forcing
+   * value-based grouping, so the correct {@code CA,3 / FR,7 / US,17} is 
returned.
+   */
+  @Test
+  void testNoProjectionsServesFromClusteredBaseTable()
+  {
+    final String sql = "SELECT \"countryName\", SUM(\"delta\") FROM \"" + 
dataSource + "\" GROUP BY 1 ORDER BY 1";
+
+    final LatchableEmitter emitter = historical.latchableEmitter();
+    emitter.flush();
+
+    // Force the clustered base table to serve the query directly, bypassing 
the projection.
+    final String result = cluster.callApi().onAnyBroker(
+        b -> b.submitSqlQuery(
+            new ClientSqlQuery(
+                sql,
+                "CSV",
+                false,
+                false,
+                false,
+                Map.of(QueryContexts.NO_PROJECTIONS, true),
+                null
+            )
+        )
+    ).trim();
+
+    // Identical results, proving the clustered base table serves them.
+    Assertions.assertEquals("CA,3\nFR,7\nUS,17", result);
+
+    // Confirm a segment scan actually happened, and that no segment scan 
reported the projection dimension for this run.
+    emitter.waitForEvent(event -> event.hasMetricName(SEGMENT_TIME_METRIC));
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents(SEGMENT_TIME_METRIC)) {
+      Assertions.assertNull(
+          event.getUserDims().get("projection"),
+          "expected no projection dimension when noProjections=true, but 
found: " + event.getUserDims()
+      );
+    }
+  }
+
+  @Test
+  void testGroupByClusteringColumnUsesBaseTable()
+  {
+    // Grouping on the clustering column cannot be satisfied by the 
country_delta projection, so it dispatches to the
+    // clustered base table and exercises the per-cluster-group cursor path.
+    final String sql = "SELECT \"channel\", SUM(\"delta\") FROM \"%s\" GROUP 
BY 1 ORDER BY 1";
+
+    final LatchableEmitter emitter = historical.latchableEmitter();
+    emitter.flush();
+
+    cluster.callApi().verifySqlQuery(sql, dataSource, "#en,18\n#fr,9");
+
+    // The projection cannot serve this query, so no segment scan should 
report the projection dimension.
+    emitter.waitForEvent(event -> event.hasMetricName(SEGMENT_TIME_METRIC));
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents(SEGMENT_TIME_METRIC)) {
+      Assertions.assertNull(
+          event.getUserDims().get("projection"),
+          "expected no projection dimension for a clustering-column group-by, 
but found: " + event.getUserDims()
+      );
+    }
+  }
+
+  @Test
+  void testFilterOnClusteringColumnUsesBaseTable()
+  {
+    // Filtering on the clustering column exercises a single cluster group of 
the clustered base table.
+    cluster.callApi().verifySqlQuery(
+        "SELECT SUM(\"delta\") FROM \"%s\" WHERE \"channel\" = '#en'",
+        dataSource,
+        "18"
+    );
+    cluster.callApi().verifySqlQuery(
+        "SELECT \"countryName\", SUM(\"delta\") FROM \"%s\" WHERE \"channel\" 
= '#en' GROUP BY 1 ORDER BY 1",
+        dataSource,
+        "CA,3\nUS,15"
+    );
+  }
+
+  /**
+   * Ingests a single clustered base-table segment (clustered by {@code 
channel}) that also defines a
+   * {@code country_delta} aggregate projection (group by {@code countryName}, 
sum {@code delta}) using a native
+   * {@code index_parallel} task.
+   */
+  private void ingestClusteredSegmentWithProjection() throws IOException
+  {
+    final File tmpDir = cluster.getTestFolder().newFolder();
+    final File inputFile = new File(tmpDir, "clustered-input.json");
+    final String inputData =
+        
"{\"time\":\"2024-01-01T00:10:00Z\",\"channel\":\"#en\",\"countryName\":\"US\",\"delta\":10}\n"
+        + 
"{\"time\":\"2024-01-01T00:20:00Z\",\"channel\":\"#en\",\"countryName\":\"US\",\"delta\":5}\n"
+        + 
"{\"time\":\"2024-01-01T00:30:00Z\",\"channel\":\"#en\",\"countryName\":\"CA\",\"delta\":3}\n"
+        + 
"{\"time\":\"2024-01-01T00:40:00Z\",\"channel\":\"#fr\",\"countryName\":\"FR\",\"delta\":7}\n"
+        + 
"{\"time\":\"2024-01-01T00:50:00Z\",\"channel\":\"#fr\",\"countryName\":\"US\",\"delta\":2}\n";
+    Files.write(inputFile.toPath(), 
inputData.getBytes(StandardCharsets.UTF_8));
+
+    final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+        ClusteredValueGroupsBaseTableProjectionSpec.builder()
+            .columns(
+                new StringDimensionSchema("channel"),       // clustering 
prefix
+                new StringDimensionSchema("countryName"),
+                new LongDimensionSchema("delta"),
+                new LongDimensionSchema("__time")           // __time present, 
non-clustering
+            )
+            .clusteringColumns("channel")
+            .build();
+
+    final AggregateProjectionSpec projection =
+        AggregateProjectionSpec.builder(PROJECTION_NAME)
+            .groupingColumns(new StringDimensionSchema("countryName"))
+            .aggregators(new LongSumAggregatorFactory("sumDelta", "delta"))
+            .build();
+
+    final SegmentGranularitySpec segmentGranularitySpec = new 
SegmentGranularitySpec(
+        Granularities.HOUR,
+        List.of(Intervals.of("2024-01-01/2024-01-02"))
+    );
+
+    final String taskId = IdUtils.getRandomId();
+    final ParallelIndexSupervisorTask task = TaskBuilder
+        .ofTypeIndexParallel()
+        .jsonInputFormat()
+        .localInputSourceWithFiles(inputFile)
+        .dataSchema(
+            builder -> builder
+                .withDataSource(dataSource)
+                .withTimestamp(new TimestampSpec("time", "iso", null))
+                .withSegmentGranularity(segmentGranularitySpec)
+                .withBaseTable(clusterSpec)
+                .withProjections(List.of(projection))
+        )
+        .tuningConfig(t -> t.withMaxNumConcurrentSubTasks(1))
+        .withId(taskId);
+
+    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
index 86be6566f8f..68eb4acdc5f 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
@@ -137,13 +137,10 @@ public class IndexMergerV10 extends IndexMergerBase
           allClustered,
           "Cannot merge clustered and non-clustered base table segments 
together"
       );
-      DruidException.conditionalDefensive(
-          CollectionUtils.isNullOrEmpty(segmentMetadata.getProjections()),
-          "Clustered base table segments do not yet support aggregate 
projections"
-      );
       return makeClusteredIndexFiles(
           adapters,
           clusterSchemas,
+          segmentMetadata,
           outDir,
           progress,
           mergedMetrics,
@@ -362,6 +359,7 @@ public class IndexMergerV10 extends IndexMergerBase
   private File makeClusteredIndexFiles(
       final List<IndexableAdapter> adapters,
       final List<ClusteredValueGroupsBaseTableSchema> clusterSchemas,
+      final Metadata segmentMetadata,
       final File outDir,
       final ProgressIndicator progress,
       final List<String> mergedMetrics,
@@ -544,9 +542,32 @@ public class IndexMergerV10 extends IndexMergerBase
           mergedGroupSpecs
       );
 
-      v10Smoosher.addProjections(
-          List.of(new ProjectionMetadata(totalRows, mergedSchema, minTime, 
maxTime))
-      );
+      final List<ProjectionMetadata> projections = new ArrayList<>();
+      projections.add(new ProjectionMetadata(totalRows, mergedSchema, minTime, 
maxTime));
+
+      // Aggregate projections are segment-wide pre-aggregated views over the 
clustered base table. Unlike the
+      // non-clustered path, a clustered segment has no segment-wide base 
dimension mergers to attach to (columns are
+      // written per cluster group), so we pass empty parentMergers and the 
projection mergers build their own
+      // dictionaries.
+      if (!CollectionUtils.isNullOrEmpty(segmentMetadata.getProjections())) {
+        final Metadata updatedMetadata = makeProjections(
+            v10Smoosher,
+            segmentMetadata.getProjections(),
+            adapters,
+            indexSpec,
+            segmentWriteOutMedium,
+            progress,
+            outDir,
+            closer,
+            Map.of(),
+            segmentMetadata
+        );
+        for (AggregateProjectionMetadata aggMeta : 
updatedMetadata.getProjections()) {
+          projections.add(new ProjectionMetadata(aggMeta.getNumRows(), 
aggMeta.getSchema()));
+        }
+      }
+
+      v10Smoosher.addProjections(projections);
 
       progress.progress();
       v10Smoosher.close();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index c2d4885d94b..f9d6ff965c3 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -140,12 +140,6 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
     initializeProjections(incrementalIndexSchema);
 
     if (incrementalIndexSchema.getClusterSpec() != null) {
-      if (!projections.isEmpty()) {
-        throw DruidException.defensive(
-            "clustered base table mode does not yet support aggregate 
projections — got [%d] projection(s)",
-            projections.size()
-        );
-      }
       this.clusteredBaseTable = new OnHeapClusteredBaseTable(
           incrementalIndexSchema.getClusterSpec(),
           incrementalIndexSchema.getVirtualColumns(),
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
index b83f7c5201c..6e9ab3126a7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.ConstantExprEvalSelector;
+import org.apache.druid.segment.DimensionDictionarySelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.IdLookup;
 import org.apache.druid.segment.RowIdSupplier;
@@ -150,7 +151,20 @@ public class ClusteringColumnSelectorFactory implements 
ColumnSelectorFactory
   {
     final int idx = clusteringColumns.indexOf(column);
     if (idx < 0) {
-      return delegate.getColumnCapabilities(column);
+      // Non-clustering columns are stored per cluster group, each with its 
own local dictionary. The
+      // ConcatenatingCursor walks those groups behind a single cursor, so a 
column's dictionary IDs are NOT stable
+      // across the whole cursor. We therefore must not advertise dictionary 
encoding here: otherwise the group-by
+      // engine keys on the (per-group-local) IDs and conflates distinct 
values from different groups. Reporting the
+      // column as non-dictionary-encoded forces value-based grouping, which 
is correct across groups.
+      final ColumnCapabilities delegateCapabilities = 
delegate.getColumnCapabilities(column);
+      if (delegateCapabilities == null) {
+        return null;
+      }
+      return ColumnCapabilitiesImpl.copyOf(delegateCapabilities)
+                                   .setDictionaryEncoded(false)
+                                   .setDictionaryValuesSorted(false)
+                                   .setDictionaryValuesUnique(false)
+                                   .setHasBitmapIndexes(false);
     }
     final ColumnType type = clusteringColumns.getColumnType(idx).orElseThrow();
     if (type.is(ValueType.STRING)) {
@@ -477,7 +491,12 @@ public class ClusteringColumnSelectorFactory implements 
ColumnSelectorFactory
     @Override
     public int getValueCardinality()
     {
-      return currentInner().getValueCardinality();
+      // The dictionary is per cluster group and NOT stable across the 
concatenating cursor (the same local id means
+      // different values in different groups). Reporting CARDINALITY_UNKNOWN 
forces query engines onto the
+      // value-based (rather than dictionary-id-keyed) path, which is correct 
across groups; a dictionary-id-keyed
+      // group-by would otherwise conflate distinct values that share an id. 
lookupName() still resolves per-row
+      // against the current group, so value-based grouping reads the right 
value.
+      return DimensionDictionarySelector.CARDINALITY_UNKNOWN;
     }
 
     @Nullable
@@ -490,14 +509,16 @@ public class ClusteringColumnSelectorFactory implements 
ColumnSelectorFactory
     @Override
     public boolean nameLookupPossibleInAdvance()
     {
-      return currentInner().nameLookupPossibleInAdvance();
+      // Per-group dictionaries cannot be enumerated in advance across the 
concatenating cursor.
+      return false;
     }
 
     @Nullable
     @Override
     public IdLookup idLookup()
     {
-      return currentInner().idLookup();
+      // No stable id<->name mapping across groups; callers must resolve by 
value.
+      return null;
     }
 
     @Nullable
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactory.java
index 450bd859815..722e1017e9a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactory.java
@@ -21,13 +21,11 @@ package org.apache.druid.segment.projections;
 
 import org.apache.druid.error.DruidException;
 import org.apache.druid.query.dimension.DimensionSpec;
-import org.apache.druid.segment.IdLookup;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
-import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.vector.ConstantVectorSelectors;
 import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
 import org.apache.druid.segment.vector.ReadableVectorInspector;
@@ -134,24 +132,23 @@ public class ClusteringVectorColumnSelectorFactory 
implements VectorColumnSelect
   @Override
   public SingleValueDimensionVectorSelector 
makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
   {
-    final int idx = clusteringColumns.indexOf(dimensionSpec.getDimension());
-    if (idx < 0) {
-      return new DelegatingSingleValueDimensionVectorSelector(this, 
dimensionSpec);
-    }
-    return new ClusteringSingleValueDimensionVectorSelector(this, idx, 
dimensionSpec);
+    // clusteredValueGroups columns are never dictionary-encoded (see 
getColumnCapabilities), so the vector engine
+    // always uses object/value selectors and never requests a dimension 
vector selector here. A request would mean a
+    // column was wrongly advertised as dictionary-encoded.
+    throw DruidException.defensive(
+        "clusteredValueGroups columns are not dictionary-encoded; no 
single-value dimension vector selector for [%s]",
+        dimensionSpec.getDimension()
+    );
   }
 
   @Override
   public MultiValueDimensionVectorSelector 
makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
   {
-    final int idx = clusteringColumns.indexOf(dimensionSpec.getDimension());
-    if (idx < 0) {
-      return new DelegatingMultiValueDimensionVectorSelector(this, 
dimensionSpec);
-    }
-    // Clustering values are single-typed primitives. Multi-value requests on 
a clustering column shouldn't happen
-    // in practice; throw to surface caller bugs rather than silently 
misbehave.
+    // See makeSingleValueDimensionSelector: clusteredValueGroups columns are 
never dictionary-encoded, so a
+    // multi-value dimension vector selector is never requested either.
     throw DruidException.defensive(
-        "multi-value vector selector not supported for clustering column [" + 
dimensionSpec.getDimension() + "]"
+        "clusteredValueGroups columns are not dictionary-encoded; no 
multi-value dimension vector selector for [%s]",
+        dimensionSpec.getDimension()
     );
   }
 
@@ -181,7 +178,21 @@ public class ClusteringVectorColumnSelectorFactory 
implements VectorColumnSelect
   {
     final int idx = clusteringColumns.indexOf(column);
     if (idx < 0) {
-      return delegate.getColumnCapabilities(column);
+      // Non-clustering columns are stored per cluster group with per-group 
local dictionaries that are NOT stable
+      // across the concatenating vector cursor. We must not advertise 
dictionary encoding: the vectorized group-by
+      // keys on the selector's (per-group-local) IDs when the column reports 
as dictionary-encoded
+      // (GroupByVectorColumnProcessorFactory#useDictionaryEncodedSelector), 
conflating distinct values across groups.
+      // Reporting non-dictionary-encoded routes it to the value-building 
vector selector, which is correct across
+      // groups.
+      final ColumnCapabilities delegateCapabilities = 
delegate.getColumnCapabilities(column);
+      if (delegateCapabilities == null) {
+        return null;
+      }
+      return ColumnCapabilitiesImpl.copyOf(delegateCapabilities)
+                                   .setDictionaryEncoded(false)
+                                   .setDictionaryValuesSorted(false)
+                                   .setDictionaryValuesUnique(false)
+                                   .setHasBitmapIndexes(false);
     }
     final ColumnType type = clusteringColumns.getColumnType(idx).orElseThrow();
     if (type.is(ValueType.STRING)) {
@@ -190,88 +201,6 @@ public class ClusteringVectorColumnSelectorFactory 
implements VectorColumnSelect
     return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(type);
   }
 
-  private static final class ClusteringSingleValueDimensionVectorSelector 
implements SingleValueDimensionVectorSelector
-  {
-    private final ClusteringVectorColumnSelectorFactory parent;
-    private final int idx;
-    private final DimensionSpec spec;
-    private long cachedGeneration = -1;
-    private SingleValueDimensionVectorSelector cachedInner;
-
-    private ClusteringSingleValueDimensionVectorSelector(
-        ClusteringVectorColumnSelectorFactory parent,
-        int idx,
-        DimensionSpec spec
-    )
-    {
-      this.parent = parent;
-      this.idx = idx;
-      this.spec = spec;
-    }
-
-    private SingleValueDimensionVectorSelector currentInner()
-    {
-      final long currentGeneration = parent.getGeneration();
-      if (cachedGeneration == currentGeneration) {
-        return cachedInner;
-      }
-      final Object raw = parent.currentValue(idx);
-      final String stringValue = raw == null ? null : String.valueOf(raw);
-      final String afterExtraction =
-          spec.getExtractionFn() == null ? stringValue : 
spec.getExtractionFn().apply(stringValue);
-      cachedInner = ConstantVectorSelectors.singleValueDimensionVectorSelector(
-          parent.getReadableVectorInspector(),
-          afterExtraction
-      );
-      cachedGeneration = currentGeneration;
-      return cachedInner;
-    }
-
-    @Override
-    public int[] getRowVector()
-    {
-      return currentInner().getRowVector();
-    }
-
-    @Override
-    public int getValueCardinality()
-    {
-      return currentInner().getValueCardinality();
-    }
-
-    @Nullable
-    @Override
-    public String lookupName(int id)
-    {
-      return currentInner().lookupName(id);
-    }
-
-    @Override
-    public boolean nameLookupPossibleInAdvance()
-    {
-      return currentInner().nameLookupPossibleInAdvance();
-    }
-
-    @Nullable
-    @Override
-    public IdLookup idLookup()
-    {
-      return currentInner().idLookup();
-    }
-
-    @Override
-    public int getMaxVectorSize()
-    {
-      return parent.getMaxVectorSize();
-    }
-
-    @Override
-    public int getCurrentVectorSize()
-    {
-      return parent.getReadableVectorInspector().getCurrentVectorSize();
-    }
-  }
-
   private static final class ClusteringVectorValueSelector implements 
VectorValueSelector
   {
     private final ClusteringVectorColumnSelectorFactory parent;
@@ -384,148 +313,6 @@ public class ClusteringVectorColumnSelectorFactory 
implements VectorColumnSelect
     }
   }
 
-  private static final class DelegatingSingleValueDimensionVectorSelector 
implements SingleValueDimensionVectorSelector
-  {
-    private final ClusteringVectorColumnSelectorFactory parent;
-    private final DimensionSpec spec;
-    private long cachedGeneration = -1;
-    private SingleValueDimensionVectorSelector cachedInner;
-
-    private DelegatingSingleValueDimensionVectorSelector(
-        ClusteringVectorColumnSelectorFactory parent,
-        DimensionSpec spec
-    )
-    {
-      this.parent = parent;
-      this.spec = spec;
-    }
-
-    private SingleValueDimensionVectorSelector currentInner()
-    {
-      final long currentGeneration = parent.getGeneration();
-      if (cachedGeneration != currentGeneration) {
-        cachedInner = 
parent.getDelegate().makeSingleValueDimensionSelector(spec);
-        cachedGeneration = currentGeneration;
-      }
-      return cachedInner;
-    }
-
-    @Override
-    public int[] getRowVector()
-    {
-      return currentInner().getRowVector();
-    }
-
-    @Override
-    public int getValueCardinality()
-    {
-      return currentInner().getValueCardinality();
-    }
-
-    @Nullable
-    @Override
-    public String lookupName(int id)
-    {
-      return currentInner().lookupName(id);
-    }
-
-    @Override
-    public boolean nameLookupPossibleInAdvance()
-    {
-      return currentInner().nameLookupPossibleInAdvance();
-    }
-
-    @Nullable
-    @Override
-    public IdLookup idLookup()
-    {
-      return currentInner().idLookup();
-    }
-
-    @Override
-    public int getMaxVectorSize()
-    {
-      return currentInner().getMaxVectorSize();
-    }
-
-    @Override
-    public int getCurrentVectorSize()
-    {
-      return currentInner().getCurrentVectorSize();
-    }
-  }
-
-  private static final class DelegatingMultiValueDimensionVectorSelector 
implements MultiValueDimensionVectorSelector
-  {
-    private final ClusteringVectorColumnSelectorFactory parent;
-    private final DimensionSpec spec;
-    private long cachedGeneration = -1;
-    private MultiValueDimensionVectorSelector cachedInner;
-
-    private DelegatingMultiValueDimensionVectorSelector(
-        ClusteringVectorColumnSelectorFactory parent,
-        DimensionSpec spec
-    )
-    {
-      this.parent = parent;
-      this.spec = spec;
-    }
-
-    private MultiValueDimensionVectorSelector currentInner()
-    {
-      final long currentGeneration = parent.getGeneration();
-      if (cachedGeneration != currentGeneration) {
-        cachedInner = 
parent.getDelegate().makeMultiValueDimensionSelector(spec);
-        cachedGeneration = currentGeneration;
-      }
-      return cachedInner;
-    }
-
-    @Override
-    public IndexedInts[] getRowVector()
-    {
-      return currentInner().getRowVector();
-    }
-
-    @Override
-    public int getValueCardinality()
-    {
-      return currentInner().getValueCardinality();
-    }
-
-    @Nullable
-    @Override
-    public String lookupName(int id)
-    {
-      return currentInner().lookupName(id);
-    }
-
-    @Override
-    public boolean nameLookupPossibleInAdvance()
-    {
-      return currentInner().nameLookupPossibleInAdvance();
-    }
-
-    @Nullable
-    @Override
-    public IdLookup idLookup()
-    {
-      return currentInner().idLookup();
-    }
-
-    @Override
-    public int getMaxVectorSize()
-    {
-      return currentInner().getMaxVectorSize();
-    }
-
-    @Override
-    public int getCurrentVectorSize()
-    {
-      return currentInner().getCurrentVectorSize();
-    }
-  }
-
   private static final class DelegatingVectorValueSelector implements 
VectorValueSelector
   {
     private final ClusteringVectorColumnSelectorFactory parent;
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV10ClusteredTest.java
 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV10ClusteredTest.java
index c0d4fc20e0f..3782b61a60f 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV10ClusteredTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV10ClusteredTest.java
@@ -21,16 +21,21 @@ package org.apache.druid.segment;
 
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
 import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec.Direction;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import 
org.apache.druid.segment.projections.ClusteredValueGroupsBaseTableSchema;
 import org.apache.druid.segment.projections.TableClusterGroupSpec;
@@ -46,6 +51,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -133,6 +139,260 @@ class IndexMergerV10ClusteredTest extends 
InitializedNullHandlingTest
     return out;
   }
 
+  /**
+   * Cluster spec with a numeric column {@code x} (in addition to {@code 
tenant}/{@code region}) so an aggregate
+   * projection can sum it. Clustering is still on {@code tenant}.
+   */
+  private static ClusteredValueGroupsBaseTableProjectionSpec 
tenantClusterSpecWithX()
+  {
+    return ClusteredValueGroupsBaseTableProjectionSpec.builder()
+        .columns(
+            new StringDimensionSchema("tenant"),
+            new StringDimensionSchema("region"),
+            new LongDimensionSchema("x"),
+            new LongDimensionSchema("__time")
+        )
+        .clusteringColumns("tenant")
+        .build();
+  }
+
+  /**
+   * The aggregate projection persisted alongside the clustered base table: 
group-by {@code tenant} with
+   * {@code cnt}=count and {@code sum_x}=sum(x).
+   */
+  private static AggregateProjectionSpec tenantProjectionSpec()
+  {
+    return AggregateProjectionSpec.builder("proj")
+                                  .groupingColumns(new 
StringDimensionSchema("tenant"))
+                                  .aggregators(
+                                      new CountAggregatorFactory("cnt"),
+                                      new LongSumAggregatorFactory("sum_x", 
"x")
+                                  )
+                                  .build();
+  }
+
+  private static IncrementalIndexSchema clusteredSchemaWithProjection(
+      ClusteredValueGroupsBaseTableProjectionSpec spec,
+      AggregateProjectionSpec projectionSpec
+  )
+  {
+    return IncrementalIndexSchema.builder()
+                                 .withMinTimestamp(T0)
+                                 .withTimestampSpec(TIMESTAMP_SPEC)
+                                 .withQueryGranularity(Granularities.NONE)
+                                 .withDimensionsSpec(spec.getDimensionsSpec())
+                                 .withRollup(false)
+                                 .withClusterSpec(spec)
+                                 .withProjections(List.of(projectionSpec))
+                                 .build();
+  }
+
+  private static InputRow row(long ts, String tenant, String region, long x)
+  {
+    final Map<String, Object> event = new HashMap<>();
+    event.put("ts", ts);
+    event.put("tenant", tenant);
+    event.put("region", region);
+    event.put("x", x);
+    return new MapBasedInputRow(ts, List.of("tenant", "region", "x"), event);
+  }
+
+  private QueryableIndex buildSegmentWithProjection(String dirName, 
List<InputRow> rows)
+  {
+    return IndexBuilder.create()
+                       .useV10()
+                       .tmpDir(new File(tempDir, dirName))
+                       
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                       
.schema(clusteredSchemaWithProjection(tenantClusterSpecWithX(), 
tenantProjectionSpec()))
+                       .rows(rows)
+                       .buildMMappedIndex();
+  }
+
+  /**
+   * A group-by on the projection's grouping column + aggregators, granularity 
ALL over ETERNITY, built through the
+   * real {@link GroupingEngine#makeCursorBuildSpec} path so the spec matches 
what production planning produces.
+   */
+  private static CursorBuildSpec projectionMatchingBuildSpec()
+  {
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ETERNITY)
+                    .addDimension("tenant")
+                    .addAggregator(new CountAggregatorFactory("cnt"))
+                    .addAggregator(new LongSumAggregatorFactory("sum_x", "x"))
+                    .addOrderByColumn("tenant", Direction.ASCENDING)
+                    .build();
+    return GroupingEngine.makeCursorBuildSpec(query, null);
+  }
+
+  /**
+   * Run the projection-matching group-by against a loaded/merged clustered 
{@link QueryableIndex} via the real
+   * {@link QueryableIndexCursorFactory}, asserting the projection was 
actually selected (the holder is
+   * pre-aggregated), and return tenant -> {cnt, sum_x}.
+   */
+  private static Map<String, long[]> queryProjection(QueryableIndex index)
+  {
+    final CursorBuildSpec buildSpec = projectionMatchingBuildSpec();
+
+    // The loader must surface the persisted aggregate projection on the 
clustered segment, and it must be selected for
+    // this spec. If this is null the IndexIO loader isn't exposing the 
projection for clustered segments.
+    Assertions.assertNotNull(
+        index.getProjection(buildSpec),
+        "aggregate projection 'proj' was not selected for the group-by build 
spec on the loaded clustered segment"
+    );
+
+    final QueryableIndexCursorFactory factory = new 
QueryableIndexCursorFactory(
+        index,
+        QueryableIndexTimeBoundaryInspector.create(index)
+    );
+    final Map<String, long[]> byTenant = new LinkedHashMap<>();
+    try (CursorHolder holder = factory.makeCursorHolder(buildSpec)) {
+      // The chosen holder must be the aggregate-projection cursor serving 
pre-aggregated rows.
+      Assertions.assertTrue(
+          holder.isPreAggregated(),
+          "expected the projection (pre-aggregated) cursor holder on the 
loaded clustered segment"
+      );
+      final Cursor cursor = holder.asCursor();
+      final DimensionSelector tenantSel =
+          
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+      final ColumnValueSelector<?> cntSel = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("cnt");
+      final ColumnValueSelector<?> sumSel = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("sum_x");
+      while (!cursor.isDone()) {
+        final String tenant = tenantSel.lookupName(tenantSel.getRow().get(0));
+        byTenant.put(tenant, new long[]{cntSel.getLong(), sumSel.getLong()});
+        cursor.advance();
+      }
+    }
+    return byTenant;
+  }
+
+  @Test
+  void testPersistAndLoadClusteredSegmentWithProjection()
+  {
+    // Ingest out of clustering order; two acme rows (x=10, x=5) and one 
globex row (x=7).
+    final QueryableIndex index = buildSegmentWithProjection(
+        "persist-projection",
+        List.of(
+            row(T0 + 2, "globex", "eu-west-1", 7),
+            row(T0, "acme", "us-east-1", 10),
+            row(T0 + 1, "acme", "us-west-2", 5)
+        )
+    );
+
+    // Base clustered table is still written: cluster groups present in 
clustering order.
+    final ClusteredValueGroupsBaseTableSchema summary = 
index.getClusteredBaseSummary();
+    Assertions.assertNotNull(summary, "clustered base summary must still be 
present alongside the projection");
+    Assertions.assertEquals(
+        List.of("acme", "globex"),
+        summary.getClusteringDictionaries().getStringDictionary()
+    );
+    final List<TableClusterGroupSpec> groups = summary.getClusterGroups();
+    Assertions.assertEquals(2, groups.size());
+    Assertions.assertEquals(List.of(0), groups.get(0).getClusteringValueIds());
+    Assertions.assertEquals(2, groups.get(0).getNumRows());
+    Assertions.assertEquals(List.of(1), groups.get(1).getClusteringValueIds());
+    Assertions.assertEquals(1, groups.get(1).getNumRows());
+
+    // The persisted projection must be surfaced by the loader, selected for 
the projection query, and aggregate
+    // correctly: acme -> cnt=2,sum_x=15 ; globex -> cnt=1,sum_x=7.
+    final Map<String, long[]> projected = queryProjection(index);
+    Assertions.assertEquals(2, projected.size(), "expected one pre-aggregated 
row per tenant");
+    Assertions.assertArrayEquals(new long[]{2L, 15L}, projected.get("acme"));
+    Assertions.assertArrayEquals(new long[]{1L, 7L}, projected.get("globex"));
+
+    // Base cluster-group querying remains correct alongside the projection 
(FULL_SCAN walks groups in clustering
+    // order, constants injected).
+    Assertions.assertEquals(
+        List.of(
+            List.of("acme", "us-east-1"),
+            List.of("acme", "us-west-2"),
+            List.of("globex", "eu-west-1")
+        ),
+        scanTenantRegion(index)
+    );
+  }
+
+  @Test
+  void testMergeClusteredSegmentsWithProjectionCombinesAggregates() throws 
Exception
+  {
+    // Segment 1: acme (x=10, x=5), globex (x=7). Segment 2: globex (x=3), 
initech (x=4). The merged projection must
+    // combine the globex aggregates across both segments.
+    final QueryableIndex segment1 = buildSegmentWithProjection(
+        "merge-projection-input-1",
+        List.of(
+            row(T0, "acme", "us-east-1", 10),
+            row(T0 + 1, "acme", "us-west-2", 5),
+            row(T0 + 2, "globex", "eu-west-1", 7)
+        )
+    );
+    final QueryableIndex segment2 = buildSegmentWithProjection(
+        "merge-projection-input-2",
+        List.of(
+            row(T0 + 3, "globex", "eu-central-1", 3),
+            row(T0 + 4, "initech", "us-east-1", 4)
+        )
+    );
+
+    final IndexBuilder builderForMerger = IndexBuilder.create().useV10();
+    final IndexIO indexIO = builderForMerger.getIndexIO();
+    final IndexMergerV10 merger = new IndexMergerV10(
+        TestHelper.makeJsonMapper(),
+        indexIO,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance()
+    );
+
+    final File mergedDir = new File(tempDir, "merged-projection");
+    merger.mergeQueryableIndex(
+        List.of(segment1, segment2),
+        false,
+        new AggregatorFactory[0],
+        mergedDir,
+        IndexSpec.getDefault(),
+        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+        -1
+    );
+    final QueryableIndex mergedIndex = indexIO.loadIndex(mergedDir);
+
+    // Base table still merges correctly: dictionary [acme, globex, initech] 
and 3 groups.
+    final ClusteredValueGroupsBaseTableSchema summary = 
mergedIndex.getClusteredBaseSummary();
+    Assertions.assertNotNull(summary);
+    Assertions.assertEquals(
+        List.of("acme", "globex", "initech"),
+        summary.getClusteringDictionaries().getStringDictionary()
+    );
+    final List<TableClusterGroupSpec> groups = summary.getClusterGroups();
+    Assertions.assertEquals(3, groups.size());
+    Assertions.assertEquals(List.of(0), groups.get(0).getClusteringValueIds());
+    Assertions.assertEquals(2, groups.get(0).getNumRows());   // acme: both 
rows from segment 1
+    Assertions.assertEquals(List.of(1), groups.get(1).getClusteringValueIds());
+    Assertions.assertEquals(2, groups.get(1).getNumRows());   // globex: one 
row from each segment
+    Assertions.assertEquals(List.of(2), groups.get(2).getClusteringValueIds());
+    Assertions.assertEquals(1, groups.get(2).getNumRows());   // initech: from 
segment 2
+    Assertions.assertEquals(5, mergedIndex.getNumRows());
+
+    // The merged projection must combine aggregates across segments: acme -> 
cnt=2,sum_x=15 ;
+    // globex -> cnt=2,sum_x=10 (7 from seg1 + 3 from seg2) ; initech -> 
cnt=1,sum_x=4.
+    final Map<String, long[]> projected = queryProjection(mergedIndex);
+    Assertions.assertEquals(3, projected.size(), "expected one pre-aggregated 
row per tenant after merge");
+    Assertions.assertArrayEquals(new long[]{2L, 15L}, projected.get("acme"));
+    Assertions.assertArrayEquals(new long[]{2L, 10L}, projected.get("globex"));
+    Assertions.assertArrayEquals(new long[]{1L, 4L}, projected.get("initech"));
+
+    // Base cluster-group querying remains correct: all 5 base rows in 
clustering order (region-sorted within group).
+    Assertions.assertEquals(
+        List.of(
+            List.of("acme", "us-east-1"),
+            List.of("acme", "us-west-2"),
+            List.of("globex", "eu-central-1"),
+            List.of("globex", "eu-west-1"),
+            List.of("initech", "us-east-1")
+        ),
+        scanTenantRegion(mergedIndex)
+    );
+  }
+
   @Test
   void testPersistAndLoadClusteredSegment()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexClusteredProjectionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexClusteredProjectionTest.java
new file mode 100644
index 00000000000..78fa0107d62
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexClusteredProjectionTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec.Direction;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.CursorBuildSpec;
+import org.apache.druid.segment.CursorHolder;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class IncrementalIndexClusteredProjectionTest extends 
InitializedNullHandlingTest
+{
+  private static final long T0 = 
DateTimes.of("2026-01-01T00:00:00").getMillis();
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", 
"millis", null);
+
+  private static MapBasedInputRow row(long ts, String tenant, String region, 
long x)
+  {
+    final Map<String, Object> event = new HashMap<>();
+    event.put("ts", ts);
+    event.put("tenant", tenant);
+    event.put("region", region);
+    event.put("x", x);
+    return new MapBasedInputRow(ts, List.of("tenant", "region", "x"), event);
+  }
+
+  private static OnheapIncrementalIndex clusteredWithProjection()
+  {
+    final ClusteredValueGroupsBaseTableProjectionSpec spec = 
ClusteredValueGroupsBaseTableProjectionSpec.builder()
+        .columns(
+            new StringDimensionSchema("tenant"),
+            new StringDimensionSchema("region"),
+            new LongDimensionSchema("x"),
+            new LongDimensionSchema("__time")
+        )
+        .clusteringColumns("tenant")
+        .build();
+    final AggregateProjectionSpec projectionSpec =
+        AggregateProjectionSpec.builder("proj")
+                               .groupingColumns(new 
StringDimensionSchema("tenant"))
+                               .aggregators(
+                                   new CountAggregatorFactory("cnt"),
+                                   new LongSumAggregatorFactory("sum_x", "x")
+                               )
+                               .build();
+    final IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
+        .withMinTimestamp(T0)
+        .withTimestampSpec(TIMESTAMP_SPEC)
+        .withQueryGranularity(Granularities.NONE)
+        .withDimensionsSpec(spec.getDimensionsSpec())
+        .withRollup(false)
+        .withClusterSpec(spec)
+        .withProjections(List.of(projectionSpec))
+        .build();
+    final OnheapIncrementalIndex index = (OnheapIncrementalIndex) new 
OnheapIncrementalIndex.Builder()
+        .setIndexSchema(schema)
+        .setMaxRowCount(10_000)
+        .build();
+    // Add tenants out of clustering order to prove the plain scan still walks 
groups in clustering-sorted order.
+    index.add(row(T0 + 2, "globex", "eu-west-1", 7));
+    index.add(row(T0, "acme", "us-east-1", 10));
+    index.add(row(T0 + 1, "acme", "us-west-2", 5));
+    return index;
+  }
+
+  /**
+   * A group-by on the projection's grouping column + aggregators, granularity 
ALL over ETERNITY. Built through the
+   * real {@link GroupingEngine#makeCursorBuildSpec} path so the resulting 
spec is what production planning produces.
+   */
+  private static CursorBuildSpec projectionMatchingBuildSpec()
+  {
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ETERNITY)
+                    .addDimension("tenant")
+                    .addAggregator(new CountAggregatorFactory("cnt"))
+                    .addAggregator(new LongSumAggregatorFactory("sum_x", "x"))
+                    .addOrderByColumn("tenant", Direction.ASCENDING)
+                    .build();
+    return GroupingEngine.makeCursorBuildSpec(query, null);
+  }
+
+  @Test
+  void testProjectionMaterializedOnClusteredIndex()
+  {
+    try (OnheapIncrementalIndex index = clusteredWithProjection()) {
+      // Construction did not throw (the OnheapIncrementalIndex guard against 
clusterSpec + projections was removed)
+      // and the projection accumulated rows during ingest.
+      Assertions.assertNotNull(index.getProjection("proj"));
+    }
+  }
+
+  @Test
+  void testProjectionSelectedAndAggregatesCorrectlyOnClusteredIndex()
+  {
+    try (OnheapIncrementalIndex index = clusteredWithProjection()) {
+      final CursorBuildSpec buildSpec = projectionMatchingBuildSpec();
+
+      // The projection must actually be selected for this spec on a clustered 
index -- not merely present. If this is
+      // null, guard removal alone is insufficient and the query path needs 
more work.
+      Assertions.assertNotNull(
+          index.getProjection(buildSpec),
+          "projection 'proj' was not selected for the group-by build spec on 
the clustered index"
+      );
+
+      final IncrementalIndexCursorFactory factory = new 
IncrementalIndexCursorFactory(index);
+      try (CursorHolder holder = factory.makeCursorHolder(buildSpec)) {
+        // The chosen holder is the aggregate-projection cursor: it serves 
pre-aggregated rows.
+        Assertions.assertTrue(holder.isPreAggregated(), "expected the 
projection (pre-aggregated) cursor holder");
+
+        final Cursor cursor = holder.asCursor();
+        final DimensionSelector tenantSel =
+            
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+        final ColumnValueSelector<?> cntSel = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("cnt");
+        final ColumnValueSelector<?> sumSel = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("sum_x");
+
+        final Map<String, long[]> byTenant = new LinkedHashMap<>();
+        while (!cursor.isDone()) {
+          final String tenant = 
tenantSel.lookupName(tenantSel.getRow().get(0));
+          byTenant.put(tenant, new long[]{cntSel.getLong(), sumSel.getLong()});
+          cursor.advance();
+        }
+
+        // acme: 2 rows (x=10, x=5) -> cnt=2, sum_x=15 ; globex: 1 row (x=7) 
-> cnt=1, sum_x=7
+        Assertions.assertEquals(2, byTenant.size(), "expected one 
pre-aggregated row per tenant");
+        Assertions.assertArrayEquals(new long[]{2L, 15L}, 
byTenant.get("acme"));
+        Assertions.assertArrayEquals(new long[]{1L, 7L}, 
byTenant.get("globex"));
+      }
+    }
+  }
+
+  @Test
+  void testBaseClusteredScanStillWalksGroupsInClusteringOrder()
+  {
+    try (OnheapIncrementalIndex index = clusteredWithProjection()) {
+      final IncrementalIndexCursorFactory factory = new 
IncrementalIndexCursorFactory(index);
+      // FULL_SCAN does not match the projection, so it falls through to the 
clustered base-table path and walks the
+      // cluster groups in clustering-ascending order (acme rows before 
globex), exactly as the non-projection
+      // clustered index does.
+      try (CursorHolder holder = 
factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+        Assertions.assertFalse(
+            holder.isPreAggregated(),
+            "FULL_SCAN must not pick the projection; it should use the 
clustered base-table cursor"
+        );
+        final Cursor cursor = holder.asCursor();
+        final DimensionSelector tenantSel =
+            
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+        final DimensionSelector regionSel =
+            
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("region"));
+        final List<List<String>> out = new ArrayList<>();
+        while (!cursor.isDone()) {
+          final String tenant = tenantSel.getRow().size() == 0 ? null : 
tenantSel.lookupName(tenantSel.getRow().get(0));
+          final String region = regionSel.getRow().size() == 0 ? null : 
regionSel.lookupName(regionSel.getRow().get(0));
+          out.add(Arrays.asList(tenant, region));
+          cursor.advance();
+        }
+        Assertions.assertEquals(
+            List.of(
+                List.of("acme", "us-east-1"),
+                List.of("acme", "us-west-2"),
+                List.of("globex", "eu-west-1")
+            ),
+            out
+        );
+      }
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactoryTest.java
index 9a76566b3e1..6e1dfbf8bcb 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/projections/ClusteringVectorColumnSelectorFactoryTest.java
@@ -42,7 +42,7 @@ class ClusteringVectorColumnSelectorFactoryTest
   private static final RowSignature CLUSTER_SIGNATURE = 
RowSignature.builder().add("tenant", ColumnType.STRING).build();
 
   @Test
-  void testStringClusteringSingleValueDimensionSelector()
+  void testSingleValueDimensionSelectorRejected()
   {
     StubDelegate delegate = new StubDelegate(inspectorFor(8));
     ClusteringVectorColumnSelectorFactory f = new 
ClusteringVectorColumnSelectorFactory(
@@ -51,10 +51,54 @@ class ClusteringVectorColumnSelectorFactoryTest
         new Object[]{"acme"}
     );
 
-    SingleValueDimensionVectorSelector sel = 
f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("tenant"));
-    Assertions.assertNull(delegate.lastSingleValDimRequest, "delegate must not 
be hit for clustering column");
-    Assertions.assertEquals("acme", sel.lookupName(0));
-    Assertions.assertEquals(1, sel.getValueCardinality());
+    // clusteredValueGroups columns are never dictionary-encoded, so a 
single-value dimension vector selector is never
+    // requested. Both a clustering column and a non-clustering column must 
throw.
+    Assertions.assertThrows(
+        DruidException.class,
+        () -> 
f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("tenant"))
+    );
+    Assertions.assertThrows(
+        DruidException.class,
+        () -> 
f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("region"))
+    );
+    Assertions.assertNull(delegate.lastSingleValDimRequest, "delegate must not 
be hit");
+  }
+
+  @Test
+  void testStringClusteringObjectSelector()
+  {
+    // STRING clustering columns are read through the object selector (they 
report non-dictionary-encoded). Verify the
+    // object selector returns the per-group clustering value without 
consulting the delegate.
+    StubDelegate delegate = new StubDelegate(inspectorFor(8));
+    ClusteringVectorColumnSelectorFactory f = new 
ClusteringVectorColumnSelectorFactory(
+        delegate,
+        CLUSTER_SIGNATURE,
+        new Object[]{"acme"}
+    );
+
+    VectorObjectSelector sel = f.makeObjectSelector("tenant");
+    Assertions.assertNull(delegate.lastObjectRequest, "delegate must not be 
hit for clustering column");
+    Object[] vec = sel.getObjectVector();
+    for (Object v : vec) {
+      Assertions.assertEquals("acme", v);
+    }
+  }
+
+  @Test
+  void testNullStringClusteringObjectSelectorIsNull()
+  {
+    StubDelegate delegate = new StubDelegate(inspectorFor(4));
+    ClusteringVectorColumnSelectorFactory f = new 
ClusteringVectorColumnSelectorFactory(
+        delegate,
+        CLUSTER_SIGNATURE,
+        new Object[]{null}
+    );
+
+    VectorObjectSelector sel = f.makeObjectSelector("tenant");
+    Object[] vec = sel.getObjectVector();
+    for (Object v : vec) {
+      Assertions.assertNull(v);
+    }
   }
 
   @Test
@@ -109,21 +153,6 @@ class ClusteringVectorColumnSelectorFactoryTest
     }
   }
 
-  @Test
-  void testNullStringClusteringValueDimensionSelectorIsNil()
-  {
-    StubDelegate delegate = new StubDelegate(inspectorFor(4));
-    ClusteringVectorColumnSelectorFactory f = new 
ClusteringVectorColumnSelectorFactory(
-        delegate,
-        CLUSTER_SIGNATURE,
-        new Object[]{null}
-    );
-
-    SingleValueDimensionVectorSelector sel = 
f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("tenant"));
-    // Nil vector selector returns null on lookupName(0) regardless of id.
-    Assertions.assertNull(sel.lookupName(0));
-  }
-
   @Test
   void testNonClusteringColumnDelegated()
   {
@@ -137,17 +166,16 @@ class ClusteringVectorColumnSelectorFactoryTest
     // Non-clustering selectors are wrapped in lazy delegating wrappers; the 
delegate is only consulted on first
     // use, so a multi-group ConcatenatingVectorCursor can swap delegates 
between groups without recreating the
     // selector instance.
-    SingleValueDimensionVectorSelector svdSel =
-        f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("region"));
-    Assertions.assertNull(delegate.lastSingleValDimRequest, "delegate must not 
be hit until selector is used");
+    VectorObjectSelector regionObjSel = f.makeObjectSelector("region");
+    Assertions.assertNull(delegate.lastObjectRequest, "delegate must not be 
hit until selector is used");
     try {
-      svdSel.getRowVector();
+      regionObjSel.getObjectVector();
     }
     catch (NullPointerException expected) {
       // StubDelegate returns null for the inner selector; the wrapper 
forwards to it. We just want to confirm
-      // the delegate's makeSingleValueDimensionSelector was invoked.
+      // the delegate's makeObjectSelector was invoked.
     }
-    Assertions.assertEquals("region", delegate.lastSingleValDimRequest);
+    Assertions.assertEquals("region", delegate.lastObjectRequest);
 
     VectorValueSelector vvSel = f.makeValueSelector("metric");
     Assertions.assertNull(delegate.lastValueRequest);
@@ -159,15 +187,6 @@ class ClusteringVectorColumnSelectorFactoryTest
     }
     Assertions.assertEquals("metric", delegate.lastValueRequest);
 
-    VectorObjectSelector voSel = f.makeObjectSelector("region");
-    try {
-      voSel.getObjectVector();
-    }
-    catch (NullPointerException expected) {
-      // same
-    }
-    Assertions.assertEquals("region", delegate.lastObjectRequest);
-
     // getColumnCapabilities is NOT lazy; it returns the result directly.
     f.getColumnCapabilities("metric");
     Assertions.assertEquals("metric", delegate.lastCapsRequest);
@@ -209,26 +228,25 @@ class ClusteringVectorColumnSelectorFactoryTest
         new Object[]{"acme"}
     );
 
-    SingleValueDimensionVectorSelector sel =
-        f.makeSingleValueDimensionSelector(DefaultDimensionSpec.of("region"));
+    VectorObjectSelector sel = f.makeObjectSelector("region");
     try {
-      sel.getRowVector();   // warms the cache against the first delegate
+      sel.getObjectVector();   // warms the cache against the first delegate
     }
     catch (NullPointerException expected) {
       // expected; just confirming the route
     }
-    Assertions.assertEquals("region", first.lastSingleValDimRequest);
+    Assertions.assertEquals("region", first.lastObjectRequest);
 
     StubDelegate second = new StubDelegate(inspectorFor(4));
     f.setDelegate(second, new Object[]{"globex"});
 
     try {
-      sel.getRowVector();   // generation bumped → re-fetches against second 
delegate
+      sel.getObjectVector();   // generation bumped → re-fetches against 
second delegate
     }
     catch (NullPointerException expected) {
       // expected
     }
-    Assertions.assertEquals("region", second.lastSingleValDimRequest);
+    Assertions.assertEquals("region", second.lastObjectRequest);
   }
 
   @Test
@@ -254,17 +272,23 @@ class ClusteringVectorColumnSelectorFactoryTest
   }
 
   @Test
-  void testMultiValueDimensionSelectorOnClusteringRejected()
+  void testMultiValueDimensionSelectorRejected()
   {
     ClusteringVectorColumnSelectorFactory f = new 
ClusteringVectorColumnSelectorFactory(
         new StubDelegate(inspectorFor(4)),
         CLUSTER_SIGNATURE,
         new Object[]{"acme"}
     );
+    // clusteredValueGroups columns are never dictionary-encoded, so a 
multi-value dimension vector selector is never
+    // requested. Both a clustering column and a non-clustering column must 
throw.
     Assertions.assertThrows(
         DruidException.class,
         () -> 
f.makeMultiValueDimensionSelector(DefaultDimensionSpec.of("tenant"))
     );
+    Assertions.assertThrows(
+        DruidException.class,
+        () -> 
f.makeMultiValueDimensionSelector(DefaultDimensionSpec.of("region"))
+    );
   }
 
   private static ReadableVectorInspector inspectorFor(int size)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to