This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f5b9decbff [index] support lumina index in python (#7579)
f5b9decbff is described below

commit f5b9decbff14751b83fd029f8346928c951166af
Author: jerry <[email protected]>
AuthorDate: Tue Apr 7 21:27:49 2026 +0800

    [index] support lumina index in python (#7579)
    
    Add Lumina vector index read/search support to paimon-python.
    Usage example:
    ```python
      from pypaimon import CatalogFactory
    
      catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
      table = catalog.get_table('default.my_table')
    
      # Step 1: Vector search — find top-5 nearest neighbors
      builder = table.new_vector_search_builder()
      builder.with_vector_column('embedding')
      builder.with_query_vector([0.1, 0.2, 0.3, 0.4])
      builder.with_limit(5)
      result = builder.execute_local()
    
      # Step 2: Read the matching rows
      read_builder = table.new_read_builder()
      scan = read_builder.new_scan().with_global_index_result(result)
      plan = scan.plan()
      arrow_table = read_builder.new_read().to_arrow(plan.splits())
      print(arrow_table.to_pandas())
    ```
---
 .github/workflows/paimon-python-checks.yml         |   5 +
 paimon-lumina/pom.xml                              |  16 ++
 .../paimon/lumina/index/JavaPyLuminaE2ETest.java   | 215 +++++++++++++++++++++
 paimon-python/dev/requirements-dev.txt             |   2 +
 paimon-python/dev/run_mixed_tests.sh               |  40 +++-
 .../pypaimon/globalindex/global_index_scanner.py   |   4 -
 .../globalindex/lumina/__init__.py}                |  18 +-
 .../globalindex/lumina/lumina_index_meta.py        |  75 +++++++
 .../lumina/lumina_vector_global_index_reader.py    | 201 +++++++++++++++++++
 .../lumina/lumina_vector_index_options.py          |  61 ++++++
 paimon-python/pypaimon/table/file_store_table.py   |   4 +
 .../pypaimon/table/format/format_table.py          |   3 +
 .../pypaimon/table/iceberg/iceberg_table.py        |   3 +
 .../pypaimon/table/object/object_table.py          |   5 +
 .../pypaimon/table/source/vector_search_builder.py | 111 +++++++++++
 .../pypaimon/table/source/vector_search_read.py    | 117 +++++++++++
 .../pypaimon/table/source/vector_search_scan.py    |  95 +++++++++
 .../table/source/vector_search_split.py}           |  24 ++-
 paimon-python/pypaimon/table/table.py              |   5 +
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |  31 +++
 .../pypaimon/tests/lumina_vector_index_test.py     | 165 ++++++++++++++++
 paimon-python/setup.py                             |   3 +
 22 files changed, 1180 insertions(+), 23 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index fde69953ee..5302d1267d 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -33,6 +33,7 @@ on:
 env:
   JDK_VERSION: 8
   MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=30 
-Dmaven.wagon.http.retryHandler.requestSentEnabled=true
+  LUMINA_DATA_VERSION: 0.1.0
 
 
 concurrency:
@@ -127,10 +128,12 @@ jobs:
             python -m pip install --upgrade pip==21.3.1
             python --version
             python -m pip install --no-cache-dir pyroaring 
readerwriterlock==1.0.9 'fsspec==2021.10.1' 'cachetools==4.2.4' 
'ossfs==2021.8.0' pyarrow==6.0.1 pandas==1.1.5 'polars==0.9.12' 
'fastavro==1.4.7' zstandard==0.19.0 dataclasses==0.8.0 flake8 pytest 
py4j==0.10.9.9 requests parameterized==0.8.1 2>&1 >/dev/null
+            python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION 
}}' -i https://pypi.org/simple/
           else
             python -m pip install --upgrade pip
             pip install torch --index-url https://download.pytorch.org/whl/cpu
             python -m pip install pyroaring readerwriterlock==1.0.9 
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0 
py4j==0.10.9.9 requests parameterized==0.9.0
+            python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION 
}}' -i https://pypi.org/simple/
             if python -c "import sys; sys.exit(0 if sys.version_info >= (3, 
11) else 1)"; then
               python -m pip install vortex-data
             fi
@@ -190,6 +193,7 @@ jobs:
             python -m pip install --upgrade pip
             pip install torch --index-url https://download.pytorch.org/whl/cpu
             python -m pip install pyroaring readerwriterlock==1.0.9 
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 flake8==4.0.1 pytest~=7.0 
py4j==0.10.9.9 requests parameterized==0.9.0
+            python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION 
}}' -i https://pypi.org/simple/
       - name: Run lint-python.sh
         shell: bash
         run: |
@@ -288,6 +292,7 @@ jobs:
             fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 
duckdb==1.3.2 \
             numpy==1.24.3 pandas==2.0.3 cramjam pytest~=7.0 py4j==0.10.9.9 
requests \
             parameterized==0.9.0 packaging
+          python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION }}' 
-i https://pypi.org/simple/
       - name: Test Ray version compatibility
         run: |
           cd paimon-python
