This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0933636fd2 [core] Introduce Table.newVectorSearchBuilder to vector
search (#7509)
0933636fd2 is described below
commit 0933636fd25ef57922f30e49689c4da48987eeab
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 24 10:34:49 2026 +0800
[core] Introduce Table.newVectorSearchBuilder to vector search (#7509)
The core design idea of this PR is to decouple vector search from
ReadBuilder and form an independent VectorSearchBuilder API.
This way, the engine can divide different index files into different
concurrent works for execution.
---
.../paimon/globalindex/GlobalIndexEvaluator.java | 36 +-
.../paimon/globalindex/GlobalIndexReader.java | 2 +-
.../globalindex/OffsetGlobalIndexReader.java | 6 +-
.../globalindex/ScoredGlobalIndexResult.java | 43 +-
.../paimon/globalindex/UnionGlobalIndexReader.java | 15 +-
.../org/apache/paimon/predicate/VectorSearch.java | 12 +-
.../testvector/TestVectorGlobalIndexReader.java | 295 ++++++++++++
.../testvector/TestVectorGlobalIndexWriter.java | 133 ++++++
.../testvector/TestVectorGlobalIndexer.java | 90 ++++
.../testvector/TestVectorGlobalIndexerFactory.java | 43 ++
....apache.paimon.globalindex.GlobalIndexerFactory | 16 +
.../paimon/globalindex/DataEvolutionBatchScan.java | 20 +-
.../paimon/globalindex/GlobalIndexScanBuilder.java | 7 +-
.../globalindex/GlobalIndexScanBuilderImpl.java | 2 -
.../globalindex/RowRangeGlobalIndexScanner.java | 8 +-
.../paimon/partition/PartitionPredicate.java | 6 +
.../java/org/apache/paimon/table/FormatTable.java | 6 +
.../java/org/apache/paimon/table/InnerTable.java | 7 +
.../main/java/org/apache/paimon/table/Table.java | 4 +
.../paimon/table/format/FormatReadBuilder.java | 6 -
.../apache/paimon/table/source/InnerTableScan.java | 11 +-
.../apache/paimon/table/source/ReadBuilder.java | 8 -
.../paimon/table/source/ReadBuilderImpl.java | 11 +-
.../org/apache/paimon/table/source/TableScan.java | 7 +
.../org/apache/paimon/table/source/VectorRead.java | 27 ++
.../org/apache/paimon/table/source/VectorScan.java | 47 ++
.../paimon/table/source/VectorSearchBuilder.java | 55 +++
.../table/source/VectorSearchBuilderImpl.java | 226 +++++++++
.../paimon/table/BitmapGlobalIndexTableTest.java | 8 +-
.../paimon/table/BtreeGlobalIndexTableTest.java | 2 +-
.../table/source/VectorSearchBuilderTest.java | 529 +++++++++++++++++++++
.../index/LuminaVectorGlobalIndexReader.java | 7 +-
.../index/LuminaVectorGlobalIndexScanTest.java | 27 +-
.../org/apache/paimon/spark/PaimonBaseScan.scala | 24 +
.../org/apache/paimon/spark/read/BaseScan.scala | 1 -
.../spark/sql/BaseVectorSearchPushDownTest.scala | 100 ----
36 files changed, 1627 insertions(+), 220 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 0c66602730..720cf63fe4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -24,7 +24,6 @@ import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateVisitor;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOUtils;
@@ -52,39 +51,8 @@ public class GlobalIndexEvaluator
this.readersFunction = readersFunction;
}
- public Optional<GlobalIndexResult> evaluate(
- @Nullable Predicate predicate, @Nullable VectorSearch
vectorSearch) {
- Optional<GlobalIndexResult> compoundResult = Optional.empty();
- if (predicate != null) {
- compoundResult = predicate.visit(this);
- }
- if (vectorSearch != null) {
- int fieldId = rowType.getField(vectorSearch.fieldName()).id();
- Collection<GlobalIndexReader> readers =
- indexReadersCache.computeIfAbsent(fieldId,
readersFunction::apply);
- if (compoundResult.isPresent()) {
- vectorSearch =
vectorSearch.withIncludeRowIds(compoundResult.get().results());
- }
- for (GlobalIndexReader fileIndexReader : readers) {
- Optional<GlobalIndexResult> childResult =
vectorSearch.visit(fileIndexReader);
- if (!childResult.isPresent()) {
- continue;
- }
- GlobalIndexResult result = childResult.get();
- // AND Operation
- if (compoundResult.isPresent()) {
- GlobalIndexResult r1 = compoundResult.get();
- compoundResult = Optional.of(r1.and(result));
- } else {
- compoundResult = Optional.of(result);
- }
-
- if (compoundResult.get().results().isEmpty()) {
- return compoundResult;
- }
- }
- }
- return compoundResult;
+ public Optional<GlobalIndexResult> evaluate(@Nullable Predicate predicate)
{
+ return predicate == null ? Optional.empty() : predicate.visit(this);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
index 4cc4cfbbbb..1b8c17bf66 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
@@ -44,7 +44,7 @@ public interface GlobalIndexReader extends
FunctionVisitor<Optional<GlobalIndexR
throw new UnsupportedOperationException();
}
- default Optional<GlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
+ default Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
throw new UnsupportedOperationException();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
index f1c3d154ed..21f06f59f3 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
@@ -117,9 +117,9 @@ public class OffsetGlobalIndexReader implements
GlobalIndexReader {
}
@Override
- public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
- return applyOffset(
-
wrapped.visitVectorSearch(vectorSearch.offsetRange(this.offset, this.to)));
+ public Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
+ return wrapped.visitVectorSearch(vectorSearch.offsetRange(this.offset,
this.to))
+ .map(r -> r.offset(offset));
}
private Optional<GlobalIndexResult>
applyOffset(Optional<GlobalIndexResult> result) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/ScoredGlobalIndexResult.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/ScoredGlobalIndexResult.java
index 6334343710..0219155a13 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/ScoredGlobalIndexResult.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/ScoredGlobalIndexResult.java
@@ -21,6 +21,8 @@ package org.apache.paimon.globalindex;
import org.apache.paimon.utils.LazyField;
import org.apache.paimon.utils.RoaringNavigableMap64;
+import java.util.Comparator;
+import java.util.PriorityQueue;
import java.util.function.Supplier;
/** Vector search global index result for scored index. */
@@ -49,9 +51,10 @@ public interface ScoredGlobalIndexResult extends
GlobalIndexResult {
}
@Override
- default GlobalIndexResult or(GlobalIndexResult other) {
+ default ScoredGlobalIndexResult or(GlobalIndexResult other) {
if (!(other instanceof ScoredGlobalIndexResult)) {
- return GlobalIndexResult.super.or(other);
+ throw new UnsupportedOperationException(
+ "Only work for scored global index result, but is: " +
other.getClass());
}
RoaringNavigableMap64 thisRowIds = results();
ScoreGetter thisScoreGetter = scoreGetter();
@@ -78,6 +81,42 @@ public interface ScoredGlobalIndexResult extends
GlobalIndexResult {
};
}
+ default ScoredGlobalIndexResult topK(int k) {
+ RoaringNavigableMap64 rowIds = results();
+ if (rowIds.getIntCardinality() <= k) {
+ return this;
+ }
+
+ ScoreGetter scoreGetter = scoreGetter();
+ // Min-heap by score: the head is the smallest score so we can evict
it when a
+ // higher-scored row arrives. This gives O(n log k) instead of O(n log
n).
+ PriorityQueue<long[]> minHeap =
+ new PriorityQueue<>(
+ k + 1, Comparator.comparingDouble(a ->
Float.intBitsToFloat((int) a[1])));
+ for (long rowId : rowIds) {
+ float score = scoreGetter.score(rowId);
+ long[] entry = new long[] {rowId, Float.floatToRawIntBits(score)};
+ if (minHeap.size() < k) {
+ minHeap.offer(entry);
+ } else if (score > Float.intBitsToFloat((int) minHeap.peek()[1])) {
+ minHeap.poll();
+ minHeap.offer(entry);
+ }
+ }
+
+ RoaringNavigableMap64 topKRowIds = new RoaringNavigableMap64();
+ for (long[] entry : minHeap) {
+ topKRowIds.add(entry[0]);
+ }
+
+ return ScoredGlobalIndexResult.create(() -> topKRowIds, scoreGetter);
+ }
+
+ /** Returns an empty {@link ScoredGlobalIndexResult}. */
+ static ScoredGlobalIndexResult createEmpty() {
+ return create(RoaringNavigableMap64::new, rowId -> 0);
+ }
+
/** Returns a new {@link ScoredGlobalIndexResult} from supplier. */
static ScoredGlobalIndexResult create(
Supplier<RoaringNavigableMap64> supplier, ScoreGetter scoreGetter)
{
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
index a59ec6c6a4..2c02a0c484 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
@@ -114,8 +114,19 @@ public class UnionGlobalIndexReader implements
GlobalIndexReader {
}
@Override
- public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
- return union(reader -> reader.visitVectorSearch(vectorSearch));
+ public Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
+ Optional<ScoredGlobalIndexResult> result = Optional.empty();
+ for (GlobalIndexReader reader : readers) {
+ Optional<ScoredGlobalIndexResult> current =
reader.visitVectorSearch(vectorSearch);
+ if (!current.isPresent()) {
+ continue;
+ }
+ if (!result.isPresent()) {
+ result = current;
+ }
+ result = Optional.of(result.get().or(current.get()));
+ }
+ return result;
}
private Optional<GlobalIndexResult> union(
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java
index 8ed13e043b..d4bb6ed3e5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java
@@ -19,7 +19,7 @@
package org.apache.paimon.predicate;
import org.apache.paimon.globalindex.GlobalIndexReader;
-import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RoaringNavigableMap64;
@@ -33,14 +33,13 @@ public class VectorSearch implements Serializable {
private static final long serialVersionUID = 1L;
- // float[] or byte[]
- private final Object vector;
+ private final float[] vector;
private final String fieldName;
private final int limit;
@Nullable private RoaringNavigableMap64 includeRowIds;
- public VectorSearch(Object vector, int limit, String fieldName) {
+ public VectorSearch(float[] vector, int limit, String fieldName) {
if (vector == null) {
throw new IllegalArgumentException("Search cannot be null");
}
@@ -55,8 +54,7 @@ public class VectorSearch implements Serializable {
this.fieldName = fieldName;
}
- // float[] or byte[]
- public Object vector() {
+ public float[] vector() {
return vector;
}
@@ -93,7 +91,7 @@ public class VectorSearch implements Serializable {
return this;
}
- public Optional<GlobalIndexResult> visit(GlobalIndexReader visitor) {
+ public Optional<ScoredGlobalIndexResult> visit(GlobalIndexReader visitor) {
return visitor.visitVectorSearch(this);
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexReader.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexReader.java
new file mode 100644
index 0000000000..dc7fb4b6a6
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexReader.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.testvector;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+/**
+ * Test vector index reader that performs brute-force linear scan for
similarity search. Loads all
+ * vectors into memory and computes distances against every stored vector.
+ *
+ * <p>Supported distance metrics:
+ *
+ * <ul>
+ * <li>{@code l2} - score = 1 / (1 + L2_distance)
+ * <li>{@code cosine} - score = cosine_similarity (i.e. 1 - cosine_distance)
+ * <li>{@code inner_product} - score = dot_product
+ * </ul>
+ */
+public class TestVectorGlobalIndexReader implements GlobalIndexReader {
+
+ private final GlobalIndexFileReader fileReader;
+ private final GlobalIndexIOMeta ioMeta;
+ private final String metric;
+
+ private float[][] vectors;
+ private int dimension;
+ private int count;
+
+ public TestVectorGlobalIndexReader(
+ GlobalIndexFileReader fileReader, GlobalIndexIOMeta ioMeta, String
metric) {
+ this.fileReader = fileReader;
+ this.ioMeta = ioMeta;
+ this.metric = metric;
+ }
+
+ @Override
+ public Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
+ try {
+ ensureLoaded();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to load test vector index", e);
+ }
+
+ float[] queryVector = vectorSearch.vector();
+ if (queryVector.length != dimension) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Query vector dimension mismatch: index expects
%d, but got %d",
+ dimension, queryVector.length));
+ }
+
+ int limit = vectorSearch.limit();
+ int effectiveK = Math.min(limit, count);
+ if (effectiveK <= 0) {
+ return Optional.empty();
+ }
+
+ RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+ // Min-heap: smallest score at head, so we evict the weakest candidate.
+ PriorityQueue<ScoredRow> topK =
+ new PriorityQueue<>(effectiveK + 1,
Comparator.comparingDouble(s -> s.score));
+
+ for (int i = 0; i < count; i++) {
+ if (includeRowIds != null && !includeRowIds.contains(i)) {
+ continue;
+ }
+ float score = computeScore(queryVector, vectors[i]);
+ if (topK.size() < effectiveK) {
+ topK.offer(new ScoredRow(i, score));
+ } else if (score > topK.peek().score) {
+ topK.poll();
+ topK.offer(new ScoredRow(i, score));
+ }
+ }
+
+ RoaringNavigableMap64 resultBitmap = new RoaringNavigableMap64();
+ Map<Long, Float> scoreMap = new HashMap<>(topK.size());
+ for (ScoredRow row : topK) {
+ resultBitmap.add(row.rowId);
+ scoreMap.put(row.rowId, row.score);
+ }
+
+ return Optional.of(ScoredGlobalIndexResult.create(() -> resultBitmap,
scoreMap::get));
+ }
+
+ private float computeScore(float[] query, float[] stored) {
+ switch (metric) {
+ case "l2":
+ return computeL2Score(query, stored);
+ case "cosine":
+ return computeCosineScore(query, stored);
+ case "inner_product":
+ return computeInnerProductScore(query, stored);
+ default:
+ throw new IllegalArgumentException("Unknown metric: " +
metric);
+ }
+ }
+
+ private static float computeL2Score(float[] a, float[] b) {
+ float sumSq = 0;
+ for (int i = 0; i < a.length; i++) {
+ float diff = a[i] - b[i];
+ sumSq += diff * diff;
+ }
+ return 1.0f / (1.0f + sumSq);
+ }
+
+ private static float computeCosineScore(float[] a, float[] b) {
+ float dot = 0, normA = 0, normB = 0;
+ for (int i = 0; i < a.length; i++) {
+ dot += a[i] * b[i];
+ normA += a[i] * a[i];
+ normB += b[i] * b[i];
+ }
+ float denominator = (float) (Math.sqrt(normA) * Math.sqrt(normB));
+ if (denominator == 0) {
+ return 0;
+ }
+ return dot / denominator;
+ }
+
+ private static float computeInnerProductScore(float[] a, float[] b) {
+ float dot = 0;
+ for (int i = 0; i < a.length; i++) {
+ dot += a[i] * b[i];
+ }
+ return dot;
+ }
+
+ private void ensureLoaded() throws IOException {
+ if (vectors != null) {
+ return;
+ }
+
+ try (SeekableInputStream in = fileReader.getInputStream(ioMeta)) {
+ // Read header: dimension (4 bytes) + count (4 bytes)
+ byte[] headerBytes = new byte[8];
+ readFully(in, headerBytes);
+ ByteBuffer header = ByteBuffer.wrap(headerBytes);
+ header.order(ByteOrder.LITTLE_ENDIAN);
+ dimension = header.getInt();
+ count = header.getInt();
+
+ // Read vectors
+ vectors = new float[count][dimension];
+ byte[] vectorBytes = new byte[dimension * Float.BYTES];
+ for (int i = 0; i < count; i++) {
+ readFully(in, vectorBytes);
+ ByteBuffer vectorBuf = ByteBuffer.wrap(vectorBytes);
+ vectorBuf.order(ByteOrder.LITTLE_ENDIAN);
+ for (int j = 0; j < dimension; j++) {
+ vectors[i][j] = vectorBuf.getFloat();
+ }
+ }
+ }
+ }
+
+ private static void readFully(SeekableInputStream in, byte[] buf) throws
IOException {
+ int offset = 0;
+ while (offset < buf.length) {
+ int bytesRead = in.read(buf, offset, buf.length - offset);
+ if (bytesRead < 0) {
+ throw new IOException(
+ "Unexpected end of stream: read "
+ + offset
+ + " bytes but expected "
+ + buf.length);
+ }
+ offset += bytesRead;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ vectors = null;
+ }
+
+ // =================== unsupported predicate operations
=====================
+
+ @Override
+ public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object>
literals) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
+ return Optional.empty();
+ }
+
+ /** A row ID paired with its similarity score, used in the top-k min-heap.
*/
+ private static class ScoredRow {
+ final long rowId;
+ final float score;
+
+ ScoredRow(long rowId, float score) {
+ this.rowId = rowId;
+ this.score = score;
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexWriter.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexWriter.java
new file mode 100644
index 0000000000..ea8ad346cb
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexWriter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.testvector;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test vector index writer that stores all vectors in memory and writes them
to a single binary
+ * file on {@link #finish()}.
+ *
+ * <p>Binary format (all values little-endian):
+ *
+ * <pre>
+ * [4 bytes] dimension (int)
+ * [4 bytes] count (int)
+ * [count * dim * 4 bytes] float vectors (row-major order)
+ * </pre>
+ */
+public class TestVectorGlobalIndexWriter implements GlobalIndexSingletonWriter
{
+
+ private static final String FILE_NAME_PREFIX = "test-vector";
+
+ private final GlobalIndexFileWriter fileWriter;
+ private final int dimension;
+ private final List<float[]> vectors;
+
+ public TestVectorGlobalIndexWriter(GlobalIndexFileWriter fileWriter, int
dimension) {
+ this.fileWriter = fileWriter;
+ this.dimension = dimension;
+ this.vectors = new ArrayList<>();
+ }
+
+ @Override
+ public void write(Object fieldData) {
+ if (fieldData == null) {
+ throw new IllegalArgumentException("Vector field data must not be
null");
+ }
+
+ float[] vector;
+ if (fieldData instanceof float[]) {
+ vector = ((float[]) fieldData).clone();
+ } else if (fieldData instanceof InternalArray) {
+ InternalArray array = (InternalArray) fieldData;
+ vector = new float[array.size()];
+ for (int i = 0; i < array.size(); i++) {
+ if (array.isNullAt(i)) {
+ throw new IllegalArgumentException("Vector element at
index " + i + " is null");
+ }
+ vector[i] = array.getFloat(i);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported vector type: " +
fieldData.getClass().getName());
+ }
+
+ int expectedDim =
+ dimension > 0
+ ? dimension
+ : (vectors.isEmpty() ? vector.length :
vectors.get(0).length);
+ if (vector.length != expectedDim) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Vector dimension mismatch: expected %d, but got
%d",
+ expectedDim, vector.length));
+ }
+ vectors.add(vector);
+ }
+
+ @Override
+ public List<ResultEntry> finish() {
+ if (vectors.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ int dim = dimension > 0 ? dimension : vectors.get(0).length;
+ int count = vectors.size();
+
+ try {
+ String fileName = fileWriter.newFileName(FILE_NAME_PREFIX);
+ try (PositionOutputStream out =
fileWriter.newOutputStream(fileName)) {
+ // Header: dimension + count
+ ByteBuffer header = ByteBuffer.allocate(8);
+ header.order(ByteOrder.LITTLE_ENDIAN);
+ header.putInt(dim);
+ header.putInt(count);
+ out.write(header.array());
+
+ // Vector data
+ ByteBuffer vectorBuf = ByteBuffer.allocate(dim * Float.BYTES);
+ vectorBuf.order(ByteOrder.LITTLE_ENDIAN);
+ for (float[] vec : vectors) {
+ vectorBuf.clear();
+ for (int i = 0; i < dim; i++) {
+ vectorBuf.putFloat(vec[i]);
+ }
+ out.write(vectorBuf.array());
+ }
+ out.flush();
+ }
+
+ return Collections.singletonList(new ResultEntry(fileName, count,
null));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to write test vector index", e);
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexer.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexer.java
new file mode 100644
index 0000000000..d6ee516f12
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.testvector;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A test-only {@link GlobalIndexer} for vector similarity search. Uses
brute-force linear scan for
+ * ANN queries. No native library dependency required.
+ *
+ * <p>Supported distance metrics (configured via option {@code
test.vector.metric}):
+ *
+ * <ul>
+ * <li>{@code l2} (default) - Euclidean distance, score = 1 / (1 + distance)
+ * <li>{@code cosine} - Cosine distance, score = 1 - distance
+ * <li>{@code inner_product} - Inner product similarity (directly used as
score)
+ * </ul>
+ */
+public class TestVectorGlobalIndexer implements GlobalIndexer {
+
+ /** Option key for vector dimension. */
+ public static final String OPT_DIMENSION = "test.vector.dimension";
+
+ /** Option key for distance metric. */
+ public static final String OPT_METRIC = "test.vector.metric";
+
+ private final DataType fieldType;
+ private final int dimension;
+ private final String metric;
+
+ public TestVectorGlobalIndexer(DataType fieldType, Options options) {
+ checkArgument(
+ fieldType instanceof ArrayType
+ && ((ArrayType) fieldType).getElementType() instanceof
FloatType,
+ "TestVectorGlobalIndexer only supports ARRAY<FLOAT>, but got:
" + fieldType);
+ this.fieldType = fieldType;
+ this.dimension = options.getInteger(OPT_DIMENSION, 0);
+ this.metric = options.getString(OPT_METRIC, "l2");
+ }
+
+ @Override
+ public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter)
throws IOException {
+ return new TestVectorGlobalIndexWriter(fileWriter, dimension);
+ }
+
+ @Override
+ public GlobalIndexReader createReader(
+ GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files)
throws IOException {
+ checkArgument(files.size() == 1, "Expected exactly one index file per
shard");
+ return new TestVectorGlobalIndexReader(fileReader, files.get(0),
metric);
+ }
+
+ public int dimension() {
+ return dimension;
+ }
+
+ public String metric() {
+ return metric;
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexerFactory.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexerFactory.java
new file mode 100644
index 0000000000..92c0d8126c
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.testvector;
+
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+
+/**
+ * A test-only {@link GlobalIndexerFactory} for vector similarity search. Uses
brute-force linear
+ * scan, no native dependencies required.
+ */
+public class TestVectorGlobalIndexerFactory implements GlobalIndexerFactory {
+
+ public static final String IDENTIFIER = "test-vector-ann";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public GlobalIndexer create(DataField field, Options options) {
+ return new TestVectorGlobalIndexer(field.type(), options);
+ }
+}
diff --git
a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
new file mode 100644
index 0000000000..3fa7826081
--- /dev/null
+++
b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.globalindex.testvector.TestVectorGlobalIndexerFactory
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 708d7d874c..13b4b59345 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -30,7 +30,6 @@ import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.RowIdPredicateVisitor;
import org.apache.paimon.predicate.TopN;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableBatchScan;
@@ -64,7 +63,6 @@ public class DataEvolutionBatchScan implements DataTableScan {
private final DataTableBatchScan batchScan;
private Predicate filter;
- private VectorSearch vectorSearch;
private RowRangeIndex pushedRowRangeIndex;
private GlobalIndexResult globalIndexResult;
@@ -117,13 +115,6 @@ public class DataEvolutionBatchScan implements
DataTableScan {
return filter;
}
- @Override
- public InnerTableScan withVectorSearch(VectorSearch vectorSearch) {
- this.vectorSearch = vectorSearch;
- batchScan.withVectorSearch(vectorSearch);
- return this;
- }
-
@Override
public InnerTableScan withReadType(@Nullable RowType readType) {
batchScan.withReadType(readType);
@@ -222,8 +213,12 @@ public class DataEvolutionBatchScan implements
DataTableScan {
return this;
}
- // To enable other system computing index result by their own.
- public InnerTableScan withGlobalIndexResult(GlobalIndexResult
globalIndexResult) {
+ @Override
+ public DataEvolutionBatchScan withGlobalIndexResult(GlobalIndexResult
globalIndexResult) {
+ if (globalIndexResult == null) {
+ return this;
+ }
+
this.globalIndexResult = globalIndexResult;
if (pushedRowRangeIndex != null) {
throw new IllegalStateException(
@@ -265,7 +260,7 @@ public class DataEvolutionBatchScan implements
DataTableScan {
if (this.globalIndexResult != null) {
return Optional.of(globalIndexResult);
}
- if (filter == null && vectorSearch == null) {
+ if (filter == null) {
return Optional.empty();
}
if (!table.coreOptions().globalIndexEnabled()) {
@@ -288,7 +283,6 @@ public class DataEvolutionBatchScan implements
DataTableScan {
indexedRowRanges,
indexScanBuilder,
filter,
- vectorSearch,
table.coreOptions().globalIndexThreadNum());
if (!resultOptional.isPresent()) {
return Optional.empty();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
index 2620e0dd40..5c4ec92f81 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
@@ -21,12 +21,9 @@ package org.apache.paimon.globalindex;
import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Range;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -56,7 +53,6 @@ public interface GlobalIndexScanBuilder {
final List<Range> ranges,
final GlobalIndexScanBuilder globalIndexScanBuilder,
final Predicate filter,
- @Nullable final VectorSearch vectorSearch,
final Integer threadNum) {
List<RowRangeGlobalIndexScanner> scanners =
ranges.stream()
@@ -69,8 +65,7 @@ public interface GlobalIndexScanBuilder {
Iterator<Optional<GlobalIndexResult>> resultIterators =
randomlyExecuteSequentialReturn(
scanner -> {
- Optional<GlobalIndexResult> result =
- scanner.scan(filter, vectorSearch);
+ Optional<GlobalIndexResult> result =
scanner.scan(filter);
return Collections.singletonList(result);
},
scanners,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
index 9b6dcf0826..4619bb77d8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
@@ -26,7 +26,6 @@ import org.apache.paimon.index.IndexPathFactory;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
@@ -53,7 +52,6 @@ public class GlobalIndexScanBuilderImpl implements
GlobalIndexScanBuilder {
private Snapshot snapshot;
private PartitionPredicate partitionPredicate;
private Range rowRange;
- private VectorSearch vectorSearch;
public GlobalIndexScanBuilderImpl(
Options options,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
index ad2ccc3498..044790e7de 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
@@ -27,13 +27,10 @@ import org.apache.paimon.index.IndexPathFactory;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Range;
-import javax.annotation.Nullable;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -107,9 +104,8 @@ public class RowRangeGlobalIndexScanner implements
Closeable {
this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType,
readersFunction);
}
- public Optional<GlobalIndexResult> scan(
- Predicate predicate, @Nullable VectorSearch vectorSearch) {
- return globalIndexEvaluator.evaluate(predicate, vectorSearch);
+ public Optional<GlobalIndexResult> scan(Predicate predicate) {
+ return globalIndexEvaluator.evaluate(predicate);
}
private Collection<GlobalIndexReader> createReaders(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 7e9dbc5062..2820b52e9b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -458,6 +458,12 @@ public interface PartitionPredicate extends Serializable {
partitionType, createBinaryPartitions(values, partitionType,
defaultPartValue));
}
+ static Optional<PartitionPredicate> splitPartitionPredicate(
+ Predicate dataPredicates, RowType tableType, List<String>
partitionKeys) {
+ return splitPartitionPredicatesAndDataPredicates(dataPredicates,
tableType, partitionKeys)
+ .getLeft();
+ }
+
static Pair<Optional<PartitionPredicate>, List<Predicate>>
splitPartitionPredicatesAndDataPredicates(
Predicate dataPredicates, RowType tableType, List<String>
partitionKeys) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index a2b01fb7f5..8f298ff9bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.format.FormatReadBuilder;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.VectorSearchBuilder;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SimpleFileReader;
@@ -272,6 +273,11 @@ public interface FormatTable extends Table {
catalogContext);
}
+ @Override
+ public VectorSearchBuilder newVectorSearchBuilder() {
+ throw new UnsupportedOperationException("FormatTable does not
support vector search.");
+ }
+
@Override
public CatalogContext catalogContext() {
return this.catalogContext;
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
index e96c333431..c404672e43 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
@@ -30,6 +30,8 @@ import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.ReadBuilderImpl;
import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.VectorSearchBuilder;
+import org.apache.paimon.table.source.VectorSearchBuilderImpl;
import java.util.Optional;
@@ -53,6 +55,11 @@ public interface InnerTable extends Table {
return new ReadBuilderImpl(this);
}
+ @Override
+ default VectorSearchBuilder newVectorSearchBuilder() {
+ return new VectorSearchBuilderImpl(this);
+ }
+
@Override
default BatchWriteBuilder newBatchWriteBuilder() {
return new BatchWriteBuilderImpl(this);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 53e99ea45f..938718db98 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -29,6 +29,7 @@ import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.VectorSearchBuilder;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SimpleFileReader;
@@ -213,6 +214,9 @@ public interface Table extends Serializable {
// =============== Read & Write Operations ==================
+ /** Returns a new vector search builder. */
+ VectorSearchBuilder newVectorSearchBuilder();
+
/** Returns a new read builder. */
ReadBuilder newReadBuilder();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 50aba047c1..6c394f3b8f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -30,7 +30,6 @@ import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FormatTable;
@@ -251,11 +250,6 @@ public class FormatReadBuilder implements ReadBuilder {
throw new UnsupportedOperationException("Format Table does not support
withRowRangeIndex.");
}
- @Override
- public ReadBuilder withVectorSearch(VectorSearch vectorSearch) {
- throw new UnsupportedOperationException("Format Table does not support
withRowRanges.");
- }
-
@Override
public StreamTableScan newStreamScan() {
throw new UnsupportedOperationException("Format Table does not support
stream scan.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 1f6edd7176..c09809feb9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -19,11 +19,11 @@
package org.apache.paimon.table.source;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.globalindex.GlobalIndexResult;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.TopN;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
@@ -39,10 +39,6 @@ public interface InnerTableScan extends TableScan {
InnerTableScan withFilter(Predicate predicate);
- default InnerTableScan withVectorSearch(VectorSearch vectorSearch) {
- return this;
- }
-
default InnerTableScan withReadType(@Nullable RowType readType) {
return this;
}
@@ -79,6 +75,11 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ @Override
+ default InnerTableScan withGlobalIndexResult(GlobalIndexResult
globalIndexResult) {
+ return this;
+ }
+
default InnerTableScan withBucket(int bucket) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 8c3cf6f681..ee94211a7f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -24,7 +24,6 @@ import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
@@ -168,13 +167,6 @@ public interface ReadBuilder extends Serializable {
*/
ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex);
- /**
- * Push vector search to the reader.
- *
- * @param vectorSearch
- */
- ReadBuilder withVectorSearch(VectorSearch vectorSearch);
-
/** Delete stats in scan plan result. */
ReadBuilder dropStats();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index f31c750ec1..553a848749 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -23,7 +23,6 @@ import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
@@ -64,7 +63,6 @@ public class ReadBuilderImpl implements ReadBuilder {
private @Nullable RowType readType;
private @Nullable RowRangeIndex rowRangeIndex;
- private @Nullable VectorSearch vectorSearch;
private boolean dropStats = false;
@@ -161,12 +159,6 @@ public class ReadBuilderImpl implements ReadBuilder {
return this;
}
- @Override
- public ReadBuilder withVectorSearch(VectorSearch vectorSearch) {
- this.vectorSearch = vectorSearch;
- return this;
- }
-
@Override
public ReadBuilder withBucket(int bucket) {
this.specifiedBucket = bucket;
@@ -209,8 +201,7 @@ public class ReadBuilderImpl implements ReadBuilder {
scan.withFilter(filter)
.withReadType(readType)
.withPartitionFilter(partitionFilter)
- .withRowRangeIndex(rowRangeIndex)
- .withVectorSearch(vectorSearch);
+ .withRowRangeIndex(rowRangeIndex);
checkState(
bucketFilter == null || shardIndexOfThisSubtask == null,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
index f3c667f19e..31fa8c0f82 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.globalindex.GlobalIndexResult;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.Table;
@@ -38,6 +39,12 @@ public interface TableScan {
/** Set {@link MetricRegistry} to table scan. */
TableScan withMetricRegistry(MetricRegistry registry);
+ /**
+ * Specify the global index result to be read. This is usually used to
search vector index in
+ * data-evolution table.
+ */
+ TableScan withGlobalIndexResult(GlobalIndexResult globalIndexResult);
+
/** Plan splits, throws {@link EndOfScanException} if the scan is ended. */
Plan plan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
new file mode 100644
index 0000000000..4fc3c7aaf0
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.globalindex.GlobalIndexResult;
+
+/** Vector read to read index files. */
+public interface VectorRead {
+
+ GlobalIndexResult read(VectorScan.Plan plan);
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
new file mode 100644
index 0000000000..57aaf9fd4a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Vector scan to pre-filter and scan index files. */
+public interface VectorScan {
+
+ Plan scan();
+
+ /** Plan of vector scan. */
+ interface Plan {
+
+ /**
+ * Index files for vector index. {@link IndexFileMeta} can be
serialized by {@link
+ * IndexFileMetaSerializer}.
+ */
+ List<IndexFileMeta> indexFiles();
+
+ /** Include row ids generated by pre-filters. */
+ @Nullable
+ RoaringNavigableMap64 includeRowIds();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilder.java
new file mode 100644
index 0000000000..ae7e7bf48e
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+
+import java.io.Serializable;
+
+/** Builder to build vector search. */
+public interface VectorSearchBuilder extends Serializable {
+
+ /** Push partition filters. */
+ VectorSearchBuilder withPartitionFilter(PartitionPredicate
partitionPredicate);
+
+ /** Push pre-filter for vector search. */
+ VectorSearchBuilder withFilter(Predicate predicate);
+
+ /** The top k results to return. */
+ VectorSearchBuilder withLimit(int limit);
+
+ /** The vector column to search. */
+ VectorSearchBuilder withVectorColumn(String name);
+
+ /** The vector to search. */
+ VectorSearchBuilder withVector(float[] vector);
+
+ /** Create vector scan to scan index files. */
+ VectorScan newVectorScan();
+
+ /** Create vector read to read index files. */
+ VectorRead newVectorRead();
+
+ /** Execute vector index search in local. */
+ default GlobalIndexResult executeLocal() {
+ return newVectorRead().read(newVectorScan().scan());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
new file mode 100644
index 0000000000..06e6ee775b
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactory;
+import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
+import org.apache.paimon.globalindex.OffsetGlobalIndexReader;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.InnerTable;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicate;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
+
+/** Implementation for {@link VectorSearchBuilder}. */
+public class VectorSearchBuilderImpl implements VectorSearchBuilder {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreTable table;
+
+ private PartitionPredicate partitionFilter;
+ private Predicate filter;
+ private int limit;
+ private DataField vectorColumn;
+ private float[] vector;
+
+ public VectorSearchBuilderImpl(InnerTable table) {
+ this.table = (FileStoreTable) table;
+ }
+
+ @Override
+ public VectorSearchBuilder withPartitionFilter(PartitionPredicate
partitionFilter) {
+ this.partitionFilter = partitionFilter;
+ return this;
+ }
+
+ @Override
+ public VectorSearchBuilder withFilter(Predicate predicate) {
+ if (this.filter == null) {
+ this.filter = predicate;
+ } else {
+ this.filter = PredicateBuilder.and(this.filter, predicate);
+ }
+ splitPartitionPredicate(predicate, table.rowType(),
table.partitionKeys())
+ .ifPresent(value -> this.partitionFilter = value);
+ return this;
+ }
+
+ @Override
+ public VectorSearchBuilder withLimit(int limit) {
+ this.limit = limit;
+ return this;
+ }
+
+ @Override
+ public VectorSearchBuilder withVectorColumn(String name) {
+ this.vectorColumn = table.rowType().getField(name);
+ return this;
+ }
+
+ @Override
+ public VectorSearchBuilder withVector(float[] vector) {
+ this.vector = vector;
+ return this;
+ }
+
+ @Override
+ public VectorScan newVectorScan() {
+ return new VectorScanImpl();
+ }
+
+ @Override
+ public VectorRead newVectorRead() {
+ return new VectorReadImpl();
+ }
+
+ private class VectorScanImpl implements VectorScan {
+
+ @Override
+ public Plan scan() {
+ Objects.requireNonNull(vector, "Vector must be set");
+ Objects.requireNonNull(vectorColumn, "Vector column must be set");
+ if (limit <= 0) {
+ throw new IllegalArgumentException("Limit must be positive");
+ }
+
+ Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
+ IndexFileHandler indexFileHandler =
table.store().newIndexFileHandler();
+ Filter<IndexManifestEntry> indexFileFilter =
+ partitionFilter == null
+ ? Filter.alwaysTrue()
+ : entry -> partitionFilter.test(entry.partition());
+ List<IndexManifestEntry> indexManifestEntries =
+ indexFileHandler.scan(snapshot, indexFileFilter);
+
+ List<IndexFileMeta> vectorIndexFiles =
+ indexManifestEntries.stream()
+ .map(IndexManifestEntry::indexFile)
+ .filter(
+ indexFile -> {
+ GlobalIndexMeta indexMeta =
indexFile.globalIndexMeta();
+ if (indexMeta == null) {
+ return false;
+ }
+ return indexMeta.indexFieldId() ==
vectorColumn.id();
+ })
+ .collect(Collectors.toList());
+
+ return new Plan() {
+ @Override
+ public List<IndexFileMeta> indexFiles() {
+ return vectorIndexFiles;
+ }
+
+ @Override
+ public @Nullable RoaringNavigableMap64 includeRowIds() {
+ // TODO pre filter by btree index
+ return null;
+ }
+ };
+ }
+ }
+
+ private class VectorReadImpl implements VectorRead {
+
+ @Override
+ public GlobalIndexResult read(VectorScan.Plan plan) {
+ List<IndexFileMeta> indexFiles = plan.indexFiles();
+ if (indexFiles.isEmpty()) {
+ return GlobalIndexResult.createEmpty();
+ }
+
+ Integer threadNum = table.coreOptions().globalIndexThreadNum();
+ Iterator<Optional<ScoredGlobalIndexResult>> resultIterators =
+ randomlyExecuteSequentialReturn(
+ vectorIndex -> singletonList(eval(vectorIndex,
plan.includeRowIds())),
+ indexFiles,
+ threadNum);
+
+ ScoredGlobalIndexResult result =
ScoredGlobalIndexResult.createEmpty();
+ while (resultIterators.hasNext()) {
+ Optional<ScoredGlobalIndexResult> next =
resultIterators.next();
+ if (next.isPresent()) {
+ result = result.or(next.get());
+ }
+ }
+
+ return result.topK(limit);
+ }
+
+ private Optional<ScoredGlobalIndexResult> eval(
+ IndexFileMeta indexFile, @Nullable RoaringNavigableMap64
includeRowIds) {
+ GlobalIndexerFactory globalIndexerFactory =
+ GlobalIndexerFactoryUtils.load(indexFile.indexType());
+ GlobalIndexer globalIndexer =
+ globalIndexerFactory.create(
+ vectorColumn,
table.coreOptions().toConfiguration());
+ GlobalIndexMeta meta = indexFile.globalIndexMeta();
+ Preconditions.checkNotNull(meta);
+ Path filePath =
table.store().pathFactory().globalIndexFileFactory().toPath(indexFile);
+ GlobalIndexIOMeta globalIndexIOMeta =
+ new GlobalIndexIOMeta(filePath, indexFile.fileSize(),
meta.indexMeta());
+ @SuppressWarnings("resource")
+ FileIO fileIO = table.fileIO();
+ GlobalIndexFileReader indexFileReader = m ->
fileIO.newInputStream(m.filePath());
+ try (GlobalIndexReader reader =
+ globalIndexer.createReader(indexFileReader,
singletonList(globalIndexIOMeta))) {
+ VectorSearch vectorSearch =
+ new VectorSearch(vector, limit, vectorColumn.name())
+ .withIncludeRowIds(includeRowIds);
+ return new OffsetGlobalIndexReader(reader,
meta.rowRangeStart(), meta.rowRangeEnd())
+ .visitVectorSearch(vectorSearch);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
index 65ec24b9bb..574579965d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
@@ -94,9 +94,11 @@ public class BitmapGlobalIndexTableTest extends
DataEvolutionTestBase {
.containsExactlyInAnyOrder(
new Range(200L, 200L), new Range(300L, 300L), new
Range(400L, 400L));
- DataEvolutionBatchScan scan = (DataEvolutionBatchScan) table.newScan();
RoaringNavigableMap64 finalRowIds = rowIds;
- scan.withGlobalIndexResult(GlobalIndexResult.create(() ->
finalRowIds));
+ DataEvolutionBatchScan scan =
+ (DataEvolutionBatchScan)
+ table.newScan()
+
.withGlobalIndexResult(GlobalIndexResult.create(() -> finalRowIds));
List<String> readF1 = new ArrayList<>();
table.newRead()
@@ -252,7 +254,7 @@ public class BitmapGlobalIndexTableTest extends
DataEvolutionTestBase {
for (Range range : ranges) {
try (RowRangeGlobalIndexScanner scanner =
indexScanBuilder.withRowRange(range).build()) {
- Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate, null);
+ Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate);
if (!globalIndexResult.isPresent()) {
throw new RuntimeException("Can't find index result by
scan");
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index 145997206a..90d4e473c1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -185,7 +185,7 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
for (Range range : ranges) {
try (RowRangeGlobalIndexScanner scanner =
indexScanBuilder.withRowRange(range).build()) {
- Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate, null);
+ Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate);
if (!globalIndexResult.isPresent()) {
throw new RuntimeException("Can't find index result by
scan");
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
new file mode 100644
index 0000000000..30f9c4b107
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.testvector.TestVectorGlobalIndexerFactory;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link VectorSearchBuilder} using test-only brute-force vector
index. */
+public class VectorSearchBuilderTest extends TableTestBase {
+
+ private static final String VECTOR_FIELD_NAME = "vec";
+ private static final int DIMENSION = 2;
+
+ @Override
+ protected Schema schemaDefault() {
+ return Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column(VECTOR_FIELD_NAME, new ArrayType(DataTypes.FLOAT()))
+ .option(CoreOptions.BUCKET.key(), "-1")
+ .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
+ .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true")
+ .option("test.vector.dimension", String.valueOf(DIMENSION))
+ .option("test.vector.metric", "l2")
+ .build();
+ }
+
+ @Test
+ public void testVectorSearchEndToEnd() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+
+ float[][] vectors = {
+ {1.0f, 0.0f},
+ {0.95f, 0.1f},
+ {0.1f, 0.95f},
+ {0.98f, 0.05f},
+ {0.0f, 1.0f},
+ {0.05f, 0.98f}
+ };
+
+ writeVectors(table, vectors);
+ buildAndCommitIndex(table, vectors);
+
+ // Query vector close to (1.0, 0.0) - should return rows 0,1,3
+ float[] queryVector = {0.85f, 0.15f};
+ GlobalIndexResult result =
+ table.newVectorSearchBuilder()
+ .withVector(queryVector)
+ .withLimit(3)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+
+ assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+ assertThat(result.results().isEmpty()).isFalse();
+
+ // Read using the search result
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Integer> ids = new ArrayList<>();
+ TableScan.Plan plan =
readBuilder.newScan().withGlobalIndexResult(result).plan();
+ try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan)) {
+ reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+ }
+
+ assertThat(ids).isNotEmpty();
+ assertThat(ids.size()).isLessThanOrEqualTo(3);
+ // Row 0 (1.0, 0.0) should be the closest to query (0.85, 0.15)
+ assertThat(ids).contains(0);
+ }
+
+ @Test
+ public void testVectorSearchWithCosineMetric() throws Exception {
+ // Create a table with cosine metric
+ catalog.createTable(
+ identifier("cosine_table"),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column(VECTOR_FIELD_NAME, new
ArrayType(DataTypes.FLOAT()))
+ .option(CoreOptions.BUCKET.key(), "-1")
+ .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
+ .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(),
"true")
+ .option("test.vector.dimension",
String.valueOf(DIMENSION))
+ .option("test.vector.metric", "cosine")
+ .build(),
+ false);
+ FileStoreTable table = getTable(identifier("cosine_table"));
+
+ float[][] vectors = {
+ {1.0f, 0.0f},
+ {0.707f, 0.707f},
+ {0.0f, 1.0f},
+ };
+
+ writeVectors(table, vectors);
+ buildAndCommitIndex(table, vectors);
+
+ // Query along x-axis: closest should be (1,0), then (0.707,0.707),
then (0,1)
+ float[] queryVector = {1.0f, 0.0f};
+ GlobalIndexResult result =
+ table.newVectorSearchBuilder()
+ .withVector(queryVector)
+ .withLimit(2)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+
+ assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan =
readBuilder.newScan().withGlobalIndexResult(result).plan();
+ List<Integer> ids = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan)) {
+ reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+ }
+
+ assertThat(ids).hasSize(2);
+ // Row 0 (1,0) has cosine similarity = 1.0, row 1 (0.707,0.707) ~ 0.707
+ assertThat(ids).contains(0, 1);
+ }
+
+ @Test
+ public void testVectorSearchEmptyResult() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+
+ // Write data but no index - should return empty result
+ float[][] vectors = {{1.0f, 0.0f}, {0.0f, 1.0f}};
+ writeVectors(table, vectors);
+
+ GlobalIndexResult result =
+ table.newVectorSearchBuilder()
+ .withVector(new float[] {1.0f, 0.0f})
+ .withLimit(1)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+
+ assertThat(result.results().isEmpty()).isTrue();
+ }
+
+ @Test
+ public void testVectorSearchTopKLimit() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+
+ float[][] vectors = new float[20][];
+ for (int i = 0; i < 20; i++) {
+ vectors[i] = new float[] {(float) Math.cos(i * 0.3), (float)
Math.sin(i * 0.3)};
+ }
+
+ writeVectors(table, vectors);
+ buildAndCommitIndex(table, vectors);
+
+ // Search with limit=5
+ GlobalIndexResult result =
+ table.newVectorSearchBuilder()
+ .withVector(new float[] {1.0f, 0.0f})
+ .withLimit(5)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan =
readBuilder.newScan().withGlobalIndexResult(result).plan();
+ List<Integer> ids = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan)) {
+ reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+ }
+
+ assertThat(ids.size()).isLessThanOrEqualTo(5);
+ }
+
+ @Test
+ public void testVectorSearchWithMultipleIndexFiles() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+
+ float[][] allVectors = {
+ {1.0f, 0.0f}, // row 0 - close to (1,0)
+ {0.95f, 0.1f}, // row 1 - close to (1,0)
+ {0.1f, 0.95f}, // row 2 - far from (1,0)
+ {0.98f, 0.05f}, // row 3 - close to (1,0)
+ {0.0f, 1.0f}, // row 4 - far from (1,0)
+ {0.05f, 0.98f} // row 5 - far from (1,0)
+ };
+
+ writeVectors(table, allVectors);
+
+ // Build two separate index files covering different row ranges
+ buildAndCommitMultipleIndexFiles(table, allVectors);
+
+ // Query vector close to (1.0, 0.0) - results should span across both
index files
+ float[] queryVector = {0.85f, 0.15f};
+ GlobalIndexResult result =
+ table.newVectorSearchBuilder()
+ .withVector(queryVector)
+ .withLimit(3)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+
+ assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+ assertThat(result.results().isEmpty()).isFalse();
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan =
readBuilder.newScan().withGlobalIndexResult(result).plan();
+ List<Integer> ids = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan)) {
+ reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+ }
+
+ assertThat(ids).isNotEmpty();
+ assertThat(ids.size()).isLessThanOrEqualTo(3);
+ // Row 0 (1.0,0.0), Row 1 (0.95,0.1), Row 3 (0.98,0.05) are closest to
query
+ // Row 0 is in the first index file, Row 3 is in the second index file
+ assertThat(ids).contains(0);
+ assertThat(ids).containsAnyOf(1, 3);
+ }
+
+ @Test
+ public void testVectorSearchWithPartitionFilter() throws Exception {
+ // Create a partitioned table
+ catalog.createTable(
+ identifier("partitioned_table"),
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("id", DataTypes.INT())
+ .column(VECTOR_FIELD_NAME, new
ArrayType(DataTypes.FLOAT()))
+ .partitionKeys("pt")
+ .option(CoreOptions.BUCKET.key(), "-1")
+ .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
+ .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(),
"true")
+ .option("test.vector.dimension",
String.valueOf(DIMENSION))
+ .option("test.vector.metric", "l2")
+ .build(),
+ false);
+ FileStoreTable table = getTable(identifier("partitioned_table"));
+
+ // Partition 1: vectors close to (1,0)
+ float[][] pt1Vectors = {{1.0f, 0.0f}, {0.95f, 0.1f}, {0.98f, 0.05f}};
+ // Partition 2: vectors close to (0,1)
+ float[][] pt2Vectors = {{0.0f, 1.0f}, {0.1f, 0.95f}, {0.05f, 0.98f}};
+
+ writePartitionedVectors(table, 1, pt1Vectors);
+ writePartitionedVectors(table, 2, pt2Vectors);
+
+ RowType partitionType = RowType.of(DataTypes.INT());
+ InternalRowSerializer serializer = new
InternalRowSerializer(partitionType);
+ BinaryRow partition1 = serializer.toBinaryRow(GenericRow.of(1)).copy();
+ BinaryRow partition2 = serializer.toBinaryRow(GenericRow.of(2)).copy();
+
+ // Build and commit indexes with non-overlapping row ranges
+ buildAndCommitPartitionedIndex(table, pt1Vectors, partition1, new
Range(0, 2));
+ buildAndCommitPartitionedIndex(table, pt2Vectors, partition2, new
Range(3, 5));
+
+ float[] queryVector = {0.9f, 0.1f};
+
+ // Search with partition filter for partition 1 only
+ PartitionPredicate partFilter1 =
+ PartitionPredicate.fromMultiple(
+ partitionType, Collections.singletonList(partition1));
+ GlobalIndexResult result1 =
+ table.newVectorSearchBuilder()
+ .withPartitionFilter(partFilter1)
+ .withVector(queryVector)
+ .withLimit(3)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+
+ assertThat(result1).isInstanceOf(ScoredGlobalIndexResult.class);
+ assertThat(result1.results().isEmpty()).isFalse();
+ // Row IDs should be within partition 1's row range [0, 2]
+ for (long rowId : result1.results()) {
+ assertThat(rowId).isBetween(0L, 2L);
+ }
+
+ // Search with partition filter for partition 2 only
+ PartitionPredicate partFilter2 =
+ PartitionPredicate.fromMultiple(
+ partitionType, Collections.singletonList(partition2));
+ GlobalIndexResult result2 =
+ table.newVectorSearchBuilder()
+ .withPartitionFilter(partFilter2)
+ .withVector(queryVector)
+ .withLimit(3)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+
+ assertThat(result2).isInstanceOf(ScoredGlobalIndexResult.class);
+ assertThat(result2.results().isEmpty()).isFalse();
+ // Row IDs should be within partition 2's row range [3, 5]
+ for (long rowId : result2.results()) {
+ assertThat(rowId).isBetween(3L, 5L);
+ }
+
+ // Search without partition filter - returns results from both
partitions
+ GlobalIndexResult resultAll =
+ table.newVectorSearchBuilder()
+ .withVector(queryVector)
+ .withLimit(6)
+ .withVectorColumn(VECTOR_FIELD_NAME)
+ .executeLocal();
+
+ assertThat(resultAll.results().getIntCardinality())
+ .isEqualTo(
+ result1.results().getIntCardinality()
+ + result2.results().getIntCardinality());
+ }
+
+ // ====================== Helper methods ======================
+
+ private void writeVectors(FileStoreTable table, float[][] vectors) throws
Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (int i = 0; i < vectors.length; i++) {
+ write.write(GenericRow.of(i, new GenericArray(vectors[i])));
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ private void buildAndCommitIndex(FileStoreTable table, float[][] vectors)
throws Exception {
+ Options options = table.coreOptions().toConfiguration();
+ DataField vectorField = table.rowType().getField(VECTOR_FIELD_NAME);
+
+ GlobalIndexSingletonWriter writer =
+ (GlobalIndexSingletonWriter)
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table,
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ vectorField,
+ options);
+ for (float[] vec : vectors) {
+ writer.write(vec);
+ }
+ List<ResultEntry> entries = writer.finish();
+
+ Range rowRange = new Range(0, vectors.length - 1);
+ List<IndexFileMeta> indexFiles =
+ GlobalIndexBuilderUtils.toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange,
+ vectorField.id(),
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ entries);
+
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+ CommitMessage message =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ dataIncrement,
+ CompactIncrement.emptyIncrement());
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(Collections.singletonList(message));
+ }
+ }
+
+ private void buildAndCommitMultipleIndexFiles(FileStoreTable table,
float[][] vectors)
+ throws Exception {
+ Options options = table.coreOptions().toConfiguration();
+ DataField vectorField = table.rowType().getField(VECTOR_FIELD_NAME);
+ int mid = vectors.length / 2;
+
+ // Build first index file covering rows [0, mid)
+ GlobalIndexSingletonWriter writer1 =
+ (GlobalIndexSingletonWriter)
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table,
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ vectorField,
+ options);
+ for (int i = 0; i < mid; i++) {
+ writer1.write(vectors[i]);
+ }
+ List<ResultEntry> entries1 = writer1.finish();
+ Range rowRange1 = new Range(0, mid - 1);
+ List<IndexFileMeta> indexFiles1 =
+ GlobalIndexBuilderUtils.toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange1,
+ vectorField.id(),
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ entries1);
+
+ // Build second index file covering rows [mid, end)
+ GlobalIndexSingletonWriter writer2 =
+ (GlobalIndexSingletonWriter)
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table,
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ vectorField,
+ options);
+ for (int i = mid; i < vectors.length; i++) {
+ writer2.write(vectors[i]);
+ }
+ List<ResultEntry> entries2 = writer2.finish();
+ Range rowRange2 = new Range(mid, vectors.length - 1);
+ List<IndexFileMeta> indexFiles2 =
+ GlobalIndexBuilderUtils.toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange2,
+ vectorField.id(),
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ entries2);
+
+ // Combine all index files and commit together
+ List<IndexFileMeta> allIndexFiles = new ArrayList<>();
+ allIndexFiles.addAll(indexFiles1);
+ allIndexFiles.addAll(indexFiles2);
+
+ DataIncrement dataIncrement =
DataIncrement.indexIncrement(allIndexFiles);
+ CommitMessage message =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ dataIncrement,
+ CompactIncrement.emptyIncrement());
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(Collections.singletonList(message));
+ }
+ }
+
+ private void writePartitionedVectors(FileStoreTable table, int partition,
float[][] vectors)
+ throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (int i = 0; i < vectors.length; i++) {
+ write.write(GenericRow.of(partition, i, new
GenericArray(vectors[i])));
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ private void buildAndCommitPartitionedIndex(
+ FileStoreTable table, float[][] vectors, BinaryRow partition,
Range rowRange)
+ throws Exception {
+ Options options = table.coreOptions().toConfiguration();
+ DataField vectorField = table.rowType().getField(VECTOR_FIELD_NAME);
+
+ GlobalIndexSingletonWriter writer =
+ (GlobalIndexSingletonWriter)
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table,
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ vectorField,
+ options);
+ for (float[] vec : vectors) {
+ writer.write(vec);
+ }
+ List<ResultEntry> entries = writer.finish();
+
+ List<IndexFileMeta> indexFiles =
+ GlobalIndexBuilderUtils.toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange,
+ vectorField.id(),
+ TestVectorGlobalIndexerFactory.IDENTIFIER,
+ entries);
+
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+ CommitMessage message =
+ new CommitMessageImpl(
+ partition, 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(Collections.singletonList(message));
+ }
+ }
+}
diff --git
a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
index 75e6b1a25e..d8492f0b13 100644
---
a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
+++
b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.VectorSearch;
@@ -82,7 +83,7 @@ public class LuminaVectorGlobalIndexReader implements
GlobalIndexReader {
}
@Override
- public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
+ public Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
try {
ensureLoaded();
return Optional.ofNullable(search(vectorSearch));
@@ -95,9 +96,9 @@ public class LuminaVectorGlobalIndexReader implements
GlobalIndexReader {
}
}
- private GlobalIndexResult search(VectorSearch vectorSearch) throws
IOException {
+ private ScoredGlobalIndexResult search(VectorSearch vectorSearch) throws
IOException {
validateSearchVector(vectorSearch.vector());
- float[] queryVector = ((float[]) vectorSearch.vector()).clone();
+ float[] queryVector = vectorSearch.vector().clone();
int limit = vectorSearch.limit();
LuminaVectorMetric indexMetric = indexMeta.metric();
diff --git
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexScanTest.java
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexScanTest.java
index 2e0696c9ff..52100fb2e1 100644
---
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexScanTest.java
+++
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexScanTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexResult;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.index.GlobalIndexMeta;
@@ -33,7 +34,6 @@ import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -45,6 +45,7 @@ import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.VectorSearchBuilder;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -130,9 +131,15 @@ public class LuminaVectorGlobalIndexScanTest {
commitIndex(indexFiles);
float[] queryVector = new float[] {0.85f, 0.15f};
- VectorSearch vectorSearch = new VectorSearch(queryVector, 3,
vectorFieldName);
- ReadBuilder readBuilder =
table.newReadBuilder().withVectorSearch(vectorSearch);
- TableScan scan = readBuilder.newScan();
+ VectorSearchBuilder vectorBuilder =
+ table.newVectorSearchBuilder()
+ .withVector(queryVector)
+ .withLimit(3)
+ .withVectorColumn(vectorFieldName);
+ GlobalIndexResult indexResult =
+
vectorBuilder.newVectorRead().read(vectorBuilder.newVectorScan().scan());
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan scan =
readBuilder.newScan().withGlobalIndexResult(indexResult);
List<Integer> ids = new ArrayList<>();
readBuilder
.newRead()
@@ -239,9 +246,15 @@ public class LuminaVectorGlobalIndexScanTest {
ipTable.newCommit(ipCommitUser).commit(1,
Collections.singletonList(message));
float[] queryVector = new float[] {1.0f, 0.0f};
- VectorSearch vectorSearch = new VectorSearch(queryVector, 3,
vectorFieldName);
- ReadBuilder readBuilder =
ipTable.newReadBuilder().withVectorSearch(vectorSearch);
- TableScan scan = readBuilder.newScan();
+ VectorSearchBuilder vectorBuilder =
+ ipTable.newVectorSearchBuilder()
+ .withVector(queryVector)
+ .withLimit(3)
+ .withVectorColumn(vectorFieldName);
+ GlobalIndexResult indexResult =
+
vectorBuilder.newVectorRead().read(vectorBuilder.newVectorScan().scan());
+ ReadBuilder readBuilder = ipTable.newReadBuilder();
+ TableScan scan =
readBuilder.newScan().withGlobalIndexResult(indexResult);
List<Integer> ids = new ArrayList<>();
readBuilder
.newRead()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 5ebab7d2fb..bd884d64b1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -18,6 +18,9 @@
package org.apache.paimon.spark
+import org.apache.paimon.globalindex.GlobalIndexResult
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.read.{BaseScan, PaimonSupportsRuntimeFiltering}
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
@@ -42,6 +45,7 @@ abstract class PaimonBaseScan(table: InnerTable)
protected def getInputSplits: Array[Split] = {
readBuilder
.newScan()
+ .withGlobalIndexResult(evalVectorSearch())
.asInstanceOf[InnerTableScan]
.withMetricRegistry(paimonMetricsRegistry)
.plan()
@@ -50,6 +54,26 @@ abstract class PaimonBaseScan(table: InnerTable)
.toArray
}
+ private def evalVectorSearch(): GlobalIndexResult = {
+ if (pushedVectorSearch.isEmpty) {
+ return null
+ }
+
+ val vectorSearch = pushedVectorSearch.get
+ val vectorBuilder = table
+ .newVectorSearchBuilder()
+ .withVector(vectorSearch.vector())
+ .withVectorColumn(vectorSearch.fieldName())
+ .withLimit(vectorSearch.limit())
+ if (pushedPartitionFilters.nonEmpty) {
+
vectorBuilder.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
+ }
+ if (pushedDataFilters.nonEmpty) {
+ vectorBuilder.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
+ }
+ vectorBuilder.newVectorRead().read(vectorBuilder.newVectorScan().scan())
+ }
+
override def toBatch: Batch = {
ensureNoFullScan()
super.toBatch
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
index 78cde8752e..5260ef8e09 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
@@ -111,7 +111,6 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
}
pushedLimit.foreach(_readBuilder.withLimit)
pushedTopN.foreach(_readBuilder.withTopN)
- pushedVectorSearch.foreach(_readBuilder.withVectorSearch)
_readBuilder.dropStats()
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BaseVectorSearchPushDownTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BaseVectorSearchPushDownTest.scala
deleted file mode 100644
index c283326cf3..0000000000
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BaseVectorSearchPushDownTest.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.sql
-
-import org.apache.paimon.spark.PaimonSparkTestBase
-
-import org.apache.spark.sql.streaming.StreamTest
-
-/** Tests for vector search table-valued function. */
-class BaseVectorSearchPushDownTest extends PaimonSparkTestBase with StreamTest
{
-
- test("vector_search table function basic syntax") {
- withTable("T") {
- spark.sql("""
- |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
- |TBLPROPERTIES (
- | 'bucket' = '-1',
- | 'row-tracking.enabled' = 'true',
- | 'data-evolution.enabled' = 'true')
- |""".stripMargin)
-
- // Insert data with known vectors
- spark.sql("""
- |INSERT INTO T VALUES
- |(1, array(1.0, 0.0, 0.0)),
- |(2, array(0.0, 1.0, 0.0)),
- |(3, array(0.0, 0.0, 1.0)),
- |(4, array(1.0, 1.0, 0.0)),
- |(5, array(1.0, 1.0, 1.0))
- |""".stripMargin)
-
- // Test vector_search table function syntax
- // Note: Without a global vector index, this will scan all rows
- val result = spark
- .sql("""
- |SELECT * FROM vector_search('T', 'v', array(1.0f, 0.0f, 0.0f),
3)
- |""".stripMargin)
- .collect()
-
- // Should return results (actual filtering depends on vector index)
- assert(result.nonEmpty)
-
- // Test invalid limit (negative)
- val ex1 = intercept[Exception] {
- spark
- .sql("""
- |SELECT * FROM vector_search('T', 'v', array(1.0f, 0.0f,
0.0f), -3)
- |""".stripMargin)
- .collect()
- }
- assert(ex1.getMessage.contains("Limit must be a positive integer"))
-
- // Test invalid limit (zero)
- val ex2 = intercept[Exception] {
- spark
- .sql("""
- |SELECT * FROM vector_search('T', 'v', array(1.0f, 0.0f,
0.0f), 0)
- |""".stripMargin)
- .collect()
- }
- assert(ex2.getMessage.contains("Limit must be a positive integer"))
-
- // Test missing parameters
- val ex3 = intercept[Exception] {
- spark
- .sql("""
- |SELECT * FROM vector_search('T', 'v', array(1.0f, 0.0f,
0.0f))
- |""".stripMargin)
- .collect()
- }
- assert(ex3.getMessage.contains("vector_search needs three parameters
after table_name"))
-
- // Test non-existent column
- val ex4 = intercept[Exception] {
- spark
- .sql("""
- |SELECT * FROM vector_search('T', 'non_existent_col',
array(1.0f, 0.0f, 0.0f), 3)
- |""".stripMargin)
- .collect()
- }
- assert(ex4.getMessage.nonEmpty)
- }
- }
-}