This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d84799280f [python] Introduce vortex file format integration (#7547)
d84799280f is described below

commit d84799280f9226beae886f5d5b4553a21182fe50
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 30 14:31:00 2026 +0800

    [python] Introduce vortex file format integration (#7547)
---
 .github/workflows/paimon-python-checks.yml         |  5 +-
 paimon-python/pypaimon/common/file_io.py           |  3 +
 .../pypaimon/common/options/core_options.py        |  1 +
 paimon-python/pypaimon/filesystem/local_file_io.py | 21 ++++-
 .../pypaimon/filesystem/pyarrow_file_io.py         | 18 +++++
 .../pypaimon/read/reader/format_vortex_reader.py   | 90 ++++++++++++++++++++++
 paimon-python/pypaimon/read/reader/vortex_utils.py | 70 +++++++++++++++++
 paimon-python/pypaimon/read/split_read.py          |  4 +
 .../pypaimon/tests/reader_append_only_test.py      | 46 +++++++++++
 paimon-python/pypaimon/write/writer/data_writer.py |  2 +
 10 files changed, 258 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 1d3b6864a2..722f37fa05 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -47,7 +47,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        python-version: [ '3.6.15', '3.10' ]
+        python-version: [ '3.6.15', '3.10', '3.11' ]
 
     steps:
       - name: Checkout code
@@ -113,6 +113,9 @@ jobs:
             python -m pip install --upgrade pip
             pip install torch --index-url https://download.pytorch.org/whl/cpu
             python -m pip install pyroaring readerwriterlock==1.0.9 
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0 
py4j==0.10.9.9 requests parameterized==0.9.0
+            if python -c "import sys; sys.exit(0 if sys.version_info >= (3, 
11) else 1)"; then
+              python -m pip install vortex-data
+            fi
           fi
           df -h
       - name: Run lint-python.sh
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index a9dfbbdbd4..4b13f0de39 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -254,6 +254,9 @@ class FileIO(ABC):
         """Write Blob format file."""
         raise NotImplementedError("write_blob must be implemented by FileIO 
subclasses")
 
+    def write_vortex(self, path: str, data, **kwargs):
+        raise NotImplementedError("write_vortex must be implemented by FileIO 
subclasses")
+
     def close(self):
         pass
 
diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 9636a58932..8b84e6d38d 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -64,6 +64,7 @@ class CoreOptions:
     FILE_FORMAT_PARQUET: str = "parquet"
     FILE_FORMAT_BLOB: str = "blob"
     FILE_FORMAT_LANCE: str = "lance"
+    FILE_FORMAT_VORTEX: str = "vortex"
 
     # Basic options
     AUTO_CREATE: ConfigOption[bool] = (
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py 
b/paimon-python/pypaimon/filesystem/local_file_io.py
index cf9399309f..7a2112e7d7 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -395,7 +395,26 @@ class LocalFileIO(FileIO):
         except Exception as e:
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
-    
+
+    def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            import vortex
+            from vortex._lib.io import write as vortex_write
+            os.makedirs(os.path.dirname(path), exist_ok=True)
+
+            from pypaimon.read.reader.vortex_utils import to_vortex_specified
+            _, store_kwargs = to_vortex_specified(self, path)
+
+            if store_kwargs:
+                from vortex import store
+                vortex_store = store.from_url(path, **store_kwargs)
+                vortex_store.write(vortex.array(data))
+            else:
+                vortex_write(vortex.array(data), path)
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Vortex file {path}: {e}") 
from e
+
     def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
         try:
             if data.num_columns != 1:
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 18e54a0b70..8f831389cb 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -494,6 +494,24 @@ class PyArrowFileIO(FileIO):
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
 
+    def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            import vortex
+            from vortex import store
+
+            from pypaimon.read.reader.vortex_utils import to_vortex_specified
+            file_path_for_vortex, store_kwargs = to_vortex_specified(self, 
path)
+
+            if store_kwargs:
+                vortex_store = store.from_url(file_path_for_vortex, 
**store_kwargs)
+                vortex_store.write(vortex.array(data))
+            else:
+                from vortex._lib.io import write as vortex_write
+                vortex_write(vortex.array(data), file_path_for_vortex)
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Vortex file {path}: {e}") 
from e
+
     def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
         try:
             if data.num_columns != 1:
diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py 
b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
new file mode 100644
index 0000000000..d0bf709e77
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
@@ -0,0 +1,90 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Optional, Any
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+
+
+class FormatVortexReader(RecordBatchReader):
+    """
+    A Format Reader that reads record batch from a Vortex file,
+    and filters it based on the provided predicate and projection.
+    """
+
+    def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+                 push_down_predicate: Any, batch_size: int = 1024):
+        import vortex
+
+        from pypaimon.read.reader.vortex_utils import to_vortex_specified
+        file_path_for_vortex, store_kwargs = to_vortex_specified(file_io, 
file_path)
+
+        if store_kwargs:
+            from vortex import store
+            vortex_store = store.from_url(file_path_for_vortex, **store_kwargs)
+            vortex_file = vortex_store.open()
+        else:
+            vortex_file = vortex.open(file_path_for_vortex)
+
+        columns_for_vortex = read_fields if read_fields else None
+        pa_table = vortex_file.to_arrow(columns_for_vortex).read_all()
+
+        # Vortex exports string_view which some PyArrow kernels don't support 
yet.
+        pa_table = self._cast_string_view_columns(pa_table)
+
+        if push_down_predicate is not None:
+            in_memory_dataset = ds.InMemoryDataset(pa_table)
+            scanner = in_memory_dataset.scanner(filter=push_down_predicate, 
batch_size=batch_size)
+            self.reader = scanner.to_reader()
+        else:
+            self.reader = iter(pa_table.to_batches(max_chunksize=batch_size))
+
+    @staticmethod
+    def _cast_string_view_columns(table: pa.Table) -> pa.Table:
+        new_fields = []
+        needs_cast = False
+        for field in table.schema:
+            if field.type == pa.string_view():
+                new_fields.append(field.with_type(pa.utf8()))
+                needs_cast = True
+            elif field.type == pa.binary_view():
+                new_fields.append(field.with_type(pa.binary()))
+                needs_cast = True
+            else:
+                new_fields.append(field)
+        if not needs_cast:
+            return table
+        return table.cast(pa.schema(new_fields))
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        try:
+            if hasattr(self.reader, 'read_next_batch'):
+                return self.reader.read_next_batch()
+            else:
+                return next(self.reader)
+        except StopIteration:
+            return None
+
+    def close(self):
+        if self.reader is not None:
+            self.reader = None
diff --git a/paimon-python/pypaimon/read/reader/vortex_utils.py 
b/paimon-python/pypaimon/read/reader/vortex_utils.py
new file mode 100644
index 0000000000..e380c7f0eb
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/vortex_utils.py
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+from typing import Dict, Optional, Tuple
+from urllib.parse import urlparse
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options.config import OssOptions
+
+
+def to_vortex_specified(file_io: FileIO, file_path: str) -> Tuple[str, 
Optional[Dict[str, str]]]:
+    """Convert path and extract storage options for Vortex store.from_url().
+
+    Returns (url, store_kwargs) where store_kwargs can be passed as
+    keyword arguments to ``vortex.store.from_url(url, **store_kwargs)``.
+    For local paths store_kwargs is None.
+    """
+    if hasattr(file_io, 'file_io'):
+        file_io = file_io.file_io()
+
+    if hasattr(file_io, 'get_merged_properties'):
+        properties = file_io.get_merged_properties()
+    else:
+        properties = file_io.properties if hasattr(file_io, 'properties') and 
file_io.properties else None
+
+    scheme, _, _ = file_io.parse_location(file_path)
+    file_path_for_vortex = file_io.to_filesystem_path(file_path)
+
+    store_kwargs = None
+
+    if scheme in {'file', None} or not scheme:
+        if not os.path.isabs(file_path_for_vortex):
+            file_path_for_vortex = os.path.abspath(file_path_for_vortex)
+        return file_path_for_vortex, None
+
+    # For remote schemes, keep the original URI so vortex can parse it
+    file_path_for_vortex = file_path
+
+    if scheme == 'oss' and properties:
+        parsed = urlparse(file_path)
+        bucket = parsed.netloc
+
+        store_kwargs = {
+            'endpoint': 
f"https://{bucket}.{properties.get(OssOptions.OSS_ENDPOINT)}",
+            'access_key_id': properties.get(OssOptions.OSS_ACCESS_KEY_ID),
+            'secret_access_key': 
properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
+            'virtual_hosted_style_request': 'true',
+        }
+        if properties.contains(OssOptions.OSS_SECURITY_TOKEN):
+            store_kwargs['session_token'] = 
properties.get(OssOptions.OSS_SECURITY_TOKEN)
+
+        file_path_for_vortex = file_path_for_vortex.replace('oss://', 's3://')
+
+    return file_path_for_vortex, store_kwargs
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index f9adee4068..41904267f7 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -46,6 +46,7 @@ from pypaimon.read.reader.row_range_filter_record_reader 
import RowIdFilterRecor
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.format_lance_reader import FormatLanceReader
 from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
+from pypaimon.read.reader.format_vortex_reader import FormatVortexReader
 from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
                                                             RowPositionReader, 
EmptyRecordBatchReader)
 from pypaimon.read.reader.iface.record_reader import RecordReader
@@ -149,6 +150,9 @@ class SplitRead(ABC):
         elif file_format == CoreOptions.FILE_FORMAT_LANCE:
             format_reader = FormatLanceReader(self.table.file_io, file_path, 
read_file_fields,
                                               read_arrow_predicate, 
batch_size=batch_size)
+        elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
+            format_reader = FormatVortexReader(self.table.file_io, file_path, 
read_file_fields,
+                                               read_arrow_predicate, 
batch_size=batch_size)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             name_to_field = {f.name: f for f in self.read_fields}
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index adb0ff4f25..54749a9dd8 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -18,6 +18,7 @@
 
 import os
 import shutil
+import sys
 import tempfile
 import time
 import unittest
@@ -102,6 +103,51 @@ class AoReaderTest(unittest.TestCase):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    def test_vortex_ao_reader(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'vortex'})
+        self.catalog.create_table('default.test_append_only_vortex', schema, 
False)
+        table = self.catalog.get_table('default.test_append_only_vortex')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    def test_vortex_ao_reader_with_filter(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'vortex'})
+        self.catalog.create_table('default.test_append_only_vortex_filter', 
schema, False)
+        table = 
self.catalog.get_table('default.test_append_only_vortex_filter')
+        self._write_test_table(table)
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        p1 = predicate_builder.less_than('user_id', 7)
+        p2 = predicate_builder.greater_or_equal('user_id', 2)
+        p3 = predicate_builder.between('user_id', 0, 6)  # [2/b, 3/c, 4/d, 
5/e, 6/f] left
+        p4 = predicate_builder.is_not_in('behavior', ['b', 'e'])  # [3/c, 4/d, 
6/f] left
+        p5 = predicate_builder.is_in('dt', ['p1'])  # exclude 3/c
+        p6 = predicate_builder.is_not_null('behavior')  # exclude 4/d
+        g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+        read_builder = table.new_read_builder().with_filter(g1)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected.slice(5, 1)  # 6/f
+        ])
+        self.assertEqual(actual.sort_by('user_id'), expected)
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    def test_vortex_ao_reader_with_projection(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'vortex'})
+        
self.catalog.create_table('default.test_vortex_append_only_projection', schema, 
False)
+        table = 
self.catalog.get_table('default.test_vortex_append_only_projection')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder().with_projection(['dt', 
'user_id'])
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        expected = self.expected.select(['dt', 'user_id'])
+        self.assertEqual(actual, expected)
+
     def test_lance_ao_reader_with_filter(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'lance'})
         self.catalog.create_table('default.test_append_only_lance_filter', 
schema, False)
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index e0cbe607ca..a58e7b36af 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -179,6 +179,8 @@ class DataWriter(ABC):
             self.file_io.write_blob(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_LANCE:
             self.file_io.write_lance(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
+            self.file_io.write_vortex(file_path, data)
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 

Reply via email to