This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cdedcddc52 feat: partial load for clustered segments (#19620)
9cdedcddc52 is described below

commit 9cdedcddc52f6de1c70a8c52568b4f051743d557
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jun 23 17:05:12 2026 -0700

    feat: partial load for clustered segments (#19620)
---
 .../org/apache/druid/segment/IndexMergerV10.java   |   5 +-
 .../druid/segment/PartialQueryableIndex.java       | 202 +++++++--
 .../PartialQueryableIndexCursorFactory.java        | 115 ++++-
 .../druid/segment/QueryableIndexCursorFactory.java |  14 +-
 .../druid/segment/projections/Projections.java     |  11 +-
 ...eIndexCursorFactoryClusteredProjectionTest.java | 227 ++++++++++
 ...alQueryableIndexCursorFactoryClusteredTest.java | 466 +++++++++++++++++++++
 .../PartialQueryableIndexCursorFactoryTest.java    |  58 +--
 ...PartialQueryableIndexCursorFactoryTestBase.java | 109 +++++
 .../loading/PartialSegmentMetadataCacheEntry.java  |  31 +-
 .../PartialSegmentMetadataCacheEntryTest.java      |  94 ++++-
 ...SegmentLocalCacheManagerPartialAcquireTest.java | 131 +++++-
 12 files changed, 1331 insertions(+), 132 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
index 68eb4acdc5f..d20f037496e 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
@@ -418,9 +418,8 @@ public class IndexMergerV10 extends IndexMergerBase
 
       for (Map.Entry<List<Integer>, List<AdapterAndGroup>> groupEntry : 
merged.groupSources.entrySet()) {
         final List<Integer> mergedTuple = groupEntry.getKey();
-        final String groupPrefix = 
Projections.getClusterGroupSegmentInternalFilePrefix(mergedTuple);
-        // file-bundle name is the prefix without the trailing slash
-        final String groupName = groupPrefix.substring(0, groupPrefix.length() 
- 1);
+        final String groupName = 
Projections.getClusterGroupBundleName(mergedTuple);
+        final String groupPrefix = groupName + "/";
 
         final List<IndexableAdapter> groupAdapters = 
Lists.newArrayListWithCapacity(groupEntry.getValue().size());
         for (AdapterAndGroup source : groupEntry.getValue()) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java 
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
index cfa7735300d..cfe5f6f4d68 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
@@ -34,6 +34,8 @@ import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ColumnDescriptor;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.ListIndexed;
 import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
@@ -46,6 +48,7 @@ import 
org.apache.druid.segment.projections.ConstantTimeColumn;
 import org.apache.druid.segment.projections.ProjectionMetadata;
 import org.apache.druid.segment.projections.Projections;
 import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.projections.TableClusterGroupSpec;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -58,6 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -103,6 +107,16 @@ public class PartialQueryableIndex implements 
QueryableIndex
   private final ConcurrentHashMap<String, Map<String, 
Supplier<BaseColumnHolder>>> projectionColumnsByName =
       new ConcurrentHashMap<>();
 
+  // clustered base summary when this segment is a clustered base table, else 
null. when non-null, the base table has
+  // no top-level columns
+  @Nullable
+  private final ClusteredValueGroupsBaseTableSchema clusteredBaseSummary;
+
+  // per-cluster-group column suppliers, keyed by group index (into the 
summary's group list). built on demand like
+  // projectionColumnsByName; each supplier defers both mapFile() and 
deserialization until the column is read.
+  private final ConcurrentHashMap<Integer, Map<String, 
Supplier<BaseColumnHolder>>> clusterGroupColumnsByIndex =
+      new ConcurrentHashMap<>();
+
   // lazy dimension handlers
   private final Supplier<Map<String, DimensionHandler>> dimensionHandlers;
 
@@ -125,14 +139,14 @@ public class PartialQueryableIndex implements 
QueryableIndex
         baseProjection.getSchema().getName()
     );
     final BaseTableProjectionSchema baseSchema = (BaseTableProjectionSchema) 
baseProjection.getSchema();
-    // Clustered V10 segments keep their data in per-cluster-group bundles, 
matched from metadata much like
-    // projections, but partial loading does not wire that up yet (and the 
write side isn't available to build one for
-    // testing). Fail loudly here rather than mis-treating the clustered base 
summary as a plain base table, whose
-    // top-level columns are empty. Both acquire modes route partial-eligible 
segments through this index, so this
-    // guards the full path too. Remove once partial loading supports 
clustered segments.
-    if (baseSchema instanceof ClusteredValueGroupsBaseTableSchema) {
+    this.clusteredBaseSummary = baseSchema instanceof 
ClusteredValueGroupsBaseTableSchema
+                                ? (ClusteredValueGroupsBaseTableSchema) 
baseSchema
+                                : null;
+    if (clusteredBaseSummary != null && 
!clusteredBaseSummary.getSharedColumns().isEmpty()) {
+      // Shared base columns aren't wired into partial loading yet
       throw DruidException.defensive(
-          "Clustered V10 segments are not yet supported for partial loading 
(interval[%s])",
+          "Clustered V10 segments with shared columns%s are not yet supported 
for partial loading (interval[%s])",
+          clusteredBaseSummary.getSharedColumns(),
           metadata.getInterval()
       );
     }
@@ -141,16 +155,24 @@ public class PartialQueryableIndex implements 
QueryableIndex
     this.baseProjectionPrefix = 
Projections.getProjectionSegmentInternalFilePrefix(baseSchema);
     this.dataInterval = Intervals.of(metadata.getInterval());
     this.bitmapFactory = metadata.getBitmapEncoding().getBitmapFactory();
-    this.availableDimensions = new 
ListIndexed<>(baseSchema.getDimensionNames());
 
-    // build column names (dimensions first, then other columns, excluding 
__time)
-    final LinkedHashSet<String> dimsFirst = new 
LinkedHashSet<>(baseSchema.getDimensionNames());
-    for (String columnName : baseSchema.getColumnNames()) {
-      if (!ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
-        dimsFirst.add(columnName);
+    // A clustered base table has no top-level columns (its data lives in 
per-cluster-group bundles), so its available
+    // dimensions / column names are empty; logical columns are resolved via 
getColumnCapabilities + cluster-group
+    // dispatch (matching the eager SimpleQueryableIndex). A regular base 
table lists dimensions first, then the
+    // remaining columns (excluding __time).
+    if (clusteredBaseSummary != null) {
+      this.availableDimensions = new ListIndexed<>(List.of());
+      this.columnNames = List.of();
+    } else {
+      this.availableDimensions = new 
ListIndexed<>(baseSchema.getDimensionNames());
+      final LinkedHashSet<String> dimsFirst = new 
LinkedHashSet<>(baseSchema.getDimensionNames());
+      for (String columnName : baseSchema.getColumnNames()) {
+        if (!ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
+          dimsFirst.add(columnName);
+        }
       }
+      this.columnNames = List.copyOf(dimsFirst);
     }
-    this.columnNames = List.copyOf(dimsFirst);
 
     // build aggregate projection metadata for matching
     final List<AggregateProjectionMetadata> aggProjections = new ArrayList<>();
@@ -193,8 +215,11 @@ public class PartialQueryableIndex implements 
QueryableIndex
     }
 
     // build per-column suppliers for the base table. each supplier is 
memoized and defers both mapFile() and
-    // deserialization until the column is accessed.
-    this.baseColumns = buildProjectionColumnSuppliers(baseProjection, 
Map.of());
+    // deserialization until the column is accessed. A clustered base has no 
top-level columns (its data lives in
+    // per-cluster-group bundles), so its base column map is empty.
+    this.baseColumns = clusteredBaseSummary != null
+                       ? Map.of()
+                       : buildProjectionColumnSuppliers(baseProjection, 
Map.of());
 
     this.dimensionHandlers = Suppliers.memoize(this::initDimensionHandlers);
   }
@@ -281,9 +306,42 @@ public class PartialQueryableIndex implements 
QueryableIndex
   @Override
   public ColumnCapabilities getColumnCapabilities(String column)
   {
+    if (clusteredBaseSummary != null) {
+      return getClusteredColumnCapabilities(column);
+    }
     // look up the column in the base table projection's namespace
     final String smooshName = baseProjectionPrefix + column;
-    final ColumnDescriptor descriptor = 
metadata.getColumnDescriptors().get(smooshName);
+    return 
capabilitiesFromDescriptor(metadata.getColumnDescriptors().get(smooshName));
+  }
+
+  /**
+   * Column capabilities for a clustered base table, answered from metadata 
only (no downloads): clustering columns
+   * resolve from the summary's typed clustering signature; data columns (and 
{@code __time}) resolve from the first
+   * cluster group's {@link ColumnDescriptor} (all groups share the same 
per-group shape). Mirrors the eager
+   * {@link SimpleQueryableIndex} clustered branch, but reads the descriptor 
directly rather than routing through a
+   * group sub-index's {@code getColumnHolder} (which would trigger a 
download).
+   */
+  @Nullable
+  private ColumnCapabilities getClusteredColumnCapabilities(String column)
+  {
+    final ColumnType clusteringType = 
clusteredBaseSummary.getClusteringColumns().getColumnType(column).orElse(null);
+    if (clusteringType != null) {
+      return clusteringType.is(ValueType.STRING)
+             ? 
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+             : 
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(clusteringType);
+    }
+    final List<TableClusterGroupSpec> groups = 
clusteredBaseSummary.getClusterGroups();
+    if (groups.isEmpty()) {
+      return null;
+    }
+    final String smooshName =
+        
Projections.getClusterGroupSegmentInternalFileName(groups.getFirst().getClusteringValueIds(),
 column);
+    return 
capabilitiesFromDescriptor(metadata.getColumnDescriptors().get(smooshName));
+  }
+
+  @Nullable
+  private static ColumnCapabilities capabilitiesFromDescriptor(@Nullable 
ColumnDescriptor descriptor)
+  {
     if (descriptor == null) {
       return null;
     }
@@ -378,6 +436,74 @@ public class PartialQueryableIndex implements 
QueryableIndex
     };
   }
 
+  @Nullable
+  @Override
+  public ClusteredValueGroupsBaseTableSchema getClusteredBaseSummary()
+  {
+    return clusteredBaseSummary;
+  }
+
+  @Override
+  public List<TableClusterGroupSpec> getClusterGroupSchemas()
+  {
+    return clusteredBaseSummary == null ? List.of() : 
clusteredBaseSummary.getClusterGroups();
+  }
+
+  /**
+   * Returns a {@link QueryableIndex} sub-view scoped to a single cluster 
group's column data, The per-group column
+   * suppliers are memoized by group index so a pre-fetch (async path) and the 
later cursor build share the same
+   * already-downloaded columns. Clustering columns are NOT present in the 
returned index; they are injected at the
+   * cursor-factory level via {@code ClusteringColumnSelectorFactory}.
+   */
+  @Override
+  public QueryableIndex getClusterGroupQueryableIndex(TableClusterGroupSpec 
groupSpec)
+  {
+    if (clusteredBaseSummary == null) {
+      throw DruidException.defensive("getClusterGroupQueryableIndex called on 
a non-clustered segment");
+    }
+    final List<TableClusterGroupSpec> groups = 
clusteredBaseSummary.getClusterGroups();
+    final int groupIndex = groups.indexOf(groupSpec);
+    if (groupIndex < 0) {
+      throw DruidException.defensive("Cluster group spec is not part of this 
segment");
+    }
+    final Map<String, Supplier<BaseColumnHolder>> groupColumns = 
clusterGroupColumnsByIndex.computeIfAbsent(
+        groupIndex,
+        i -> buildColumnSuppliers(
+            clusteredBaseSummary.getTimeColumnName(),
+            groupSpec.getNumRows(),
+            clusteredBaseSummary.getGroupColumnNames(),
+            column -> 
Projections.getClusterGroupSegmentInternalFileName(groupSpec.getClusteringValueIds(),
 column),
+            Map.of()
+        )
+    );
+    final Metadata groupMetadata = new Metadata(
+        null,
+        null,
+        null,
+        clusteredBaseSummary.getEffectiveGranularity(),
+        false,
+        clusteredBaseSummary.getGroupOrdering(),
+        null,
+        null
+    );
+    return new SimpleQueryableIndex(
+        dataInterval,
+        new ListIndexed<>(clusteredBaseSummary.getGroupDimensionNames()),
+        bitmapFactory,
+        groupColumns,
+        fileMapper,
+        groupMetadata,
+        null
+    )
+    {
+      @Override
+      public Metadata getMetadata()
+      {
+        return groupMetadata;
+      }
+    };
+  }
+
   @Override
   public void close()
   {
@@ -406,12 +532,34 @@ public class PartialQueryableIndex implements 
QueryableIndex
       Map<String, Supplier<BaseColumnHolder>> parentColumns
   )
   {
-    final String timeColumnName = 
projectionSpec.getSchema().getTimeColumnName();
+    return buildColumnSuppliers(
+        projectionSpec.getSchema().getTimeColumnName(),
+        projectionSpec.getNumRows(),
+        projectionSpec.getSchema().getColumnNames(),
+        column -> 
Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), 
column),
+        parentColumns
+    );
+  }
+
+  /**
+   * Shared builder for lazy per-column suppliers. Each supplier is memoized 
and defers both
+   * {@link SegmentFileMapper#mapFile} and {@link ColumnDescriptor#read} until 
the column is actually accessed, so
+   * queries only trigger downloads for the specific columns they use. {@code 
fileNameFn} maps a logical column name to
+   * its segment-internal (smoosh) file name in the right bundle namespace.
+   */
+  private Map<String, Supplier<BaseColumnHolder>> buildColumnSuppliers(
+      @Nullable String timeColumnName,
+      int numRows,
+      List<String> columnNames,
+      Function<String, String> fileNameFn,
+      Map<String, Supplier<BaseColumnHolder>> parentColumns
+  )
+  {
     final boolean renameTime = 
!ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName);
-    final Map<String, Supplier<BaseColumnHolder>> projectionColumns = new 
LinkedHashMap<>();
+    final Map<String, Supplier<BaseColumnHolder>> columns = new 
LinkedHashMap<>();
 
-    for (String column : projectionSpec.getSchema().getColumnNames()) {
-      final String smooshName = 
Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), 
column);
+    for (String column : columnNames) {
+      final String smooshName = fileNameFn.apply(column);
       final ColumnDescriptor columnDescriptor = 
metadata.getColumnDescriptors().get(smooshName);
       if (columnDescriptor == null) {
         continue;
@@ -430,21 +578,21 @@ public class PartialQueryableIndex implements 
QueryableIndex
         }
       });
 
