This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d84799280f [python] Introduce vortex file format integration (#7547)
d84799280f is described below
commit d84799280f9226beae886f5d5b4553a21182fe50
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 30 14:31:00 2026 +0800
[python] Introduce vortex file format integration (#7547)
---
.github/workflows/paimon-python-checks.yml | 5 +-
paimon-python/pypaimon/common/file_io.py | 3 +
.../pypaimon/common/options/core_options.py | 1 +
paimon-python/pypaimon/filesystem/local_file_io.py | 21 ++++-
.../pypaimon/filesystem/pyarrow_file_io.py | 18 +++++
.../pypaimon/read/reader/format_vortex_reader.py | 90 ++++++++++++++++++++++
paimon-python/pypaimon/read/reader/vortex_utils.py | 70 +++++++++++++++++
paimon-python/pypaimon/read/split_read.py | 4 +
.../pypaimon/tests/reader_append_only_test.py | 46 +++++++++++
paimon-python/pypaimon/write/writer/data_writer.py | 2 +
10 files changed, 258 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 1d3b6864a2..722f37fa05 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- python-version: [ '3.6.15', '3.10' ]
+ python-version: [ '3.6.15', '3.10', '3.11' ]
steps:
- name: Checkout code
@@ -113,6 +113,9 @@ jobs:
python -m pip install --upgrade pip
pip install torch --index-url https://download.pytorch.org/whl/cpu
python -m pip install pyroaring readerwriterlock==1.0.9
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0
py4j==0.10.9.9 requests parameterized==0.9.0
+ if python -c "import sys; sys.exit(0 if sys.version_info >= (3,
11) else 1)"; then
+ python -m pip install vortex-data
+ fi
fi
df -h
- name: Run lint-python.sh
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index a9dfbbdbd4..4b13f0de39 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -254,6 +254,9 @@ class FileIO(ABC):
"""Write Blob format file."""
raise NotImplementedError("write_blob must be implemented by FileIO
subclasses")
+ def write_vortex(self, path: str, data, **kwargs):
+ raise NotImplementedError("write_vortex must be implemented by FileIO
subclasses")
+
def close(self):
pass
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 9636a58932..8b84e6d38d 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -64,6 +64,7 @@ class CoreOptions:
FILE_FORMAT_PARQUET: str = "parquet"
FILE_FORMAT_BLOB: str = "blob"
FILE_FORMAT_LANCE: str = "lance"
+ FILE_FORMAT_VORTEX: str = "vortex"
# Basic options
AUTO_CREATE: ConfigOption[bool] = (
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py
b/paimon-python/pypaimon/filesystem/local_file_io.py
index cf9399309f..7a2112e7d7 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -395,7 +395,26 @@ class LocalFileIO(FileIO):
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Lance file {path}: {e}") from
e
-
+
+ def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
+ try:
+ import vortex
+ from vortex._lib.io import write as vortex_write
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+
+ from pypaimon.read.reader.vortex_utils import to_vortex_specified
+ _, store_kwargs = to_vortex_specified(self, path)
+
+ if store_kwargs:
+ from vortex import store
+ vortex_store = store.from_url(path, **store_kwargs)
+ vortex_store.write(vortex.array(data))
+ else:
+ vortex_write(vortex.array(data), path)
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Vortex file {path}: {e}")
from e
+
def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
try:
if data.num_columns != 1:
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 18e54a0b70..8f831389cb 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -494,6 +494,24 @@ class PyArrowFileIO(FileIO):
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Lance file {path}: {e}") from
e
+ def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
+ try:
+ import vortex
+ from vortex import store
+
+ from pypaimon.read.reader.vortex_utils import to_vortex_specified
+ file_path_for_vortex, store_kwargs = to_vortex_specified(self,
path)
+
+ if store_kwargs:
+ vortex_store = store.from_url(file_path_for_vortex,
**store_kwargs)
+ vortex_store.write(vortex.array(data))
+ else:
+ from vortex._lib.io import write as vortex_write
+ vortex_write(vortex.array(data), file_path_for_vortex)
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Vortex file {path}: {e}")
from e
+
def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
try:
if data.num_columns != 1:
diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py
b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
new file mode 100644
index 0000000000..d0bf709e77
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
@@ -0,0 +1,90 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Optional, Any
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+
+
+class FormatVortexReader(RecordBatchReader):
+ """
+ A Format Reader that reads record batch from a Vortex file,
+ and filters it based on the provided predicate and projection.
+ """
+
+ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+ push_down_predicate: Any, batch_size: int = 1024):
+ import vortex
+
+ from pypaimon.read.reader.vortex_utils import to_vortex_specified
+ file_path_for_vortex, store_kwargs = to_vortex_specified(file_io,
file_path)
+
+ if store_kwargs:
+ from vortex import store
+ vortex_store = store.from_url(file_path_for_vortex, **store_kwargs)
+ vortex_file = vortex_store.open()
+ else:
+ vortex_file = vortex.open(file_path_for_vortex)
+
+ columns_for_vortex = read_fields if read_fields else None
+ pa_table = vortex_file.to_arrow(columns_for_vortex).read_all()
+
+ # Vortex exports string_view which some PyArrow kernels don't support
yet.
+ pa_table = self._cast_string_view_columns(pa_table)
+
+ if push_down_predicate is not None:
+ in_memory_dataset = ds.InMemoryDataset(pa_table)
+ scanner = in_memory_dataset.scanner(filter=push_down_predicate,
batch_size=batch_size)
+ self.reader = scanner.to_reader()
+ else:
+ self.reader = iter(pa_table.to_batches(max_chunksize=batch_size))
+
+ @staticmethod
+ def _cast_string_view_columns(table: pa.Table) -> pa.Table:
+ new_fields = []
+ needs_cast = False
+ for field in table.schema:
+ if field.type == pa.string_view():
+ new_fields.append(field.with_type(pa.utf8()))
+ needs_cast = True
+ elif field.type == pa.binary_view():
+ new_fields.append(field.with_type(pa.binary()))
+ needs_cast = True
+ else:
+ new_fields.append(field)
+ if not needs_cast:
+ return table
+ return table.cast(pa.schema(new_fields))
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ try:
+ if hasattr(self.reader, 'read_next_batch'):
+ return self.reader.read_next_batch()
+ else:
+ return next(self.reader)
+ except StopIteration:
+ return None
+
+ def close(self):
+ if self.reader is not None:
+ self.reader = None
diff --git a/paimon-python/pypaimon/read/reader/vortex_utils.py
b/paimon-python/pypaimon/read/reader/vortex_utils.py
new file mode 100644
index 0000000000..e380c7f0eb
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/vortex_utils.py
@@ -0,0 +1,70 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+from typing import Dict, Optional, Tuple
+from urllib.parse import urlparse
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options.config import OssOptions
+
+
+def to_vortex_specified(file_io: FileIO, file_path: str) -> Tuple[str,
Optional[Dict[str, str]]]:
+ """Convert path and extract storage options for Vortex store.from_url().
+
+ Returns (url, store_kwargs) where store_kwargs can be passed as
+ keyword arguments to ``vortex.store.from_url(url, **store_kwargs)``.
+ For local paths store_kwargs is None.
+ """
+ if hasattr(file_io, 'file_io'):
+ file_io = file_io.file_io()
+
+ if hasattr(file_io, 'get_merged_properties'):
+ properties = file_io.get_merged_properties()
+ else:
+ properties = file_io.properties if hasattr(file_io, 'properties') and
file_io.properties else None
+
+ scheme, _, _ = file_io.parse_location(file_path)
+ file_path_for_vortex = file_io.to_filesystem_path(file_path)
+
+ store_kwargs = None
+
+ if scheme in {'file', None} or not scheme:
+ if not os.path.isabs(file_path_for_vortex):
+ file_path_for_vortex = os.path.abspath(file_path_for_vortex)
+ return file_path_for_vortex, None
+
+ # For remote schemes, keep the original URI so vortex can parse it
+ file_path_for_vortex = file_path
+
+ if scheme == 'oss' and properties:
+ parsed = urlparse(file_path)
+ bucket = parsed.netloc
+
+ store_kwargs = {
+ 'endpoint':
f"https://{bucket}.{properties.get(OssOptions.OSS_ENDPOINT)}",
+ 'access_key_id': properties.get(OssOptions.OSS_ACCESS_KEY_ID),
+ 'secret_access_key':
properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
+ 'virtual_hosted_style_request': 'true',
+ }
+ if properties.contains(OssOptions.OSS_SECURITY_TOKEN):
+ store_kwargs['session_token'] =
properties.get(OssOptions.OSS_SECURITY_TOKEN)
+
+ file_path_for_vortex = file_path_for_vortex.replace('oss://', 's3://')
+
+ return file_path_for_vortex, store_kwargs
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index f9adee4068..41904267f7 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -46,6 +46,7 @@ from pypaimon.read.reader.row_range_filter_record_reader
import RowIdFilterRecor
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
+from pypaimon.read.reader.format_vortex_reader import FormatVortexReader
from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
RowPositionReader,
EmptyRecordBatchReader)
from pypaimon.read.reader.iface.record_reader import RecordReader
@@ -149,6 +150,9 @@ class SplitRead(ABC):
elif file_format == CoreOptions.FILE_FORMAT_LANCE:
format_reader = FormatLanceReader(self.table.file_io, file_path,
read_file_fields,
read_arrow_predicate,
batch_size=batch_size)
+ elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
+ format_reader = FormatVortexReader(self.table.file_io, file_path,
read_file_fields,
+ read_arrow_predicate,
batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index adb0ff4f25..54749a9dd8 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -18,6 +18,7 @@
import os
import shutil
+import sys
import tempfile
import time
import unittest
@@ -102,6 +103,51 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ def test_vortex_ao_reader(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'vortex'})
+ self.catalog.create_table('default.test_append_only_vortex', schema,
False)
+ table = self.catalog.get_table('default.test_append_only_vortex')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ def test_vortex_ao_reader_with_filter(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'vortex'})
+ self.catalog.create_table('default.test_append_only_vortex_filter',
schema, False)
+ table =
self.catalog.get_table('default.test_append_only_vortex_filter')
+ self._write_test_table(table)
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.less_than('user_id', 7)
+ p2 = predicate_builder.greater_or_equal('user_id', 2)
+ p3 = predicate_builder.between('user_id', 0, 6) # [2/b, 3/c, 4/d,
5/e, 6/f] left
+ p4 = predicate_builder.is_not_in('behavior', ['b', 'e']) # [3/c, 4/d,
6/f] left
+ p5 = predicate_builder.is_in('dt', ['p1']) # exclude 3/c
+ p6 = predicate_builder.is_not_null('behavior') # exclude 4/d
+ g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+ read_builder = table.new_read_builder().with_filter(g1)
+ actual = self._read_test_table(read_builder)
+ expected = pa.concat_tables([
+ self.expected.slice(5, 1) # 6/f
+ ])
+ self.assertEqual(actual.sort_by('user_id'), expected)
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ def test_vortex_ao_reader_with_projection(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'vortex'})
+
self.catalog.create_table('default.test_vortex_append_only_projection', schema,
False)
+ table =
self.catalog.get_table('default.test_vortex_append_only_projection')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder().with_projection(['dt',
'user_id'])
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ expected = self.expected.select(['dt', 'user_id'])
+ self.assertEqual(actual, expected)
+
def test_lance_ao_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'lance'})
self.catalog.create_table('default.test_append_only_lance_filter',
schema, False)
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index e0cbe607ca..a58e7b36af 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -179,6 +179,8 @@ class DataWriter(ABC):
self.file_io.write_blob(file_path, data)
elif self.file_format == CoreOptions.FILE_FORMAT_LANCE:
self.file_io.write_lance(file_path, data)
+ elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
+ self.file_io.write_vortex(file_path, data)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")