This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 71932f75c9 [core] Refactor scalar index eval and apply pre filter to
vector search (#7513)
71932f75c9 is described below
commit 71932f75c93a6bffde7276332a558625b2ce6d9f
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 24 18:55:24 2026 +0800
[core] Refactor scalar index eval and apply pre filter to vector search
(#7513)
This PR did two core things:
1. Refactoring the scalar index evaluation logic - removed
GlobalIndexScanBuilder/GlobalIndexScanBuilderImpl and inline its
functionality into GlobalIndexScanner, simplifying the API
2. Introducing pre filter mechanism for vector search - splitting scan
and read into independent VectorScanImpl/VectorReadImpl classes,
introducing VectorSearchSplit as an intermediate data structure, and
supporting pre filtering with scalar indexes before vector search
And this PR no longer queries unindexed files. If there is a global
index, we only need to query from the global index.
---
.../paimon/globalindex/UnionGlobalIndexReader.java | 40 ++-
.../java/org/apache/paimon/AbstractFileStore.java | 13 -
.../src/main/java/org/apache/paimon/FileStore.java | 3 -
.../paimon/globalindex/DataEvolutionBatchScan.java | 43 +--
.../paimon/globalindex/GlobalIndexScanBuilder.java | 95 -------
.../globalindex/GlobalIndexScanBuilderImpl.java | 187 ------------
...alIndexScanner.java => GlobalIndexScanner.java} | 85 ++++--
.../org/apache/paimon/index/GlobalIndexMeta.java | 5 +
.../paimon/privilege/PrivilegedFileStore.java | 6 -
.../org/apache/paimon/table/source/VectorRead.java | 8 +-
.../apache/paimon/table/source/VectorReadImpl.java | 163 +++++++++++
.../org/apache/paimon/table/source/VectorScan.java | 17 +-
.../apache/paimon/table/source/VectorScanImpl.java | 125 ++++++++
.../table/source/VectorSearchBuilderImpl.java | 138 +--------
.../paimon/table/source/VectorSearchSplit.java | 130 +++++++++
.../paimon/table/BitmapGlobalIndexTableTest.java | 22 +-
.../paimon/table/BtreeGlobalIndexTableTest.java | 62 +---
.../table/source/VectorSearchBuilderTest.java | 315 +++++++++++++++++++++
paimon-python/pypaimon/globalindex/__init__.py | 8 +-
.../pypaimon/globalindex/global_index_evaluator.py | 36 +--
.../pypaimon/globalindex/global_index_reader.py | 6 +-
.../globalindex/global_index_scan_builder.py | 202 -------------
.../globalindex/global_index_scan_builder_impl.py | 165 -----------
.../pypaimon/globalindex/global_index_scanner.py | 162 +++++++++++
paimon-python/pypaimon/read/read_builder.py | 9 +-
.../pypaimon/read/scanner/file_scanner.py | 68 +----
paimon-python/pypaimon/read/table_scan.py | 15 +-
paimon-python/pypaimon/table/file_store_table.py | 19 --
.../procedure/CreateGlobalIndexProcedureTest.scala | 2 -
29 files changed, 1047 insertions(+), 1102 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
index 2c02a0c484..b585a75570 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
@@ -21,10 +21,19 @@ package org.apache.paimon.globalindex;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.VectorSearch;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
/**
* A {@link GlobalIndexReader} that combines results from multiple readers by
performing a union
@@ -33,9 +42,16 @@ import java.util.function.Function;
public class UnionGlobalIndexReader implements GlobalIndexReader {
private final List<GlobalIndexReader> readers;
+ private final @Nullable ExecutorService executor;
public UnionGlobalIndexReader(List<GlobalIndexReader> readers) {
+ this(readers, null);
+ }
+
+ public UnionGlobalIndexReader(
+ List<GlobalIndexReader> readers, @Nullable ExecutorService
executor) {
this.readers = readers;
+ this.executor = executor;
}
@Override
@@ -116,8 +132,9 @@ public class UnionGlobalIndexReader implements
GlobalIndexReader {
@Override
public Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
Optional<ScoredGlobalIndexResult> result = Optional.empty();
- for (GlobalIndexReader reader : readers) {
- Optional<ScoredGlobalIndexResult> current =
reader.visitVectorSearch(vectorSearch);
+ List<Optional<ScoredGlobalIndexResult>> results =
+ executeAllReaders(reader ->
reader.visitVectorSearch(vectorSearch));
+ for (Optional<ScoredGlobalIndexResult> current : results) {
if (!current.isPresent()) {
continue;
}
@@ -132,8 +149,8 @@ public class UnionGlobalIndexReader implements
GlobalIndexReader {
private Optional<GlobalIndexResult> union(
Function<GlobalIndexReader, Optional<GlobalIndexResult>> visitor) {
Optional<GlobalIndexResult> result = Optional.empty();
- for (GlobalIndexReader reader : readers) {
- Optional<GlobalIndexResult> current = visitor.apply(reader);
+ List<Optional<GlobalIndexResult>> results = executeAllReaders(visitor);
+ for (Optional<GlobalIndexResult> current : results) {
if (!current.isPresent()) {
continue;
}
@@ -145,6 +162,21 @@ public class UnionGlobalIndexReader implements
GlobalIndexReader {
return result;
}
+ private <R> List<R> executeAllReaders(Function<GlobalIndexReader, R>
function) {
+ if (executor == null) {
+ return readers.stream().map(function).collect(Collectors.toList());
+ }
+
+ Iterator<R> iterator =
+ randomlyExecuteSequentialReturn(
+ executor, reader ->
singletonList(function.apply(reader)), readers);
+ List<R> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ return result;
+ }
+
@Override
public void close() throws IOException {
for (GlobalIndexReader reader : readers) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 35f7a42a6e..388b7eb9de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -26,8 +26,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilderImpl;
import org.apache.paimon.iceberg.IcebergCommitCallback;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.index.IndexFileHandler;
@@ -523,15 +521,4 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
this.snapshotCache = cache;
}
-
- @Override
- public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
- return new GlobalIndexScanBuilderImpl(
- options.toConfiguration(),
- schema.logicalRowType(),
- fileIO,
- pathFactory().globalIndexFileFactory(),
- snapshotManager(),
- newIndexFileHandler());
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 1ccd19ca11..98905b47e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -19,7 +19,6 @@
package org.apache.paimon;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
@@ -125,6 +124,4 @@ public interface FileStore<T> {
void setManifestCache(SegmentsCache<Path> manifestCache);
void setSnapshotCache(Cache<Path, Snapshot> cache);
-
- GlobalIndexScanBuilder newGlobalIndexScanBuilder();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 84f40243f2..af169f4c6d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -18,7 +18,7 @@
package org.apache.paimon.globalindex;
-import org.apache.paimon.Snapshot;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
@@ -36,7 +36,6 @@ import org.apache.paimon.table.source.DataTableBatchScan;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
@@ -44,15 +43,14 @@ import org.apache.paimon.utils.RowRangeIndex;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
-import static
org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
import static org.apache.paimon.table.SpecialFields.ROW_ID;
import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
@@ -263,39 +261,18 @@ public class DataEvolutionBatchScan implements
DataTableScan {
if (filter == null) {
return Optional.empty();
}
- if (!table.coreOptions().globalIndexEnabled()) {
+ CoreOptions options = table.coreOptions();
+ if (!options.globalIndexEnabled()) {
return Optional.empty();
}
- PartitionPredicate partitionPredicate =
+ PartitionPredicate partitionFilter =
batchScan.snapshotReader().manifestsReader().partitionFilter();
- GlobalIndexScanBuilder indexScanBuilder =
table.store().newGlobalIndexScanBuilder();
- Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
-
indexScanBuilder.withPartitionPredicate(partitionPredicate).withSnapshot(snapshot);
- List<Range> indexedRowRanges = indexScanBuilder.shardList();
- if (indexedRowRanges.isEmpty()) {
- return Optional.empty();
- }
-
- Long nextRowId = Objects.requireNonNull(snapshot.nextRowId());
- List<Range> nonIndexedRowRanges = new Range(0, nextRowId -
1).exclude(indexedRowRanges);
- Optional<GlobalIndexResult> resultOptional =
- parallelScan(
- indexedRowRanges,
- indexScanBuilder,
- filter,
- table.coreOptions().globalIndexThreadNum());
- if (!resultOptional.isPresent()) {
- return Optional.empty();
+ try (GlobalIndexScanner scanner =
+ GlobalIndexScanner.create(table, partitionFilter, filter)) {
+ return scanner.scan(filter);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
-
- GlobalIndexResult result = resultOptional.get();
- if (!nonIndexedRowRanges.isEmpty()) {
- for (Range range : nonIndexedRowRanges) {
- result = result.or(GlobalIndexResult.fromRange(range));
- }
- }
-
- return Optional.of(result);
}
@VisibleForTesting
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
deleted file mode 100644
index 5c4ec92f81..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.globalindex;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Range;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
-
-/** Builder for scanning global indexes. */
-public interface GlobalIndexScanBuilder {
-
- GlobalIndexScanBuilder withSnapshot(long snapshotId);
-
- GlobalIndexScanBuilder withSnapshot(Snapshot snapshot);
-
- GlobalIndexScanBuilder withPartitionPredicate(PartitionPredicate
partitionPredicate);
-
- GlobalIndexScanBuilder withRowRange(Range rowRange);
-
- RowRangeGlobalIndexScanner build();
-
- // Return sorted and no overlap ranges
- List<Range> shardList();
-
- static Optional<GlobalIndexResult> parallelScan(
- final List<Range> ranges,
- final GlobalIndexScanBuilder globalIndexScanBuilder,
- final Predicate filter,
- final Integer threadNum) {
- List<RowRangeGlobalIndexScanner> scanners =
- ranges.stream()
- .map(globalIndexScanBuilder::withRowRange)
- .map(GlobalIndexScanBuilder::build)
- .collect(Collectors.toList());
-
- try {
- List<Optional<GlobalIndexResult>> rowsResults = new ArrayList<>();
- Iterator<Optional<GlobalIndexResult>> resultIterators =
- randomlyExecuteSequentialReturn(
- scanner -> {
- Optional<GlobalIndexResult> result =
scanner.scan(filter);
- return Collections.singletonList(result);
- },
- scanners,
- threadNum);
- while (resultIterators.hasNext()) {
- rowsResults.add(resultIterators.next());
- }
- if (rowsResults.stream().noneMatch(Optional::isPresent)) {
- return Optional.empty();
- }
-
- GlobalIndexResult globalIndexResult =
GlobalIndexResult.createEmpty();
-
- for (int i = 0; i < ranges.size(); i++) {
- if (rowsResults.get(i).isPresent()) {
- globalIndexResult =
globalIndexResult.or(rowsResults.get(i).get());
- } else {
- globalIndexResult =
-
globalIndexResult.or(GlobalIndexResult.fromRange(ranges.get(i)));
- }
- }
- return Optional.of(globalIndexResult);
- } finally {
- IOUtils.closeAllQuietly(scanners);
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
deleted file mode 100644
index 4619bb77d8..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.globalindex;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.index.GlobalIndexMeta;
-import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexPathFactory;
-import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Range;
-import org.apache.paimon.utils.SnapshotManager;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/** Implementation of {@link GlobalIndexScanBuilder}. */
-public class GlobalIndexScanBuilderImpl implements GlobalIndexScanBuilder {
-
- private final Options options;
- private final RowType rowType;
- private final FileIO fileIO;
- private final IndexPathFactory indexPathFactory;
- private final SnapshotManager snapshotManager;
- private final IndexFileHandler indexFileHandler;
-
- private Snapshot snapshot;
- private PartitionPredicate partitionPredicate;
- private Range rowRange;
-
- public GlobalIndexScanBuilderImpl(
- Options options,
- RowType rowType,
- FileIO fileIO,
- IndexPathFactory indexPathFactory,
- SnapshotManager snapshotManager,
- IndexFileHandler indexFileHandler) {
- this.options = options;
- this.rowType = rowType;
- this.fileIO = fileIO;
- this.indexPathFactory = indexPathFactory;
- this.snapshotManager = snapshotManager;
- this.indexFileHandler = indexFileHandler;
- }
-
- @Override
- public GlobalIndexScanBuilder withSnapshot(long snapshotId) {
- this.snapshot = snapshotManager.snapshot(snapshotId);
- return this;
- }
-
- @Override
- public GlobalIndexScanBuilder withSnapshot(Snapshot snapshot) {
- this.snapshot = snapshot;
- return this;
- }
-
- @Override
- public GlobalIndexScanBuilder withPartitionPredicate(PartitionPredicate
partitionPredicate) {
- this.partitionPredicate = partitionPredicate;
- return this;
- }
-
- @Override
- public GlobalIndexScanBuilder withRowRange(Range rowRange) {
- this.rowRange = rowRange;
- return this;
- }
-
- @Override
- public RowRangeGlobalIndexScanner build() {
- Objects.requireNonNull(rowRange, "rowRange must not be null");
- List<IndexManifestEntry> entries = scan();
- return new RowRangeGlobalIndexScanner(
- options, rowType, fileIO, indexPathFactory, rowRange, entries);
- }
-
- @Override
- public List<Range> shardList() {
- Map<String, List<Range>> indexRanges = new HashMap<>();
- for (IndexManifestEntry entry : scan()) {
- GlobalIndexMeta globalIndexMeta =
entry.indexFile().globalIndexMeta();
-
- if (globalIndexMeta == null) {
- continue;
- }
- long start = globalIndexMeta.rowRangeStart();
- long end = globalIndexMeta.rowRangeEnd();
- indexRanges
- .computeIfAbsent(entry.indexFile().indexType(), k -> new
ArrayList<>())
- .add(new Range(start, end));
- }
-
- String checkIndexType = null;
- List<Range> checkRanges = null;
- // check all type index have same shard ranges
- // If index a has [1,10],[20,30] and index b has [1,10],[20,25], it's
inconsistent, because
- // it is hard to handle the [26,30] range.
- for (Map.Entry<String, List<Range>> rangeEntry :
indexRanges.entrySet()) {
- String indexType = rangeEntry.getKey();
- List<Range> ranges = rangeEntry.getValue();
- if (checkRanges == null) {
- checkIndexType = indexType;
- checkRanges = Range.sortAndMergeOverlap(ranges, true);
- } else {
- List<Range> merged = Range.sortAndMergeOverlap(ranges, true);
- if (merged.size() != checkRanges.size()) {
- throw new IllegalStateException(
- "Inconsistent shard ranges among index types: "
- + checkIndexType
- + " vs "
- + indexType);
- }
- for (int i = 0; i < merged.size(); i++) {
- Range r1 = merged.get(i);
- Range r2 = checkRanges.get(i);
- if (r1.from != r2.from || r1.to != r2.to) {
- throw new IllegalStateException(
- "Inconsistent shard ranges among index types:"
- + checkIndexType
- + " vs "
- + indexType);
- }
- }
- }
- }
-
- return Range.sortAndMergeOverlap(
- indexRanges.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList()));
- }
-
- private List<IndexManifestEntry> scan() {
- Filter<IndexManifestEntry> filter =
- entry -> {
- if (partitionPredicate != null) {
- if (!partitionPredicate.test(entry.partition())) {
- return false;
- }
- }
- if (rowRange != null) {
- GlobalIndexMeta globalIndexMeta =
entry.indexFile().globalIndexMeta();
- if (globalIndexMeta == null) {
- return false;
- }
- long entryStart = globalIndexMeta.rowRangeStart();
- long entryEnd = globalIndexMeta.rowRangeEnd();
-
- if (!Range.intersect(entryStart, entryEnd,
rowRange.from, rowRange.to)) {
- return false;
- }
- }
- return true;
- };
-
- Snapshot snapshot =
- this.snapshot == null ? snapshotManager.latestSnapshot() :
this.snapshot;
-
- return indexFileHandler.scan(snapshot, filter);
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
similarity index 67%
rename from
paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
rename to
paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
index 044790e7de..1312f17b43 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
@@ -26,9 +26,13 @@ import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexPathFactory;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Range;
import java.io.Closeable;
@@ -42,57 +46,46 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_THREAD_NUM;
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
+import static
org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Scanner for shard-based global indexes. */
-public class RowRangeGlobalIndexScanner implements Closeable {
+public class GlobalIndexScanner implements Closeable {
private final Options options;
+ private final ExecutorService executor;
private final GlobalIndexEvaluator globalIndexEvaluator;
private final IndexPathFactory indexPathFactory;
- public RowRangeGlobalIndexScanner(
+ public GlobalIndexScanner(
Options options,
RowType rowType,
FileIO fileIO,
IndexPathFactory indexPathFactory,
- Range range,
- List<IndexManifestEntry> entries) {
+ Collection<IndexFileMeta> indexFiles) {
this.options = options;
- for (IndexManifestEntry entry : entries) {
- GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
- checkArgument(
- meta != null
- && Range.intersect(
- range.from, range.to,
meta.rowRangeStart(), meta.rowRangeEnd()),
- "All index files must have an intersection with row range
["
- + range.from
- + ", "
- + range.to
- + ")");
- }
-
+ this.executor =
+
ManifestReadThreadPool.getExecutorService(options.get(GLOBAL_INDEX_THREAD_NUM));
this.indexPathFactory = indexPathFactory;
-
GlobalIndexFileReader indexFileReader = meta ->
fileIO.newInputStream(meta.filePath());
-
Map<Integer, Map<String, Map<Range, List<IndexFileMeta>>>> indexMetas
= new HashMap<>();
- for (IndexManifestEntry entry : entries) {
- GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
- checkArgument(meta != null, "Global index meta must not be null");
+ for (IndexFileMeta indexFile : indexFiles) {
+ GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
int fieldId = meta.indexFieldId();
- String indexType = entry.indexFile().indexType();
+ String indexType = indexFile.indexType();
indexMetas
.computeIfAbsent(fieldId, k -> new HashMap<>())
.computeIfAbsent(indexType, k -> new HashMap<>())
.computeIfAbsent(
- new Range(meta.rowRangeStart(),
meta.rowRangeStart()),
+ new Range(meta.rowRangeStart(),
meta.rowRangeEnd()),
k -> new ArrayList<>())
- .add(entry.indexFile());
+ .add(indexFile);
}
IntFunction<Collection<GlobalIndexReader>> readersFunction =
@@ -104,6 +97,44 @@ public class RowRangeGlobalIndexScanner implements
Closeable {
this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType,
readersFunction);
}
+ public static GlobalIndexScanner create(
+ FileStoreTable table, Collection<IndexFileMeta> indexFiles) {
+ return new GlobalIndexScanner(
+ table.coreOptions().toConfiguration(),
+ table.rowType(),
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ indexFiles);
+ }
+
+ public static GlobalIndexScanner create(
+ FileStoreTable table, PartitionPredicate partitionFilter,
Predicate filter) {
+ Set<Integer> filterFieldIds =
+ collectFieldNames(filter).stream()
+ .filter(name -> table.rowType().containsField(name))
+ .map(name -> table.rowType().getField(name).id())
+ .collect(Collectors.toSet());
+ Filter<IndexManifestEntry> indexFileFilter =
+ entry -> {
+ if (partitionFilter != null &&
!partitionFilter.test(entry.partition())) {
+ return false;
+ }
+ GlobalIndexMeta globalIndex =
entry.indexFile().globalIndexMeta();
+ if (globalIndex == null) {
+ return false;
+ }
+ return filterFieldIds.contains(globalIndex.indexFieldId());
+ };
+
+ List<IndexFileMeta> indexFiles =
+
table.store().newIndexFileHandler().scan(tryTravelOrLatest(table),
indexFileFilter)
+ .stream()
+ .map(IndexManifestEntry::indexFile)
+ .collect(Collectors.toList());
+
+ return create(table, indexFiles);
+ }
+
public Optional<GlobalIndexResult> scan(Predicate predicate) {
return globalIndexEvaluator.evaluate(predicate);
}
@@ -142,7 +173,7 @@ public class RowRangeGlobalIndexScanner implements
Closeable {
unionReader.add(innerReader);
}
- readers.add(new UnionGlobalIndexReader(unionReader));
+ readers.add(new UnionGlobalIndexReader(unionReader, executor));
}
} catch (IOException e) {
throw new RuntimeException("Failed to create global index reader",
e);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java
b/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java
index 5efe920832..c468bbffb3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java
@@ -23,6 +23,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
import javax.annotation.Nullable;
@@ -69,6 +70,10 @@ public class GlobalIndexMeta {
return rowRangeEnd;
}
+ public Range rowRange() {
+ return new Range(rowRangeStart, rowRangeEnd);
+ }
+
public int indexFieldId() {
return indexFieldId;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 917301a2e5..6ced52bbd1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -23,7 +23,6 @@ import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
@@ -239,9 +238,4 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
wrapped.setSnapshotCache(cache);
}
-
- @Override
- public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
- return wrapped.newGlobalIndexScanBuilder();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
index 4fc3c7aaf0..74e17e2845 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
@@ -20,8 +20,14 @@ package org.apache.paimon.table.source;
import org.apache.paimon.globalindex.GlobalIndexResult;
+import java.util.List;
+
/** Vector read to read index files. */
public interface VectorRead {
- GlobalIndexResult read(VectorScan.Plan plan);
+ default GlobalIndexResult read(VectorScan.Plan plan) {
+ return read(plan.splits());
+ }
+
+ GlobalIndexResult read(List<VectorSearchSplit> splits);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java
new file mode 100644
index 0000000000..49708a0a2a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexScanner;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
+import org.apache.paimon.globalindex.OffsetGlobalIndexReader;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexPathFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Implementation for {@link VectorRead}. */
+public class VectorReadImpl implements VectorRead {
+
+ private final FileStoreTable table;
+ private final Predicate filter;
+ private final int limit;
+ private final DataField vectorColumn;
+ private final float[] vector;
+
+ public VectorReadImpl(
+ FileStoreTable table,
+ Predicate filter,
+ int limit,
+ DataField vectorColumn,
+ float[] vector) {
+ this.table = table;
+ this.filter = filter;
+ this.limit = limit;
+ this.vectorColumn = vectorColumn;
+ this.vector = vector;
+ }
+
+ @Override
+ public GlobalIndexResult read(List<VectorSearchSplit> splits) {
+ if (splits.isEmpty()) {
+ return GlobalIndexResult.createEmpty();
+ }
+
+ RoaringNavigableMap64 preFilter = preFilter(splits).orElse(null);
+ Integer threadNum = table.coreOptions().globalIndexThreadNum();
+
+ String indexType = splits.get(0).vectorIndexFiles().get(0).indexType();
+ GlobalIndexer globalIndexer =
+ GlobalIndexerFactoryUtils.load(indexType)
+ .create(vectorColumn,
table.coreOptions().toConfiguration());
+ IndexPathFactory indexPathFactory =
table.store().pathFactory().globalIndexFileFactory();
+ Iterator<Optional<ScoredGlobalIndexResult>> resultIterators =
+ randomlyExecuteSequentialReturn(
+ split ->
+ singletonList(
+ eval(
+ globalIndexer,
+ indexPathFactory,
+ split.rowRangeStart(),
+ split.rowRangeEnd(),
+ split.vectorIndexFiles(),
+ preFilter)),
+ splits,
+ threadNum);
+
+ ScoredGlobalIndexResult result = ScoredGlobalIndexResult.createEmpty();
+ while (resultIterators.hasNext()) {
+ Optional<ScoredGlobalIndexResult> next = resultIterators.next();
+ if (next.isPresent()) {
+ result = result.or(next.get());
+ }
+ }
+
+ return result.topK(limit);
+ }
+
+ private Optional<RoaringNavigableMap64> preFilter(List<VectorSearchSplit>
splits) {
+ Set<IndexFileMeta> scalarIndexFiles =
+ new TreeSet<>(Comparator.comparing(IndexFileMeta::fileName));
+ for (VectorSearchSplit split : splits) {
+ scalarIndexFiles.addAll(split.scalarIndexFiles());
+ }
+ if (scalarIndexFiles.isEmpty()) {
+ return Optional.empty();
+ }
+
+ try (GlobalIndexScanner scanner = GlobalIndexScanner.create(table,
scalarIndexFiles)) {
+ return scanner.scan(filter).map(GlobalIndexResult::results);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Optional<ScoredGlobalIndexResult> eval(
+ GlobalIndexer globalIndexer,
+ IndexPathFactory indexPathFactory,
+ long rowRangeStart,
+ long rowRangeEnd,
+ List<IndexFileMeta> vectorIndexFiles,
+ @Nullable RoaringNavigableMap64 includeRowIds) {
+ List<GlobalIndexIOMeta> indexIOMetaList = new ArrayList<>();
+ for (IndexFileMeta indexFile : vectorIndexFiles) {
+ GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
+ indexIOMetaList.add(
+ new GlobalIndexIOMeta(
+ indexPathFactory.toPath(indexFile),
+ indexFile.fileSize(),
+ meta.indexMeta()));
+ }
+ @SuppressWarnings("resource")
+ FileIO fileIO = table.fileIO();
+ GlobalIndexFileReader indexFileReader = m ->
fileIO.newInputStream(m.filePath());
+ try (GlobalIndexReader reader =
+ globalIndexer.createReader(indexFileReader, indexIOMetaList)) {
+ VectorSearch vectorSearch =
+ new VectorSearch(vector, limit, vectorColumn.name())
+ .withIncludeRowIds(includeRowIds);
+ return new OffsetGlobalIndexReader(reader, rowRangeStart,
rowRangeEnd)
+ .visitVectorSearch(vectorSearch);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
index 57aaf9fd4a..80f667cc0b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
@@ -18,12 +18,6 @@
package org.apache.paimon.table.source;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.index.IndexFileMetaSerializer;
-import org.apache.paimon.utils.RoaringNavigableMap64;
-
-import javax.annotation.Nullable;
-
import java.util.List;
/** Vector scan to pre-filter and scan index files. */
@@ -33,15 +27,6 @@ public interface VectorScan {
/** Plan of vector scan. */
interface Plan {
-
- /**
- * Index files for vector index. {@link IndexFileMeta} can be
serialized by {@link
- * IndexFileMetaSerializer}.
- */
- List<IndexFileMeta> indexFiles();
-
- /** Include row ids generated by pre-filters. */
- @Nullable
- RoaringNavigableMap64 includeRowIds();
+ List<VectorSearchSplit> splits();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java
new file mode 100644
index 0000000000..d3db6dd13d
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Range;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Implementation for {@link VectorScan}. */
+public class VectorScanImpl implements VectorScan {
+
+ private final FileStoreTable table;
+ private final PartitionPredicate partitionFilter;
+ private final Predicate filter;
+ private final DataField vectorColumn;
+
+ public VectorScanImpl(
+ FileStoreTable table,
+ PartitionPredicate partitionFilter,
+ Predicate filter,
+ DataField vectorColumn) {
+ this.table = table;
+ this.partitionFilter = partitionFilter;
+ this.filter = filter;
+ this.vectorColumn = vectorColumn;
+ }
+
+ @Override
+ public Plan scan() {
+ Objects.requireNonNull(vectorColumn, "Vector column must be set");
+
+ Set<Integer> filterFieldIds =
+ collectFieldNames(filter).stream()
+ .filter(name -> table.rowType().containsField(name))
+ .map(name -> table.rowType().getField(name).id())
+ .collect(Collectors.toSet());
+ Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
+ IndexFileHandler indexFileHandler =
table.store().newIndexFileHandler();
+ Filter<IndexManifestEntry> indexFileFilter =
+ entry -> {
+ if (partitionFilter != null &&
!partitionFilter.test(entry.partition())) {
+ return false;
+ }
+ GlobalIndexMeta globalIndex =
entry.indexFile().globalIndexMeta();
+ if (globalIndex == null) {
+ return false;
+ }
+ int fieldId = globalIndex.indexFieldId();
+ return vectorColumn.id() == fieldId ||
filterFieldIds.contains(fieldId);
+ };
+
+ List<IndexFileMeta> allIndexFiles =
+ indexFileHandler.scan(snapshot, indexFileFilter).stream()
+ .map(IndexManifestEntry::indexFile)
+ .collect(Collectors.toList());
+
+ // Group vector index files by (rowRangeStart, rowRangeEnd)
+ Map<Range, List<IndexFileMeta>> vectorByRange = new HashMap<>();
+ for (IndexFileMeta indexFile : allIndexFiles) {
+ GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
+ if (meta.indexFieldId() == vectorColumn.id()) {
+ Range range = new Range(meta.rowRangeStart(),
meta.rowRangeEnd());
+ vectorByRange.computeIfAbsent(range, k -> new
ArrayList<>()).add(indexFile);
+ }
+ }
+
+ // Build splits: for each vector range, attach matching scalar index
files
+ List<VectorSearchSplit> splits = new ArrayList<>();
+ for (Map.Entry<Range, List<IndexFileMeta>> entry :
vectorByRange.entrySet()) {
+ Range range = entry.getKey();
+ List<IndexFileMeta> vectorFiles = entry.getValue();
+ List<IndexFileMeta> scalarFiles =
+ allIndexFiles.stream()
+ .filter(
+ f -> {
+ GlobalIndexMeta globalIndex =
+
checkNotNull(f.globalIndexMeta());
+ if (globalIndex.indexFieldId() ==
vectorColumn.id()) {
+ return false;
+ }
+ return
range.hasIntersection(globalIndex.rowRange());
+ })
+ .collect(Collectors.toList());
+ splits.add(new VectorSearchSplit(range.from, range.to,
vectorFiles, scalarFiles));
+ }
+
+ return () -> splits;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
index 06e6ee775b..beb7844e13 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
@@ -18,46 +18,14 @@
package org.apache.paimon.table.source;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexIOMeta;
-import org.apache.paimon.globalindex.GlobalIndexReader;
-import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexer;
-import org.apache.paimon.globalindex.GlobalIndexerFactory;
-import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
-import org.apache.paimon.globalindex.OffsetGlobalIndexReader;
-import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
-import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
-import org.apache.paimon.index.GlobalIndexMeta;
-import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.InnerTable;
-import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RoaringNavigableMap64;
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static java.util.Collections.singletonList;
import static
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicate;
-import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
/** Implementation for {@link VectorSearchBuilder}. */
public class VectorSearchBuilderImpl implements VectorSearchBuilder {
@@ -114,113 +82,11 @@ public class VectorSearchBuilderImpl implements
VectorSearchBuilder {
@Override
public VectorScan newVectorScan() {
- return new VectorScanImpl();
+ return new VectorScanImpl(table, partitionFilter, filter,
vectorColumn);
}
@Override
public VectorRead newVectorRead() {
- return new VectorReadImpl();
- }
-
- private class VectorScanImpl implements VectorScan {
-
- @Override
- public Plan scan() {
- Objects.requireNonNull(vector, "Vector must be set");
- Objects.requireNonNull(vectorColumn, "Vector column must be set");
- if (limit <= 0) {
- throw new IllegalArgumentException("Limit must be positive");
- }
-
- Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
- IndexFileHandler indexFileHandler =
table.store().newIndexFileHandler();
- Filter<IndexManifestEntry> indexFileFilter =
- partitionFilter == null
- ? Filter.alwaysTrue()
- : entry -> partitionFilter.test(entry.partition());
- List<IndexManifestEntry> indexManifestEntries =
- indexFileHandler.scan(snapshot, indexFileFilter);
-
- List<IndexFileMeta> vectorIndexFiles =
- indexManifestEntries.stream()
- .map(IndexManifestEntry::indexFile)
- .filter(
- indexFile -> {
- GlobalIndexMeta indexMeta =
indexFile.globalIndexMeta();
- if (indexMeta == null) {
- return false;
- }
- return indexMeta.indexFieldId() ==
vectorColumn.id();
- })
- .collect(Collectors.toList());
-
- return new Plan() {
- @Override
- public List<IndexFileMeta> indexFiles() {
- return vectorIndexFiles;
- }
-
- @Override
- public @Nullable RoaringNavigableMap64 includeRowIds() {
- // TODO pre filter by btree index
- return null;
- }
- };
- }
- }
-
- private class VectorReadImpl implements VectorRead {
-
- @Override
- public GlobalIndexResult read(VectorScan.Plan plan) {
- List<IndexFileMeta> indexFiles = plan.indexFiles();
- if (indexFiles.isEmpty()) {
- return GlobalIndexResult.createEmpty();
- }
-
- Integer threadNum = table.coreOptions().globalIndexThreadNum();
- Iterator<Optional<ScoredGlobalIndexResult>> resultIterators =
- randomlyExecuteSequentialReturn(
- vectorIndex -> singletonList(eval(vectorIndex,
plan.includeRowIds())),
- indexFiles,
- threadNum);
-
- ScoredGlobalIndexResult result =
ScoredGlobalIndexResult.createEmpty();
- while (resultIterators.hasNext()) {
- Optional<ScoredGlobalIndexResult> next =
resultIterators.next();
- if (next.isPresent()) {
- result = result.or(next.get());
- }
- }
-
- return result.topK(limit);
- }
-
- private Optional<ScoredGlobalIndexResult> eval(
- IndexFileMeta indexFile, @Nullable RoaringNavigableMap64
includeRowIds) {
- GlobalIndexerFactory globalIndexerFactory =
- GlobalIndexerFactoryUtils.load(indexFile.indexType());
- GlobalIndexer globalIndexer =
- globalIndexerFactory.create(
- vectorColumn,
table.coreOptions().toConfiguration());
- GlobalIndexMeta meta = indexFile.globalIndexMeta();
- Preconditions.checkNotNull(meta);
- Path filePath =
table.store().pathFactory().globalIndexFileFactory().toPath(indexFile);
- GlobalIndexIOMeta globalIndexIOMeta =
- new GlobalIndexIOMeta(filePath, indexFile.fileSize(),
meta.indexMeta());
- @SuppressWarnings("resource")
- FileIO fileIO = table.fileIO();
- GlobalIndexFileReader indexFileReader = m ->
fileIO.newInputStream(m.filePath());
- try (GlobalIndexReader reader =
- globalIndexer.createReader(indexFileReader,
singletonList(globalIndexIOMeta))) {
- VectorSearch vectorSearch =
- new VectorSearch(vector, limit, vectorColumn.name())
- .withIncludeRowIds(includeRowIds);
- return new OffsetGlobalIndexReader(reader,
meta.rowRangeStart(), meta.rowRangeEnd())
- .visitVectorSearch(vectorSearch);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
+ return new VectorReadImpl(table, filter, limit, vectorColumn, vector);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchSplit.java
new file mode 100644
index 0000000000..032c2be301
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchSplit.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/** Split of vector search. */
+public class VectorSearchSplit implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int VERSION = 1;
+
+ private static final ThreadLocal<IndexFileMetaSerializer> INDEX_SERIALIZER
=
+ ThreadLocal.withInitial(IndexFileMetaSerializer::new);
+
+ private transient long rowRangeStart;
+ private transient long rowRangeEnd;
+ private transient List<IndexFileMeta> vectorIndexFiles;
+ private transient List<IndexFileMeta> scalarIndexFiles;
+
+ public VectorSearchSplit(
+ long rowRangeStart,
+ long rowRangeEnd,
+ List<IndexFileMeta> vectorIndexFiles,
+ List<IndexFileMeta> scalarIndexFiles) {
+ this.rowRangeStart = rowRangeStart;
+ this.rowRangeEnd = rowRangeEnd;
+ this.vectorIndexFiles = vectorIndexFiles;
+ this.scalarIndexFiles = scalarIndexFiles;
+ }
+
+ public long rowRangeStart() {
+ return rowRangeStart;
+ }
+
+ public long rowRangeEnd() {
+ return rowRangeEnd;
+ }
+
+ public List<IndexFileMeta> vectorIndexFiles() {
+ return vectorIndexFiles;
+ }
+
+ public List<IndexFileMeta> scalarIndexFiles() {
+ return scalarIndexFiles;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ out.writeInt(VERSION);
+ IndexFileMetaSerializer serializer = INDEX_SERIALIZER.get();
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ view.writeLong(rowRangeStart);
+ view.writeLong(rowRangeEnd);
+ serializer.serializeList(vectorIndexFiles, view);
+ serializer.serializeList(scalarIndexFiles, view);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ int version = in.readInt();
+ if (version != VERSION) {
+ throw new IOException("Unsupported VectorSearchSplit version: " +
version);
+ }
+ IndexFileMetaSerializer serializer = INDEX_SERIALIZER.get();
+ DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+ this.rowRangeStart = view.readLong();
+ this.rowRangeEnd = view.readLong();
+ this.vectorIndexFiles = serializer.deserializeList(view);
+ this.scalarIndexFiles = serializer.deserializeList(view);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ VectorSearchSplit that = (VectorSearchSplit) o;
+ return rowRangeStart == that.rowRangeStart
+ && rowRangeEnd == that.rowRangeEnd
+ && Objects.equals(vectorIndexFiles, that.vectorIndexFiles)
+ && Objects.equals(scalarIndexFiles, that.scalarIndexFiles);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowRangeStart, rowRangeEnd, vectorIndexFiles,
scalarIndexFiles);
+ }
+
+ @Override
+ public String toString() {
+ return "VectorSearchSplit{"
+ + "rowRangeStart="
+ + rowRangeStart
+ + ", rowRangeEnd="
+ + rowRangeEnd
+ + ", vectorIndexFiles="
+ + vectorIndexFiles
+ + ", scalarIndexFiles="
+ + scalarIndexFiles
+ + '}';
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
index 574579965d..cd888e6982 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
@@ -25,19 +25,19 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.globalindex.DataEvolutionBatchScan;
import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
+import org.apache.paimon.globalindex.GlobalIndexScanner;
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
import org.apache.paimon.globalindex.GlobalIndexerFactory;
import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
import org.apache.paimon.globalindex.ResultEntry;
-import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
@@ -56,7 +56,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -248,20 +247,9 @@ public class BitmapGlobalIndexTableTest extends
DataEvolutionTestBase {
private RoaringNavigableMap64 globalIndexScan(FileStoreTable table,
Predicate predicate)
throws Exception {
- GlobalIndexScanBuilder indexScanBuilder =
table.store().newGlobalIndexScanBuilder();
- List<Range> ranges = indexScanBuilder.shardList();
- GlobalIndexResult globalFileIndexResult =
GlobalIndexResult.createEmpty();
- for (Range range : ranges) {
- try (RowRangeGlobalIndexScanner scanner =
- indexScanBuilder.withRowRange(range).build()) {
- Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate);
- if (!globalIndexResult.isPresent()) {
- throw new RuntimeException("Can't find index result by
scan");
- }
- globalFileIndexResult =
globalFileIndexResult.or(globalIndexResult.get());
- }
+ try (GlobalIndexScanner scanner =
+ GlobalIndexScanner.create(table,
PartitionPredicate.ALWAYS_TRUE, predicate)) {
+ return scanner.scan(predicate).get().results();
}
-
- return globalFileIndexResult.results();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index f524e07e73..98f730359d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -22,11 +22,10 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.globalindex.DataEvolutionBatchScan;
import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
+import org.apache.paimon.globalindex.GlobalIndexScanner;
import org.apache.paimon.globalindex.IndexedSplit;
-import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
-import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -44,9 +43,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -172,44 +169,6 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
assertThat(result).containsExactly("a200", "a56789");
}
- @Test
- public void testBtreeWithNonIndexedRowRange() throws Exception {
- write(10L);
- append(0, 10);
-
- FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
- createIndex("f1", Collections.singletonList(new Range(0L, 9L)));
-
- assertThat(table.store().newGlobalIndexScanBuilder().shardList())
- .containsExactly(new Range(0L, 9L));
-
- Predicate predicate =
- new PredicateBuilder(table.rowType()).equal(1,
BinaryString.fromString("a5"));
- ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
-
- List<Split> splits = readBuilder.newScan().plan().splits();
- assertThat(splits).hasSize(1);
-
- IndexedSplit indexedSplit = (IndexedSplit) splits.get(0);
- assertThat(indexedSplit.rowRanges())
- .containsExactly(new Range(5L, 5L), new Range(10L, 19L));
- assertThat(
- indexedSplit.dataSplit().dataFiles().stream()
- .map(DataFileMeta::firstRowId)
- .distinct()
- .sorted()
- .collect(Collectors.toList()))
- .containsExactly(0L, 10L);
-
- List<String> result = new ArrayList<>();
- readBuilder
- .newRead()
- .createReader(splits)
- .forEachRemaining(row ->
result.add(row.getString(1).toString()));
- assertThat(result)
- .containsExactly("a5", "a0", "a1", "a2", "a3", "a4", "a5",
"a6", "a7", "a8", "a9");
- }
-
private void createIndex(String fieldName) throws Exception {
createIndex(fieldName, null);
}
@@ -266,20 +225,9 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
private RoaringNavigableMap64 globalIndexScan(FileStoreTable table,
Predicate predicate)
throws Exception {
- GlobalIndexScanBuilder indexScanBuilder =
table.store().newGlobalIndexScanBuilder();
- List<Range> ranges = indexScanBuilder.shardList();
- GlobalIndexResult globalFileIndexResult =
GlobalIndexResult.createEmpty();
- for (Range range : ranges) {
- try (RowRangeGlobalIndexScanner scanner =
- indexScanBuilder.withRowRange(range).build()) {
- Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate);
- if (!globalIndexResult.isPresent()) {
- throw new RuntimeException("Can't find index result by
scan");
- }
- globalFileIndexResult =
globalFileIndexResult.or(globalIndexResult.get());
- }
+ try (GlobalIndexScanner scanner =
+ GlobalIndexScanner.create(table,
PartitionPredicate.ALWAYS_TRUE, predicate)) {
+ return scanner.scan(predicate).get().results();
}
-
- return globalFileIndexResult.results();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
index 30f9c4b107..79928bacca 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
@@ -25,16 +25,20 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
import org.apache.paimon.globalindex.GlobalIndexResult;
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
import org.apache.paimon.globalindex.testvector.TestVectorGlobalIndexerFactory;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
@@ -52,6 +56,10 @@ import org.apache.paimon.utils.Range;
import org.junit.jupiter.api.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -355,6 +363,150 @@ public class VectorSearchBuilderTest extends
TableTestBase {
+ result2.results().getIntCardinality());
}
+ @Test
+ public void testScanPartialRangeIntersection() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+
+ // Write 10 rows
+ float[][] allVectors = new float[10][];
+ for (int i = 0; i < 10; i++) {
+ allVectors[i] = new float[] {(float) Math.cos(i * 0.3), (float)
Math.sin(i * 0.3)};
+ }
+ writeVectors(table, allVectors);
+
+ // Build ONE vector index covering full range [0,9]
+ buildAndCommitVectorIndex(table, allVectors, new Range(0, 9));
+
+ // Build ONE btree index covering partial range [3,7]
+ buildAndCommitBTreeIndex(table, new int[] {3, 4, 5, 6, 7}, new
Range(3, 7));
+
+ // VectorScanImpl should attach scalar index because [3,7] intersects
[0,9]
+ Predicate idFilter = new
PredicateBuilder(table.rowType()).greaterOrEqual(0, 5);
+ VectorScan.Plan plan =
+ table.newVectorSearchBuilder()
+ .withVector(new float[] {1.0f, 0.0f})
+ .withLimit(5)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .withFilter(idFilter)
+ .newVectorScan()
+ .scan();
+
+ assertThat(plan.splits()).hasSize(1);
+ VectorSearchSplit split = plan.splits().get(0);
+ assertThat(split.rowRangeStart()).isEqualTo(0);
+ assertThat(split.rowRangeEnd()).isEqualTo(9);
+ assertThat(split.vectorIndexFiles()).isNotEmpty();
+ // Scalar index [3,7] intersects vector range [0,9] → attached
+ assertThat(split.scalarIndexFiles()).isNotEmpty();
+
+ // Read with pre-filter: id >= 5, btree covers [3,7] so rows 5,6,7
from btree
+ GlobalIndexResult result =
+ table.newVectorSearchBuilder()
+ .withVector(new float[] {1.0f, 0.0f})
+ .withLimit(5)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .withFilter(idFilter)
+ .newVectorRead()
+ .read(plan);
+
+ assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+ assertThat(result.results().isEmpty()).isFalse();
+ // Pre-filter restricts to rows matching id >= 5 from btree [3,7]
+ for (long rowId : result.results()) {
+ assertThat(rowId).isBetween(5L, 7L);
+ }
+ }
+
+ @Test
+ public void testPreFilterMatchesZeroRows() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+
+ float[][] vectors = {
+ {1.0f, 0.0f},
+ {0.95f, 0.1f},
+ {0.0f, 1.0f},
+ {0.1f, 0.95f}
+ };
+ writeVectors(table, vectors);
+
+ Range range = new Range(0, 3);
+ buildAndCommitVectorIndex(table, vectors, range);
+ buildAndCommitBTreeIndex(table, new int[] {0, 1, 2, 3}, range);
+
+ // Filter id > 100: btree covers ids 0-3, so preFilter matches zero
rows
+ Predicate impossibleFilter = new
PredicateBuilder(table.rowType()).greaterThan(0, 100);
+ VectorSearchBuilder searchBuilder =
+ table.newVectorSearchBuilder()
+ .withVector(new float[] {1.0f, 0.0f})
+ .withLimit(4)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .withFilter(impossibleFilter);
+
+ VectorScan.Plan plan = searchBuilder.newVectorScan().scan();
+ assertThat(plan.splits()).hasSize(1);
+ // Scalar index is attached since field matches filter
+ assertThat(plan.splits().get(0).scalarIndexFiles()).isNotEmpty();
+
+ // Read: preFilter returns empty bitmap → vector search returns no
results
+ GlobalIndexResult result = searchBuilder.newVectorRead().read(plan);
+ assertThat(result.results().isEmpty()).isTrue();
+ }
+
+ @Test
+ public void testVectorSearchSplitSerialization() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+
+ float[][] vectors = {{1.0f, 0.0f}, {0.0f, 1.0f}};
+ writeVectors(table, vectors);
+
+ Range range = new Range(0, 1);
+ buildAndCommitVectorIndex(table, vectors, range);
+ buildAndCommitBTreeIndex(table, new int[] {0, 1}, range);
+
+ Predicate filter = new
PredicateBuilder(table.rowType()).greaterOrEqual(0, 0);
+ VectorScan.Plan plan =
+ table.newVectorSearchBuilder()
+ .withVector(new float[] {1.0f, 0.0f})
+ .withLimit(2)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .withFilter(filter)
+ .newVectorScan()
+ .scan();
+
+ assertThat(plan.splits()).hasSize(1);
+ VectorSearchSplit original = plan.splits().get(0);
+
+ // Serialize
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
+ out.writeObject(original);
+ }
+
+ // Deserialize
+ VectorSearchSplit deserialized;
+ try (ObjectInputStream in =
+ new ObjectInputStream(new
ByteArrayInputStream(bos.toByteArray()))) {
+ deserialized = (VectorSearchSplit) in.readObject();
+ }
+
+ // Verify all fields match
+
assertThat(deserialized.rowRangeStart()).isEqualTo(original.rowRangeStart());
+
assertThat(deserialized.rowRangeEnd()).isEqualTo(original.rowRangeEnd());
+
assertThat(deserialized.vectorIndexFiles()).hasSize(original.vectorIndexFiles().size());
+
assertThat(deserialized.scalarIndexFiles()).hasSize(original.scalarIndexFiles().size());
+ for (int i = 0; i < original.vectorIndexFiles().size(); i++) {
+ assertThat(deserialized.vectorIndexFiles().get(i).fileName())
+ .isEqualTo(original.vectorIndexFiles().get(i).fileName());
+ }
+ for (int i = 0; i < original.scalarIndexFiles().size(); i++) {
+ assertThat(deserialized.scalarIndexFiles().get(i).fileName())
+ .isEqualTo(original.scalarIndexFiles().get(i).fileName());
+ }
+ }
+
// ====================== Helper methods ======================
private void writeVectors(FileStoreTable table, float[][] vectors) throws
Exception {
@@ -490,6 +642,169 @@ public class VectorSearchBuilderTest extends
TableTestBase {
}
}
+ @Test
+ public void testVectorSearchWithBTreePreFilter() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+
+ // Write 10 rows: ids 0-9 with vectors
+ // Rows 0-4: vectors near (1,0)
+ // Rows 5-9: vectors near (0,1)
+ float[][] allVectors = {
+ {1.0f, 0.0f}, // row 0
+ {0.95f, 0.1f}, // row 1
+ {0.98f, 0.05f}, // row 2
+ {0.9f, 0.15f}, // row 3
+ {0.85f, 0.2f}, // row 4
+ {0.0f, 1.0f}, // row 5
+ {0.1f, 0.95f}, // row 6
+ {0.05f, 0.98f}, // row 7
+ {0.15f, 0.9f}, // row 8
+ {0.2f, 0.85f} // row 9
+ };
+ writeVectors(table, allVectors);
+
+ Range range1 = new Range(0, 4);
+ Range range2 = new Range(5, 9);
+
+ // Build two vector index files for each range
+ buildAndCommitVectorIndex(
+ table,
+ new float[][] {
+ allVectors[0], allVectors[1], allVectors[2],
allVectors[3], allVectors[4]
+ },
+ range1);
+ buildAndCommitVectorIndex(
+ table,
+ new float[][] {
+ allVectors[5], allVectors[6], allVectors[7],
allVectors[8], allVectors[9]
+ },
+ range2);
+
+ // Build two btree indexes on 'id' field for each range
+ buildAndCommitBTreeIndex(table, new int[] {0, 1, 2, 3, 4}, range1);
+ buildAndCommitBTreeIndex(table, new int[] {5, 6, 7, 8, 9}, range2);
+
+ // --- Test VectorScanImpl: verify splits contain scalar index files
---
+ Predicate idFilter = new
PredicateBuilder(table.rowType()).greaterOrEqual(0, 5);
+ VectorSearchBuilder searchBuilder =
+ table.newVectorSearchBuilder()
+ .withVector(new float[] {0.1f, 0.9f})
+ .withLimit(5)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .withFilter(idFilter);
+
+ VectorScan.Plan plan = searchBuilder.newVectorScan().scan();
+ assertThat(plan.splits()).isNotEmpty();
+ // Every split should have vector index files
+ for (VectorSearchSplit split : plan.splits()) {
+ assertThat(split.vectorIndexFiles()).isNotEmpty();
+ }
+ // At least one split should have scalar (btree) index files
+ long scalarCount =
+ plan.splits().stream().filter(s ->
!s.scalarIndexFiles().isEmpty()).count();
+ assertThat(scalarCount).isGreaterThan(0);
+
+ // --- Test VectorReadImpl: pre-filter should narrow results ---
+ // Query vector near (0,1) with filter id >= 5
+ // Without filter: rows 5,6,7,8,9 are closest
+ // With filter id >= 5: btree pre-filter restricts to rows 5-9
+ GlobalIndexResult resultWithFilter =
searchBuilder.newVectorRead().read(plan);
+
assertThat(resultWithFilter).isInstanceOf(ScoredGlobalIndexResult.class);
+ assertThat(resultWithFilter.results().isEmpty()).isFalse();
+ for (long rowId : resultWithFilter.results()) {
+ assertThat(rowId).isBetween(5L, 9L);
+ }
+
+ // Compare with no-filter search (should include results from both
ranges)
+ GlobalIndexResult resultNoFilter =
+ table.newVectorSearchBuilder()
+ .withVector(new float[] {0.1f, 0.9f})
+ .withLimit(10)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+ assertThat(resultNoFilter.results().getIntCardinality())
+ .isGreaterThan(resultWithFilter.results().getIntCardinality());
+ }
+
+ private void buildAndCommitVectorIndex(FileStoreTable table, float[][]
vectors, Range rowRange)
+ throws Exception {
+ Options options = table.coreOptions().toConfiguration();
+ DataField vectorField = table.rowType().getField(VECTOR_FIELD_NAME);
+
+ GlobalIndexSingletonWriter writer =
+ (GlobalIndexSingletonWriter)
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table,
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ vectorField,
+ options);
+ for (float[] vec : vectors) {
+ writer.write(vec);
+ }
+ List<ResultEntry> entries = writer.finish();
+
+ List<IndexFileMeta> indexFiles =
+ GlobalIndexBuilderUtils.toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange,
+ vectorField.id(),
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ entries);
+
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+ CommitMessage message =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ dataIncrement,
+ CompactIncrement.emptyIncrement());
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(Collections.singletonList(message));
+ }
+ }
+
+ private void buildAndCommitBTreeIndex(FileStoreTable table, int[] ids,
Range rowRange)
+ throws Exception {
+ Options options = table.coreOptions().toConfiguration();
+ DataField idField = table.rowType().getField("id");
+
+ GlobalIndexParallelWriter writer =
+ (GlobalIndexParallelWriter)
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table, BTreeGlobalIndexerFactory.IDENTIFIER,
idField, options);
+ for (int id : ids) {
+ long relativeRowId = id - rowRange.from;
+ writer.write(id, relativeRowId);
+ }
+ List<ResultEntry> entries = writer.finish();
+
+ List<IndexFileMeta> indexFiles =
+ GlobalIndexBuilderUtils.toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange,
+ idField.id(),
+ BTreeGlobalIndexerFactory.IDENTIFIER,
+ entries);
+
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+ CommitMessage message =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ dataIncrement,
+ CompactIncrement.emptyIncrement());
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(Collections.singletonList(message));
+ }
+ }
+
private void buildAndCommitPartitionedIndex(
FileStoreTable table, float[][] vectors, BinaryRow partition,
Range rowRange)
throws Exception {
diff --git a/paimon-python/pypaimon/globalindex/__init__.py
b/paimon-python/pypaimon/globalindex/__init__.py
index c3311655c8..e96d2d6a80 100644
--- a/paimon-python/pypaimon/globalindex/__init__.py
+++ b/paimon-python/pypaimon/globalindex/__init__.py
@@ -26,10 +26,7 @@ from pypaimon.globalindex.vector_search_result import (
)
from pypaimon.globalindex.global_index_meta import GlobalIndexMeta,
GlobalIndexIOMeta
from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
-from pypaimon.globalindex.global_index_scan_builder import (
- GlobalIndexScanBuilder,
- RowRangeGlobalIndexScanner,
-)
+from pypaimon.globalindex.global_index_scanner import GlobalIndexScanner
from pypaimon.utils.range import Range
__all__ = [
@@ -43,7 +40,6 @@ __all__ = [
'GlobalIndexMeta',
'GlobalIndexIOMeta',
'GlobalIndexEvaluator',
- 'GlobalIndexScanBuilder',
- 'RowRangeGlobalIndexScanner',
+ 'GlobalIndexScanner',
'Range',
]
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index 1692113d64..bbab92e883 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -23,7 +23,6 @@ from typing import Callable, Collection, Dict, List, Optional
from pypaimon.globalindex.global_index_reader import GlobalIndexReader,
FieldRef
from pypaimon.globalindex.global_index_result import GlobalIndexResult
from pypaimon.common.predicate import Predicate
-from pypaimon.globalindex.vector_search import VectorSearch
from pypaimon.schema.data_types import DataField
@@ -44,8 +43,7 @@ class GlobalIndexEvaluator:
def evaluate(
self,
- predicate: Optional[Predicate],
- vector_search: Optional[VectorSearch]
+ predicate: Optional[Predicate]
) -> Optional[GlobalIndexResult]:
compound_result: Optional[GlobalIndexResult] = None
@@ -53,36 +51,6 @@ class GlobalIndexEvaluator:
if predicate is not None:
compound_result = self._visit_predicate(predicate)
- # Evaluate vector search
- if vector_search is not None:
- field = self._field_by_name.get(vector_search.field_name)
- if field is None:
- raise ValueError(f"Field not found:
{vector_search.field_name}")
-
- field_id = field.id
- readers = self._index_readers_cache.get(field_id)
- if readers is None:
- readers = self._readers_function(field)
- self._index_readers_cache[field_id] = readers
-
- # If we have a compound result from predicates, use it to filter
vector search
- if compound_result is not None:
- vector_search =
vector_search.with_include_row_ids(compound_result.results())
-
- for reader in readers:
- child_result = vector_search.visit(reader)
- if child_result is None:
- continue
-
- # AND operation
- if compound_result is not None:
- compound_result = compound_result.and_(child_result)
- else:
- compound_result = child_result
-
- if compound_result.is_empty():
- return compound_result
-
return compound_result
def _visit_predicate(self, predicate: Predicate) ->
Optional[GlobalIndexResult]:
@@ -141,7 +109,7 @@ class GlobalIndexEvaluator:
continue
if compound_result is not None:
- compound_result = compound_result.and_(child_result)
+ compound_result = compound_result.or_(child_result)
else:
compound_result = child_result
diff --git a/paimon-python/pypaimon/globalindex/global_index_reader.py
b/paimon-python/pypaimon/globalindex/global_index_reader.py
index ce70b5d6c1..cb9760ed2d 100644
--- a/paimon-python/pypaimon/globalindex/global_index_reader.py
+++ b/paimon-python/pypaimon/globalindex/global_index_reader.py
@@ -19,11 +19,7 @@
"""Global index reader interface."""
from abc import ABC, abstractmethod
-from typing import List, Optional, TYPE_CHECKING
-
-if TYPE_CHECKING:
- from pypaimon.globalindex.global_index_result import GlobalIndexResult
- from pypaimon.globalindex.vector_search import VectorSearch
+from typing import List, Optional
class FieldRef:
diff --git a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
deleted file mode 100644
index 543e4559fa..0000000000
--- a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
+++ /dev/null
@@ -1,202 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""Builder for scanning global indexes."""
-
-from abc import ABC, abstractmethod
-from typing import List, Optional, Collection
-from concurrent.futures import ThreadPoolExecutor, as_completed
-
-from pypaimon.globalindex import GlobalIndexIOMeta, GlobalIndexReader,
GlobalIndexEvaluator
-from pypaimon.utils.range import Range
-from pypaimon.globalindex.global_index_result import GlobalIndexResult
-from pypaimon.schema.data_types import DataField
-
-
-class GlobalIndexScanBuilder(ABC):
- """Builder for scanning global indexes."""
-
- @abstractmethod
- def with_snapshot(self, snapshot_or_id) -> 'GlobalIndexScanBuilder':
- """Set the snapshot to scan."""
- pass
-
- @abstractmethod
- def with_partition_predicate(self, partition_predicate) ->
'GlobalIndexScanBuilder':
- """Set the partition predicate."""
- pass
-
- @abstractmethod
- def with_row_range(self, row_range: Range) -> 'GlobalIndexScanBuilder':
- """Set the row range to scan."""
- pass
-
- @abstractmethod
- def build(self) -> 'RowRangeGlobalIndexScanner':
- """Build the scanner."""
- pass
-
- @abstractmethod
- def shard_list(self) -> List[Range]:
- """Return sorted and non-overlapping ranges."""
- pass
-
- @staticmethod
- def parallel_scan(
- ranges: List[Range],
- builder: 'GlobalIndexScanBuilder',
- filter_predicate: Optional['Predicate'],
- vector_search: Optional['VectorSearch'],
- thread_num: Optional[int] = None
- ) -> Optional[GlobalIndexResult]:
-
- if not ranges:
- return None
-
- scanners = []
- try:
- # Build scanners for each range
- for row_range in ranges:
- scanner = builder.with_row_range(row_range).build()
- scanners.append((row_range, scanner))
-
- # Execute scans in parallel
- results: List[Optional[GlobalIndexResult]] = [None] * len(ranges)
-
- def scan_range(idx: int, scanner: 'RowRangeGlobalIndexScanner') ->
tuple:
- result = scanner.scan(filter_predicate, vector_search)
- return idx, result
-
- with ThreadPoolExecutor(max_workers=thread_num) as executor:
- futures = {
- executor.submit(scan_range, idx, scanner): idx
- for idx, (_, scanner) in enumerate(scanners)
- }
-
- for future in as_completed(futures):
- idx, result = future.result()
- results[idx] = result
-
- # Combine results
- if all(r is None for r in results):
- return None
-
- # Find the first non-None result to start combining
- combined: Optional[GlobalIndexResult] = None
- for i, row_range in enumerate(ranges):
- if results[i] is not None:
- if combined is None:
- combined = results[i]
- else:
- combined = combined.or_(results[i])
- else:
- # If no result for this range, include the full range
- range_result = GlobalIndexResult.from_range(row_range)
- if combined is None:
- combined = range_result
- else:
- combined = combined.or_(range_result)
-
- return combined
-
- finally:
- # Close all scanners
- for _, scanner in scanners:
- try:
- scanner.close()
- except Exception:
- pass
-
-
-class RowRangeGlobalIndexScanner:
- """Scanner for shard-based global indexes."""
-
- def __init__(
- self,
- options: dict,
- fields: list,
- file_io,
- index_path: str,
- row_range: Range,
- index_entries: list
- ):
- self._options = options
- self._row_range = row_range
- self._evaluator = self._create_evaluator(fields, file_io, index_path,
index_entries)
-
- def _create_evaluator(self, fields, file_io, index_path, index_entries):
- index_metas = {}
- for entry in index_entries:
- global_index_meta = entry.get('global_index_meta')
- if global_index_meta is None:
- continue
-
- field_id = global_index_meta.index_field_id
- index_type = entry.get('index_type', 'unknown')
-
- if field_id not in index_metas:
- index_metas[field_id] = {}
- if index_type not in index_metas[field_id]:
- index_metas[field_id][index_type] = []
-
- io_meta = GlobalIndexIOMeta(
- file_name=entry.get('file_name'),
- file_size=entry.get('file_size'),
- metadata=global_index_meta.index_meta
- )
- index_metas[field_id][index_type].append(io_meta)
-
- def readers_function(field: DataField) ->
Collection[GlobalIndexReader]:
- readers = []
- if field.id not in index_metas:
- return readers
-
- for index_type, io_metas in index_metas[field.id].items():
- if index_type == 'btree':
- from pypaimon.globalindex.btree import BTreeIndexReader
- from pypaimon.globalindex.btree.key_serializer import
create_serializer
- for metadata in io_metas:
- reader = BTreeIndexReader(
- key_serializer=create_serializer(field.type),
- file_io=file_io,
- index_path=index_path,
- io_meta=metadata
- )
- readers.append(reader)
-
- return readers
-
- return GlobalIndexEvaluator(fields, readers_function)
-
- def scan(
- self,
- predicate: Optional['Predicate'],
- vector_search: Optional['VectorSearch']
- ) -> Optional[GlobalIndexResult]:
- """Scan the global index with the given predicate and vector search."""
- return self._evaluator.evaluate(predicate, vector_search)
-
- def close(self) -> None:
- """Close the scanner and release resources."""
- self._evaluator.close()
-
- def __enter__(self) -> 'RowRangeGlobalIndexScanner':
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb) -> None:
- self.close()
diff --git
a/paimon-python/pypaimon/globalindex/global_index_scan_builder_impl.py
b/paimon-python/pypaimon/globalindex/global_index_scan_builder_impl.py
deleted file mode 100644
index 67e11d9b70..0000000000
--- a/paimon-python/pypaimon/globalindex/global_index_scan_builder_impl.py
+++ /dev/null
@@ -1,165 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""
-Implementation of GlobalIndexScanBuilder for FileStoreTable.
-"""
-
-from typing import List
-
-from pypaimon.utils.range import Range
-from pypaimon.globalindex.global_index_scan_builder import (
- GlobalIndexScanBuilder,
- RowRangeGlobalIndexScanner
-)
-
-
-class GlobalIndexScanBuilderImpl(GlobalIndexScanBuilder):
-
- def __init__(
- self,
- options: dict,
- row_type: list,
- file_io: 'FileIO',
- index_path_factory: 'IndexPathFactory',
- snapshot_manager: 'SnapshotManager',
- index_file_handler: 'IndexFileHandler'
- ):
- self._options = options
- self._row_type = row_type
- self._file_io = file_io
- self._index_path_factory = index_path_factory
- self._snapshot_manager = snapshot_manager
- self._index_file_handler = index_file_handler
-
- self._snapshot = None
- self._partition_predicate = None
- self._row_range = None
- self._cached_entries = None
-
- def with_snapshot(self, snapshot_or_id) -> 'GlobalIndexScanBuilderImpl':
- """Set the snapshot to scan."""
- if isinstance(snapshot_or_id, int):
- self._snapshot = self._snapshot_manager.snapshot(snapshot_or_id)
- else:
- self._snapshot = snapshot_or_id
- return self
-
- def with_partition_predicate(self, partition_predicate) ->
'GlobalIndexScanBuilderImpl':
- """Set the partition predicate."""
- self._partition_predicate = partition_predicate
- return self
-
- def with_row_range(self, row_range: Range) -> 'GlobalIndexScanBuilderImpl':
- """Set the row range to scan."""
- self._row_range = row_range
- return self
-
- def shard_list(self) -> List[Range]:
- entries = self._scan()
- if not entries:
- return []
-
- # Build ranges from index entries grouped by index type
- index_ranges = {}
- for entry in entries:
- global_index_meta = entry.index_file.global_index_meta
- if global_index_meta is None:
- continue
-
- index_type = entry.index_file.index_type
- start = global_index_meta.row_range_start
- end = global_index_meta.row_range_end
-
- if index_type not in index_ranges:
- index_ranges[index_type] = []
- index_ranges[index_type].append(Range(start, end))
-
- if not index_ranges:
- return []
-
- # Merge all ranges
- all_ranges = []
- for ranges in index_ranges.values():
- all_ranges.extend(ranges)
-
- return Range.sort_and_merge_overlap(all_ranges)
-
- def build(self) -> RowRangeGlobalIndexScanner:
- """Build the scanner for the current row range."""
- if self._row_range is None:
- raise ValueError("rowRange must not be null")
-
- entries = self._scan()
- index_path = self._index_path_factory.index_path()
-
- # Convert entries to dict format for RowRangeGlobalIndexScanner
- index_entries = []
- for entry in entries:
- index_entries.append({
- 'file_name': entry.index_file.file_name,
- 'file_size': entry.index_file.file_size,
- 'index_type': entry.index_file.index_type,
- 'global_index_meta': entry.index_file.global_index_meta
- })
-
- return RowRangeGlobalIndexScanner(
- options=self._options,
- fields=self._row_type,
- file_io=self._file_io,
- index_path=index_path,
- row_range=self._row_range,
- index_entries=index_entries
- )
-
- def _scan(self) -> List['IndexManifestEntry']:
- """
- Scan index manifest entries.
- """
- if self._cached_entries is not None:
- return self._cached_entries
-
- def entry_filter(entry: 'IndexManifestEntry') -> bool:
- # Filter by partition predicate
- if self._partition_predicate is not None:
- if not self._partition_predicate.test(entry.partition):
- return False
-
- # Filter by row range
- if self._row_range is not None:
- global_index_meta = entry.index_file.global_index_meta
- if global_index_meta is None:
- return False
-
- entry_start = global_index_meta.row_range_start
- entry_end = global_index_meta.row_range_end
-
- if not Range.intersect(
- entry_start, entry_end,
- self._row_range.from_, self._row_range.to
- ):
- return False
-
- return True
-
- snapshot = self._snapshot
- if snapshot is None:
- snapshot = self._snapshot_manager.get_latest_snapshot()
-
- self._cached_entries = self._index_file_handler.scan(snapshot,
entry_filter)
- return self._cached_entries
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
new file mode 100644
index 0000000000..c3d144811c
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -0,0 +1,162 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Scanner for shard-based global indexes."""
+
+from typing import Collection, Optional
+
+from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.common.predicate import Predicate
+from pypaimon.read.push_down_utils import _get_all_fields
+from pypaimon.schema.data_types import DataField
+from pypaimon.utils.range import Range
+
+
+class GlobalIndexScanner:
+ """Scanner for shard-based global indexes."""
+
+ def __init__(
+ self,
+ options: dict,
+ fields: list,
+ file_io,
+ index_path: str,
+ index_files: Collection['IndexFileMeta']
+ ):
+ self._options = options
+ self._evaluator = self._create_evaluator(fields, file_io, index_path,
index_files)
+
+ def _create_evaluator(self, fields, file_io, index_path, index_files):
+ index_metas = {}
+ for index_file in index_files:
+ global_index_meta = index_file.global_index_meta
+ if global_index_meta is None:
+ continue
+
+ field_id = global_index_meta.index_field_id
+ index_type = index_file.index_type
+
+ if field_id not in index_metas:
+ index_metas[field_id] = {}
+ if index_type not in index_metas[field_id]:
+ index_metas[field_id][index_type] = {}
+
+ range_key = Range(global_index_meta.row_range_start,
global_index_meta.row_range_end)
+ if range_key not in index_metas[field_id][index_type]:
+ index_metas[field_id][index_type][range_key] = []
+
+ io_meta = GlobalIndexIOMeta(
+ file_name=index_file.file_name,
+ file_size=index_file.file_size,
+ metadata=global_index_meta.index_meta
+ )
+ index_metas[field_id][index_type][range_key].append(io_meta)
+
+ def readers_function(field: DataField) ->
Collection[GlobalIndexReader]:
+ return _create_readers(file_io, index_path,
index_metas.get(field.id), field)
+
+ return GlobalIndexEvaluator(fields, readers_function)
+
+ @staticmethod
+ def create(table, index_files=None, partition_filter=None, predicate=None):
+ """Create a GlobalIndexScanner.
+
+ Can be called in two ways:
+ 1. create(table, index_files) - with explicit index files
+ 2. create(table, partition_filter=..., predicate=...) - scan index
files from snapshot
+ """
+ from pypaimon.index.index_file_handler import IndexFileHandler
+
+ if index_files is not None:
+ return GlobalIndexScanner(
+ options=table.table_schema.options,
+ fields=table.fields,
+ file_io=table.file_io,
+
index_path=table.path_factory().global_index_path_factory().index_path(),
+ index_files=index_files
+ )
+
+ # Scan index files from snapshot using partition_filter and predicate
+ filter_field_names = _get_all_fields(predicate)
+ filter_field_ids = set()
+ if predicate is not None:
+ for field_item in table.fields:
+ if field_item.name in filter_field_names:
+ filter_field_ids.add(field_item.id)
+
+ def index_file_filter(entry):
+ if partition_filter is not None:
+ if not partition_filter.test(entry.partition):
+ return False
+ global_index_meta = entry.index_file.global_index_meta
+ if global_index_meta is None:
+ return False
+ return global_index_meta.index_field_id in filter_field_ids
+
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+ snapshot = SnapshotManager(table).get_latest_snapshot()
+ index_file_handler = IndexFileHandler(table=table)
+ entries = index_file_handler.scan(snapshot, index_file_filter)
+ scanned_index_files = [entry.index_file for entry in entries]
+
+ return GlobalIndexScanner(
+ options=table.table_schema.options,
+ fields=table.fields,
+ file_io=table.file_io,
+
index_path=table.path_factory().global_index_path_factory().index_path(),
+ index_files=scanned_index_files
+ )
+
+ def scan(self, predicate: Optional[Predicate]) ->
Optional[GlobalIndexResult]:
+ """Scan the global index with the given predicate."""
+ return self._evaluator.evaluate(predicate)
+
+ def close(self):
+ """Close the scanner and release resources."""
+ self._evaluator.close()
+
+ def __enter__(self) -> 'GlobalIndexScanner':
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
+ self.close()
+
+
+def _create_readers(file_io, index_path, index_type_metas, field):
+ """Create readers for a specific field from the index metadata."""
+ if index_type_metas is None:
+ return []
+
+ readers = []
+ for index_type, range_metas in index_type_metas.items():
+ if index_type == 'btree':
+ from pypaimon.globalindex.btree import BTreeIndexReader
+ from pypaimon.globalindex.btree.key_serializer import
create_serializer
+ for range_key, io_metas in range_metas.items():
+ for metadata in io_metas:
+ reader = BTreeIndexReader(
+ key_serializer=create_serializer(field.type),
+ file_io=file_io,
+ index_path=index_path,
+ io_meta=metadata
+ )
+ readers.append(reader)
+ return readers
diff --git a/paimon-python/pypaimon/read/read_builder.py
b/paimon-python/pypaimon/read/read_builder.py
index be489c9bac..d45a526a69 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -20,7 +20,6 @@ from typing import List, Optional
from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
-from pypaimon.globalindex import VectorSearch
from pypaimon.read.table_read import TableRead
from pypaimon.read.table_scan import TableScan
from pypaimon.schema.data_types import DataField
@@ -37,7 +36,6 @@ class ReadBuilder:
self._predicate: Optional[Predicate] = None
self._projection: Optional[List[str]] = None
self._limit: Optional[int] = None
- self._vector_search: Optional['VectorSearch'] = None
def with_filter(self, predicate: Predicate) -> 'ReadBuilder':
self._predicate = predicate
@@ -51,16 +49,11 @@ class ReadBuilder:
self._limit = limit
return self
- def with_vector_search(self, vector_search: VectorSearch) -> 'ReadBuilder':
- self._vector_search = vector_search
- return self
-
def new_scan(self) -> TableScan:
return TableScan(
table=self.table,
predicate=self._predicate,
- limit=self._limit,
- vector_search=self._vector_search
+ limit=self._limit
)
def new_read(self) -> TableRead:
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 1b78ddb0c8..b770b15916 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -167,8 +167,7 @@ class FileScanner:
table,
manifest_scanner: Callable[[], List[ManifestFileMeta]],
predicate: Optional[Predicate] = None,
- limit: Optional[int] = None,
- vector_search: Optional['VectorSearch'] = None
+ limit: Optional[int] = None
):
from pypaimon.table.file_store_table import FileStoreTable
@@ -177,7 +176,6 @@ class FileScanner:
self.predicate = predicate
self.predicate_for_stats = remove_row_id_filter(predicate) if
predicate else None
self.limit = limit
- self.vector_search = vector_search
self.snapshot_manager = SnapshotManager(table)
self.manifest_list_manager = ManifestListManager(table)
@@ -299,67 +297,27 @@ class FileScanner:
return self.read_manifest_entries(manifest_files)
def _eval_global_index(self):
- from pypaimon.globalindex.global_index_result import GlobalIndexResult
- from pypaimon.globalindex.global_index_scan_builder import \
- GlobalIndexScanBuilder
- from pypaimon.utils.range import Range
-
- # No filter and no vector search - nothing to evaluate
- if self.predicate is None and self.vector_search is None:
+ # No filter - nothing to evaluate
+ if self.predicate is None:
return None
# Check if global index is enabled
if not self.table.options.global_index_enabled():
return None
- # Get latest snapshot
- snapshot = self.snapshot_manager.get_latest_snapshot()
- if snapshot is None:
- return None
-
- # Check if table has store with global index scan builder
- index_scan_builder = self.table.new_global_index_scan_builder()
- if index_scan_builder is None:
- return None
-
- # Set partition predicate and snapshot
- index_scan_builder.with_partition_predicate(
- self.partition_key_predicate
- ).with_snapshot(snapshot)
-
- # Get indexed row ranges
- indexed_row_ranges = index_scan_builder.shard_list()
- if not indexed_row_ranges:
- return None
-
- # Get next row ID from snapshot
- next_row_id = snapshot.next_row_id
- if next_row_id is None:
- return None
-
- # Calculate non-indexed row ranges
- non_indexed_row_ranges = Range(0, next_row_id -
1).exclude(indexed_row_ranges)
-
- # Get thread number from options (can be None, meaning use default)
- thread_num = self.table.options.global_index_thread_num()
+ from pypaimon.globalindex.global_index_scanner import
GlobalIndexScanner
- # Scan global index in parallel
- result = GlobalIndexScanBuilder.parallel_scan(
- indexed_row_ranges,
- index_scan_builder,
- self.predicate,
- self.vector_search,
- thread_num
- )
-
- if result is None:
+ try:
+ scanner = GlobalIndexScanner.create(
+ self.table,
+ partition_filter=self.partition_key_predicate,
+ predicate=self.predicate
+ )
+ with scanner:
+ return scanner.scan(self.predicate)
+ except Exception:
return None
- for row_range in non_indexed_row_ranges:
- result = result.or_(GlobalIndexResult.from_range(row_range))
-
- return result
-
def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) ->
List[ManifestEntry]:
max_workers =
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
manifest_files = [entry for entry in manifest_files if
self._filter_manifest_file(entry)]
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 17da0692e8..416ef1717d 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-from typing import Optional, TYPE_CHECKING
+from typing import Optional
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
@@ -26,9 +26,6 @@ from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
-if TYPE_CHECKING:
- from pypaimon.globalindex.vector_search import VectorSearch
-
class TableScan:
"""Implementation of TableScan for native Python reading."""
@@ -37,15 +34,13 @@ class TableScan:
self,
table,
predicate: Optional[Predicate],
- limit: Optional[int],
- vector_search: Optional['VectorSearch'] = None
+ limit: Optional[int]
):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.predicate = predicate
self.limit = limit
- self.vector_search = vector_search
self.file_scanner = self._create_file_scanner()
def plan(self) -> Plan:
@@ -115,8 +110,7 @@ class TableScan:
self.table,
tag_manifest_scanner,
self.predicate,
- self.limit,
- vector_search=self.vector_search
+ self.limit
)
def all_manifests():
@@ -127,8 +121,7 @@ class TableScan:
self.table,
all_manifests,
self.predicate,
- self.limit,
- vector_search=self.vector_search
+ self.limit
)
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'TableScan':
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 51a031bd94..9db0fc3a48 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -341,25 +341,6 @@ class FileStoreTable(Table):
def new_read_builder(self) -> 'ReadBuilder':
return ReadBuilder(self)
- def new_global_index_scan_builder(self) ->
Optional['GlobalIndexScanBuilder']:
- if not self.options.global_index_enabled():
- return None
-
- from pypaimon.globalindex.global_index_scan_builder_impl import (
- GlobalIndexScanBuilderImpl
- )
-
- from pypaimon.index.index_file_handler import IndexFileHandler
-
- return GlobalIndexScanBuilderImpl(
- options=self.table_schema.options,
- row_type=self.fields,
- file_io=self.file_io,
- index_path_factory=self.path_factory().global_index_path_factory(),
- snapshot_manager=self.snapshot_manager(),
- index_file_handler=IndexFileHandler(table=self)
- )
-
def new_stream_read_builder(self) -> 'StreamReadBuilder':
return StreamReadBuilder(self)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index 029d6cd378..b45475ff0c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -64,7 +64,6 @@ class CreateGlobalIndexProcedureTest extends
PaimonSparkTestBase with StreamTest
.scanEntries()
.asScala
.filter(_.indexFile().indexType() == "bitmap")
- table.store().newGlobalIndexScanBuilder().shardList()
assert(bitmapEntries.nonEmpty)
val totalRowCount = bitmapEntries.map(_.indexFile().rowCount()).sum
assert(totalRowCount == 100000L)
@@ -359,7 +358,6 @@ class CreateGlobalIndexProcedureTest extends
PaimonSparkTestBase with StreamTest
.scanEntries()
.asScala
.filter(_.indexFile().indexType() == "btree")
- table.store().newGlobalIndexScanBuilder().shardList()
assert(btreeEntries.nonEmpty)
// 1. assert total row count