-      projectionColumns.put(internedColumnName, columnSupplier);
+      columns.put(internedColumnName, columnSupplier);
 
       if (column.equals(timeColumnName) && renameTime) {
-        projectionColumns.put(ColumnHolder.TIME_COLUMN_NAME, 
projectionColumns.get(column));
-        projectionColumns.remove(column);
+        columns.put(ColumnHolder.TIME_COLUMN_NAME, columns.get(column));
+        columns.remove(column);
       }
     }
 
     if (timeColumnName == null) {
-      projectionColumns.put(
+      columns.put(
           ColumnHolder.TIME_COLUMN_NAME,
-          
ConstantTimeColumn.makeConstantTimeSupplier(projectionSpec.getNumRows(), 
dataInterval.getStartMillis())
+          ConstantTimeColumn.makeConstantTimeSupplier(numRows, 
dataInterval.getStartMillis())
       );
     }
 
-    return projectionColumns;
+    return columns;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java
index 31de0f9a3fb..3704c055ff2 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java
@@ -29,8 +29,10 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.ClusterGroupQueryPlan;
 import org.apache.druid.segment.projections.Projections;
 import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.projections.TableClusterGroupSpec;
 import org.apache.druid.segment.vector.VectorCursor;
 import org.apache.druid.utils.CloseableUtils;
 
@@ -42,6 +44,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 /**
  * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
@@ -102,51 +105,102 @@ public class PartialQueryableIndexCursorFactory 
implements CursorFactory
   public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
   {
     final QueryableProjection<QueryableIndex> matched = 
index.getProjection(spec);
+    if (matched == null && index.getClusteredBaseSummary() != null) {
+      return makeClusteredCursorHolderAsync(spec);
+    }
+
+    // Aggregate-projection match, or the plain base table: a single bundle 
(the projection's, or __base) with the
+    // query's required columns.
     final QueryableIndex rowSelector = matched != null ? 
matched.getRowSelector() : index;
-    final Set<String> requiredColumns = requiredColumns(rowSelector, matched, 
spec);
     final String bundleName = matched != null ? matched.getName() : 
Projections.BASE_TABLE_PROJECTION_NAME;
+    final DownloadBundle bundle = new DownloadBundle(bundleName, rowSelector, 
requiredColumns(rowSelector, matched, spec));
+    return buildAsyncCursorHolder(List.of(bundle), () -> 
delegate.makeCursorHolderForProjection(spec, matched));
+  }
 
-    // Mount the cache-layer bundle before submitting downloads. Released on 
the cancel/failure paths (requestRelease)
-    // or handed to the produced cursor holder on success (releaser), exactly 
once, and never while an in-flight column
-    // download is mid-mapFile(). See BundleHoldRelease.
-    final BundleHoldRelease holdRelease = new 
BundleHoldRelease(bundleAcquirer.acquire(bundleName));
+  /**
+   * Async cursor holder for a clustered base table. Cluster-group resolution 
is metadata-only
+   * ({@link Projections#planClusterGroupQuery}): only the groups whose 
clustering tuples survive the query's filters
+   * are downloaded, one bundle (and its required columns) per surviving 
group. Once every group is resident, the
+   * holder is built via the delegate's clustered path.
+   */
+  private AsyncCursorHolder makeClusteredCursorHolderAsync(CursorBuildSpec 
spec)
+  {
+    final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
+        new ArrayList<>(index.getClusterGroupSchemas()),
+        spec
+    );
+    final List<TableClusterGroupSpec> surviving = plan.survivingGroups();
+    if (surviving.isEmpty()) {
+      // Filter rules out every group: no bundle to acquire, nothing to 
download.
+      return AsyncCursorHolder.completed(EmptyCursorHolder.INSTANCE);
+    }
 
+    final List<DownloadBundle> bundles = new ArrayList<>(surviving.size());
+    for (TableClusterGroupSpec group : surviving) {
+      final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(group);
+      final CursorBuildSpec groupSpec = plan.rebuildCursorBuildSpec(spec, 
group);
+      bundles.add(
+          new DownloadBundle(
+              
Projections.getClusterGroupBundleName(group.getClusteringValueIds()),
+              groupIndex,
+              requiredColumns(groupIndex, null, groupSpec)
+          )
+      );
+    }
+    // Reuse the plan we already computed: delegate.makeClusteredCursorHolder 
skips a second planClusterGroupQuery and
+    // builds over the now-resident surviving groups (their columns memoized 
via getClusterGroupQueryableIndex).
+    return buildAsyncCursorHolder(bundles, () -> 
delegate.makeClusteredCursorHolder(spec, plan));
+  }
+
+  /**
+   * Generalized async-holder build over one or more bundles. Each {@link 
DownloadBundle} mounts its cache-layer bundle
+   * (its own {@link BundleHoldRelease}, since eviction is per-bundle) and 
submits a download per required column; the
+   * holder becomes ready once every column of every bundle is downloaded, at 
which point {@code innerBuilder} builds the
+   * (now fully-resident) cursor holder and the produced holder takes 
ownership of {@code inner} plus every bundle hold.
+   */
+  private AsyncCursorHolder buildAsyncCursorHolder(List<DownloadBundle> 
bundles, Supplier<CursorHolder> innerBuilder)
+  {
+    final List<BundleHoldRelease> holdReleases = new 
ArrayList<>(bundles.size());
     try {
-      // submit one materialization task per column so a multi-threaded 
download executor can fan them out
-      final List<AsyncResource<String>> columnDownloads = new 
ArrayList<>(requiredColumns.size());
-      for (String column : requiredColumns) {
-        columnDownloads.add(submitColumnDownload(rowSelector, column, 
holdRelease));
+      final List<AsyncResource<String>> columnDownloads = new ArrayList<>();
+      for (DownloadBundle bundle : bundles) {
+        final BundleHoldRelease holdRelease = new 
BundleHoldRelease(bundleAcquirer.acquire(bundle.bundleName()));
+        holdReleases.add(holdRelease);
+        // submit one materialization task per column so a multi-threaded 
download executor can fan them out
+        for (String column : bundle.requiredColumns()) {
+          columnDownloads.add(submitColumnDownload(bundle.rowSelector(), 
column, holdRelease));
+        }
       }
       final AsyncResource<List<String>> downloaded = 
AsyncResources.collect(columnDownloads);
 
       // Canceler runs if the awaiter closes this holder before it's ready 
(e.g. query cancel/timeout). Close the
       // collected resource to cancel every column download that hasn't begun 
its deep-storage read yet (queued tasks
       // are skipped; tasks parked on the download executor's permit are 
interrupted out of the interruptible wait
-      // before doing any I/O), then request the hold release through the 
handshake. Once the holder is produced and
-      // handed to set(), ownership transfers to the awaiter, which drains it 
via close() (cancel) or release()
-      // (success); the once-guard keeps the hold release safe across all of 
these paths.
+      // before doing any I/O), then request the hold release on every bundle 
through the handshake. Once the holder is
+      // produced and handed to set(), ownership transfers to the awaiter, 
which drains it via close() (cancel) or
+      // release() (success); each bundle's once-guard keeps its hold release 
safe across all of these paths.
       final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(() -> {
         CloseableUtils.closeAndSuppressExceptions(downloaded, ignored -> {});
-        holdRelease.requestRelease();
+        holdReleases.forEach(BundleHoldRelease::requestRelease);
       });
       downloaded.addReadyCallback(() -> {
         final CursorHolder holder;
         try {
           downloaded.get(); // surfaces any column-download failure (or 
cancellation) as the cause
-          final CursorHolder inner = 
delegate.makeCursorHolderForProjection(spec, matched);
-          // The produced holder takes ownership of both inner and the bundle 
hold; from here, closing it releases both
-          holder = wrapWithBundleRelease(inner, holdRelease.releaser());
+          final CursorHolder inner = innerBuilder.get();
+          // The produced holder takes ownership of inner and every bundle 
hold; closing it releases all of them.
+          holder = wrapWithBundleRelease(inner, releasers(holdReleases));
         }
         catch (Throwable t) {
           // A column download failed or was canceled, this branch can fire 
while a sibling download is still
-          // mid-mapFile(), so the hold release must go through the handshake 
rather than dropping it here directly.
-          holdRelease.requestRelease();
+          // mid-mapFile(), so each hold release must go through the handshake 
rather than being dropped here directly.
+          holdReleases.forEach(BundleHoldRelease::requestRelease);
           asyncHolder.setException(t);
           return;
         }
         if (!asyncHolder.set(holder)) {
           // wrapper was closed (awaiter canceled) while we were producing the 
holder; close it ourselves so the
-          // holder, its inner, and the bundle hold don't leak.
+          // holder, its inner, and the bundle holds don't leak.
           holder.close();
         }
       });
@@ -154,9 +208,9 @@ public class PartialQueryableIndexCursorFactory implements 
CursorFactory
     }
     catch (Throwable t) {
       // Failure while submitting downloads / wiring up the holder (e.g. 
submitDownload shutdown rejection). A column
-      // download submitted before the failure may already be in flight, so 
release the bundle hold through the
+      // download submitted before the failure may already be in flight, so 
release every bundle hold through the
       // handshake (requestRelease defers to the last in-flight body) rather 
than dropping it directly mid-mapFile().
-      throw CloseableUtils.closeAndWrapInCatch(t, holdRelease::requestRelease);
+      throw CloseableUtils.closeAndWrapInCatch(t, () -> 
holdReleases.forEach(BundleHoldRelease::requestRelease));
     }
   }
 