diff --git a/paimon-lumina/pom.xml b/paimon-lumina/pom.xml
index 4ca2e1cf12..27bebee080 100644
--- a/paimon-lumina/pom.xml
+++ b/paimon-lumina/pom.xml
@@ -77,6 +77,22 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-format</artifactId>
diff --git 
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/JavaPyLuminaE2ETest.java
 
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/JavaPyLuminaE2ETest.java
new file mode 100644
index 0000000000..dd0212bda9
--- /dev/null
+++ 
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/JavaPyLuminaE2ETest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.aliyun.lumina.Lumina;
+import org.aliyun.lumina.LuminaException;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ENABLED;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Mixed language E2E test for Java Lumina vector index building and Python 
reading.
+ *
+ * <p>Java writes data and builds a Lumina vector index, then Python reads and 
searches it.
+ */
+public class JavaPyLuminaE2ETest {
+
+    @BeforeAll
+    public static void checkNativeLibrary() {
+        if (!Lumina.isLibraryLoaded()) {
+            try {
+                Lumina.loadLibrary();
+            } catch (LuminaException e) {
+                assumeTrue(false, "Lumina native library not available: " + 
e.getMessage());
+            }
+        }
+    }
+
+    java.nio.file.Path tempDir = 
Paths.get("../paimon-python/pypaimon/tests/e2e").toAbsolutePath();
+
+    protected Path warehouse;
+
+    @BeforeEach
+    public void before() throws Exception {
+        if (!Files.exists(tempDir.resolve("warehouse"))) {
+            Files.createDirectories(tempDir.resolve("warehouse"));
+        }
+        warehouse = new Path("file://" + tempDir.resolve("warehouse"));
+    }
+
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testLuminaVectorIndexWrite() throws Exception {
+        String tableName = "test_lumina_vector";
+        Path tablePath = new Path(warehouse.toString() + "/default.db/" + 
tableName);
+
+        int dimension = 4;
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), new ArrayType(new 
FloatType())},
+                        new String[] {"id", "embedding"});
+
+        Options options = new Options();
+        options.set(PATH, tablePath.toString());
+        options.set(ROW_TRACKING_ENABLED, true);
+        options.set(DATA_EVOLUTION_ENABLED, true);
+        options.set(GLOBAL_INDEX_ENABLED, true);
+        options.setString(LuminaVectorIndexOptions.DIMENSION.key(), 
String.valueOf(dimension));
+        options.setString(LuminaVectorIndexOptions.DISTANCE_METRIC.key(), 
"l2");
+        options.setString(LuminaVectorIndexOptions.ENCODING_TYPE.key(), 
"rawf32");
+
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                options.toMap(),
+                                ""));
+
+        AppendOnlyFileStoreTable table =
+                new AppendOnlyFileStoreTable(
+                        FileIOFinder.find(tablePath),
+                        tablePath,
+                        tableSchema,
+                        CatalogEnvironment.empty());
+
+        // Test vectors: 6 vectors of dimension 4
+        float[][] vectors =
+                new float[][] {
+                    new float[] {1.0f, 0.0f, 0.0f, 0.0f},
+                    new float[] {0.9f, 0.1f, 0.0f, 0.0f},
+                    new float[] {0.0f, 1.0f, 0.0f, 0.0f},
+                    new float[] {0.0f, 0.0f, 1.0f, 0.0f},
+                    new float[] {0.0f, 0.0f, 0.0f, 1.0f},
+                    new float[] {0.95f, 0.05f, 0.0f, 0.0f}
+                };
+
+        // Write data rows
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            for (int i = 0; i < vectors.length; i++) {
+                write.write(GenericRow.of(i, new GenericArray(vectors[i])));
+            }
+            commit.commit(write.prepareCommit());
+        }
+
+        // Build Lumina vector index on "embedding" column
+        DataField embeddingField = table.rowType().getField("embedding");
+        Options indexOptions = table.coreOptions().toConfiguration();
+        LuminaVectorIndexOptions luminaOptions = new 
LuminaVectorIndexOptions(indexOptions);
+
+        GlobalIndexSingletonWriter writer =
+                (GlobalIndexSingletonWriter)
+                        GlobalIndexBuilderUtils.createIndexWriter(
+                                table,
+                                LuminaVectorGlobalIndexerFactory.IDENTIFIER,
+                                embeddingField,
+                                indexOptions);
+
+        // Write vectors to index
+        for (float[] vec : vectors) {
+            writer.write(vec);
+        }
+
+        List<ResultEntry> entries = writer.finish();
+        assertThat(entries).hasSize(1);
+        assertThat(entries.get(0).rowCount()).isEqualTo(vectors.length);
+
+        Range rowRange = new Range(0, vectors.length - 1);
+        List<IndexFileMeta> indexFiles =
+                GlobalIndexBuilderUtils.toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange,
+                        embeddingField.id(),
+                        LuminaVectorGlobalIndexerFactory.IDENTIFIER,
+                        entries);
+
+        // Commit the index
+        DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+        CommitMessage message =
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        null,
+                        dataIncrement,
+                        CompactIncrement.emptyIncrement());
+        try (BatchTableCommit commit = writeBuilder.newCommit()) {
+            commit.commit(Collections.singletonList(message));
+        }
+
+        // Verify index was committed
+        List<org.apache.paimon.manifest.IndexManifestEntry> indexEntries =
+                
table.indexManifestFileReader().read(table.latestSnapshot().get().indexManifest());
+        assertThat(indexEntries).hasSize(1);
+        assertThat(indexEntries.get(0).indexFile().indexType())
+                .isEqualTo(LuminaVectorGlobalIndexerFactory.IDENTIFIER);
+    }
+}
diff --git a/paimon-python/dev/requirements-dev.txt 
b/paimon-python/dev/requirements-dev.txt
index 564b85b8c5..4ab78afd67 100644
--- a/paimon-python/dev/requirements-dev.txt
+++ b/paimon-python/dev/requirements-dev.txt
@@ -25,3 +25,5 @@ pytest~=7.0
 ray>=2.10.0
 requests
 parameterized
+# Lumina vector search (optional, for lumina index tests)
+lumina-data>=0.1.0
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index 839a750780..f98bf32747 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -266,6 +266,30 @@ run_tantivy_fulltext_test() {
     fi
 }
 
+# Function to run Lumina vector index test (Java write index, Python read and 
search)
+run_lumina_vector_test() {
+    echo -e "${YELLOW}=== Step 9: Running Lumina Vector Index Test (Java 
Write, Python Read) ===${NC}"
+
+    cd "$PROJECT_ROOT"
+
+    echo "Running Maven test for 
JavaPyLuminaE2ETest.testLuminaVectorIndexWrite..."
+    if mvn test 
-Dtest=org.apache.paimon.lumina.index.JavaPyLuminaE2ETest#testLuminaVectorIndexWrite
 -pl paimon-lumina -q -Drun.e2e.tests=true; then
