This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f575a25849b feat: add PartialQueryableIndex for loading columns on
demand (#19383)
f575a25849b is described below
commit f575a25849b7e2cca576e8cab9cb069f3ddf6b3a
Author: Clint Wylie <[email protected]>
AuthorDate: Fri May 1 04:28:50 2026 -0700
feat: add PartialQueryableIndex for loading columns on demand (#19383)
changes:
* adds `PartialQueryableIndex` which uses a `PartialSegmentFileMapperV10`
to expose a `QueryableIndex` that supports loading columns on demand
* adds `ColumnDescriptor.toColumnType` to provide a way to collect
`ColumnType` information from `ColumnPartSerde` so we can expose schema type
information without downloading files
* adds `PartialQueryableIndexTest` to cover a variety of scenarios of
partial load
---
.../druid/segment/PartialQueryableIndex.java | 416 ++++++++++++++++
.../org/apache/druid/segment/QueryableIndex.java | 9 +
.../druid/segment/column/ColumnDescriptor.java | 19 +
.../segment/file/PartialSegmentFileMapperV10.java | 10 +
.../druid/segment/projections/Projections.java | 2 +-
.../druid/segment/serde/ColumnPartSerde.java | 13 +
.../segment/serde/ComplexColumnPartSerde.java | 8 +
.../serde/NestedCommonFormatColumnPartSerde.java | 6 +
.../input/impl/AggregateProjectionSpecTest.java | 17 +
.../druid/segment/PartialQueryableIndexTest.java | 531 +++++++++++++++++++++
.../druid/segment/column/ColumnDescriptorTest.java | 85 ++++
11 files changed, 1115 insertions(+), 1 deletion(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
new file mode 100644
index 00000000000..93c7e1fe2b5
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.segment.column.BaseColumnHolder;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.file.SegmentFileMapper;
+import org.apache.druid.segment.file.SegmentFileMetadata;
+import org.apache.druid.segment.projections.AggregateProjectionSchema;
+import org.apache.druid.segment.projections.BaseTableProjectionSchema;
+import org.apache.druid.segment.projections.ConstantTimeColumn;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link QueryableIndex} that loads projection and base table columns on
demand from a
+ * {@link PartialSegmentFileMapperV10}. Schema queries (column names, types,
intervals, metadata) are answered from
+ * the {@link SegmentFileMetadata} alone without triggering any downloads.
Column data is only downloaded when a column
+ * is accessed via {@link #getColumnHolder(String)} or {@link
#getProjection(CursorBuildSpec)}.
+ * <p>
+ * Projection matching uses only metadata ({@link
SegmentFileMetadata#getColumnDescriptors()} keys) to determine if
+ * a projection can satisfy a query, avoiding downloads of projection data
that won't be used.
+ *
+ * @see PartialSegmentFileMapperV10
+ */
+public class PartialQueryableIndex implements QueryableIndex
+{
+ private final Interval dataInterval;
+ private final int baseNumRows;
+ private final Indexed<String> availableDimensions;
+ private final List<String> columnNames;
+ private final BitmapFactory bitmapFactory;
+ private final PartialSegmentFileMapperV10 fileMapper;
+ private final SegmentFileMetadata metadata;
+ private final ColumnConfig columnConfig;
+ private final Metadata reconstructedMetadata;
+ private final List<OrderBy> ordering;
+
+ // projection metadata for matching
+ private final SortedSet<AggregateProjectionMetadata> projections;
+ private final Map<String, AggregateProjectionMetadata> projectionsMap;
+ private final Map<String, ProjectionMetadata> projectionSpecs;
+
+ // segment-internal file prefix for the base table projection, used to
translate column names to descriptor keys
+ private final String baseProjectionPrefix;
+
+ // base table columns, built at construction time. each entry's supplier
defers both mapFile() and column
+ // deserialization until the column is actually accessed, so queries only
trigger downloads for the specific
+ // columns they use.
+ private final Map<String, Supplier<BaseColumnHolder>> baseColumns;
+
+ // projection columns, keyed by projection name. built on demand
(per-projection) when the projection is matched.
+ // within each projection, per-column suppliers defer both mapFile() and
deserialization.
+ private final ConcurrentHashMap<String, Map<String,
Supplier<BaseColumnHolder>>> projectionColumnsByName =
+ new ConcurrentHashMap<>();
+
+ // lazy dimension handlers
+ private final Supplier<Map<String, DimensionHandler>> dimensionHandlers;
+
+ public PartialQueryableIndex(
+ SegmentFileMetadata metadata,
+ PartialSegmentFileMapperV10 fileMapper,
+ ColumnConfig columnConfig
+ )
+ {
+ this.metadata = metadata;
+ this.fileMapper = fileMapper;
+ this.columnConfig = columnConfig;
+
+ // base table projection is always first
+ final ProjectionMetadata baseProjection = metadata.getProjections().get(0);
+ DruidException.conditionalDefensive(
+
Projections.BASE_TABLE_PROJECTION_NAME.equals(baseProjection.getSchema().getName()),
+ "Expected base table projection with name[%s], but got projection with
name[%s] instead",
+ Projections.BASE_TABLE_PROJECTION_NAME,
+ baseProjection.getSchema().getName()
+ );
+ final BaseTableProjectionSchema baseSchema = (BaseTableProjectionSchema)
baseProjection.getSchema();
+ this.baseNumRows = baseProjection.getNumRows();
+ this.baseProjectionPrefix =
Projections.getProjectionSegmentInternalFilePrefix(baseSchema);
+ this.dataInterval = Intervals.of(metadata.getInterval());
+ this.bitmapFactory = metadata.getBitmapEncoding().getBitmapFactory();
+ this.availableDimensions = new
ListIndexed<>(baseSchema.getDimensionNames());
+
+ // build column names (dimensions first, then other columns, excluding
__time)
+ final LinkedHashSet<String> dimsFirst = new
LinkedHashSet<>(baseSchema.getDimensionNames());
+ for (String columnName : baseSchema.getColumnNames()) {
+ if (!ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
+ dimsFirst.add(columnName);
+ }
+ }
+ this.columnNames = List.copyOf(dimsFirst);
+
+ // build aggregate projection metadata for matching
+ final List<AggregateProjectionMetadata> aggProjections = new ArrayList<>();
+ this.projectionSpecs = new ConcurrentHashMap<>();
+ boolean first = true;
+ for (ProjectionMetadata projectionSpec : metadata.getProjections()) {
+ if (first) {
+ first = false;
+ continue;
+ }
+ if (projectionSpec.getSchema() instanceof AggregateProjectionSchema) {
+ aggProjections.add(
+ new AggregateProjectionMetadata(
+ (AggregateProjectionSchema) projectionSpec.getSchema(),
+ projectionSpec.getNumRows()
+ )
+ );
+ projectionSpecs.put(projectionSpec.getSchema().getName(),
projectionSpec);
+ } else {
+ throw DruidException.defensive(
+ "Unexpected projection[%s] with type[%s]",
+ projectionSpec.getSchema().getName(),
+ projectionSpec.getSchema().getClass()
+ );
+ }
+ }
+
+ this.reconstructedMetadata = baseSchema.asMetadata(aggProjections);
+ if (reconstructedMetadata.getOrdering() != null) {
+ this.ordering =
SimpleQueryableIndex.ORDERING_INTERNER.intern(reconstructedMetadata.getOrdering());
+ } else {
+ this.ordering = Cursors.ascendingTimeOrder();
+ }
+
+ this.projectionsMap =
Maps.newHashMapWithExpectedSize(aggProjections.size());
+ this.projections = new
ObjectAVLTreeSet<>(AggregateProjectionMetadata.COMPARATOR);
+ for (AggregateProjectionMetadata projection : aggProjections) {
+ projections.add(projection);
+ projectionsMap.put(projection.getSchema().getName(), projection);
+ }
+
+ // build per-column suppliers for the base table. each supplier is
memoized and defers both mapFile() and
+ // deserialization until the column is accessed.
+ this.baseColumns = buildProjectionColumnSuppliers(baseProjection,
Map.of());
+
+ this.dimensionHandlers = Suppliers.memoize(this::initDimensionHandlers);
+ }
+
+ @Override
+ public Interval getDataInterval()
+ {
+ return dataInterval;
+ }
+
+ @Override
+ public int getNumRows()
+ {
+ return baseNumRows;
+ }
+
+ @Override
+ public Indexed<String> getAvailableDimensions()
+ {
+ return availableDimensions;
+ }
+
+ @Override
+ public BitmapFactory getBitmapFactoryForDimensions()
+ {
+ return bitmapFactory;
+ }
+
+ @Override
+ public Metadata getMetadata()
+ {
+ return reconstructedMetadata;
+ }
+
+ @Override
+ public Map<String, DimensionHandler> getDimensionHandlers()
+ {
+ return dimensionHandlers.get();
+ }
+
+ @Override
+ public List<String> getColumnNames()
+ {
+ return columnNames;
+ }
+
+ @Override
+ public List<OrderBy> getOrdering()
+ {
+ return ordering;
+ }
+
+ @Nullable
+ @Override
+ public BaseColumnHolder getColumnHolder(String columnName)
+ {
+ final Supplier<BaseColumnHolder> supplier = baseColumns.get(columnName);
+ return supplier == null ? null : supplier.get();
+ }
+
+ /**
+ * Answers from metadata without triggering column downloads. The default
implementation in {@link QueryableIndex}
+ * calls {@link #getColumnHolder(String)}, which would force a base table
load.
+ */
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ // look up the column in the base table projection's namespace
+ final String smooshName = baseProjectionPrefix + column;
+ final ColumnDescriptor descriptor =
metadata.getColumnDescriptors().get(smooshName);
+ if (descriptor == null) {
+ return null;
+ }
+ return ColumnCapabilitiesImpl.createDefault()
+ .setType(descriptor.toColumnType())
+
.setHasMultipleValues(descriptor.isHasMultipleValues());
+ }
+
+ @Nullable
+ @Override
+ public QueryableProjection<QueryableIndex> getProjection(CursorBuildSpec
cursorBuildSpec)
+ {
+ return Projections.findMatchingProjection(
+ cursorBuildSpec,
+ projections,
+ dataInterval,
+ (projectionName, columnName) -> {
+ // check if the projection has this column using metadata column
descriptors
+ final ProjectionMetadata projSpec =
projectionSpecs.get(projectionName);
+ if (projSpec == null) {
+ return false;
+ }
+ final String smooshName =
Projections.getProjectionSegmentInternalFileName(projSpec.getSchema(),
columnName);
+ return metadata.getColumnDescriptors().containsKey(smooshName)
+ || getColumnCapabilities(columnName) == null;
+ },
+ this::getProjectionQueryableIndex
+ );
+ }
+
+ @Nullable
+ @Override
+ public QueryableIndex getProjectionQueryableIndex(String name)
+ {
+ final AggregateProjectionMetadata projectionMeta =
projectionsMap.get(name);
+ if (projectionMeta == null) {
+ return null;
+ }
+
+ // build per-column suppliers for this projection on first access. the
suppliers themselves still defer download
+ // and deserialization until individual columns are read.
+ final Map<String, Supplier<BaseColumnHolder>> projColumns =
projectionColumnsByName.computeIfAbsent(
+ name,
+ projName ->
buildProjectionColumnSuppliers(projectionSpecs.get(projName), baseColumns)
+ );
+
+ final Metadata projectionMetadata = new Metadata(
+ null,
+ projectionMeta.getSchema().getAggregators(),
+ null,
+ null,
+ true,
+ projectionMeta.getSchema().getOrderingWithTimeColumnSubstitution(),
+ null
+ );
+
+ return new SimpleQueryableIndex(
+ dataInterval,
+ new ListIndexed<>(
+ projectionMeta.getSchema()
+ .getGroupingColumns()
+ .stream()
+ .filter(x ->
!x.equals(projectionMeta.getSchema().getTimeColumnName()))
+ .collect(Collectors.toList())
+ ),
+ bitmapFactory,
+ projColumns,
+ fileMapper,
+ projectionMetadata,
+ null
+ )
+ {
+ @Override
+ public Metadata getMetadata()
+ {
+ return projectionMetadata;
+ }
+
+ @Override
+ public int getNumRows()
+ {
+ return projectionMeta.getNumRows();
+ }
+
+ @Override
+ public List<OrderBy> getOrdering()
+ {
+ return
projectionMeta.getSchema().getOrderingWithTimeColumnSubstitution();
+ }
+ };
+ }
+
+ @Override
+ public void close()
+ {
+ fileMapper.close();
+ }
+
+ private Map<String, DimensionHandler> initDimensionHandlers()
+ {
+ final Map<String, DimensionHandler> handlers = Maps.newLinkedHashMap();
+ for (String dim : availableDimensions) {
+ final ColumnHolder columnHolder = getColumnHolder(dim);
+ if (columnHolder != null) {
+ handlers.put(dim,
columnHolder.getColumnFormat().getColumnHandler(dim));
+ }
+ }
+ return handlers;
+ }
+
+ /**
+ * Build a map of column name to per-column supplier for the given
projection. Each supplier defers both
+ * {@link SegmentFileMapper#mapFile} and {@link ColumnDescriptor#read} until
the column is actually accessed, so
+ * queries only trigger downloads for the specific columns they use.
+ */
+ private Map<String, Supplier<BaseColumnHolder>>
buildProjectionColumnSuppliers(
+ ProjectionMetadata projectionSpec,
+ Map<String, Supplier<BaseColumnHolder>> parentColumns
+ )
+ {
+ final String timeColumnName =
projectionSpec.getSchema().getTimeColumnName();
+ final boolean renameTime =
!ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName);
+ final Map<String, Supplier<BaseColumnHolder>> projectionColumns = new
LinkedHashMap<>();
+
+ for (String column : projectionSpec.getSchema().getColumnNames()) {
+ final String smooshName =
Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(),
column);
+ final ColumnDescriptor columnDescriptor =
metadata.getColumnDescriptors().get(smooshName);
+ if (columnDescriptor == null) {
+ continue;
+ }
+
+ final String internedColumnName =
SmooshedFileMapper.STRING_INTERNER.intern(column);
+ final Supplier<BaseColumnHolder> columnSupplier = Suppliers.memoize(()
-> {
+ try {
+ final ByteBuffer colBuffer = fileMapper.mapFile(smooshName);
+ final BaseColumnHolder parentColumn =
+ parentColumns.containsKey(column) ?
parentColumns.get(column).get() : null;
+ return columnDescriptor.read(colBuffer, columnConfig, fileMapper,
parentColumn);
+ }
+ catch (IOException e) {
+ throw DruidException.defensive(e, "Failed to load column[%s]",
smooshName);
+ }
+ });
+
+ projectionColumns.put(internedColumnName, columnSupplier);
+
+ if (column.equals(timeColumnName) && renameTime) {
+ projectionColumns.put(ColumnHolder.TIME_COLUMN_NAME,
projectionColumns.get(column));
+ projectionColumns.remove(column);
+ }
+ }
+
+ if (timeColumnName == null) {
+ projectionColumns.put(
+ ColumnHolder.TIME_COLUMN_NAME,
+
ConstantTimeColumn.makeConstantTimeSupplier(projectionSpec.getNumRows(),
dataInterval.getStartMillis())
+ );
+ }
+
+ return projectionColumns;
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java
b/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java
index 6c413a1ca86..ce9bee38ac1 100644
--- a/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java
@@ -64,6 +64,15 @@ public interface QueryableIndex extends Closeable,
ColumnInspector
@Nullable
BaseColumnHolder getColumnHolder(String columnName);
+
+ /**
+ * Provides information about columns, most callers should prefer to call
{@link #getColumnHolder(String)} and then
+ * {@link ColumnHolder#getCapabilities()} instead of this method to have
fully accurate column details. The default
+ * implementation of this method does this, but callers can only count on
{@link ColumnCapabilities#getType()} and
+ * {@link ColumnCapabilities#hasMultipleValues()} to be reliably set from
this method; in some implementations richer
+ * fields ({@code isDictionaryEncoded}, {@code hasBitmapIndexes}, {@code
hasNulls}, etc.) might keep their
+ * default/UNKNOWN values.
+ */
@Override
@Nullable
default ColumnCapabilities getColumnCapabilities(String column)
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
index c988d26440a..47f13ca019e 100644
---
a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
+++
b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java
@@ -78,6 +78,25 @@ public class ColumnDescriptor implements Serializer
return parts;
}
+ /**
+ * Derive the {@link ColumnType} for this column from the part serdes,
without deserializing column data. This is
+ * useful for answering schema queries from metadata alone.
+ * <p>
+ * Part serdes that carry additional type information (such as complex type
names or array element types) provide it
+ * via {@link ColumnPartSerde#getColumnType()}. If no part serde provides a
type, falls back to a simple
+ * {@link ColumnType} from {@link #getValueType()}.
+ */
+ public ColumnType toColumnType()
+ {
+ for (ColumnPartSerde part : parts) {
+ final ColumnType type = part.getColumnType();
+ if (type != null) {
+ return type;
+ }
+ }
+ return new ColumnType(valueType, null, null);
+ }
+
@Override
public long getSerializedSize() throws IOException
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java
b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java
index 79d52ab384f..c622ec75641 100644
---
a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java
+++
b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java
@@ -315,6 +315,16 @@ public class PartialSegmentFileMapperV10 implements
SegmentFileMapper
return total;
}
+ /**
+ * The internal file names that have been downloaded so far, scoped to this
mapper. External mappers' downloaded
+ * files are not included; call {@link #getDownloadedFiles()} on each
external mapper directly if needed. Primarily
+ * intended for tests and diagnostics.
+ */
+ public Set<String> getDownloadedFiles()
+ {
+ return Set.copyOf(downloadedFiles);
+ }
+
@Override
public void close()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index c81549f41c2..cc037d730da 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -63,7 +63,7 @@ public class Projections
throw InvalidInput.exception("projection name cannot be null or empty");
}
if (name.startsWith("__")) {
- throw InvalidInput.exception("projection cannot use reserved name[%s]",
BASE_TABLE_PROJECTION_NAME);
+ throw InvalidInput.exception("projection cannot use reserved name[%s],
names cannot start with '__'", name);
}
return name;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnPartSerde.java
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnPartSerde.java
index f48b46fbbdd..dd387c8d7e7 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnPartSerde.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnPartSerde.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@@ -53,6 +54,18 @@ public interface ColumnPartSerde
*/
Deserializer getDeserializer();
+ /**
+ * Returns the {@link ColumnType} for this part serde, if known. This is
used to determine the column type from
+ * metadata without deserializing the column data. Returns {@code null} by
default; implementations that carry
+ * type information beyond what {@link
org.apache.druid.segment.column.ColumnDescriptor#getValueType()} provides
+ * (such as complex type names or array element types) should override this.
+ */
+ @Nullable
+ default ColumnType getColumnType()
+ {
+ return null;
+ }
+
interface Deserializer
{
void read(
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
index f8220d40046..0ec98414ff3 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
@@ -68,6 +70,12 @@ public class ComplexColumnPartSerde implements
ColumnPartSerde
return typeName;
}
+ @Override
+ public ColumnType getColumnType()
+ {
+ return new ColumnType(ValueType.COMPLEX, typeName, null);
+ }
+
@Override
public Serializer getSerializer()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
index 5bf50769b55..aa2070e7a00 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
@@ -168,6 +168,12 @@ public class NestedCommonFormatColumnPartSerde implements
ColumnPartSerde
return new NestedColumnDeserializer();
}
+ @Override
+ public ColumnType getColumnType()
+ {
+ return logicalType;
+ }
+
@JsonProperty("logicalType")
public ColumnType getLogicalType()
{
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
index 6c33ec57a11..638b7029ce9 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
@@ -213,6 +213,23 @@ class AggregateProjectionSpecTest extends
InitializedNullHandlingTest
Assertions.assertEquals("projection name cannot be null or empty",
t.getMessage());
}
+ @Test
+ void testReservedName()
+ {
+ Throwable t = Assertions.assertThrows(
+ DruidException.class,
+ () -> new AggregateProjectionSpec(
+ "__foo",
+ null,
+ VirtualColumns.EMPTY,
+ List.of(new StringDimensionSchema("string")),
+ null
+ )
+ );
+
+ Assertions.assertEquals("projection cannot use reserved name[__foo], names
cannot start with '__'", t.getMessage());
+ }
+
@Test
void testInvalidGrouping()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexTest.java
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexTest.java
new file mode 100644
index 00000000000..bb3e865fa11
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexTest.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.apache.druid.segment.projections.QueryableProjection;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class PartialQueryableIndexTest extends InitializedNullHandlingTest
+{
+ private static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT;
+ private static final DateTime TIME = DateTimes.of("2025-01-01");
+
+ private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+ .add("dim1",
ColumnType.STRING)
+ .add("dim2",
ColumnType.STRING)
+
.add("metric1", ColumnType.LONG)
+ .build();
+
+ private static final List<AggregateProjectionSpec> PROJECTIONS =
Collections.singletonList(
+ AggregateProjectionSpec.builder("dim1_hourly_metric1_sum")
+ .virtualColumns(
+ Granularities.toVirtualColumn(
+ Granularities.HOUR,
+
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
+ )
+ )
+ .groupingColumns(
+ new StringDimensionSchema("dim1"),
+ new
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
+ )
+ .aggregators(
+ new LongSumAggregatorFactory("_metric1_sum",
"metric1"),
+ new CountAggregatorFactory("_count")
+ )
+ .build()
+ );
+
+ private static final List<InputRow> ROWS = Arrays.asList(
+ new ListBasedInputRow(ROW_SIGNATURE, TIME,
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", "x", 1L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(1),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", "y", 2L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(2),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", "x", 3L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", "y", 4L))
+ );
+
+ @TempDir
+ static File sharedTempDir;
+
+ // the built V10 segment directory, shared across tests since it's read-only
+ private static File segmentDir;
+
+ @BeforeAll
+ static void buildSegment()
+ {
+ final File tmpDir = new File(sharedTempDir, "build_" +
ThreadLocalRandom.current().nextInt());
+ segmentDir = IndexBuilder.create()
+ .useV10()
+ .tmpDir(tmpDir)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ IncrementalIndexSchema.builder()
+ .withDimensionsSpec(
+
DimensionsSpec.builder()
+
.setDimensions(
+
List.of(
+
new StringDimensionSchema("dim1"),
+
new StringDimensionSchema("dim2"),
+
new LongDimensionSchema("metric1")
+ )
+ )
+
.build()
+ )
+ .withRollup(false)
+
.withMinTimestamp(TIME.getMillis())
+
.withProjections(PROJECTIONS)
+ .build()
+ )
+
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.ZSTD).build())
+ .rows(ROWS)
+ .buildMMappedIndexFile();
+ }
+
+ @Test
+ void testSchemaWithoutDownloads() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final File cacheDir = newCacheDir("schema");
+
+ try (PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ TestHelper.makeJsonMapper(),
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ Collections.emptyList()
+ )) {
+ rangeReader.resetCount();
+
+ final PartialQueryableIndex index = new PartialQueryableIndex(
+ mapper.getSegmentFileMetadata(),
+ mapper,
+ COLUMN_CONFIG
+ );
+
+ // all these should work without triggering any range reads (downloads)
+ Assertions.assertNotNull(index.getDataInterval());
+ Assertions.assertEquals(4, index.getNumRows());
+ Assertions.assertNotNull(index.getAvailableDimensions());
+ Assertions.assertNotNull(index.getMetadata());
+ Assertions.assertNotNull(index.getOrdering());
+ Assertions.assertFalse(index.getColumnNames().isEmpty());
+ Assertions.assertNotNull(index.getBitmapFactoryForDimensions());
+
+ // no downloads triggered
+ Assertions.assertEquals(0, rangeReader.getReadCount());
+ Assertions.assertEquals(Set.of(), rangeReader.getReadFilenames());
+ }
+ }
+
+ @Test
+ void testGetColumnCapabilitiesFromMetadata() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final File cacheDir = newCacheDir("caps");
+
+ try (PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ TestHelper.makeJsonMapper(),
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ Collections.emptyList()
+ )) {
+ rangeReader.resetCount();
+
+ final PartialQueryableIndex index = new PartialQueryableIndex(
+ mapper.getSegmentFileMetadata(),
+ mapper,
+ COLUMN_CONFIG
+ );
+
+ // string dimension
+ ColumnCapabilities dim1Caps = index.getColumnCapabilities("dim1");
+ Assertions.assertNotNull(dim1Caps);
+ Assertions.assertEquals(ValueType.STRING, dim1Caps.getType());
+
+ // long metric
+ ColumnCapabilities metric1Caps = index.getColumnCapabilities("metric1");
+ Assertions.assertNotNull(metric1Caps);
+ Assertions.assertEquals(ValueType.LONG, metric1Caps.getType());
+
+ // time column
+ ColumnCapabilities timeCaps =
index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME);
+ Assertions.assertNotNull(timeCaps);
+
+ // non-existent column
+ Assertions.assertNull(index.getColumnCapabilities("nonexistent"));
+
+ // no downloads triggered
+ Assertions.assertEquals(0, rangeReader.getReadCount());
+ Assertions.assertEquals(Set.of(), rangeReader.getReadFilenames());
+ }
+ }
+
+ @Test
+ void testGetColumnHolderTriggersBaseTableLoad() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final File cacheDir = newCacheDir("colholder");
+
+ try (PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ TestHelper.makeJsonMapper(),
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ Collections.emptyList()
+ )) {
+ rangeReader.resetCount();
+
+ final PartialQueryableIndex index = new PartialQueryableIndex(
+ mapper.getSegmentFileMetadata(),
+ mapper,
+ COLUMN_CONFIG
+ );
+
+ // no downloads yet
+ Assertions.assertEquals(0, rangeReader.getReadCount());
+
+ // accessing a column holder should trigger downloads
+
Assertions.assertNotNull(index.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME));
+ Assertions.assertTrue(rangeReader.getReadCount() > 0);
+
+ Assertions.assertNotNull(index.getColumnHolder("dim1"));
+ Assertions.assertNull(index.getColumnHolder("nonexistent"));
+
+ // all reads went to the V10 main segment file (no externals queried)
+ Assertions.assertEquals(Set.of(IndexIO.V10_FILE_NAME),
rangeReader.getReadFilenames());
+ }
+ }
+
+ @Test
+ void testGetProjectionMatchesFromMetadataAndLoadsLazily() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final File cacheDir = newCacheDir("projection");
+
+ try (PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ TestHelper.makeJsonMapper(),
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ Collections.emptyList()
+ )) {
+ final PartialQueryableIndex index = new PartialQueryableIndex(
+ mapper.getSegmentFileMetadata(),
+ mapper,
+ COLUMN_CONFIG
+ );
+
+ // build a CursorBuildSpec that should match the projection
+ final CursorBuildSpec matchingSpec = CursorBuildSpec.builder()
+ .setInterval(index.getDataInterval())
+ .setPhysicalColumns(Set.of("dim1", "metric1"))
+ .setGroupingColumns(Collections.singletonList("dim1"))
+ .setVirtualColumns(
+ VirtualColumns.create(
+ Granularities.toVirtualColumn(Granularities.HOUR,
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
+ )
+ )
+ .setAggregators(
+ List.of(
+ new LongSumAggregatorFactory("_metric1_sum", "metric1"),
+ new CountAggregatorFactory("_count")
+ )
+ )
+ .build();
+
+ rangeReader.resetCount();
+
+ final QueryableProjection<QueryableIndex> projection =
index.getProjection(matchingSpec);
+ Assertions.assertNotNull(projection, "projection should match");
+
+ // matching the projection itself shouldn't trigger any downloads, it's
metadata-based
+ Assertions.assertEquals(0, rangeReader.getReadCount(), "matching should
not download files");
+ Assertions.assertEquals(Set.of(), rangeReader.getReadFilenames(),
"matching should not download files");
+ Assertions.assertEquals(Set.of(), mapper.getDownloadedFiles(), "matching
should not download files");
+
+ final QueryableIndex projIndex = projection.getRowSelector();
+ Assertions.assertNotNull(projIndex);
+ Assertions.assertEquals(0, rangeReader.getReadCount(), "this should not
download files either");
+ // actually accessing a column on the projection triggers the column's
download
+
Assertions.assertNotNull(projIndex.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME));
+ Assertions.assertTrue(rangeReader.getReadCount() > 0, "accessing a
projection column should download");
+ Assertions.assertEquals(Set.of(IndexIO.V10_FILE_NAME),
rangeReader.getReadFilenames());
+
+ // downloaded files are scoped to the matched projection's namespace,
not the base table (if no shared parts)
+ final Set<String> downloaded = mapper.getDownloadedFiles();
+ Assertions.assertTrue(
+ downloaded.stream().anyMatch(name ->
name.startsWith("dim1_hourly_metric1_sum/")),
+ "expected at least one file from the matched projection's namespace,
got " + downloaded
+ );
+ Assertions.assertTrue(
+ downloaded.stream().noneMatch(name -> name.startsWith("__base/")),
+ "no base table files should be downloaded when only the projection
was accessed, got " + downloaded
+ );
+
+ // fetching a projection column which has a base table parent does
download base table stuff
+ Assertions.assertNotNull(projIndex.getColumnHolder("dim1"));
+ final Set<String> downloadedAfterDim1 = mapper.getDownloadedFiles();
+ Assertions.assertTrue(
+ downloadedAfterDim1.stream().anyMatch(name ->
name.startsWith("dim1_hourly_metric1_sum/")),
+ "expected at least one file from the matched projection's namespace,
got " + downloadedAfterDim1
+ );
+ Assertions.assertTrue(
+ downloadedAfterDim1.stream().anyMatch(name ->
name.startsWith("__base/")),
+ "base table files should be downloaded when a projection column
shares data with a base table parent, got " + downloadedAfterDim1
+ );
+ }
+ }
+
+ @Test
+ void testPerColumnLaziness() throws IOException
+ {
+ // verify that accessing one column of a projection doesn't download other
columns
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final File cacheDir = newCacheDir("per_col");
+
+ try (PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ TestHelper.makeJsonMapper(),
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ Collections.emptyList()
+ )) {
+ final PartialQueryableIndex index = new PartialQueryableIndex(
+ mapper.getSegmentFileMetadata(),
+ mapper,
+ COLUMN_CONFIG
+ );
+
+ rangeReader.resetCount();
+
+ // access one base table column
+ Assertions.assertNotNull(index.getColumnHolder("dim1"));
+ final int countAfterDim1 = rangeReader.getReadCount();
+ Assertions.assertTrue(countAfterDim1 > 0, "accessing dim1 should trigger
downloads");
+
+ // dim1's smoosh entry is downloaded; metric1's is not
+ final Set<String> filesAfterDim1 = mapper.getDownloadedFiles();
+ Assertions.assertTrue(filesAfterDim1.contains("__base/dim1"), "expected
__base/dim1 in " + filesAfterDim1);
+ Assertions.assertFalse(filesAfterDim1.contains("__base/metric1"),
"metric1 should not be downloaded yet");
+
+ // access the same column again should not trigger more downloads
+ Assertions.assertNotNull(index.getColumnHolder("dim1"));
+ Assertions.assertEquals(countAfterDim1, rangeReader.getReadCount(),
"re-access should be cached");
+ Assertions.assertEquals(filesAfterDim1, mapper.getDownloadedFiles(),
"re-access should not download new files");
+
+ // access a different column should trigger additional downloads for its
files
+ Assertions.assertNotNull(index.getColumnHolder("metric1"));
+ Assertions.assertTrue(
+ rangeReader.getReadCount() > countAfterDim1,
+ "accessing metric1 should trigger additional downloads"
+ );
+
+ // metric1's smoosh entry is now also downloaded
+ final Set<String> filesAfterMetric1 = mapper.getDownloadedFiles();
+ Assertions.assertTrue(filesAfterMetric1.contains("__base/dim1"));
+ Assertions.assertTrue(filesAfterMetric1.contains("__base/metric1"),
"expected __base/metric1 in " + filesAfterMetric1);
+
+ // all reads went to the V10 main segment file (no externals queried)
+ Assertions.assertEquals(Set.of(IndexIO.V10_FILE_NAME),
rangeReader.getReadFilenames());
+ }
+ }
+
+ @Test
+ void testGetProjectionReturnsNullForNonAggregateQuery() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final File cacheDir = newCacheDir("no_proj");
+
+ try (PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ TestHelper.makeJsonMapper(),
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ Collections.emptyList()
+ )) {
+ final PartialQueryableIndex index = new PartialQueryableIndex(
+ mapper.getSegmentFileMetadata(),
+ mapper,
+ COLUMN_CONFIG
+ );
+
+ // scan query, no grouping, no aggregation, should not match any
projection
+ final CursorBuildSpec scanSpec = CursorBuildSpec.builder()
+ .setInterval(index.getDataInterval())
+ .build();
+
+ Assertions.assertNull(index.getProjection(scanSpec));
+ }
+ }
+
+ @Test
+ void testMatchesEagerQueryableIndex() throws IOException
+ {
+ // verify that the partial index produces the same schema info as the
eager (full) index
+ final IndexIO indexIO = TestHelper.getTestIndexIO();
+ final File cacheDir = newCacheDir("match_eager");
+ final DirectoryRangeReader rangeReader = new
DirectoryRangeReader(segmentDir);
+
+ try (
+ QueryableIndex eagerIndex = indexIO.loadIndex(segmentDir);
+ PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ TestHelper.makeJsonMapper(),
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ Collections.emptyList()
+ )
+ ) {
+ final PartialQueryableIndex partialIndex = new PartialQueryableIndex(
+ mapper.getSegmentFileMetadata(),
+ mapper,
+ COLUMN_CONFIG
+ );
+
+ Assertions.assertEquals(eagerIndex.getDataInterval(),
partialIndex.getDataInterval());
+ Assertions.assertEquals(eagerIndex.getNumRows(),
partialIndex.getNumRows());
+ final List<String> eagerDims = new ArrayList<>();
+ eagerIndex.getAvailableDimensions().forEach(eagerDims::add);
+ final List<String> partialDims = new ArrayList<>();
+ partialIndex.getAvailableDimensions().forEach(partialDims::add);
+ Assertions.assertEquals(eagerDims, partialDims);
+ Assertions.assertEquals(eagerIndex.getColumnNames(),
partialIndex.getColumnNames());
+ Assertions.assertEquals(eagerIndex.getOrdering(),
partialIndex.getOrdering());
+
+ // verify column capabilities match for all columns
+ for (String colName : eagerIndex.getColumnNames()) {
+ final ColumnCapabilities eagerCaps =
eagerIndex.getColumnCapabilities(colName);
+ final ColumnCapabilities partialCaps =
partialIndex.getColumnCapabilities(colName);
+ Assertions.assertNotNull(eagerCaps, "eager caps for " + colName);
+ Assertions.assertNotNull(partialCaps, "partial caps for " + colName);
+ Assertions.assertEquals(
+ eagerCaps.toColumnType(),
+ partialCaps.toColumnType(),
+ "type mismatch for " + colName
+ );
+ }
+ }
+ }
+
+ private File newCacheDir(String name) throws IOException
+ {
+ final File dir = new File(sharedTempDir, name + "_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(dir);
+ return dir;
+ }
+
+ static class DirectoryRangeReader implements SegmentRangeReader
+ {
+ private final File directory;
+
+ DirectoryRangeReader(File directory)
+ {
+ this.directory = directory;
+ }
+
+ @Override
+ public InputStream readRange(String filename, long offset, long length)
throws IOException
+ {
+ File target = new File(directory, filename);
+ try (RandomAccessFile raf = new RandomAccessFile(target, "r")) {
+ final int available = (int) Math.min(length, Math.max(0, raf.length()
- offset));
+ byte[] data = new byte[available];
+ raf.seek(offset);
+ raf.readFully(data);
+ return new ByteArrayInputStream(data);
+ }
+ }
+ }
+
+ static class CountingRangeReader extends DirectoryRangeReader
+ {
+ private final AtomicInteger readCount = new AtomicInteger(0);
+ private final Set<String> readFilenames = ConcurrentHashMap.newKeySet();
+
+ CountingRangeReader(File directory)
+ {
+ super(directory);
+ }
+
+ int getReadCount()
+ {
+ return readCount.get();
+ }
+
+ Set<String> getReadFilenames()
+ {
+ return Set.copyOf(readFilenames);
+ }
+
+ void resetCount()
+ {
+ readCount.set(0);
+ readFilenames.clear();
+ }
+
+ @Override
+ public InputStream readRange(String filename, long offset, long length)
throws IOException
+ {
+ readCount.incrementAndGet();
+ readFilenames.add(filename);
+ return super.readRange(filename, offset, length);
+ }
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/column/ColumnDescriptorTest.java
b/processing/src/test/java/org/apache/druid/segment/column/ColumnDescriptorTest.java
new file mode 100644
index 00000000000..0008c9640ff
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/column/ColumnDescriptorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import org.apache.druid.segment.nested.NestedCommonFormatColumnFormatSpec;
+import org.apache.druid.segment.serde.ColumnPartSerde;
+import org.apache.druid.segment.serde.ComplexColumnPartSerde;
+import org.apache.druid.segment.serde.LongNumericColumnPartSerde;
+import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteOrder;
+import java.util.List;
+
+class ColumnDescriptorTest
+{
+ @Test
+ void testToColumnTypeFallsBackToValueTypeForSimpleSerdes()
+ {
+ // simple numeric serdes don't carry extra type info, so toColumnType()
falls back to ValueType
+ final ColumnPartSerde longSerde =
LongNumericColumnPartSerde.serializerBuilder()
+
.withByteOrder(ByteOrder.nativeOrder())
+ .build();
+ final ColumnDescriptor descriptor = new ColumnDescriptor(ValueType.LONG,
false, List.of(longSerde));
+
+ Assertions.assertEquals(ColumnType.LONG, descriptor.toColumnType());
+ }
+
+ @Test
+ void testToColumnTypeUsesComplexTypeNameFromPartSerde()
+ {
+ // complex columns carry the complex type name in the part serde;
toColumnType() must preserve it
+ final ColumnPartSerde complexSerde =
ComplexColumnPartSerde.serializerBuilder()
+
.withTypeName("hyperUnique")
+ .build();
+ final ColumnDescriptor descriptor = new
ColumnDescriptor(ValueType.COMPLEX, false, List.of(complexSerde));
+
+ final ColumnType columnType = descriptor.toColumnType();
+ Assertions.assertEquals(ValueType.COMPLEX, columnType.getType());
+ Assertions.assertEquals("hyperUnique", columnType.getComplexTypeName());
+ }
+
+ @Test
+ void testToColumnTypeUsesLogicalTypeFromNestedPartSerde()
+ {
+ // nested common format columns carry the full logical type (including
array element types) in the part serde
+ final ColumnPartSerde nestedSerde =
+ NestedCommonFormatColumnPartSerde.serializerBuilder()
+
.withLogicalType(ColumnType.STRING_ARRAY)
+ .withHasNulls(false)
+
.withColumnFormatSpec(NestedCommonFormatColumnFormatSpec.builder().build())
+
.withByteOrder(ByteOrder.nativeOrder())
+ .build();
+ final ColumnDescriptor descriptor = new ColumnDescriptor(ValueType.ARRAY,
false, List.of(nestedSerde));
+
+ Assertions.assertEquals(ColumnType.STRING_ARRAY,
descriptor.toColumnType());
+ }
+
+ @Test
+ void testToColumnTypeEmptyPartsFallsBackToValueType()
+ {
+ final ColumnDescriptor descriptor = new ColumnDescriptor(ValueType.STRING,
false, List.of());
+ final ColumnType columnType = descriptor.toColumnType();
+ Assertions.assertEquals(ValueType.STRING, columnType.getType());
+ Assertions.assertNull(columnType.getComplexTypeName());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]