@@ -285,6 +339,25 @@ public class PartialQueryableIndexCursorFactory implements 
CursorFactory
     };
   }
 
+  /**
+   * Compose every bundle's success-path {@link BundleHoldRelease#releaser} 
into one {@link Closeable} that the produced
+   * cursor holder owns: closing the holder releases all the bundle holds 
together.
+   */
+  private static Closeable releasers(List<BundleHoldRelease> holdReleases)
+  {
+    final Closer closer = Closer.create();
+    holdReleases.forEach(holdRelease -> 
closer.register(holdRelease.releaser()));
+    return closer;
+  }
+
+  /**
+   * One bundle's worth of async download work: the cache-layer {@code 
bundleName} to mount, the {@link QueryableIndex}
+   * row selector whose {@code getColumnHolder} triggers the per-column 
downloads, and the columns to pre-fetch.
+   */
+  private record DownloadBundle(String bundleName, QueryableIndex rowSelector, 
Set<String> requiredColumns)
+  {
+  }
+
   /**
    * Owns a bundle hold's release for one {@link #makeCursorHolderAsync} 
build, guarding the file mapper's
    * evict-vs-mapFile contract: releasing the hold makes the bundle entry 
eligible for eviction, which unmaps the
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
index 1537454e9fc..4acd2d4e28a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
@@ -163,11 +163,19 @@ public class QueryableIndexCursorFactory implements 
CursorFactory
 
   private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec)
   {
-    final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
-        new ArrayList<>(index.getClusterGroupSchemas()),
-        spec
+    return makeClusteredCursorHolder(
+        spec,
+        Projections.planClusterGroupQuery(new 
ArrayList<>(index.getClusterGroupSchemas()), spec)
     );
+  }
 
+  /**
+   * Build a clustered-base-table cursor holder from an already-computed 
{@link ClusterGroupQueryPlan}. Exposed so the
+   * partial (on-demand) cursor factory can plan the cluster groups once — to 
decide which group bundles to download —
+   * and reuse the same plan to build the holder, rather than re-running 
{@link Projections#planClusterGroupQuery}.
+   */
+  public CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, 
ClusterGroupQueryPlan plan)
+  {
     if (plan.survivingGroups().isEmpty()) {
       return EmptyCursorHolder.INSTANCE;
     }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index 8f03f7d0685..40980d000bb 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -568,6 +568,16 @@ public class Projections
   }
 
   public static String getClusterGroupSegmentInternalFilePrefix(List<Integer> 
clusteringValueIds)
+  {
+    return getClusterGroupBundleName(clusteringValueIds) + '/';
+  }
+
+  /**
+   * Bundle name for a cluster group's containers in the V10 file, matching 
the tag {@code IndexMergerV10} applies at
+   * write time: {@code __base$<id0>_<id1>...<idK>}. This is the cluster 
group's canonical identity; the per-group file
+   * prefix ({@link #getClusterGroupSegmentInternalFilePrefix}) is just this 
name plus a trailing {@code '/'} separator.
+   */
+  public static String getClusterGroupBundleName(List<Integer> 
clusteringValueIds)
   {
     if (clusteringValueIds == null || clusteringValueIds.isEmpty()) {
       throw DruidException.defensive("clusteringValueIds must not be null or 
empty");
@@ -579,7 +589,6 @@ public class Projections
       }
       sb.append(clusteringValueIds.get(i));
     }
-    sb.append('/');
     return sb.toString();
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredProjectionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredProjectionTest.java
new file mode 100644
index 00000000000..ea30fd4e6b0
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredProjectionTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.projections.QueryableProjection;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Partial (on-demand) download coverage for a segment that is <em>both</em> 
clustered <em>and</em> carries an aggregate
+ * projection. The partial cursor factory matches the aggregate projection 
first and, if nothing matches, dispatches to
+ * the clustered groups, so this verifies both paths download only their own 
bundle(s): a projection-matching query
+ * fetches the {@code proj} bundle and none of the {@code __base$<ids>} group 
bundles, while a non-matching query
+ * fetches only the surviving group bundle and not {@code proj}. With no 
shared columns there is no {@code __base}
+ * bundle, so the projection is self-contained.
+ */
+class PartialQueryableIndexCursorFactoryClusteredProjectionTest extends 
PartialQueryableIndexCursorFactoryTestBase
+{
+  private static final long T0 = DateTimes.of("2025-01-01").getMillis();
+  private static final String PROJECTION_BUNDLE = "proj";
+  // tenants sort to dictionary ids acme=0, globex=1 → group bundles __base$0 
(2 rows) and __base$1 (1 row)
+  private static final String ACME_BUNDLE = "__base$0";
+  private static final String GLOBEX_BUNDLE = "__base$1";
+
+  @TempDir
+  static File sharedTempDir;
+
+  private static File segmentDir;
+
+  @BeforeAll
+  static void buildSegment()
+  {
+    final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+        ClusteredValueGroupsBaseTableProjectionSpec.builder()
+            .columns(
+                new StringDimensionSchema("tenant"),
+                new StringDimensionSchema("region"),
+                new LongDimensionSchema("x"),
+                new LongDimensionSchema("__time")
+            )
+            .clusteringColumns("tenant")
+            .build();
+    final AggregateProjectionSpec projectionSpec =
+        AggregateProjectionSpec.builder(PROJECTION_BUNDLE)
+                               .groupingColumns(new 
StringDimensionSchema("tenant"))
+                               .aggregators(
+                                   new CountAggregatorFactory("cnt"),
+                                   new LongSumAggregatorFactory("sum_x", "x")
+                               )
+                               .build();
+    final IncrementalIndexSchema schema =
+        IncrementalIndexSchema.builder()
+                              .withMinTimestamp(T0)
+                              .withTimestampSpec(new TimestampSpec("ts", 
"millis", null))
+                              .withQueryGranularity(Granularities.NONE)
+                              
.withDimensionsSpec(clusterSpec.getDimensionsSpec())
+                              .withRollup(false)
+                              .withClusterSpec(clusterSpec)
+                              .withProjections(List.of(projectionSpec))
+                              .build();
+    final File tmpDir = new File(sharedTempDir, "build_" + 
ThreadLocalRandom.current().nextInt());
+    segmentDir = IndexBuilder.create()
+                             .useV10()
+                             .tmpDir(tmpDir)
+                             
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                             .schema(schema)
+                             
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+                             .rows(List.of(
+                                 row(T0 + 2, "globex", "eu-west-1", 5),
+                                 row(T0, "acme", "us-east-1", 10),
+                                 row(T0 + 1, "acme", "us-west-2", 20)
+                             ))
+                             .buildMMappedIndexFile();
+  }
+
+  @Test
+  void testProjectionMatchDownloadsOnlyProjectionBundle() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "combo_projection")) {
+      final PartialQueryableIndexCursorFactory factory = 
factory(opened.index());
+      // group-by tenant + sum(x): matches the aggregate projection, so the 
clustered path is never consulted.
+      final CursorBuildSpec aggSpec = CursorBuildSpec.builder()
+                                                     
.setGroupingColumns(List.of("tenant"))
+                                                     
.setAggregators(List.of(new LongSumAggregatorFactory("sum_x", "x")))
+                                                     
.setPhysicalColumns(Set.of("tenant", "x"))
+                                                     .build();
+
+      // sanity: the index is both clustered and projection-bearing, and this 
spec resolves to the projection
+      Assertions.assertNotNull(opened.index().getClusteredBaseSummary(), 
"segment must be clustered");
+      final QueryableProjection<QueryableIndex> matched = 
opened.index().getProjection(aggSpec);
+      Assertions.assertNotNull(matched, "spec must match the aggregate 
projection");
+      Assertions.assertEquals(PROJECTION_BUNDLE, matched.getName());
+
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(aggSpec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertNotNull(holder.asCursor(), "projection-matched cursor 
must build");
+
+        final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+        // The projection bundle's columns were materialized.
+        Assertions.assertTrue(
+            downloaded.stream().anyMatch(f -> f.startsWith(PROJECTION_BUNDLE + 
"/")),
+            "projection bundle must be downloaded; got: " + downloaded
+        );
+        // None of the clustered group bundles were touched: the projection 
match short-circuits clustered dispatch.
+        Assertions.assertTrue(
+            downloaded.stream().noneMatch(f -> f.startsWith(ACME_BUNDLE + "/") 
|| f.startsWith(GLOBEX_BUNDLE + "/")),
+            "no cluster group bundle should be downloaded for a 
projection-matched query; got: " + downloaded
+        );
+      }
+    }
+  }
+
+  @Test
+  void testNonMatchingQueryDispatchesToClusteredGroups() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "combo_clustered")) {
+      final PartialQueryableIndexCursorFactory factory = 
factory(opened.index());
+      // a filter on the clustering column with no grouping/aggregators does 
not match the aggregate projection, so the
+      // clustered path runs and prunes to the surviving group only.
+      final CursorBuildSpec spec = CursorBuildSpec.builder()
+                                                  .setFilter(new 
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+                                                  .build();
+      Assertions.assertNull(opened.index().getProjection(spec), "filter-only 
spec must not match the projection");
+
+      try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertEquals(List.of("us-east-1", "us-west-2"), 
scanRegion(holder));
+
+        final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+        // Only the surviving (acme) group bundle materialized.
+        Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"), 
"got: " + downloaded);
+        Assertions.assertTrue(
+            downloaded.stream().noneMatch(f -> f.startsWith(GLOBEX_BUNDLE + 
"/")),
+            "pruned globex group must not download; got: " + downloaded
+        );
+        // The projection bundle is irrelevant to this query and must not be 
downloaded.
+        Assertions.assertTrue(
+            downloaded.stream().noneMatch(f -> f.startsWith(PROJECTION_BUNDLE 
+ "/")),
+            "projection bundle must not download for a clustered-dispatch 
query; got: " + downloaded
+        );
+      }
+    }
+  }
+
+  private PartialQueryableIndexCursorFactory factory(PartialQueryableIndex 
index)
+  {
+    return new PartialQueryableIndexCursorFactory(
+        index,
+        QueryableIndexTimeBoundaryInspector.create(index),
+        noOpAcquirer(directExec())
+    );
+  }
+
+  private static List<String> scanRegion(CursorHolder holder)
+  {
+    final Cursor cursor = holder.asCursor();
+    final List<String> out = new ArrayList<>();
+    if (cursor == null) {
+      return out;
+    }
+    final DimensionSelector regionSel =
+        
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("region"));
+    while (!cursor.isDone()) {
+      out.add(regionSel.getRow().size() == 0 ? null : 
regionSel.lookupName(regionSel.getRow().get(0)));
+      cursor.advance();
+    }
+    return out;
+  }
+
+  private static InputRow row(long ts, String tenant, String region, long x)
+  {
+    final Map<String, Object> event = new HashMap<>();
+    event.put("ts", ts);
+    event.put("tenant", tenant);
+    event.put("region", region);
+    event.put("x", x);
+    return new MapBasedInputRow(ts, List.of("tenant", "region", "x"), event);
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredTest.java
 
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredTest.java
new file mode 100644
index 00000000000..f0c2693bb9b
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Partial (on-demand) download coverage for a clustered base-table V10 
segment. Cluster groups are resolved from
+ * metadata only ({@code planClusterGroupQuery}); the async cursor factory 
downloads one bundle ({@code __base$<ids>})
+ * per surviving group.
+ */
+class PartialQueryableIndexCursorFactoryClusteredTest extends 
PartialQueryableIndexCursorFactoryTestBase
+{
+  private static final long T0 = DateTimes.of("2025-01-01").getMillis();
+
+  // tenants sort to dictionary ids acme=0, globex=1 → bundles __base$0 (2 
rows) and __base$1 (1 row)
+  private static final String ACME_BUNDLE = "__base$0";
+  private static final String GLOBEX_BUNDLE = "__base$1";
+
+  @TempDir
+  static File sharedTempDir;
+
+  private static File segmentDir;
+
+  @BeforeAll
+  static void buildSegment()
+  {
+    final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+        ClusteredValueGroupsBaseTableProjectionSpec.builder()
+            .columns(
+                new StringDimensionSchema("tenant"),
+                new StringDimensionSchema("region"),
+                new LongDimensionSchema("__time")
+            )
+            .clusteringColumns("tenant")
+            .build();
+    final IncrementalIndexSchema schema =
+        IncrementalIndexSchema.builder()
+                              .withMinTimestamp(T0)
+                              .withTimestampSpec(new TimestampSpec("ts", 
"millis", null))
+                              .withQueryGranularity(Granularities.NONE)
+                              
.withDimensionsSpec(clusterSpec.getDimensionsSpec())
+                              .withRollup(false)
+                              .withClusterSpec(clusterSpec)
+                              .build();
+    final File tmpDir = new File(sharedTempDir, "build_" + 
ThreadLocalRandom.current().nextInt());
+    segmentDir = IndexBuilder.create()
+                             .useV10()
+                             .tmpDir(tmpDir)
+                             
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                             .schema(schema)
+                             
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+                             // ingest out of clustering order; the writer 
sorts groups by clustering value
+                             .rows(List.of(
+                                 row(T0 + 2, "globex", "eu-west-1"),
+                                 row(T0, "acme", "us-east-1"),
+                                 row(T0 + 1, "acme", "us-west-2")
+                             ))
+                             .buildMMappedIndexFile();
+  }
+
+  @Test
+  void testFilterOnClusteringColumnDownloadsOnlySurvivingGroup() throws 
IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "filter_acme")) {
+      final PartialQueryableIndexCursorFactory factory = 
factory(opened.index());
+      final CursorBuildSpec spec = CursorBuildSpec.builder()
+                                                  .setFilter(new 
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+                                                  .build();
+
+      try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+           CursorHolder holder = asyncHolder.release()) {
+        // Only the acme group survives the clustering-column filter, so only 
its rows are returned.
+        Assertions.assertEquals(
+            List.of(List.of("acme", "us-east-1"), List.of("acme", 
"us-west-2")),
+            scanTenantRegion(holder)
+        );
+
+        final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+        // The acme group's columns were materialized.
+        Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"), 
"got: " + downloaded);
+        Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/__time"), 
"got: " + downloaded);
+        // The globex group bundle was pruned by metadata-only resolution and 
never touched.
+        Assertions.assertTrue(
+            downloaded.stream().noneMatch(f -> f.startsWith(GLOBEX_BUNDLE + 
"/")),
+            "globex group must not be downloaded; got: " + downloaded
+        );
+      }
+    }
+  }
+
+  @Test
+  void testFilterMatchingNoGroupDownloadsNothing() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "filter_none")) {
+      final PartialQueryableIndexCursorFactory factory = 
factory(opened.index());
+      final CursorBuildSpec spec = CursorBuildSpec.builder()
+                                                  .setFilter(new 
EqualityFilter("tenant", ColumnType.STRING, "nobody", null))
+                                                  .build();
+
+      try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+           CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertTrue(scanTenantRegion(holder).isEmpty(), "no group 
should survive");
+        Assertions.assertTrue(
+            opened.mapper().getDownloadedFiles().isEmpty(),
+            "no group bundle should be downloaded when nothing survives; got: 
" + opened.mapper().getDownloadedFiles()
+        );
+      }
+    }
+  }
+
+  @Test
+  void testFullScanConcatenatesAllGroups() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "full_scan")) {
+      final PartialQueryableIndexCursorFactory factory = 
factory(opened.index());
+
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+           CursorHolder holder = asyncHolder.release()) {
+        // Groups are concatenated in clustering (dictionary id) order: acme 
then globex, clustering constant injected.
+        Assertions.assertEquals(
+            List.of(
+                List.of("acme", "us-east-1"),
+                List.of("acme", "us-west-2"),
+                List.of("globex", "eu-west-1")
+            ),
+            scanTenantRegion(holder)
+        );
+
+        final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+        Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"), 
"got: " + downloaded);
+        Assertions.assertTrue(downloaded.contains(GLOBEX_BUNDLE + "/region"), 
"got: " + downloaded);
+      }
+    }
+  }
+
+  @Test
+  void testSyncMakeCursorHolderAfterFullDownload() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "sync_full")) {
+      final PartialSegmentFileMapperV10 mapper = opened.mapper();
+      final PartialQueryableIndexCursorFactory factory = 
factory(opened.index());
+
+      // Sync path refuses until everything is resident.
+      Assertions.assertThrows(
+          DruidException.class,
+          () -> factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)
+      );
+
+      // Eager-acquire equivalent: materialize every internal file, then the 
sync clustered path works via the delegate.
+      
mapper.ensureFilesAvailable(mapper.getSegmentFileMetadata().getFiles().keySet());
+      Assertions.assertTrue(mapper.isFullyDownloaded());
+      try (CursorHolder holder = 
factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+        Assertions.assertEquals(
+            List.of(
+                List.of("acme", "us-east-1"),
+                List.of("acme", "us-west-2"),
+                List.of("globex", "eu-west-1")
+            ),
+            scanTenantRegion(holder)
+        );
+      }
+    }
+  }
+
+  @Test
+  void testAsyncMultiGroupDefersDownloadUntilExecutorRuns() throws Exception
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    final CountDownLatch gate = new CountDownLatch(1);
+    final ExecutorService rawExec = 
Execs.singleThreaded("partial-clustered-defer-%d");
+    final ListeningExecutorService gatedExec = 
MoreExecutors.listeningDecorator(rawExec);
+    try (IndexAndMapper opened = openIndex(rangeReader, "async_defer")) {
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          opened.index(),
+          QueryableIndexTimeBoundaryInspector.create(opened.index()),
+          noOpAcquirer(gatedExec)
+      );
+      rangeReader.resetCount();
+
+      // Block the download executor so no group download can start yet.
+      @SuppressWarnings("unused")
+      ListenableFuture<?> unused = gatedExec.submit(() -> {
+        try {
+          gate.await();
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      });
+
+      // FULL_SCAN survives both groups → both bundles must download before 
the holder is ready.
+      final AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+      Assertions.assertFalse(asyncHolder.isReady(), "must wait while the 
download executor is blocked");
+      Assertions.assertEquals(0, rangeReader.getReadCount(), "no group 
download should have started yet");
+
+      final CountDownLatch ready = new CountDownLatch(1);
+      asyncHolder.addReadyCallback(ready::countDown);
+      gate.countDown();
+      Assertions.assertTrue(ready.await(15, TimeUnit.SECONDS), "ready callback 
must fire once downloads run");
+      Assertions.assertTrue(asyncHolder.isReady());
+
+      try (CursorHolder holder = asyncHolder.release()) {
+        Assertions.assertEquals(
+            List.of(
+                List.of("acme", "us-east-1"),
+                List.of("acme", "us-west-2"),
+                List.of("globex", "eu-west-1")
+            ),
+            scanTenantRegion(holder)
+        );
+      }
+    }
+    finally {
+      gatedExec.shutdownNow();
+      rawExec.shutdownNow();
+    }
+  }
+
+  @Test
+  void testMultiGroupSuccessCloseReleasesEveryBundleHold() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    final CountingBundleAcquirer acquirer = new 
CountingBundleAcquirer(directExec());
+    try (IndexAndMapper opened = openIndex(rangeReader, "release_success")) {
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          opened.index(),
+          QueryableIndexTimeBoundaryInspector.create(opened.index()),
+          acquirer
+      );
+
+      // FULL_SCAN survives both groups → both bundles acquired during the 
build; with the direct executor the
+      // downloads run inline so the holder is immediately ready.
+      try (AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN)) {
+        Assertions.assertTrue(asyncHolder.isReady());
+        Assertions.assertEquals(1, acquirer.acquiredCount(ACME_BUNDLE), "acme 
group bundle acquired");
+        Assertions.assertEquals(1, acquirer.acquiredCount(GLOBEX_BUNDLE), 
"globex group bundle acquired");
+
+        final CursorHolder holder = asyncHolder.release();
+        // The produced holder owns every group's hold; nothing is released 
until it closes.
+        Assertions.assertEquals(0, acquirer.releasedCount(ACME_BUNDLE));
+        Assertions.assertEquals(0, acquirer.releasedCount(GLOBEX_BUNDLE));
+
+        holder.close();
+        // Closing the holder fans out the release across all surviving 
groups' bundle holds.
+        Assertions.assertEquals(1, acquirer.releasedCount(ACME_BUNDLE), "acme 
hold released on holder close");
+        Assertions.assertEquals(1, acquirer.releasedCount(GLOBEX_BUNDLE), 
"globex hold released on holder close");
+      }
+    }
+  }
+
+  @Test
+  void testMultiGroupCancelBeforeReadyReleasesEveryBundleHold() throws 
Exception
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    final CountDownLatch gate = new CountDownLatch(1);
+    final ExecutorService rawExec = 
Execs.singleThreaded("partial-clustered-cancel-%d");
+    final ListeningExecutorService gatedExec = 
MoreExecutors.listeningDecorator(rawExec);
+    final CountingBundleAcquirer acquirer = new 
CountingBundleAcquirer(gatedExec);
+    try (IndexAndMapper opened = openIndex(rangeReader, "release_cancel")) {
+      final PartialQueryableIndexCursorFactory factory = new 
PartialQueryableIndexCursorFactory(
+          opened.index(),
+          QueryableIndexTimeBoundaryInspector.create(opened.index()),
+          acquirer
+      );
+
+      // Block the download executor so the per-column download bodies stay 
queued and the holder never becomes ready.
+      @SuppressWarnings("unused")
+      ListenableFuture<?> unused = gatedExec.submit(() -> {
+        try {
+          gate.await();
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      });
+
+      final AsyncCursorHolder asyncHolder = 
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+      // Both surviving groups' bundles are acquired synchronously during the 
build, before any download runs.
+      Assertions.assertEquals(1, acquirer.acquiredCount(ACME_BUNDLE), "acme 
group bundle acquired");
+      Assertions.assertEquals(1, acquirer.acquiredCount(GLOBEX_BUNDLE), 
"globex group bundle acquired");
+      Assertions.assertFalse(asyncHolder.isReady(), "must wait while the 
download executor is blocked");
+      Assertions.assertEquals(0, acquirer.releasedCount(ACME_BUNDLE));
+      Assertions.assertEquals(0, acquirer.releasedCount(GLOBEX_BUNDLE));
+
+      // Cancel before the holder is ready (query cancel/timeout). No download 
body has run, so the per-bundle
+      // handshake releases each hold immediately, and the canceler must fan 
the release out across every group.
+      asyncHolder.close();
+      Assertions.assertEquals(1, acquirer.releasedCount(ACME_BUNDLE), "acme 
hold released on cancel");
+      Assertions.assertEquals(1, acquirer.releasedCount(GLOBEX_BUNDLE), 
"globex hold released on cancel");
+    }
+    finally {
+      gate.countDown();
+      gatedExec.shutdownNow();
+      rawExec.shutdownNow();
+    }
+  }
+
+  @Test
+  void testClusteredColumnCapabilitiesAreMetadataOnly() throws IOException
+  {
+    final CountingRangeReader rangeReader = new 
CountingRangeReader(segmentDir);
+    try (IndexAndMapper opened = openIndex(rangeReader, "capabilities")) {
+      final PartialQueryableIndex index = opened.index();
+      // clustering column: type comes from the clustered summary's typed 
signature
+      Assertions.assertEquals(ValueType.STRING, 
index.getColumnCapabilities("tenant").getType());
+      // per-group data columns: type comes from the first group's 
ColumnDescriptor, never a group sub-index download
+      Assertions.assertEquals(ValueType.STRING, 
index.getColumnCapabilities("region").getType());
+      Assertions.assertEquals(ValueType.LONG, 
index.getColumnCapabilities("__time").getType());
+      // an unknown column resolves to null without touching the file mapper
+      Assertions.assertNull(index.getColumnCapabilities("nonexistent"));
+
+      Assertions.assertTrue(
+          opened.mapper().getDownloadedFiles().isEmpty(),
+          "column capabilities must be answered from metadata with no 
download; got: "
+          + opened.mapper().getDownloadedFiles()
+      );
+    }
+  }
+
+  private PartialQueryableIndexCursorFactory factory(PartialQueryableIndex 
index)
+  {
+    return new PartialQueryableIndexCursorFactory(
+        index,
+        QueryableIndexTimeBoundaryInspector.create(index),
+        noOpAcquirer(directExec())
+    );
+  }
+
+  /**
+   * Walk (tenant, region) pairs out of a clustered cursor holder. {@code 
tenant} is the clustering column, injected as
+   * a constant by the cluster-group cursor path; {@code region} is the 
per-group physical column.
+   */
+  private static List<List<String>> scanTenantRegion(CursorHolder holder)
+  {
+    final Cursor cursor = holder.asCursor();
+    final List<List<String>> out = new ArrayList<>();
+    if (cursor == null) {
+      return out;
+    }
+    final DimensionSelector tenantSel =
+        
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("tenant"));
+    final DimensionSelector regionSel =
+        
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("region"));
+    while (!cursor.isDone()) {
+      final String tenant = tenantSel.getRow().size() == 0 ? null : 
tenantSel.lookupName(tenantSel.getRow().get(0));
+      final String region = regionSel.getRow().size() == 0 ? null : 
regionSel.lookupName(regionSel.getRow().get(0));
+      out.add(Arrays.asList(tenant, region));
+      cursor.advance();
+    }
+    return out;
+  }
+
+  private static InputRow row(long ts, String tenant, String region)
+  {
+    final Map<String, Object> event = new HashMap<>();
+    event.put("ts", ts);
+    event.put("tenant", tenant);
+    event.put("region", region);
+    return new MapBasedInputRow(ts, List.of("tenant", "region"), event);
+  }
+
+  /**
+   * A {@link PartialBundleAcquirer} that counts how many times each bundle is 
acquired and released, so a test can
+   * assert the cursor factory holds (and ultimately releases) one hold per 
surviving cluster group. Downloads are
+   * submitted to the supplied executor (direct for the success path, gated 
for the cancel path).
+   */
+  private static final class CountingBundleAcquirer implements 
PartialBundleAcquirer
+  {
+    private final ListeningExecutorService downloadExec;
+    private final Map<String, Integer> acquired = new ConcurrentHashMap<>();
+    private final Map<String, Integer> released = new ConcurrentHashMap<>();
+
+    private CountingBundleAcquirer(ListeningExecutorService downloadExec)
+    {
+      this.downloadExec = downloadExec;
+    }
+
+    @Override
+    public Closeable acquire(String bundleName)
+    {
+      acquired.merge(bundleName, 1, Integer::sum);
+      return () -> released.merge(bundleName, 1, Integer::sum);
+    }
+
+    @Override
+    public <T> AsyncResource<T> submitDownload(Callable<T> task)
+    {
+      return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task));
+    }
+
+    private int acquiredCount(String bundleName)
+    {
+      return acquired.getOrDefault(bundleName, 0);
+    }
+
+    private int releasedCount(String bundleName)
+    {
+      return released.getOrDefault(bundleName, 0);
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java
index 74c1e4d4aa1..1e2ef260f3f 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java
@@ -42,7 +42,6 @@ import org.apache.druid.query.Order;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.filter.EqualityFilter;
-import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
@@ -52,7 +51,6 @@ import 
org.apache.druid.segment.file.PartialSegmentFileMapperV10;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.projections.Projections;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.DateTime;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -78,9 +76,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
-class PartialQueryableIndexCursorFactoryTest extends 
InitializedNullHandlingTest
+class PartialQueryableIndexCursorFactoryTest extends 
PartialQueryableIndexCursorFactoryTestBase
 {
-  private static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT;
   private static final DateTime TIME = DateTimes.of("2025-01-01");
   private static final String PROJECTION_NAME = "dim1_metric1_sum";
 
@@ -112,9 +109,6 @@ class PartialQueryableIndexCursorFactoryTest extends 
InitializedNullHandlingTest
   private static File segmentDir;
   private static ListeningExecutorService realExec;
 
-  @TempDir
-  File perTestTempDir;
-
   @BeforeAll
   static void buildSegment()
   {
@@ -704,54 +698,4 @@ class PartialQueryableIndexCursorFactoryTest extends 
InitializedNullHandlingTest
                        .rows(ROWS)
                        .buildMMappedIndexFile();
   }
-
-  private IndexAndMapper openIndex(CountingRangeReader rangeReader, String 
cacheName) throws IOException
-  {
-    final File cacheDir = new File(perTestTempDir, cacheName);
-    FileUtils.mkdirp(cacheDir);
-    final PartialSegmentFileMapperV10 mapper = 
PartialSegmentFileMapperV10.create(
-        rangeReader,
-        TestHelper.makeJsonMapper(),
-        cacheDir,
-        IndexIO.V10_FILE_NAME,
-        Collections.emptyList()
-    );
-    return new IndexAndMapper(
-        new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, 
COLUMN_CONFIG),
-        mapper
-    );
-  }
-
-  private static ListeningExecutorService directExec()
-  {
-    return 
MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
-  }
-
-  private static PartialBundleAcquirer noOpAcquirer(ListeningExecutorService 
downloadExec)
-  {
-    return new PartialBundleAcquirer()
-    {
-      @Override
-      public Closeable acquire(String bundleName)
-      {
-        return () -> {};
-      }
-
-      @Override
-      public <T> AsyncResource<T> submitDownload(Callable<T> task)
-      {
-        return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task));
-      }
-    };
-  }
-
-  private record IndexAndMapper(PartialQueryableIndex index, 
PartialSegmentFileMapperV10 mapper)
-      implements AutoCloseable
-  {
-    @Override
-    public void close()
-    {
-      mapper.close();
-    }
-  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTestBase.java
 
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTestBase.java
new file mode 100644
index 00000000000..0b8ccf3b11a
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTestBase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+/**
+ * Shared harness for {@link PartialQueryableIndexCursorFactory} tests: 
building the segment is left to each subclass
+ * (a plain aggregate-projection segment vs. a clustered base table), but 
opening a partial index over it from a
+ * {@link CountingRangeReader} and wiring the on-demand download executor is 
identical and lives here.
+ */
+abstract class PartialQueryableIndexCursorFactoryTestBase extends 
InitializedNullHandlingTest
+{
+  protected static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT;
+
+  @TempDir
+  protected File perTestTempDir;
+
+  /**
+   * Mount a fresh partial index over the (already built) segment via a {@link 
PartialSegmentFileMapperV10} backed by
+   * {@code rangeReader}, into a per-test cache subdirectory named {@code 
cacheName}. Only the V10 header is read up
+   * front; no internal column files are downloaded until a cursor requires 
them.
+   */
+  protected IndexAndMapper openIndex(CountingRangeReader rangeReader, String 
cacheName) throws IOException
+  {
+    final File cacheDir = new File(perTestTempDir, cacheName);
+    FileUtils.mkdirp(cacheDir);
+    final PartialSegmentFileMapperV10 mapper = 
PartialSegmentFileMapperV10.create(
+        rangeReader,
+        TestHelper.makeJsonMapper(),
+        cacheDir,
+        IndexIO.V10_FILE_NAME,
+        Collections.emptyList()
+    );
+    return new IndexAndMapper(
+        new PartialQueryableIndex(mapper.getSegmentFileMetadata(), mapper, 
COLUMN_CONFIG),
+        mapper
+    );
+  }
+
+  protected static ListeningExecutorService directExec()
+  {
+    return 
MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
+  }
+
+  /**
+   * A {@link PartialBundleAcquirer} whose holds are no-ops, submitting 
downloads to {@code downloadExec}. Used by tests
+   * that don't care about the cache-layer hold lifecycle.
+   */
+  protected static PartialBundleAcquirer noOpAcquirer(ListeningExecutorService 
downloadExec)
+  {
+    return new PartialBundleAcquirer()
+    {
+      @Override
+      public Closeable acquire(String bundleName)
+      {
+        return () -> {};
+      }
+
+      @Override
+      public <T> AsyncResource<T> submitDownload(Callable<T> task)
+      {
+        return AsyncResources.fromFutureUnmanaged(downloadExec.submit(task));
+      }
+    };
+  }
+
+  protected record IndexAndMapper(PartialQueryableIndex index, 
PartialSegmentFileMapperV10 mapper)
+      implements AutoCloseable
+  {
+    @Override
+    public void close()
+    {
+      mapper.close();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java
 
b/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java
index 244dd680fd3..e5f8aa55fab 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java
@@ -884,21 +884,21 @@ public class PartialSegmentMetadataCacheEntry implements 
SegmentCacheEntry, Resi
   }
 
   /**
-   * Structural inference of the parent bundles that the given {@code 
bundleName} depends on within this segment.
-   * Single source of truth for both bootstrap (which post-filters by what's 
actually restorable on disk) and the
-   * query-time acquire path (which uses the result directly to seed
-   * {@link PartialSegmentBundleCacheEntry#forBundle}'s {@code 
parentEntryIds}).
+   * Inference of the parent bundles that the given {@code bundleName} depends 
on within this segment.
    * <p>
-   * Today's rule is structural and trivial: any non-base bundle depends on 
the base bundle. The base bundle and the
-   * {@link SegmentFileBuilder#ROOT_BUNDLE_NAME root bundle} have no parents, 
the root bundle owns everything written
-   * without an explicit {@code startFileBundle} call (older fileGroup-less 
segments, or shared internal metadata) and
-   * is structurally a peer of the base. If future writers introduce richer 
dependency graphs, the rule will need to
-   * grow, likely by reading dependency metadata that the writer records 
explicitly rather than by inference here.
+   * The rule is uniform: the base bundle and the {@link 
SegmentFileBuilder#ROOT_BUNDLE_NAME root bundle} have no
+   * parents (the root bundle owns everything written without an explicit 
{@code startFileBundle} call, for older
+   * fileGroup-less segments, or any future shared internal metadata and is 
structurally a peer of the base);
+   * every other bundle depends on the base bundle, but only if this segment 
actually carries one.
+   * <p>
+   * If future writers introduce richer dependency graphs, the rule will need 
to grow, likely by reading dependency
+   * metadata the writer records explicitly.
    */
   public List<PartialSegmentBundleCacheEntryIdentifier> 
inferParentBundles(String bundleName)
   {
     if (Projections.BASE_TABLE_PROJECTION_NAME.equals(bundleName)
-        || SegmentFileBuilder.ROOT_BUNDLE_NAME.equals(bundleName)) {
+        || SegmentFileBuilder.ROOT_BUNDLE_NAME.equals(bundleName)
+        || !hasBaseBundle()) {
       return List.of();
     }
     return List.of(
@@ -909,6 +909,17 @@ public class PartialSegmentMetadataCacheEntry implements 
SegmentCacheEntry, Resi
     );
   }
 
+  /**
+   * Whether this segment carries a {@code __base} bundle (shared base-table 
column data). Probed from the mounted file
+   * mapper's actual bundle set; returns false when the entry is not mounted.
+   */
+  private boolean hasBaseBundle()
+  {
+    final PartialSegmentFileMapperV10 mapper = getFileMapper();
+    return mapper != null
+           && 
PartialSegmentBundleCacheEntry.bundleNames(mapper).contains(Projections.BASE_TABLE_PROJECTION_NAME);
+  }
+
   /**
    * Register a bundle entry as a current dependent of this metadata entry. 
Called by
    * {@link PartialSegmentBundleCacheEntry} after a successful mount; the drop 
path uses {@link #snapshotLinkedBundles}
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntryTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntryTest.java
index 620234e562b..f09ca7ba1be 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntryTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntryTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.file.CountingRangeReader;
 import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.file.SegmentFileBuilder;
 import org.apache.druid.segment.file.SegmentFileBuilderV10;
 import org.apache.druid.segment.projections.Projections;
 import org.apache.druid.timeline.SegmentId;
@@ -62,6 +63,7 @@ class PartialSegmentMetadataCacheEntryTest
 
   private File segmentFile;
   private File cacheDir;
+  private int fixtureSeq;
 
   @BeforeEach
   void setup() throws IOException
@@ -393,16 +395,51 @@ class PartialSegmentMetadataCacheEntryTest
   }
 
   @Test
-  void testInferParentBundlesForAggregateReturnsBase()
+  void testInferParentBundlesForRootReturnsEmpty()
   {
     final PartialSegmentMetadataCacheEntry entry = newEntry(ESTIMATE);
-    final List<PartialSegmentBundleCacheEntryIdentifier> parents = 
entry.inferParentBundles("some_aggregate_projection");
-    Assertions.assertEquals(1, parents.size());
-    Assertions.assertEquals(SEGMENT_ID, parents.getFirst().segmentId());
     Assertions.assertEquals(
-        Projections.BASE_TABLE_PROJECTION_NAME,
-        parents.getFirst().bundleName()
+        List.of(),
+        entry.inferParentBundles(SegmentFileBuilder.ROOT_BUNDLE_NAME)
+    );
+  }
+
+  @Test
+  void testInferParentBundlesDependsOnBaseWhenBaseBundlePresent() throws 
IOException
+  {
+    // A segment that carries a __base bundle (the non-clustered 
base+projection shape): every non-base/root bundle
+    // depends on it. Asserted uniformly for an aggregate-projection bundle 
and for a cluster-group bundle name (the
+    // latter standing in for the future clustered+shared-columns layout, 
where groups will share __base).
+    final PartialSegmentMetadataCacheEntry entry = mountedEntryOver(
+        buildSegmentWithBundles(Projections.BASE_TABLE_PROJECTION_NAME, 
"some_projection")
     );
+    for (String dependent : List.of("some_projection", 
Projections.getClusterGroupBundleName(List.of(0, 1)))) {
+      final List<PartialSegmentBundleCacheEntryIdentifier> parents = 
entry.inferParentBundles(dependent);
+      Assertions.assertEquals(1, parents.size(), "expected a __base parent for 
bundle[" + dependent + "]");
+      Assertions.assertEquals(SEGMENT_ID, parents.getFirst().segmentId());
+      Assertions.assertEquals(Projections.BASE_TABLE_PROJECTION_NAME, 
parents.getFirst().bundleName());
+    }
+  }
+
+  @Test
+  void testInferParentBundlesEmptyWhenSegmentHasNoBaseBundle() throws 
IOException
+  {
+    // A clustered + aggregate-projection segment with no shared columns has 
no __base bundle: the base data lives in
+    // per-group __base$<ids> bundles and the aggregate projection is 
self-contained. So neither a cluster group nor
+    // the aggregate projection has a parent to depend on. 
(Pre-shared-columns; the old "aggregate always depends on
+    // __base" rule would have wrongly tried to mount a nonexistent __base for 
the projection bundle.)
+    final PartialSegmentMetadataCacheEntry entry = mountedEntryOver(
+        buildSegmentWithBundles(
+            Projections.getClusterGroupBundleName(List.of(0)),
+            Projections.getClusterGroupBundleName(List.of(1)),
+            "some_projection"
+        )
+    );
+    Assertions.assertEquals(
+        List.of(),
+        
entry.inferParentBundles(Projections.getClusterGroupBundleName(List.of(0)))
+    );
+    Assertions.assertEquals(List.of(), 
entry.inferParentBundles("some_projection"));
   }
 
   private PartialSegmentMetadataCacheEntry newEntry(long estimate)
@@ -433,4 +470,49 @@ class PartialSegmentMetadataCacheEntryTest
     return new File(baseDir, IndexIO.V10_FILE_NAME);
   }
 
+  /**
+   * Build a V10 segment whose containers are tagged with exactly the given 
bundle names (one column file per bundle),
+   * so {@link PartialSegmentMetadataCacheEntry#inferParentBundles} can be 
exercised against a known bundle set without
+   * a full ingestion. Returns the deep-storage directory containing the V10 
file.
+   */
+  private File buildSegmentWithBundles(String... bundleNames) throws 
IOException
+  {
+    final int seq = fixtureSeq++;
+    final File baseDir = new File(tempDir, "deep_" + seq);
+    FileUtils.mkdirp(baseDir);
+    try (SegmentFileBuilderV10 builder = 
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir, CompressionStrategy.NONE)) {
+      for (int i = 0; i < bundleNames.length; ++i) {
+        builder.startFileBundle(bundleNames[i]);
+        final File tmpFile = new File(tempDir, 
StringUtils.format("fixture-%d-%d.bin", seq, i));
+        Files.write(Ints.toByteArray(i), tmpFile);
+        builder.add(bundleNames[i] + "/col", tmpFile);
+      }
+    }
+    return baseDir;
+  }
+
+  /**
+   * Reserve and mount a fresh metadata entry over the segment in {@code 
deepStorageDir}, into a per-call cache
+   * directory. The mounted entry's file mapper is what {@code 
inferParentBundles} probes for the base-bundle existence.
+   */
+  private PartialSegmentMetadataCacheEntry mountedEntryOver(File 
deepStorageDir) throws IOException
+  {
+    final File cache = new File(tempDir, "cache_" + (fixtureSeq++));
+    FileUtils.mkdirp(cache);
+    final StorageLocation location = new StorageLocation(cache, ESTIMATE * 4, 
null);
+    final PartialSegmentMetadataCacheEntry entry = new 
PartialSegmentMetadataCacheEntry(
+        SEGMENT_ID,
+        cache,
+        IndexIO.V10_FILE_NAME,
+        List.of(),
+        new DirectoryBackedRangeReader(deepStorageDir),
+        JSON_MAPPER,
+        null,
+        ESTIMATE
+    );
+    Assertions.assertTrue(location.reserve(entry));
+    entry.mount(location);
+    return entry;
+  }
+
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java
index d8d37381381..61993321047 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java
@@ -24,15 +24,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.guice.LocalDataStorageDruidModule;
 import org.apache.druid.jackson.SegmentizerModule;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -75,6 +79,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -113,10 +118,17 @@ class SegmentLocalCacheManagerPartialAcquireTest
       new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3), 
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 4L))
   );
 