+        echo -e "${GREEN}✓ Java test completed successfully${NC}"
+    else
+        echo -e "${RED}✗ Java test failed${NC}"
+        return 1
+    fi
+    cd "$PAIMON_PYTHON_DIR"
+    echo "Running Python test for 
JavaPyReadWriteTest.test_read_lumina_vector_index..."
+    if python -m pytest 
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_lumina_vector_index 
-v; then
+        echo -e "${GREEN}✓ Python test completed successfully${NC}"
+        return 0
+    else
+        echo -e "${RED}✗ Python test failed${NC}"
+        return 1
+    fi
+}
+
 # Main execution
 main() {
     local java_write_result=0
@@ -276,6 +300,7 @@ main() {
     local btree_index_result=0
     local compressed_text_result=0
     local tantivy_fulltext_result=0
+    local lumina_vector_result=0
 
     # Detect Python version
     PYTHON_VERSION=$(python -c "import sys; 
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || 
echo "unknown")
@@ -351,6 +376,13 @@ main() {
 
     echo ""
 
+    # Run Lumina vector index test (Java write, Python read)
+    if ! run_lumina_vector_test; then
+        lumina_vector_result=1
+    fi
+
+    echo ""
+
     echo -e "${YELLOW}=== Test Results Summary ===${NC}"
 
     if [[ $java_write_result -eq 0 ]]; then
@@ -401,12 +433,18 @@ main() {
         echo -e "${RED}✗ Tantivy Full-Text Index Test (Java Write, Python 
Read): FAILED${NC}"
     fi
 
+    if [[ $lumina_vector_result -eq 0 ]]; then
+        echo -e "${GREEN}✓ Lumina Vector Index Test (Java Write, Python Read): 
PASSED${NC}"
+    else
+        echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read): 
FAILED${NC}"
+    fi
+
     echo ""
 
     # Clean up warehouse directory after all tests
     cleanup_warehouse
 
-    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 && 
$tantivy_fulltext_result -eq 0 ]]; then
+    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 && 
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 ]]; then
         echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability 
verified.${NC}"
         return 0
     else
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py 
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
index 515600d77f..19fba0cdeb 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scanner.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -35,13 +35,11 @@ class GlobalIndexScanner:
 
     def __init__(
         self,
-        options: dict,
         fields: list,
         file_io,
         index_path: str,
         index_files: Collection['IndexFileMeta']
     ):
-        self._options = options
         self._evaluator = self._create_evaluator(fields, file_io, index_path, 
index_files)
 
     def _create_evaluator(self, fields, file_io, index_path, index_files):
