This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f5b9decbff [index] support lumina index in python (#7579)
f5b9decbff is described below
commit f5b9decbff14751b83fd029f8346928c951166af
Author: jerry <[email protected]>
AuthorDate: Tue Apr 7 21:27:49 2026 +0800
[index] support lumina index in python (#7579)
Add Lumina vector index read/search support to paimon-python.
Usage example:
```python
from pypaimon import CatalogFactory
catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
table = catalog.get_table('default.my_table')
# Step 1: Vector search — find top-5 nearest neighbors
builder = table.new_vector_search_builder()
builder.with_vector_column('embedding')
builder.with_query_vector([0.1, 0.2, 0.3, 0.4])
builder.with_limit(5)
result = builder.execute_local()
# Step 2: Read the matching rows
read_builder = table.new_read_builder()
scan = read_builder.new_scan().with_global_index_result(result)
plan = scan.plan()
arrow_table = read_builder.new_read().to_arrow(plan.splits())
print(arrow_table.to_pandas())
```
---
.github/workflows/paimon-python-checks.yml | 5 +
paimon-lumina/pom.xml | 16 ++
.../paimon/lumina/index/JavaPyLuminaE2ETest.java | 215 +++++++++++++++++++++
paimon-python/dev/requirements-dev.txt | 2 +
paimon-python/dev/run_mixed_tests.sh | 40 +++-
.../pypaimon/globalindex/global_index_scanner.py | 4 -
.../globalindex/lumina/__init__.py} | 18 +-
.../globalindex/lumina/lumina_index_meta.py | 75 +++++++
.../lumina/lumina_vector_global_index_reader.py | 201 +++++++++++++++++++
.../lumina/lumina_vector_index_options.py | 61 ++++++
paimon-python/pypaimon/table/file_store_table.py | 4 +
.../pypaimon/table/format/format_table.py | 3 +
.../pypaimon/table/iceberg/iceberg_table.py | 3 +
.../pypaimon/table/object/object_table.py | 5 +
.../pypaimon/table/source/vector_search_builder.py | 111 +++++++++++
.../pypaimon/table/source/vector_search_read.py | 117 +++++++++++
.../pypaimon/table/source/vector_search_scan.py | 95 +++++++++
.../table/source/vector_search_split.py} | 24 ++-
paimon-python/pypaimon/table/table.py | 5 +
.../pypaimon/tests/e2e/java_py_read_write_test.py | 31 +++
.../pypaimon/tests/lumina_vector_index_test.py | 165 ++++++++++++++++
paimon-python/setup.py | 3 +
22 files changed, 1180 insertions(+), 23 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index fde69953ee..5302d1267d 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -33,6 +33,7 @@ on:
env:
JDK_VERSION: 8
MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=30
-Dmaven.wagon.http.retryHandler.requestSentEnabled=true
+ LUMINA_DATA_VERSION: 0.1.0
concurrency:
@@ -127,10 +128,12 @@ jobs:
python -m pip install --upgrade pip==21.3.1
python --version
python -m pip install --no-cache-dir pyroaring
readerwriterlock==1.0.9 'fsspec==2021.10.1' 'cachetools==4.2.4'
'ossfs==2021.8.0' pyarrow==6.0.1 pandas==1.1.5 'polars==0.9.12'
'fastavro==1.4.7' zstandard==0.19.0 dataclasses==0.8.0 flake8 pytest
py4j==0.10.9.9 requests parameterized==0.8.1 2>&1 >/dev/null
+ python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION
}}' -i https://pypi.org/simple/
else
python -m pip install --upgrade pip
pip install torch --index-url https://download.pytorch.org/whl/cpu
python -m pip install pyroaring readerwriterlock==1.0.9
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0
py4j==0.10.9.9 requests parameterized==0.9.0
+ python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION
}}' -i https://pypi.org/simple/
if python -c "import sys; sys.exit(0 if sys.version_info >= (3,
11) else 1)"; then
python -m pip install vortex-data
fi
@@ -190,6 +193,7 @@ jobs:
python -m pip install --upgrade pip
pip install torch --index-url https://download.pytorch.org/whl/cpu
python -m pip install pyroaring readerwriterlock==1.0.9
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 flake8==4.0.1 pytest~=7.0
py4j==0.10.9.9 requests parameterized==0.9.0
+ python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION
}}' -i https://pypi.org/simple/
- name: Run lint-python.sh
shell: bash
run: |
@@ -288,6 +292,7 @@ jobs:
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0
duckdb==1.3.2 \
numpy==1.24.3 pandas==2.0.3 cramjam pytest~=7.0 py4j==0.10.9.9
requests \
parameterized==0.9.0 packaging
+ python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION }}'
-i https://pypi.org/simple/
- name: Test Ray version compatibility
run: |
cd paimon-python
diff --git a/paimon-lumina/pom.xml b/paimon-lumina/pom.xml
index 4ca2e1cf12..27bebee080 100644
--- a/paimon-lumina/pom.xml
+++ b/paimon-lumina/pom.xml
@@ -77,6 +77,22 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-format</artifactId>
diff --git
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/JavaPyLuminaE2ETest.java
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/JavaPyLuminaE2ETest.java
new file mode 100644
index 0000000000..dd0212bda9
--- /dev/null
+++
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/JavaPyLuminaE2ETest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.aliyun.lumina.Lumina;
+import org.aliyun.lumina.LuminaException;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ENABLED;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Mixed language E2E test for Java Lumina vector index building and Python
reading.
+ *
+ * <p>Java writes data and builds a Lumina vector index, then Python reads and
searches it.
+ */
+public class JavaPyLuminaE2ETest {
+
+ @BeforeAll
+ public static void checkNativeLibrary() {
+ if (!Lumina.isLibraryLoaded()) {
+ try {
+ Lumina.loadLibrary();
+ } catch (LuminaException e) {
+ assumeTrue(false, "Lumina native library not available: " +
e.getMessage());
+ }
+ }
+ }
+
+ java.nio.file.Path tempDir =
Paths.get("../paimon-python/pypaimon/tests/e2e").toAbsolutePath();
+
+ protected Path warehouse;
+
+ @BeforeEach
+ public void before() throws Exception {
+ if (!Files.exists(tempDir.resolve("warehouse"))) {
+ Files.createDirectories(tempDir.resolve("warehouse"));
+ }
+ warehouse = new Path("file://" + tempDir.resolve("warehouse"));
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testLuminaVectorIndexWrite() throws Exception {
+ String tableName = "test_lumina_vector";
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
+
+ int dimension = 4;
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), new ArrayType(new
FloatType())},
+ new String[] {"id", "embedding"});
+
+ Options options = new Options();
+ options.set(PATH, tablePath.toString());
+ options.set(ROW_TRACKING_ENABLED, true);
+ options.set(DATA_EVOLUTION_ENABLED, true);
+ options.set(GLOBAL_INDEX_ENABLED, true);
+ options.setString(LuminaVectorIndexOptions.DIMENSION.key(),
String.valueOf(dimension));
+ options.setString(LuminaVectorIndexOptions.DISTANCE_METRIC.key(),
"l2");
+ options.setString(LuminaVectorIndexOptions.ENCODING_TYPE.key(),
"rawf32");
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options.toMap(),
+ ""));
+
+ AppendOnlyFileStoreTable table =
+ new AppendOnlyFileStoreTable(
+ FileIOFinder.find(tablePath),
+ tablePath,
+ tableSchema,
+ CatalogEnvironment.empty());
+
+ // Test vectors: 6 vectors of dimension 4
+ float[][] vectors =
+ new float[][] {
+ new float[] {1.0f, 0.0f, 0.0f, 0.0f},
+ new float[] {0.9f, 0.1f, 0.0f, 0.0f},
+ new float[] {0.0f, 1.0f, 0.0f, 0.0f},
+ new float[] {0.0f, 0.0f, 1.0f, 0.0f},
+ new float[] {0.0f, 0.0f, 0.0f, 1.0f},
+ new float[] {0.95f, 0.05f, 0.0f, 0.0f}
+ };
+
+ // Write data rows
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (int i = 0; i < vectors.length; i++) {
+ write.write(GenericRow.of(i, new GenericArray(vectors[i])));
+ }
+ commit.commit(write.prepareCommit());
+ }
+
+ // Build Lumina vector index on "embedding" column
+ DataField embeddingField = table.rowType().getField("embedding");
+ Options indexOptions = table.coreOptions().toConfiguration();
+ LuminaVectorIndexOptions luminaOptions = new
LuminaVectorIndexOptions(indexOptions);
+
+ GlobalIndexSingletonWriter writer =
+ (GlobalIndexSingletonWriter)
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table,
+ LuminaVectorGlobalIndexerFactory.IDENTIFIER,
+ embeddingField,
+ indexOptions);
+
+ // Write vectors to index
+ for (float[] vec : vectors) {
+ writer.write(vec);
+ }
+
+ List<ResultEntry> entries = writer.finish();
+ assertThat(entries).hasSize(1);
+ assertThat(entries.get(0).rowCount()).isEqualTo(vectors.length);
+
+ Range rowRange = new Range(0, vectors.length - 1);
+ List<IndexFileMeta> indexFiles =
+ GlobalIndexBuilderUtils.toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange,
+ embeddingField.id(),
+ LuminaVectorGlobalIndexerFactory.IDENTIFIER,
+ entries);
+
+ // Commit the index
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+ CommitMessage message =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ dataIncrement,
+ CompactIncrement.emptyIncrement());
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(Collections.singletonList(message));
+ }
+
+ // Verify index was committed
+ List<org.apache.paimon.manifest.IndexManifestEntry> indexEntries =
+
table.indexManifestFileReader().read(table.latestSnapshot().get().indexManifest());
+ assertThat(indexEntries).hasSize(1);
+ assertThat(indexEntries.get(0).indexFile().indexType())
+ .isEqualTo(LuminaVectorGlobalIndexerFactory.IDENTIFIER);
+ }
+}
diff --git a/paimon-python/dev/requirements-dev.txt
b/paimon-python/dev/requirements-dev.txt
index 564b85b8c5..4ab78afd67 100644
--- a/paimon-python/dev/requirements-dev.txt
+++ b/paimon-python/dev/requirements-dev.txt
@@ -25,3 +25,5 @@ pytest~=7.0
ray>=2.10.0
requests
parameterized
+# Lumina vector search (optional, for lumina index tests)
+lumina-data>=0.1.0
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index 839a750780..f98bf32747 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -266,6 +266,30 @@ run_tantivy_fulltext_test() {
fi
}
+# Function to run Lumina vector index test (Java write index, Python read and
search)
+run_lumina_vector_test() {
+ echo -e "${YELLOW}=== Step 9: Running Lumina Vector Index Test (Java
Write, Python Read) ===${NC}"
+
+ cd "$PROJECT_ROOT"
+
+ echo "Running Maven test for
JavaPyLuminaE2ETest.testLuminaVectorIndexWrite..."
+ if mvn test
-Dtest=org.apache.paimon.lumina.index.JavaPyLuminaE2ETest#testLuminaVectorIndexWrite
-pl paimon-lumina -q -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java test completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java test failed${NC}"
+ return 1
+ fi
+ cd "$PAIMON_PYTHON_DIR"
+ echo "Running Python test for
JavaPyReadWriteTest.test_read_lumina_vector_index..."
+ if python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_lumina_vector_index
-v; then
+ echo -e "${GREEN}✓ Python test completed successfully${NC}"
+ return 0
+ else
+ echo -e "${RED}✗ Python test failed${NC}"
+ return 1
+ fi
+}
+
# Main execution
main() {
local java_write_result=0
@@ -276,6 +300,7 @@ main() {
local btree_index_result=0
local compressed_text_result=0
local tantivy_fulltext_result=0
+ local lumina_vector_result=0
# Detect Python version
PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null ||
echo "unknown")
@@ -351,6 +376,13 @@ main() {
echo ""
+ # Run Lumina vector index test (Java write, Python read)
+ if ! run_lumina_vector_test; then
+ lumina_vector_result=1
+ fi
+
+ echo ""
+
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
if [[ $java_write_result -eq 0 ]]; then
@@ -401,12 +433,18 @@ main() {
echo -e "${RED}✗ Tantivy Full-Text Index Test (Java Write, Python
Read): FAILED${NC}"
fi
+ if [[ $lumina_vector_result -eq 0 ]]; then
+ echo -e "${GREEN}✓ Lumina Vector Index Test (Java Write, Python Read):
PASSED${NC}"
+ else
+ echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read):
FAILED${NC}"
+ fi
+
echo ""
# Clean up warehouse directory after all tests
cleanup_warehouse
- if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 ]]; then
+ if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability
verified.${NC}"
return 0
else
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
index 515600d77f..19fba0cdeb 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scanner.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -35,13 +35,11 @@ class GlobalIndexScanner:
def __init__(
self,
- options: dict,
fields: list,
file_io,
index_path: str,
index_files: Collection['IndexFileMeta']
):
- self._options = options
self._evaluator = self._create_evaluator(fields, file_io, index_path,
index_files)
def _create_evaluator(self, fields, file_io, index_path, index_files):
@@ -89,7 +87,6 @@ class GlobalIndexScanner:
if len(index_files) == 0:
return None
return GlobalIndexScanner(
- options=table.table_schema.options,
fields=table.fields,
file_io=table.file_io,
index_path=table.path_factory().global_index_path_factory().index_path(),
@@ -122,7 +119,6 @@ class GlobalIndexScanner:
if len(scanned_index_files) == 0:
return None
return GlobalIndexScanner(
- options=table.table_schema.options,
fields=table.fields,
file_io=table.file_io,
index_path=table.path_factory().global_index_path_factory().index_path(),
diff --git a/paimon-python/dev/requirements-dev.txt
b/paimon-python/pypaimon/globalindex/lumina/__init__.py
similarity index 78%
copy from paimon-python/dev/requirements-dev.txt
copy to paimon-python/pypaimon/globalindex/lumina/__init__.py
index 564b85b8c5..26cd4367bc 100644
--- a/paimon-python/dev/requirements-dev.txt
+++ b/paimon-python/pypaimon/globalindex/lumina/__init__.py
@@ -15,13 +15,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-# Core dependencies for pypaimon are in requirements.txt
-# Test dependencies for pypaimon are as follows
-duckdb==1.3.2
-flake8==4.0.1
-pytest~=7.0
-# Ray: 2.48+ has no wheel for Python 3.8; use 2.10.0 on 3.8, 2.48.0 on 3.9+
-ray>=2.10.0
-requests
-parameterized
+from pypaimon.globalindex.lumina.lumina_vector_global_index_reader import (
+ LuminaVectorGlobalIndexReader,
+ LUMINA_VECTOR_ANN_IDENTIFIER,
+)
+
+__all__ = [
+ 'LuminaVectorGlobalIndexReader',
+ 'LUMINA_VECTOR_ANN_IDENTIFIER',
+]
diff --git a/paimon-python/pypaimon/globalindex/lumina/lumina_index_meta.py
b/paimon-python/pypaimon/globalindex/lumina/lumina_index_meta.py
new file mode 100644
index 0000000000..323925613e
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/lumina/lumina_index_meta.py
@@ -0,0 +1,75 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Lumina index metadata serialization.
+
+Serialized as a flat JSON ``Map<String, String>`` whose keys are native Lumina
+option keys (with the ``lumina.`` prefix stripped). This matches the format
+used by both paimon-lumina (Java) and paimon-cpp, so indexes built by any
+implementation can be read by all of them.
+"""
+
+import json
+
+
+class LuminaIndexMeta:
+ """Metadata for a Lumina vector index file."""
+
+ KEY_DIMENSION = "index.dimension"
+ KEY_DISTANCE_METRIC = "distance.metric"
+ KEY_INDEX_TYPE = "index.type"
+
+ def __init__(self, options):
+ self._options = dict(options)
+
+ @property
+ def options(self):
+ return self._options
+
+ @property
+ def dim(self):
+ return int(self._options[self.KEY_DIMENSION])
+
+ @property
+ def distance_metric(self):
+ return self._options[self.KEY_DISTANCE_METRIC]
+
+ @property
+ def metric(self):
+ from lumina_data import MetricType
+ return MetricType.from_lumina_name(self.distance_metric)
+
+ @property
+ def index_type(self):
+ return self._options.get(self.KEY_INDEX_TYPE, "diskann")
+
+ def serialize(self):
+ """Serialize to UTF-8 JSON bytes."""
+ return json.dumps(self._options).encode("utf-8")
+
+ @staticmethod
+ def deserialize(data):
+ """Deserialize from UTF-8 JSON bytes."""
+ options = json.loads(data.decode("utf-8"))
+ if LuminaIndexMeta.KEY_DIMENSION not in options:
+ raise ValueError(
+ "Missing required key: %s" % LuminaIndexMeta.KEY_DIMENSION)
+ if LuminaIndexMeta.KEY_DISTANCE_METRIC not in options:
+ raise ValueError(
+ "Missing required key: %s" %
LuminaIndexMeta.KEY_DISTANCE_METRIC)
+ return LuminaIndexMeta(options)
diff --git
a/paimon-python/pypaimon/globalindex/lumina/lumina_vector_global_index_reader.py
b/paimon-python/pypaimon/globalindex/lumina/lumina_vector_global_index_reader.py
new file mode 100644
index 0000000000..3e65f910fc
--- /dev/null
+++
b/paimon-python/pypaimon/globalindex/lumina/lumina_vector_global_index_reader.py
@@ -0,0 +1,201 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector global index reader using Lumina (via lumina-data SDK).
+
+Each shard has exactly one Lumina index file. This reader lazy-loads the
+index and performs vector similarity search using the lumina-data SDK.
+"""
+
+import os
+
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader
+from pypaimon.globalindex.vector_search_result import
DictBasedScoredIndexResult
+
+LUMINA_VECTOR_ANN_IDENTIFIER = "lumina-vector-ann"
+
+MIN_SEARCH_LIST_SIZE = 16
+
+
+def _ensure_search_list_size(search_options, top_k):
+ """Set diskann.search.list_size when not explicitly configured."""
+ if "diskann.search.list_size" not in search_options:
+ list_size = max(int(top_k * 1.5), MIN_SEARCH_LIST_SIZE)
+ search_options["diskann.search.list_size"] = str(list_size)
+
+
+class LuminaVectorGlobalIndexReader(GlobalIndexReader):
+ """Vector global index reader using Lumina."""
+
+ def __init__(self, file_io, index_path, io_metas, options=None):
+ assert len(io_metas) == 1, "Expected exactly one index file per shard"
+ self._file_io = file_io
+ self._index_path = index_path
+ self._io_meta = io_metas[0]
+ self._options = options or {}
+ self._searcher = None
+ self._index_meta = None
+ self._search_options = None
+ self._stream = None
+
+ def visit_vector_search(self, vector_search):
+ self._ensure_loaded()
+
+ from lumina_data import MetricType
+
+ query = vector_search.vector
+ # Flatten to a plain list of floats for search_list API
+ if hasattr(query, 'tolist'):
+ query_flat = list(query.flatten()) if hasattr(query, 'flatten')
else list(query)
+ else:
+ query_flat = list(query)
+ query_flat = [float(v) for v in query_flat]
+
+ expected_dim = self._index_meta.dim
+ if len(query_flat) != expected_dim:
+ raise ValueError(
+ "Query vector dimension mismatch: expected %d, got %d"
+ % (expected_dim, len(query_flat)))
+
+ limit = vector_search.limit
+ index_metric = self._index_meta.metric
+
+ count = self._searcher.get_count()
+ effective_k = min(limit, count)
+ if effective_k <= 0:
+ return None
+
+ include_row_ids = vector_search.include_row_ids
+
+ if include_row_ids is not None:
+ filter_id_list = list(include_row_ids)
+ if len(filter_id_list) == 0:
+ return None
+ effective_k = min(effective_k, len(filter_id_list))
+ search_opts = dict(self._search_options)
+ search_opts["search.thread_safe_filter"] = "true"
+ _ensure_search_list_size(search_opts, effective_k)
+ distances, labels = self._searcher.search_with_filter_list(
+ query_flat, 1, effective_k, filter_id_list, search_opts)
+ else:
+ search_opts = dict(self._search_options)
+ _ensure_search_list_size(search_opts, effective_k)
+ distances, labels = self._searcher.search_list(
+ query_flat, 1, effective_k, search_opts)
+
+ # Collect results with score conversion (same as Java collectResults)
+ SENTINEL = 0xFFFFFFFFFFFFFFFF
+ id_to_scores = {}
+ for i in range(effective_k):
+ row_id = labels[i]
+ if row_id == SENTINEL:
+ continue
+ score = MetricType.convert_distance_to_score(
+ float(distances[i]), index_metric)
+ id_to_scores[int(row_id)] = score
+
+ return DictBasedScoredIndexResult(id_to_scores)
+
+ def _ensure_loaded(self):
+ if self._searcher is not None:
+ return
+
+ from lumina_data import LuminaSearcher
+ from pypaimon.globalindex.lumina.lumina_index_meta import
LuminaIndexMeta
+ from pypaimon.globalindex.lumina.lumina_vector_index_options import (
+ strip_lumina_options,
+ )
+
+ self._index_meta = LuminaIndexMeta.deserialize(self._io_meta.metadata)
+ # Merge paimon table options (prefix-stripped) with index metadata
options;
+ # index metadata takes precedence as it reflects the actual built
index.
+ searcher_options = strip_lumina_options(self._options)
+ searcher_options.update(self._index_meta.options)
+ self._search_options = searcher_options
+
+ file_path = os.path.join(self._index_path, self._io_meta.file_name)
+ stream = self._file_io.new_input_stream(file_path)
+ try:
+ self._searcher = LuminaSearcher(searcher_options)
+ self._searcher.open_stream(stream, self._io_meta.file_size)
+ self._stream = stream
+ except Exception:
+ stream.close()
+ raise
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+ return False
+
+ def close(self):
+ if self._searcher is not None:
+ self._searcher.close()
+ self._searcher = None
+ if self._stream is not None:
+ self._stream.close()
+ self._stream = None
+
+ # =================== unsupported =====================
+
+ def visit_equal(self, field_ref, literal):
+ return None
+
+ def visit_not_equal(self, field_ref, literal):
+ return None
+
+ def visit_less_than(self, field_ref, literal):
+ return None
+
+ def visit_less_or_equal(self, field_ref, literal):
+ return None
+
+ def visit_greater_than(self, field_ref, literal):
+ return None
+
+ def visit_greater_or_equal(self, field_ref, literal):
+ return None
+
+ def visit_is_null(self, field_ref):
+ return None
+
+ def visit_is_not_null(self, field_ref):
+ return None
+
+ def visit_in(self, field_ref, literals):
+ return None
+
+ def visit_not_in(self, field_ref, literals):
+ return None
+
+ def visit_starts_with(self, field_ref, literal):
+ return None
+
+ def visit_ends_with(self, field_ref, literal):
+ return None
+
+ def visit_contains(self, field_ref, literal):
+ return None
+
+ def visit_like(self, field_ref, literal):
+ return None
+
+ def visit_between(self, field_ref, min_v, max_v):
+ return None
diff --git
a/paimon-python/pypaimon/globalindex/lumina/lumina_vector_index_options.py
b/paimon-python/pypaimon/globalindex/lumina/lumina_vector_index_options.py
new file mode 100644
index 0000000000..4e233e90c1
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/lumina/lumina_vector_index_options.py
@@ -0,0 +1,61 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Lumina vector index options for Paimon tables (read/search only).
+
+Strips the ``lumina.`` prefix from Paimon table properties, populates
+search-related defaults, and passes them to the lumina-data SDK searcher.
+"""
+
+LUMINA_PREFIX = "lumina."
+
+# Search-related options with defaults: (paimon_key, default_value)
+_SEARCH_OPTIONS = [
+ ("lumina.diskann.search.beam_width", "4"),
+ ("lumina.search.parallel_number", "5"),
+]
+
+
+def strip_lumina_options(paimon_options):
+ """Extract lumina.* keys from Paimon table options, strip the prefix,
+ and populate search-related defaults.
+
+ Args:
+ paimon_options: dict of table properties, e.g.
+ {"lumina.index.dimension": "128", "lumina.distance.metric": "l2",
...}
+ Non-lumina keys are ignored.
+
+ Returns:
+ dict with prefix stripped and search defaults populated, e.g.
+ {"index.dimension": "128", "distance.metric": "l2",
+ "diskann.search.beam_width": "4", "search.parallel_number": "5",
...}
+ """
+ result = {}
+
+ # 1. Populate search-related defaults
+ for paimon_key, default_value in _SEARCH_OPTIONS:
+ native_key = paimon_key[len(LUMINA_PREFIX):]
+ result[native_key] = default_value
+
+ # 2. Overlay user-specified lumina.* options (overrides defaults)
+ for key, value in paimon_options.items():
+ if key.startswith(LUMINA_PREFIX):
+ native_key = key[len(LUMINA_PREFIX):]
+ result[native_key] = str(value)
+
+ return result
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 61a1455f36..4dadb234db 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -373,6 +373,10 @@ class FileStoreTable(Table):
from pypaimon.table.source.full_text_search_builder import
FullTextSearchBuilderImpl
return FullTextSearchBuilderImpl(self)
+ def new_vector_search_builder(self) -> 'VectorSearchBuilder':
+ from pypaimon.table.source.vector_search_builder import
VectorSearchBuilderImpl
+ return VectorSearchBuilderImpl(self)
+
def create_row_key_extractor(self) -> RowKeyExtractor:
bucket_mode = self.bucket_mode()
if bucket_mode == BucketMode.HASH_FIXED:
diff --git a/paimon-python/pypaimon/table/format/format_table.py
b/paimon-python/pypaimon/table/format/format_table.py
index 41767e1da8..649aee4256 100644
--- a/paimon-python/pypaimon/table/format/format_table.py
+++ b/paimon-python/pypaimon/table/format/format_table.py
@@ -105,3 +105,6 @@ class FormatTable(Table):
def new_full_text_search_builder(self):
raise NotImplementedError("Format table does not support full text
search.")
+
+ def new_vector_search_builder(self):
+ raise NotImplementedError("Format table does not support vector
search.")
diff --git a/paimon-python/pypaimon/table/iceberg/iceberg_table.py
b/paimon-python/pypaimon/table/iceberg/iceberg_table.py
index 8e0ddfa07b..abc178bdb0 100644
--- a/paimon-python/pypaimon/table/iceberg/iceberg_table.py
+++ b/paimon-python/pypaimon/table/iceberg/iceberg_table.py
@@ -110,3 +110,6 @@ class IcebergTable(Table):
def new_full_text_search_builder(self):
raise NotImplementedError("IcebergTable does not support full text
search.")
+
+ def new_vector_search_builder(self):
+ raise NotImplementedError("IcebergTable does not support vector
search.")
diff --git a/paimon-python/pypaimon/table/object/object_table.py
b/paimon-python/pypaimon/table/object/object_table.py
index df6d6f2f84..935d68adea 100644
--- a/paimon-python/pypaimon/table/object/object_table.py
+++ b/paimon-python/pypaimon/table/object/object_table.py
@@ -106,3 +106,8 @@ class ObjectTable(Table):
raise NotImplementedError(
"ObjectTable is read-only and does not support full text search."
)
+
+ def new_vector_search_builder(self):
+ raise NotImplementedError(
+ "ObjectTable is read-only and does not support vector search."
+ )
diff --git a/paimon-python/pypaimon/table/source/vector_search_builder.py
b/paimon-python/pypaimon/table/source/vector_search_builder.py
new file mode 100644
index 0000000000..95f3cdfc67
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/vector_search_builder.py
@@ -0,0 +1,111 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Builder to build vector search."""
+
+from abc import ABC, abstractmethod
+
+from pypaimon.table.source.vector_search_read import VectorSearchReadImpl
+from pypaimon.table.source.vector_search_scan import VectorSearchScanImpl
+
+
+class VectorSearchBuilder(ABC):
+ """Builder to build vector search."""
+
+ @abstractmethod
+ def with_limit(self, limit):
+ # type: (int) -> VectorSearchBuilder
+ """The top k results to return."""
+ pass
+
+ @abstractmethod
+ def with_vector_column(self, name):
+ # type: (str) -> VectorSearchBuilder
+ """The vector column to search."""
+ pass
+
+ @abstractmethod
+ def with_query_vector(self, vector):
+ # type: (list) -> VectorSearchBuilder
+ """The query vector (list of floats)."""
+ pass
+
+ @abstractmethod
+ def new_vector_search_scan(self):
+ # type: () -> VectorSearchScan
+ """Create vector search scan to scan index files."""
+ pass
+
+ @abstractmethod
+ def new_vector_search_read(self):
+ # type: () -> VectorSearchRead
+ """Create vector search read to read index files."""
+ pass
+
+ def execute_local(self):
+ # type: () -> GlobalIndexResult
+ """Execute vector search locally."""
+ return self.new_vector_search_read().read_plan(
+ self.new_vector_search_scan().scan()
+ )
+
+
+class VectorSearchBuilderImpl(VectorSearchBuilder):
+ """Implementation for VectorSearchBuilder."""
+
+ def __init__(self, table):
+ self._table = table
+ self._limit = 0
+ self._vector_column = None
+ self._query_vector = None
+
+ def with_limit(self, limit):
+ # type: (int) -> VectorSearchBuilder
+ self._limit = limit
+ return self
+
+ def with_vector_column(self, name):
+ # type: (str) -> VectorSearchBuilder
+ field_dict = {f.name: f for f in self._table.fields}
+ if name not in field_dict:
+ raise ValueError("Vector column '%s' not found in table schema" %
name)
+ self._vector_column = field_dict[name]
+ return self
+
+ def with_query_vector(self, vector):
+ # type: (list) -> VectorSearchBuilder
+ self._query_vector = vector
+ return self
+
+ def new_vector_search_scan(self):
+ # type: () -> VectorSearchScan
+ if self._vector_column is None:
+ raise ValueError("Vector column must be set via
with_vector_column()")
+ return VectorSearchScanImpl(self._table, self._vector_column)
+
+ def new_vector_search_read(self):
+ # type: () -> VectorSearchRead
+ if self._limit <= 0:
+ raise ValueError("Limit must be positive, set via with_limit()")
+ if self._vector_column is None:
+ raise ValueError("Vector column must be set via
with_vector_column()")
+ if self._query_vector is None:
+ raise ValueError("Query vector must be set via
with_query_vector()")
+ return VectorSearchReadImpl(
+ self._table, self._limit, self._vector_column, self._query_vector
+ )
diff --git a/paimon-python/pypaimon/table/source/vector_search_read.py
b/paimon-python/pypaimon/table/source/vector_search_read.py
new file mode 100644
index 0000000000..4c8aa111da
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/vector_search_read.py
@@ -0,0 +1,117 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector search read to read index files."""
+
+from abc import ABC, abstractmethod
+
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.offset_global_index_reader import
OffsetGlobalIndexReader
+from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.globalindex.vector_search_result import ScoredGlobalIndexResult
+
+
+class VectorSearchRead(ABC):
+ """Vector search read to read index files."""
+
+ def read_plan(self, plan):
+ # type: (VectorSearchScanPlan) -> GlobalIndexResult
+ return self.read(plan.splits())
+
+ @abstractmethod
+ def read(self, splits):
+ # type: (List[VectorSearchSplit]) -> GlobalIndexResult
+ pass
+
+
+class VectorSearchReadImpl(VectorSearchRead):
+ """Implementation for VectorSearchRead."""
+
+ def __init__(self, table, limit, vector_column, query_vector):
+ self._table = table
+ self._limit = limit
+ self._vector_column = vector_column
+ self._query_vector = query_vector
+
+ def read(self, splits):
+ # type: (List[VectorSearchSplit]) -> GlobalIndexResult
+ if not splits:
+ return GlobalIndexResult.create_empty()
+
+ result = ScoredGlobalIndexResult.create_empty()
+ for split in splits:
+ split_result = self._eval(
+ split.row_range_start, split.row_range_end,
+ split.vector_index_files
+ )
+ if split_result is not None:
+ result = result.or_(split_result)
+
+ return result.top_k(self._limit)
+
+ def _eval(self, row_range_start, row_range_end, vector_index_files):
+ # type: (int, int, list) -> Optional[ScoredGlobalIndexResult]
+ if not vector_index_files:
+ return None
+ index_io_meta_list = []
+ for index_file in vector_index_files:
+ meta = index_file.global_index_meta
+ assert meta is not None
+ index_io_meta_list.append(
+ GlobalIndexIOMeta(
+ file_name=index_file.file_name,
+ file_size=index_file.file_size,
+ metadata=meta.index_meta
+ )
+ )
+
+ index_type = vector_index_files[0].index_type
+ index_path =
self._table.path_factory().global_index_path_factory().index_path()
+ file_io = self._table.file_io
+ options = self._table.table_schema.options
+
+ reader = _create_vector_reader(
+ index_type, file_io, index_path,
+ index_io_meta_list, options
+ )
+
+ vector_search = VectorSearch(
+ vector=self._query_vector,
+ limit=self._limit,
+ field_name=self._vector_column.name
+ )
+
+ try:
+ offset_reader = OffsetGlobalIndexReader(reader, row_range_start,
row_range_end)
+ return offset_reader.visit_vector_search(vector_search)
+ finally:
+ reader.close()
+
+
+def _create_vector_reader(index_type, file_io, index_path, index_io_meta_list,
options=None):
+ """Create a global index reader for vector search."""
+ from pypaimon.globalindex.lumina.lumina_vector_global_index_reader import (
+ LUMINA_VECTOR_ANN_IDENTIFIER,
+ LuminaVectorGlobalIndexReader,
+ )
+ if index_type == LUMINA_VECTOR_ANN_IDENTIFIER:
+ return LuminaVectorGlobalIndexReader(
+ file_io, index_path, index_io_meta_list, options
+ )
+ raise ValueError("Unsupported vector index type: '%s'" % index_type)
diff --git a/paimon-python/pypaimon/table/source/vector_search_scan.py
b/paimon-python/pypaimon/table/source/vector_search_scan.py
new file mode 100644
index 0000000000..450b334b13
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/vector_search_scan.py
@@ -0,0 +1,95 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector search scan to scan index files."""
+
+from abc import ABC, abstractmethod
+from collections import defaultdict
+
+from pypaimon.table.source.vector_search_split import VectorSearchSplit
+from pypaimon.utils.range import Range
+
+
+class VectorSearchScanPlan:
+ """Plan of vector search scan."""
+
+ def __init__(self, splits):
+ # type: (List[VectorSearchSplit]) -> None
+ self._splits = splits
+
+ def splits(self):
+ # type: () -> List[VectorSearchSplit]
+ return self._splits
+
+
+class VectorSearchScan(ABC):
+ """Vector search scan to scan index files."""
+
+ @abstractmethod
+ def scan(self):
+ # type: () -> VectorSearchScanPlan
+ pass
+
+
+class VectorSearchScanImpl(VectorSearchScan):
+ """Implementation for VectorSearchScan."""
+
+ def __init__(self, table, vector_column):
+ self._table = table
+ self._vector_column = vector_column
+
+ def scan(self):
+ # type: () -> VectorSearchScanPlan
+ from pypaimon.index.index_file_handler import IndexFileHandler
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+ vector_column = self._vector_column
+
+ from pypaimon.snapshot.time_travel_util import TimeTravelUtil
+ from pypaimon.common.options.options import Options
+ snapshot = TimeTravelUtil.try_travel_to_snapshot(
+ Options(self._table.table_schema.options),
+ self._table.tag_manager()
+ )
+ if snapshot is None:
+ snapshot = SnapshotManager(self._table).get_latest_snapshot()
+
+ index_file_handler = IndexFileHandler(table=self._table)
+
+ def index_file_filter(entry):
+ global_index_meta = entry.index_file.global_index_meta
+ if global_index_meta is None:
+ return False
+ return vector_column.id == global_index_meta.index_field_id
+
+ entries = index_file_handler.scan(snapshot, index_file_filter)
+ all_index_files = [entry.index_file for entry in entries]
+
+ # Group by (rowRangeStart, rowRangeEnd)
+ by_range = defaultdict(list)
+ for index_file in all_index_files:
+ meta = index_file.global_index_meta
+ assert meta is not None
+ range_key = Range(meta.row_range_start, meta.row_range_end)
+ by_range[range_key].append(index_file)
+
+ splits = []
+ for range_key, files in by_range.items():
+ splits.append(VectorSearchSplit(range_key.from_, range_key.to,
files))
+
+ return VectorSearchScanPlan(splits)
diff --git a/paimon-python/dev/requirements-dev.txt
b/paimon-python/pypaimon/table/source/vector_search_split.py
similarity index 75%
copy from paimon-python/dev/requirements-dev.txt
copy to paimon-python/pypaimon/table/source/vector_search_split.py
index 564b85b8c5..80ea8d5e90 100644
--- a/paimon-python/dev/requirements-dev.txt
+++ b/paimon-python/pypaimon/table/source/vector_search_split.py
@@ -15,13 +15,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-# Core dependencies for pypaimon are in requirements.txt
-# Test dependencies for pypaimon are as follows
-duckdb==1.3.2
-flake8==4.0.1
-pytest~=7.0
-# Ray: 2.48+ has no wheel for Python 3.8; use 2.10.0 on 3.8, 2.48.0 on 3.9+
-ray>=2.10.0
-requests
-parameterized
+"""Split of vector search."""
+
+from dataclasses import dataclass
+from typing import List
+
+from pypaimon.index.index_file_meta import IndexFileMeta
+
+
+@dataclass
+class VectorSearchSplit:
+ """Split of vector search."""
+
+ row_range_start: int
+ row_range_end: int
+ vector_index_files: List[IndexFileMeta]
diff --git a/paimon-python/pypaimon/table/table.py
b/paimon-python/pypaimon/table/table.py
index 68ad3a9d13..89dde94454 100644
--- a/paimon-python/pypaimon/table/table.py
+++ b/paimon-python/pypaimon/table/table.py
@@ -21,6 +21,7 @@ from abc import ABC, abstractmethod
from pypaimon.read.read_builder import ReadBuilder
from pypaimon.read.stream_read_builder import StreamReadBuilder
from pypaimon.table.source.full_text_search_builder import
FullTextSearchBuilder
+from pypaimon.table.source.vector_search_builder import VectorSearchBuilder
from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
@@ -46,3 +47,7 @@ class Table(ABC):
@abstractmethod
def new_full_text_search_builder(self) -> FullTextSearchBuilder:
"""Returns a new full-text search builder."""
+
+ @abstractmethod
+ def new_vector_search_builder(self) -> VectorSearchBuilder:
+ """Returns a new vector search builder."""
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 58805983c5..5cc7d54925 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -571,3 +571,34 @@ class JavaPyReadWriteTest(unittest.TestCase):
ids3 = pa_table3.column('id').to_pylist()
self.assertIn(1, ids3)
self.assertIn(3, ids3)
+
+ def test_read_lumina_vector_index(self):
+ """Test reading a Lumina vector index built by Java."""
+ table = self.catalog.get_table('default.test_lumina_vector')
+
+ # Use VectorSearchBuilder to search
+ # Java wrote 6 vectors: [1,0,0,0], [0.9,0.1,0,0], [0,1,0,0],
+ # [0,0,1,0], [0,0,0,1], [0.95,0.05,0,0]
+ # Query with [1,0,0,0] - nearest by L2 should be row 0, 5, 1
+ builder = table.new_vector_search_builder()
+ builder.with_vector_column('embedding')
+ builder.with_query_vector([1.0, 0.0, 0.0, 0.0])
+ builder.with_limit(3)
+
+ result = builder.execute_local()
+ row_ids = sorted(list(result.results()))
+ print(f"Lumina vector search for [1,0,0,0]: row_ids={row_ids}")
+ self.assertIn(0, row_ids) # exact match
+ self.assertEqual(len(row_ids), 3)
+
+ # Read matching rows using withGlobalIndexResult
+ read_builder = table.new_read_builder()
+ scan = read_builder.new_scan().with_global_index_result(result)
+ plan = scan.plan()
+ table_read = read_builder.new_read()
+ pa_table = table_read.to_arrow(plan.splits())
+ pa_table = table_sort_by(pa_table, 'id')
+ self.assertEqual(pa_table.num_rows, 3)
+ ids = pa_table.column('id').to_pylist()
+ print(f"Lumina vector search matched rows: ids={ids}")
+ self.assertIn(0, ids)
diff --git a/paimon-python/pypaimon/tests/lumina_vector_index_test.py
b/paimon-python/pypaimon/tests/lumina_vector_index_test.py
new file mode 100644
index 0000000000..89e60e5d9f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/lumina_vector_index_test.py
@@ -0,0 +1,165 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import ctypes
+import os
+import random
+import shutil
+import tempfile
+import unittest
+
+from lumina_data import LuminaBuilder
+
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.lumina.lumina_index_meta import LuminaIndexMeta
+from pypaimon.globalindex.lumina.lumina_vector_global_index_reader import (
+ LuminaVectorGlobalIndexReader,
+)
+from pypaimon.globalindex.lumina.lumina_vector_index_options import (
+ strip_lumina_options,
+)
+from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.utils.roaring_bitmap import RoaringBitmap64
+
+
+def _make_vectors(n, dim, seed=42):
+ random.seed(seed)
+ data = [random.gauss(0, 1) for _ in range(n * dim)]
+ vectors = (ctypes.c_float * (n * dim))(*data)
+ ids = (ctypes.c_uint64 * n)(*range(n))
+ return vectors, ids, data
+
+
+class _SimpleFileIO(object):
+ def new_input_stream(self, path):
+ return open(path, 'rb')
+
+
+class LuminaVectorIndexTest(unittest.TestCase):
+
+ def test_build_and_read(self):
+ """Build a DiskANN index and read via LuminaVectorGlobalIndexReader."""
+ dim, n = 4, 100
+
+ # Paimon table options (with lumina. prefix)
+ paimon_options = {
+ "lumina.index.dimension": str(dim),
+ "lumina.index.type": "diskann",
+ "lumina.distance.metric": "l2",
+ "lumina.encoding.type": "rawf32",
+ "lumina.diskann.build.ef_construction": "64",
+ "lumina.diskann.build.neighbor_count": "32",
+ "lumina.diskann.build.thread_count": "2",
+ }
+
+ build_options = strip_lumina_options(paimon_options)
+ vectors, ids, raw = _make_vectors(n, dim, seed=777)
+
+ tmp_dir = tempfile.mkdtemp(prefix="paimon_lumina_test_")
+ file_name = "lumina-0.index"
+ index_file = os.path.join(tmp_dir, file_name)
+
+ try:
+ with LuminaBuilder(build_options) as builder:
+ builder.pretrain(vectors, n, dim)
+ builder.insert(vectors, ids, n, dim)
+ builder.dump(index_file)
+
+ # Serialize metadata (same as Java LuminaIndexMeta)
+ meta = LuminaIndexMeta(build_options)
+ io_meta = GlobalIndexIOMeta(
+ file_name=file_name,
+ file_size=os.path.getsize(index_file),
+ metadata=meta.serialize(),
+ )
+
+ # Reader receives paimon_options (with lumina. prefix)
+ with LuminaVectorGlobalIndexReader(
+ file_io=_SimpleFileIO(),
+ index_path=tmp_dir,
+ io_metas=[io_meta],
+ options=paimon_options,
+ ) as reader:
+ vs = VectorSearch(vector=raw[:dim], limit=5,
field_name="embedding")
+ result = reader.visit_vector_search(vs)
+
+ self.assertIsNotNone(result)
+ row_ids = result.results()
+ self.assertGreater(row_ids.cardinality(), 0)
+ self.assertIn(0, row_ids)
+ self.assertIsNotNone(result.score_getter()(0))
+ finally:
+ shutil.rmtree(tmp_dir, ignore_errors=True)
+
+ def test_filtered_search(self):
+ """Test filtered vector search with include_row_ids."""
+ dim, n = 4, 100
+
+ paimon_options = {
+ "lumina.index.dimension": str(dim),
+ "lumina.index.type": "diskann",
+ "lumina.distance.metric": "l2",
+ "lumina.encoding.type": "rawf32",
+ "lumina.diskann.build.ef_construction": "64",
+ "lumina.diskann.build.neighbor_count": "32",
+ "lumina.diskann.build.thread_count": "2",
+ }
+
+ build_options = strip_lumina_options(paimon_options)
+ vectors, ids, raw = _make_vectors(n, dim, seed=99)
+
+ tmp_dir = tempfile.mkdtemp(prefix="paimon_lumina_test_")
+ file_name = "lumina-filter-0.index"
+ index_file = os.path.join(tmp_dir, file_name)
+
+ try:
+ with LuminaBuilder(build_options) as builder:
+ builder.pretrain(vectors, n, dim)
+ builder.insert(vectors, ids, n, dim)
+ builder.dump(index_file)
+
+ meta = LuminaIndexMeta(build_options)
+ io_meta = GlobalIndexIOMeta(
+ file_name=file_name,
+ file_size=os.path.getsize(index_file),
+ metadata=meta.serialize(),
+ )
+ reader = LuminaVectorGlobalIndexReader(
+ file_io=_SimpleFileIO(),
+ index_path=tmp_dir,
+ io_metas=[io_meta],
+ options=paimon_options,
+ )
+
+ # Only search even IDs
+ include_ids = RoaringBitmap64()
+ for i in range(0, n, 2):
+ include_ids.add(i)
+
+ vs = VectorSearch(
+ vector=raw[:dim], limit=3, field_name="embedding",
+ include_row_ids=include_ids,
+ )
+ result = reader.visit_vector_search(vs)
+
+ self.assertIsNotNone(result)
+ for row_id in result.results():
+ self.assertEqual(row_id % 2, 0)
+ reader.close()
+ finally:
+ shutil.rmtree(tmp_dir, ignore_errors=True)
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 3b0b6833f4..6e0726958f 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -70,6 +70,9 @@ setup(
'pylance>=0.20,<1; python_version>="3.9"',
'pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"'
],
+ 'lumina': [
+ 'lumina-data>=0.1.0'
+ ],
'sql': [
'pypaimon-rust; python_version>="3.10"',
'datafusion>=52; python_version>="3.10"',