+  // A second segment that is both clustered and carries an aggregate 
projection: no shared columns, so it has
+  // per-group __base$<ids> bundles + a self-contained "proj" bundle, but no 
__base bundle.
+  private static final SegmentId CLUSTERED_SEGMENT_ID =
+      SegmentId.of("test_clustered", Intervals.of("2025/2026"), "v1", 0);
+  private static final String CLUSTERED_PROJECTION_BUNDLE = "proj";
+
   @TempDir
   static File SHARED_TEMP_DIR;
 
   private static File DEEP_STORAGE_DIR;
+  private static File CLUSTERED_DEEP_STORAGE_DIR;
 
   @TempDir
   File perTestTempDir;
@@ -156,9 +168,70 @@ class SegmentLocalCacheManagerPartialAcquireTest
                                                        .build())
                                    .rows(ROWS)
                                    .buildMMappedIndexFile();
+    CLUSTERED_DEEP_STORAGE_DIR = buildClusteredProjectionSegment();
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
   }
 
+  /**
+   * Build a clustered base-table segment that also carries an aggregate 
projection (group-by {@code tenant} with
+   * {@code sum(x)}). With no shared columns the layout is per-group {@code 
__base$<ids>} bundles + a self-contained
+   * {@code proj} bundle and no {@code __base} bundle.
+   */
+  private static File buildClusteredProjectionSegment()
+  {
+    final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+        ClusteredValueGroupsBaseTableProjectionSpec.builder()
+            .columns(
+                new StringDimensionSchema("tenant"),
+                new StringDimensionSchema("region"),
+                new LongDimensionSchema("x"),
+                new LongDimensionSchema("__time")
+            )
+            .clusteringColumns("tenant")
+            .build();
+    final AggregateProjectionSpec projectionSpec =
+        AggregateProjectionSpec.builder(CLUSTERED_PROJECTION_BUNDLE)
+                               .groupingColumns(new 
StringDimensionSchema("tenant"))
+                               .aggregators(
+                                   new CountAggregatorFactory("cnt"),
+                                   new LongSumAggregatorFactory("sum_x", "x")
+                               )
+                               .build();
+    final File tmp = new File(SHARED_TEMP_DIR, "build_clustered_" + 
ThreadLocalRandom.current().nextInt());
+    return IndexBuilder.create()
+                       .useV10()
+                       .tmpDir(tmp)
+                       
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                       .schema(
+                           IncrementalIndexSchema.builder()
+                                                 
.withMinTimestamp(TIME.getMillis())
+                                                 .withTimestampSpec(new 
TimestampSpec("ts", "millis", null))
+                                                 
.withQueryGranularity(Granularities.NONE)
+                                                 
.withDimensionsSpec(clusterSpec.getDimensionsSpec())
+                                                 .withRollup(false)
+                                                 .withClusterSpec(clusterSpec)
+                                                 
.withProjections(List.of(projectionSpec))
+                                                 .build()
+                       )
+                       
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+                       .rows(List.of(
+                           clusteredRow(TIME.getMillis() + 2, "globex", 
"eu-west-1", 5),
+                           clusteredRow(TIME.getMillis(), "acme", "us-east-1", 
10),
+                           clusteredRow(TIME.getMillis() + 1, "acme", 
"us-west-2", 20)
+                       ))
+                       .buildMMappedIndexFile();
+  }
+
+  private static InputRow clusteredRow(long ts, String tenant, String region, 
long x)
+  {
+    final Map<String, Object> event = new HashMap<>();
+    event.put("ts", ts);
+    event.put("tenant", tenant);
+    event.put("region", region);
+    event.put("x", x);
+    return new MapBasedInputRow(ts, List.of("tenant", "region", "x"), event);
+  }
+
   @BeforeEach
   void setup() throws IOException
   {
@@ -206,10 +279,7 @@ class SegmentLocalCacheManagerPartialAcquireTest
     };
 
     // DataSegment with a LocalLoadSpec pointing at the deep storage directory 