@@ -89,7 +87,6 @@ class GlobalIndexScanner:
             if len(index_files) == 0:
                 return None
             return GlobalIndexScanner(
-                options=table.table_schema.options,
                 fields=table.fields,
                 file_io=table.file_io,
                 
index_path=table.path_factory().global_index_path_factory().index_path(),
@@ -122,7 +119,6 @@ class GlobalIndexScanner:
         if len(scanned_index_files) == 0:
             return None
         return GlobalIndexScanner(
-            options=table.table_schema.options,
             fields=table.fields,
             file_io=table.file_io,
             
index_path=table.path_factory().global_index_path_factory().index_path(),
diff --git a/paimon-python/dev/requirements-dev.txt 
b/paimon-python/pypaimon/globalindex/lumina/__init__.py
similarity index 78%
copy from paimon-python/dev/requirements-dev.txt
copy to paimon-python/pypaimon/globalindex/lumina/__init__.py
index 564b85b8c5..26cd4367bc 100644
--- a/paimon-python/dev/requirements-dev.txt
+++ b/paimon-python/pypaimon/globalindex/lumina/__init__.py
@@ -15,13 +15,13 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-# Core dependencies for pypaimon are in requirements.txt
 
-# Test dependencies for pypaimon are as follows
-duckdb==1.3.2
-flake8==4.0.1
-pytest~=7.0
-# Ray: 2.48+ has no wheel for Python 3.8; use 2.10.0 on 3.8, 2.48.0 on 3.9+
-ray>=2.10.0
-requests
-parameterized
+from pypaimon.globalindex.lumina.lumina_vector_global_index_reader import (
+    LuminaVectorGlobalIndexReader,
+    LUMINA_VECTOR_ANN_IDENTIFIER,
+)
+
+__all__ = [
+    'LuminaVectorGlobalIndexReader',
+    'LUMINA_VECTOR_ANN_IDENTIFIER',
+]
diff --git a/paimon-python/pypaimon/globalindex/lumina/lumina_index_meta.py 
b/paimon-python/pypaimon/globalindex/lumina/lumina_index_meta.py
new file mode 100644
index 0000000000..323925613e
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/lumina/lumina_index_meta.py
@@ -0,0 +1,75 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Lumina index metadata serialization.
+
+Serialized as a flat JSON ``Map<String, String>`` whose keys are native Lumina
+option keys (with the ``lumina.`` prefix stripped).  This matches the format
+used by both paimon-lumina (Java) and paimon-cpp, so indexes built by any
+implementation can be read by all of them.
+"""
+
+import json
+
+
+class LuminaIndexMeta:
+    """Metadata for a Lumina vector index file."""
+
+    KEY_DIMENSION = "index.dimension"
+    KEY_DISTANCE_METRIC = "distance.metric"
+    KEY_INDEX_TYPE = "index.type"
+
+    def __init__(self, options):
+        self._options = dict(options)
+
+    @property
+    def options(self):
+        return self._options
+
+    @property
+    def dim(self):
+        return int(self._options[self.KEY_DIMENSION])
+
+    @property
+    def distance_metric(self):
+        return self._options[self.KEY_DISTANCE_METRIC]
+
+    @property
+    def metric(self):
+        from lumina_data import MetricType
+        return MetricType.from_lumina_name(self.distance_metric)
+
+    @property
+    def index_type(self):
+        return self._options.get(self.KEY_INDEX_TYPE, "diskann")
+
+    def serialize(self):
+        """Serialize to UTF-8 JSON bytes."""
+        return json.dumps(self._options).encode("utf-8")
+
+    @staticmethod
+    def deserialize(data):
+        """Deserialize from UTF-8 JSON bytes."""
+        options = json.loads(data.decode("utf-8"))
+        if LuminaIndexMeta.KEY_DIMENSION not in options:
+            raise ValueError(
+                "Missing required key: %s" % LuminaIndexMeta.KEY_DIMENSION)
+        if LuminaIndexMeta.KEY_DISTANCE_METRIC not in options:
+            raise ValueError(
+                "Missing required key: %s" % 
LuminaIndexMeta.KEY_DISTANCE_METRIC)
+        return LuminaIndexMeta(options)
diff --git 
a/paimon-python/pypaimon/globalindex/lumina/lumina_vector_global_index_reader.py
 
b/paimon-python/pypaimon/globalindex/lumina/lumina_vector_global_index_reader.py
new file mode 100644
index 0000000000..3e65f910fc
--- /dev/null
+++ 
b/paimon-python/pypaimon/globalindex/lumina/lumina_vector_global_index_reader.py
@@ -0,0 +1,201 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector global index reader using Lumina (via lumina-data SDK).
+
+Each shard has exactly one Lumina index file. This reader lazy-loads the
+index and performs vector similarity search using the lumina-data SDK.
+"""
+
+import os
+
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader
+from pypaimon.globalindex.vector_search_result import 
DictBasedScoredIndexResult
+
+LUMINA_VECTOR_ANN_IDENTIFIER = "lumina-vector-ann"
+
+MIN_SEARCH_LIST_SIZE = 16
+
+
+def _ensure_search_list_size(search_options, top_k):
+    """Set diskann.search.list_size when not explicitly configured."""
+    if "diskann.search.list_size" not in search_options:
+        list_size = max(int(top_k * 1.5), MIN_SEARCH_LIST_SIZE)
+        search_options["diskann.search.list_size"] = str(list_size)
+
+
+class LuminaVectorGlobalIndexReader(GlobalIndexReader):
+    """Vector global index reader using Lumina."""
+
+    def __init__(self, file_io, index_path, io_metas, options=None):
+        assert len(io_metas) == 1, "Expected exactly one index file per shard"
+        self._file_io = file_io
+        self._index_path = index_path
+        self._io_meta = io_metas[0]
+        self._options = options or {}
+        self._searcher = None
+        self._index_meta = None
+        self._search_options = None
+        self._stream = None
+
+    def visit_vector_search(self, vector_search):
+        self._ensure_loaded()
+
+        from lumina_data import MetricType
+
+        query = vector_search.vector
+        # Flatten to a plain list of floats for search_list API
+        if hasattr(query, 'tolist'):
+            query_flat = list(query.flatten()) if hasattr(query, 'flatten') 
else list(query)
+        else:
+            query_flat = list(query)
+        query_flat = [float(v) for v in query_flat]
+
+        expected_dim = self._index_meta.dim
+        if len(query_flat) != expected_dim:
+            raise ValueError(
+                "Query vector dimension mismatch: expected %d, got %d"
+                % (expected_dim, len(query_flat)))
+
+        limit = vector_search.limit
+        index_metric = self._index_meta.metric
+
+        count = self._searcher.get_count()
+        effective_k = min(limit, count)
+        if effective_k <= 0:
+            return None
+
+        include_row_ids = vector_search.include_row_ids
+
+        if include_row_ids is not None:
+            filter_id_list = list(include_row_ids)
+            if len(filter_id_list) == 0:
+                return None
+            effective_k = min(effective_k, len(filter_id_list))
+            search_opts = dict(self._search_options)
+            search_opts["search.thread_safe_filter"] = "true"
+            _ensure_search_list_size(search_opts, effective_k)
+            distances, labels = self._searcher.search_with_filter_list(
+                query_flat, 1, effective_k, filter_id_list, search_opts)
+        else:
+            search_opts = dict(self._search_options)
+            _ensure_search_list_size(search_opts, effective_k)
+            distances, labels = self._searcher.search_list(
+                query_flat, 1, effective_k, search_opts)
+
+        # Collect results with score conversion (same as Java collectResults)
+        SENTINEL = 0xFFFFFFFFFFFFFFFF
+        id_to_scores = {}
+        for i in range(effective_k):
+            row_id = labels[i]
+            if row_id == SENTINEL:
+                continue
+            score = MetricType.convert_distance_to_score(
+                float(distances[i]), index_metric)
+            id_to_scores[int(row_id)] = score
+
+        return DictBasedScoredIndexResult(id_to_scores)
+
+    def _ensure_loaded(self):
+        if self._searcher is not None:
+            return
+
+        from lumina_data import LuminaSearcher
+        from pypaimon.globalindex.lumina.lumina_index_meta import 
LuminaIndexMeta
+        from pypaimon.globalindex.lumina.lumina_vector_index_options import (
+            strip_lumina_options,
+        )
+
+        self._index_meta = LuminaIndexMeta.deserialize(self._io_meta.metadata)
+        # Merge paimon table options (prefix-stripped) with index metadata 
options;
+        # index metadata takes precedence as it reflects the actual built 
index.
+        searcher_options = strip_lumina_options(self._options)
+        searcher_options.update(self._index_meta.options)
+        self._search_options = searcher_options
+
+        file_path = os.path.join(self._index_path, self._io_meta.file_name)
+        stream = self._file_io.new_input_stream(file_path)
+        try:
+            self._searcher = LuminaSearcher(searcher_options)
+            self._searcher.open_stream(stream, self._io_meta.file_size)
+            self._stream = stream
+        except Exception:
+            stream.close()
+            raise
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+        return False
+
+    def close(self):
+        if self._searcher is not None:
+            self._searcher.close()
+            self._searcher = None
+        if self._stream is not None:
+            self._stream.close()
+            self._stream = None
+
+    # =================== unsupported =====================
+
+    def visit_equal(self, field_ref, literal):
+        return None
+
+    def visit_not_equal(self, field_ref, literal):
+        return None
+
+    def visit_less_than(self, field_ref, literal):
+        return None
+
+    def visit_less_or_equal(self, field_ref, literal):
+        return None
+
+    def visit_greater_than(self, field_ref, literal):
+        return None
+
+    def visit_greater_or_equal(self, field_ref, literal):
+        return None
+
+    def visit_is_null(self, field_ref):
+        return None
+
+    def visit_is_not_null(self, field_ref):
+        return None
+
+    def visit_in(self, field_ref, literals):
+        return None
+
+    def visit_not_in(self, field_ref, literals):
+        return None
+
+    def visit_starts_with(self, field_ref, literal):
+        return None
+
+    def visit_ends_with(self, field_ref, literal):
+        return None
+
+    def visit_contains(self, field_ref, literal):
+        return None
+
+    def visit_like(self, field_ref, literal):
+        return None
+
+    def visit_between(self, field_ref, min_v, max_v):
+        return None
diff --git 
a/paimon-python/pypaimon/globalindex/lumina/lumina_vector_index_options.py 
b/paimon-python/pypaimon/globalindex/lumina/lumina_vector_index_options.py
new file mode 100644
index 0000000000..4e233e90c1
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/lumina/lumina_vector_index_options.py
@@ -0,0 +1,61 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Lumina vector index options for Paimon tables (read/search only).
+
+Strips the ``lumina.`` prefix from Paimon table properties, populates
+search-related defaults, and passes them to the lumina-data SDK searcher.
+"""
+
+LUMINA_PREFIX = "lumina."
+
+# Search-related options with defaults: (paimon_key, default_value)
+_SEARCH_OPTIONS = [
+    ("lumina.diskann.search.beam_width", "4"),
+    ("lumina.search.parallel_number", "5"),
+]
+
+
+def strip_lumina_options(paimon_options):
+    """Extract lumina.* keys from Paimon table options, strip the prefix,
+    and populate search-related defaults.
+
+    Args:
+        paimon_options: dict of table properties, e.g.
+            {"lumina.index.dimension": "128", "lumina.distance.metric": "l2", 
...}
+            Non-lumina keys are ignored.
+
+    Returns:
+        dict with prefix stripped and search defaults populated, e.g.
+            {"index.dimension": "128", "distance.metric": "l2",
+             "diskann.search.beam_width": "4", "search.parallel_number": "5", 
...}
+    """
+    result = {}
+
+    # 1. Populate search-related defaults
+    for paimon_key, default_value in _SEARCH_OPTIONS:
+        native_key = paimon_key[len(LUMINA_PREFIX):]
+        result[native_key] = default_value
+
+    # 2. Overlay user-specified lumina.* options (overrides defaults)
+    for key, value in paimon_options.items():
+        if key.startswith(LUMINA_PREFIX):
+            native_key = key[len(LUMINA_PREFIX):]
+            result[native_key] = str(value)
+
+    return result
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 61a1455f36..4dadb234db 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -373,6 +373,10 @@ class FileStoreTable(Table):
         from pypaimon.table.source.full_text_search_builder import 
FullTextSearchBuilderImpl
         return FullTextSearchBuilderImpl(self)
 
+    def new_vector_search_builder(self) -> 'VectorSearchBuilder':
+        from pypaimon.table.source.vector_search_builder import 
VectorSearchBuilderImpl
+        return VectorSearchBuilderImpl(self)
+
     def create_row_key_extractor(self) -> RowKeyExtractor:
         bucket_mode = self.bucket_mode()
         if bucket_mode == BucketMode.HASH_FIXED:
diff --git a/paimon-python/pypaimon/table/format/format_table.py 
b/paimon-python/pypaimon/table/format/format_table.py
index 41767e1da8..649aee4256 100644
--- a/paimon-python/pypaimon/table/format/format_table.py
+++ b/paimon-python/pypaimon/table/format/format_table.py
@@ -105,3 +105,6 @@ class FormatTable(Table):
 
     def new_full_text_search_builder(self):
         raise NotImplementedError("Format table does not support full text 
search.")
+
+    def new_vector_search_builder(self):
+        raise NotImplementedError("Format table does not support vector 
search.")
diff --git a/paimon-python/pypaimon/table/iceberg/iceberg_table.py 
b/paimon-python/pypaimon/table/iceberg/iceberg_table.py
index 8e0ddfa07b..abc178bdb0 100644
--- a/paimon-python/pypaimon/table/iceberg/iceberg_table.py
+++ b/paimon-python/pypaimon/table/iceberg/iceberg_table.py
@@ -110,3 +110,6 @@ class IcebergTable(Table):
 
     def new_full_text_search_builder(self):
         raise NotImplementedError("IcebergTable does not support full text 
search.")
+
+    def new_vector_search_builder(self):
+        raise NotImplementedError("IcebergTable does not support vector 
search.")
diff --git a/paimon-python/pypaimon/table/object/object_table.py 
b/paimon-python/pypaimon/table/object/object_table.py
index df6d6f2f84..935d68adea 100644
--- a/paimon-python/pypaimon/table/object/object_table.py
+++ b/paimon-python/pypaimon/table/object/object_table.py
@@ -106,3 +106,8 @@ class ObjectTable(Table):
         raise NotImplementedError(
             "ObjectTable is read-only and does not support full text search."
         )
+
+    def new_vector_search_builder(self):
+        raise NotImplementedError(
+            "ObjectTable is read-only and does not support vector search."
+        )
diff --git a/paimon-python/pypaimon/table/source/vector_search_builder.py 
b/paimon-python/pypaimon/table/source/vector_search_builder.py
new file mode 100644
index 0000000000..95f3cdfc67
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/vector_search_builder.py
@@ -0,0 +1,111 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Builder to build vector search."""
+
+from abc import ABC, abstractmethod
+
+from pypaimon.table.source.vector_search_read import VectorSearchReadImpl
+from pypaimon.table.source.vector_search_scan import VectorSearchScanImpl
+
+
+class VectorSearchBuilder(ABC):
+    """Builder to build vector search."""
+
+    @abstractmethod
+    def with_limit(self, limit):
+        # type: (int) -> VectorSearchBuilder
+        """The top k results to return."""
+        pass
+
+    @abstractmethod
+    def with_vector_column(self, name):
+        # type: (str) -> VectorSearchBuilder
+        """The vector column to search."""
+        pass
+
+    @abstractmethod
+    def with_query_vector(self, vector):
+        # type: (list) -> VectorSearchBuilder
+        """The query vector (list of floats)."""
+        pass
+
+    @abstractmethod
+    def new_vector_search_scan(self):
+        # type: () -> VectorSearchScan
+        """Create vector search scan to scan index files."""
+        pass
+
+    @abstractmethod
+    def new_vector_search_read(self):
+        # type: () -> VectorSearchRead
+        """Create vector search read to read index files."""
+        pass
+
+    def execute_local(self):
+        # type: () -> GlobalIndexResult
+        """Execute vector search locally."""
+        return self.new_vector_search_read().read_plan(
+            self.new_vector_search_scan().scan()
+        )
+
+
+class VectorSearchBuilderImpl(VectorSearchBuilder):
+    """Implementation for VectorSearchBuilder."""
+
+    def __init__(self, table):
+        self._table = table
+        self._limit = 0
+        self._vector_column = None
+        self._query_vector = None
+
+    def with_limit(self, limit):
+        # type: (int) -> VectorSearchBuilder
+        self._limit = limit
+        return self
+
+    def with_vector_column(self, name):
+        # type: (str) -> VectorSearchBuilder
+        field_dict = {f.name: f for f in self._table.fields}
+        if name not in field_dict:
+            raise ValueError("Vector column '%s' not found in table schema" % 
name)
+        self._vector_column = field_dict[name]
+        return self
+
+    def with_query_vector(self, vector):
+        # type: (list) -> VectorSearchBuilder
+        self._query_vector = vector
+        return self
+
+    def new_vector_search_scan(self):
+        # type: () -> VectorSearchScan
+        if self._vector_column is None:
+            raise ValueError("Vector column must be set via 
with_vector_column()")
+        return VectorSearchScanImpl(self._table, self._vector_column)
+
+    def new_vector_search_read(self):
+        # type: () -> VectorSearchRead
+        if self._limit <= 0:
+            raise ValueError("Limit must be positive, set via with_limit()")
+        if self._vector_column is None:
+            raise ValueError("Vector column must be set via 
with_vector_column()")
+        if self._query_vector is None:
+            raise ValueError("Query vector must be set via 
with_query_vector()")
+        return VectorSearchReadImpl(
+            self._table, self._limit, self._vector_column, self._query_vector
+        )
diff --git a/paimon-python/pypaimon/table/source/vector_search_read.py 
b/paimon-python/pypaimon/table/source/vector_search_read.py
new file mode 100644
index 0000000000..4c8aa111da
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/vector_search_read.py
@@ -0,0 +1,117 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector search read to read index files."""
+
+from abc import ABC, abstractmethod
+
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.offset_global_index_reader import 
OffsetGlobalIndexReader
+from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.globalindex.vector_search_result import ScoredGlobalIndexResult
+
+
+class VectorSearchRead(ABC):
+    """Vector search read to read index files."""
+
+    def read_plan(self, plan):
+        # type: (VectorSearchScanPlan) -> GlobalIndexResult
+        return self.read(plan.splits())
+
+    @abstractmethod
+    def read(self, splits):
+        # type: (List[VectorSearchSplit]) -> GlobalIndexResult
+        pass
+
+
+class VectorSearchReadImpl(VectorSearchRead):
+    """Implementation for VectorSearchRead."""
+
+    def __init__(self, table, limit, vector_column, query_vector):
+        self._table = table
+        self._limit = limit
+        self._vector_column = vector_column
+        self._query_vector = query_vector
+
+    def read(self, splits):
+        # type: (List[VectorSearchSplit]) -> GlobalIndexResult
+        if not splits:
+            return GlobalIndexResult.create_empty()
+
+        result = ScoredGlobalIndexResult.create_empty()
+        for split in splits:
+            split_result = self._eval(
+                split.row_range_start, split.row_range_end,
+                split.vector_index_files
+            )
+            if split_result is not None:
+                result = result.or_(split_result)
+
+        return result.top_k(self._limit)
+
+    def _eval(self, row_range_start, row_range_end, vector_index_files):
+        # type: (int, int, list) -> Optional[ScoredGlobalIndexResult]
+        if not vector_index_files:
+            return None
+        index_io_meta_list = []
+        for index_file in vector_index_files:
+            meta = index_file.global_index_meta
+            assert meta is not None
+            index_io_meta_list.append(
+                GlobalIndexIOMeta(
+                    file_name=index_file.file_name,
+                    file_size=index_file.file_size,
+                    metadata=meta.index_meta
+                )
+            )
+
+        index_type = vector_index_files[0].index_type
+        index_path = 
self._table.path_factory().global_index_path_factory().index_path()
+        file_io = self._table.file_io
+        options = self._table.table_schema.options
+
+        reader = _create_vector_reader(
+            index_type, file_io, index_path,
+            index_io_meta_list, options
+        )
+
+        vector_search = VectorSearch(
+            vector=self._query_vector,
+            limit=self._limit,
+            field_name=self._vector_column.name
+        )
+
+        try:
+            offset_reader = OffsetGlobalIndexReader(reader, row_range_start, 
row_range_end)
+            return offset_reader.visit_vector_search(vector_search)
+        finally:
+            reader.close()
+
+
+def _create_vector_reader(index_type, file_io, index_path, index_io_meta_list, 
options=None):
+    """Create a global index reader for vector search."""
+    from pypaimon.globalindex.lumina.lumina_vector_global_index_reader import (
+        LUMINA_VECTOR_ANN_IDENTIFIER,
+        LuminaVectorGlobalIndexReader,
+    )
+    if index_type == LUMINA_VECTOR_ANN_IDENTIFIER:
+        return LuminaVectorGlobalIndexReader(
+            file_io, index_path, index_io_meta_list, options
+        )
+    raise ValueError("Unsupported vector index type: '%s'" % index_type)
diff --git a/paimon-python/pypaimon/table/source/vector_search_scan.py 
b/paimon-python/pypaimon/table/source/vector_search_scan.py
new file mode 100644
index 0000000000..450b334b13
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/vector_search_scan.py
@@ -0,0 +1,95 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector search scan to scan index files."""
+
+from abc import ABC, abstractmethod
+from collections import defaultdict
+
+from pypaimon.table.source.vector_search_split import VectorSearchSplit
+from pypaimon.utils.range import Range
+
+
+class VectorSearchScanPlan:
+    """Plan of vector search scan."""
+
+    def __init__(self, splits):
+        # type: (List[VectorSearchSplit]) -> None
+        self._splits = splits
+
+    def splits(self):
+        # type: () -> List[VectorSearchSplit]
+        return self._splits
+
+
+class VectorSearchScan(ABC):
+    """Vector search scan to scan index files."""
+
+    @abstractmethod
+    def scan(self):
+        # type: () -> VectorSearchScanPlan
+        pass
+
+
+class VectorSearchScanImpl(VectorSearchScan):
+    """Implementation for VectorSearchScan."""
+
+    def __init__(self, table, vector_column):
+        self._table = table
+        self._vector_column = vector_column
+
+    def scan(self):
+        # type: () -> VectorSearchScanPlan
+        from pypaimon.index.index_file_handler import IndexFileHandler
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+        vector_column = self._vector_column
+
+        from pypaimon.snapshot.time_travel_util import TimeTravelUtil
+        from pypaimon.common.options.options import Options
+        snapshot = TimeTravelUtil.try_travel_to_snapshot(
+            Options(self._table.table_schema.options),
+            self._table.tag_manager()
+        )
+        if snapshot is None:
+            snapshot = SnapshotManager(self._table).get_latest_snapshot()
+
+        index_file_handler = IndexFileHandler(table=self._table)
+
+        def index_file_filter(entry):
+            global_index_meta = entry.index_file.global_index_meta
+            if global_index_meta is None:
+                return False
+            return vector_column.id == global_index_meta.index_field_id
+
+        entries = index_file_handler.scan(snapshot, index_file_filter)
+        all_index_files = [entry.index_file for entry in entries]
+
+        # Group by (rowRangeStart, rowRangeEnd)
+        by_range = defaultdict(list)
+        for index_file in all_index_files:
+            meta = index_file.global_index_meta
+            assert meta is not None
+            range_key = Range(meta.row_range_start, meta.row_range_end)
+            by_range[range_key].append(index_file)
+
+        splits = []
+        for range_key, files in by_range.items():
+            splits.append(VectorSearchSplit(range_key.from_, range_key.to, 
files))
+
+        return VectorSearchScanPlan(splits)
diff --git a/paimon-python/dev/requirements-dev.txt 
b/paimon-python/pypaimon/table/source/vector_search_split.py
similarity index 75%
copy from paimon-python/dev/requirements-dev.txt
copy to paimon-python/pypaimon/table/source/vector_search_split.py
index 564b85b8c5..80ea8d5e90 100644
--- a/paimon-python/dev/requirements-dev.txt
+++ b/paimon-python/pypaimon/table/source/vector_search_split.py
@@ -15,13 +15,19 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-# Core dependencies for pypaimon are in requirements.txt
 
-# Test dependencies for pypaimon are as follows
-duckdb==1.3.2
-flake8==4.0.1
-pytest~=7.0
-# Ray: 2.48+ has no wheel for Python 3.8; use 2.10.0 on 3.8, 2.48.0 on 3.9+
-ray>=2.10.0
-requests
-parameterized
+"""Split of vector search."""
+
+from dataclasses import dataclass
+from typing import List
+
+from pypaimon.index.index_file_meta import IndexFileMeta
+
+
+@dataclass
+class VectorSearchSplit:
+    """Split of vector search."""
+
+    row_range_start: int
+    row_range_end: int
+    vector_index_files: List[IndexFileMeta]
diff --git a/paimon-python/pypaimon/table/table.py 
b/paimon-python/pypaimon/table/table.py
index 68ad3a9d13..89dde94454 100644
--- a/paimon-python/pypaimon/table/table.py
+++ b/paimon-python/pypaimon/table/table.py
@@ -21,6 +21,7 @@ from abc import ABC, abstractmethod
 from pypaimon.read.read_builder import ReadBuilder
 from pypaimon.read.stream_read_builder import StreamReadBuilder
 from pypaimon.table.source.full_text_search_builder import 
FullTextSearchBuilder
+from pypaimon.table.source.vector_search_builder import VectorSearchBuilder
 from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
 
 
@@ -46,3 +47,7 @@ class Table(ABC):
     @abstractmethod
     def new_full_text_search_builder(self) -> FullTextSearchBuilder:
         """Returns a new full-text search builder."""
+
+    @abstractmethod
+    def new_vector_search_builder(self) -> VectorSearchBuilder:
+        """Returns a new vector search builder."""
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 58805983c5..5cc7d54925 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -571,3 +571,34 @@ class JavaPyReadWriteTest(unittest.TestCase):
         ids3 = pa_table3.column('id').to_pylist()
         self.assertIn(1, ids3)
         self.assertIn(3, ids3)
+
+    def test_read_lumina_vector_index(self):
+        """Test reading a Lumina vector index built by Java."""
+        table = self.catalog.get_table('default.test_lumina_vector')
+
+        # Use VectorSearchBuilder to search
+        # Java wrote 6 vectors: [1,0,0,0], [0.9,0.1,0,0], [0,1,0,0],
+        #                        [0,0,1,0], [0,0,0,1], [0.95,0.05,0,0]
+        # Query with [1,0,0,0] - nearest by L2 should be row 0, 5, 1
+        builder = table.new_vector_search_builder()
+        builder.with_vector_column('embedding')
+        builder.with_query_vector([1.0, 0.0, 0.0, 0.0])
+        builder.with_limit(3)
+
+        result = builder.execute_local()
+        row_ids = sorted(list(result.results()))
+        print(f"Lumina vector search for [1,0,0,0]: row_ids={row_ids}")
+        self.assertIn(0, row_ids)  # exact match
+        self.assertEqual(len(row_ids), 3)
+
+        # Read matching rows using withGlobalIndexResult
+        read_builder = table.new_read_builder()
+        scan = read_builder.new_scan().with_global_index_result(result)
+        plan = scan.plan()
+        table_read = read_builder.new_read()
+        pa_table = table_read.to_arrow(plan.splits())
+        pa_table = table_sort_by(pa_table, 'id')
+        self.assertEqual(pa_table.num_rows, 3)
+        ids = pa_table.column('id').to_pylist()
+        print(f"Lumina vector search matched rows: ids={ids}")
+        self.assertIn(0, ids)
diff --git a/paimon-python/pypaimon/tests/lumina_vector_index_test.py 
b/paimon-python/pypaimon/tests/lumina_vector_index_test.py
new file mode 100644
index 0000000000..89e60e5d9f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/lumina_vector_index_test.py
@@ -0,0 +1,165 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import ctypes
+import os
+import random
+import shutil
+import tempfile
+import unittest
+
+from lumina_data import LuminaBuilder
+
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.lumina.lumina_index_meta import LuminaIndexMeta
+from pypaimon.globalindex.lumina.lumina_vector_global_index_reader import (
+    LuminaVectorGlobalIndexReader,
+)
+from pypaimon.globalindex.lumina.lumina_vector_index_options import (
+    strip_lumina_options,
+)
+from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.utils.roaring_bitmap import RoaringBitmap64
+
+
+def _make_vectors(n, dim, seed=42):
+    random.seed(seed)
+    data = [random.gauss(0, 1) for _ in range(n * dim)]
+    vectors = (ctypes.c_float * (n * dim))(*data)
+    ids = (ctypes.c_uint64 * n)(*range(n))
+    return vectors, ids, data
+
+
+class _SimpleFileIO(object):
+    def new_input_stream(self, path):
+        return open(path, 'rb')
+
+
+class LuminaVectorIndexTest(unittest.TestCase):
+
+    def test_build_and_read(self):
+        """Build a DiskANN index and read via LuminaVectorGlobalIndexReader."""
+        dim, n = 4, 100
+
+        # Paimon table options (with lumina. prefix)
+        paimon_options = {
+            "lumina.index.dimension": str(dim),
+            "lumina.index.type": "diskann",
+            "lumina.distance.metric": "l2",
+            "lumina.encoding.type": "rawf32",
+            "lumina.diskann.build.ef_construction": "64",
+            "lumina.diskann.build.neighbor_count": "32",
+            "lumina.diskann.build.thread_count": "2",
+        }
+
+        build_options = strip_lumina_options(paimon_options)
+        vectors, ids, raw = _make_vectors(n, dim, seed=777)
+
+        tmp_dir = tempfile.mkdtemp(prefix="paimon_lumina_test_")
+        file_name = "lumina-0.index"
+        index_file = os.path.join(tmp_dir, file_name)
+
+        try:
+            with LuminaBuilder(build_options) as builder:
+                builder.pretrain(vectors, n, dim)
+                builder.insert(vectors, ids, n, dim)
+                builder.dump(index_file)
+
+            # Serialize metadata (same as Java LuminaIndexMeta)
+            meta = LuminaIndexMeta(build_options)
+            io_meta = GlobalIndexIOMeta(
+                file_name=file_name,
+                file_size=os.path.getsize(index_file),
+                metadata=meta.serialize(),
+            )
+
+            # Reader receives paimon_options (with lumina. prefix)
+            with LuminaVectorGlobalIndexReader(
+                file_io=_SimpleFileIO(),
+                index_path=tmp_dir,
+                io_metas=[io_meta],
+                options=paimon_options,
+            ) as reader:
+                vs = VectorSearch(vector=raw[:dim], limit=5, 
field_name="embedding")
+                result = reader.visit_vector_search(vs)
+
+                self.assertIsNotNone(result)
+                row_ids = result.results()
+                self.assertGreater(row_ids.cardinality(), 0)
+                self.assertIn(0, row_ids)
+                self.assertIsNotNone(result.score_getter()(0))
+        finally:
+            shutil.rmtree(tmp_dir, ignore_errors=True)
+
+    def test_filtered_search(self):
+        """Test filtered vector search with include_row_ids."""
+        dim, n = 4, 100
+
+        paimon_options = {
+            "lumina.index.dimension": str(dim),
+            "lumina.index.type": "diskann",
+            "lumina.distance.metric": "l2",
+            "lumina.encoding.type": "rawf32",
+            "lumina.diskann.build.ef_construction": "64",
+            "lumina.diskann.build.neighbor_count": "32",
+            "lumina.diskann.build.thread_count": "2",
+        }
+
+        build_options = strip_lumina_options(paimon_options)
+        vectors, ids, raw = _make_vectors(n, dim, seed=99)
+
+        tmp_dir = tempfile.mkdtemp(prefix="paimon_lumina_test_")
+        file_name = "lumina-filter-0.index"
+        index_file = os.path.join(tmp_dir, file_name)
+
+        try:
+            with LuminaBuilder(build_options) as builder:
+                builder.pretrain(vectors, n, dim)
+                builder.insert(vectors, ids, n, dim)
+                builder.dump(index_file)
+
+            meta = LuminaIndexMeta(build_options)
+            io_meta = GlobalIndexIOMeta(
+                file_name=file_name,
+                file_size=os.path.getsize(index_file),
+                metadata=meta.serialize(),
+            )
+            reader = LuminaVectorGlobalIndexReader(
+                file_io=_SimpleFileIO(),
+                index_path=tmp_dir,
+                io_metas=[io_meta],
+                options=paimon_options,
+            )
+
+            # Only search even IDs
+            include_ids = RoaringBitmap64()
+            for i in range(0, n, 2):
+                include_ids.add(i)
+
+            vs = VectorSearch(
+                vector=raw[:dim], limit=3, field_name="embedding",
+                include_row_ids=include_ids,
+            )
+            result = reader.visit_vector_search(vs)
+
+            self.assertIsNotNone(result)
+            for row_id in result.results():
+                self.assertEqual(row_id % 2, 0)
+            reader.close()
+        finally:
+            shutil.rmtree(tmp_dir, ignore_errors=True)
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 3b0b6833f4..6e0726958f 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -70,6 +70,9 @@ setup(
             'pylance>=0.20,<1; python_version>="3.9"',
             'pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"'
         ],
+        'lumina': [
+            'lumina-data>=0.1.0'
+        ],
         'sql': [
             'pypaimon-rust; python_version>="3.10"',
             'datafusion>=52; python_version>="3.10"',

Reply via email to