(unzipped V10 layout).
-    partialSegment = DataSegment.builder()
-                                .dataSource(SEGMENT_ID.getDataSource())
-                                .interval(SEGMENT_ID.getInterval())
-                                .version(SEGMENT_ID.getVersion())
+    partialSegment = DataSegment.builder(SEGMENT_ID)
                                 .shardSpec(NoneShardSpec.instance())
                                 .loadSpec(Map.of("type", "local", "path", 
DEEP_STORAGE_DIR.getAbsolutePath()))
                                 .size(0)
@@ -405,6 +475,59 @@ class SegmentLocalCacheManagerPartialAcquireTest
     );
   }
 
+  @Test
+  void 
testPartialAcquireClusteredWithProjectionMountsProjectionBundleWithoutBase()
+      throws ExecutionException, InterruptedException, IOException
+  {
+    final DataSegment clusteredSegment =
+        DataSegment.builder(CLUSTERED_SEGMENT_ID)
+                   .shardSpec(NoneShardSpec.instance())
+                   .loadSpec(Map.of("type", "local", "path", 
CLUSTERED_DEEP_STORAGE_DIR.getAbsolutePath()))
+                   .size(0)
+                   .build();
+    try (AcquireSegmentAction action = 
manager.acquireSegment(clusteredSegment, AcquireMode.PARTIAL)) {
+      final AcquireSegmentResult result = action.getSegmentFuture().get();
+      try (Segment segment = 
result.getReferenceProvider().acquireReference().orElseThrow()) {
+        Assertions.assertEquals(CLUSTERED_SEGMENT_ID, segment.getId());
+
+        // group-by tenant + sum(x) matches the aggregate projection. Building 
this cursor drives the 'proj' bundle to
+        // mount through the real acquire path. inferParentBundles must return 
no parent for it (the clustered segment
+        // has no __base bundle); the old "aggregate always depends on __base" 
rule would have tried to mount a
+        // nonexistent __base here and failed.
+        final CursorBuildSpec aggSpec = CursorBuildSpec.builder()
+                                                       
.setGroupingColumns(List.of("tenant"))
+                                                       
.setAggregators(List.of(new LongSumAggregatorFactory("sum_x", "x")))
+                                                       
.setPhysicalColumns(Set.of("tenant", "x"))
+                                                       .build();
+        try (var asyncHolder = 
segment.as(CursorFactory.class).makeCursorHolderAsync(aggSpec)) {
+          final CountDownLatch ready = new CountDownLatch(1);
+          asyncHolder.addReadyCallback(ready::countDown);
+          Assertions.assertTrue(ready.await(15, TimeUnit.SECONDS));
+          try (CursorHolder cursorHolder = asyncHolder.release()) {
+            Assertions.assertNotNull(cursorHolder.asCursor(), 
"projection-matched cursor must build over the combo segment");
+          }
+        }
+      }
+
+      final StorageLocation loc = manager.getLocations().get(0);
+      // the projection bundle mounted through the real acquire path...
+      Assertions.assertTrue(
+          loc.isWeakReserved(new 
PartialSegmentBundleCacheEntryIdentifier(CLUSTERED_SEGMENT_ID, 
CLUSTERED_PROJECTION_BUNDLE)),
+          "projection bundle must be registered after the projection-matched 
cursor build"
+      );
+      // ...and NO phantom __base bundle was created: a clustered segment with 
no shared columns has no base bundle.
+      Assertions.assertFalse(
+          loc.isWeakReserved(
+              new 
PartialSegmentBundleCacheEntryIdentifier(CLUSTERED_SEGMENT_ID, 
Projections.BASE_TABLE_PROJECTION_NAME)
+          ),
+          "no __base bundle should be mounted for a clustered segment with no 
shared columns"
+      );
+    }
+    finally {
+      manager.drop(clusteredSegment);
+    }
+  }
+
   @Test
   void testAcquireSegmentForcesFullDownloadOnPartialEligible()
       throws ExecutionException, InterruptedException, IOException


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to