This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e8c98d9 feat(python): introduce pypaimon core and DataFusion catalog
integration (#204)
e8c98d9 is described below
commit e8c98d9ec0bb82dccf6234909970fa92cb0b396d
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Apr 6 08:40:32 2026 +0800
feat(python): introduce pypaimon core and DataFusion catalog integration
(#204)
---
.github/workflows/ci.yml | 21 ++++
Cargo.toml => .licenserc.yaml | 29 +++---
Cargo.toml | 5 +-
LICENSE | 4 +-
bindings/go/go.mod | 17 +++
bindings/go/tests/go.mod | 17 +++
Cargo.toml => bindings/python/.gitignore | 24 ++---
.../datafusion => bindings/python}/Cargo.toml | 27 +++--
LICENSE => bindings/python/LICENSE | 4 +-
Cargo.toml => bindings/python/Makefile | 23 ++---
bindings/python/NOTICE | 5 +
bindings/python/README.md | 72 +++++++++++++
bindings/python/project-description.md | 69 +++++++++++++
bindings/python/pyproject.toml | 57 +++++++++++
.../python/python/pypaimon_rust/__init__.py | 19 +---
.../python/python/pypaimon_rust/datafusion.pyi | 21 +---
bindings/python/python/pypaimon_rust/py.typed | 0
bindings/python/src/context.rs | 114 +++++++++++++++++++++
bindings/python/src/error.rs | 23 +++++
bindings/python/src/lib.rs | 27 +++++
.../python/tests/test_datafusion.py | 45 ++++----
crates/integrations/datafusion/Cargo.toml | 3 +-
crates/integrations/datafusion/src/catalog.rs | 84 ++++++++-------
crates/integrations/datafusion/src/lib.rs | 1 +
crates/integrations/datafusion/src/runtime.rs | 74 +++++++++++++
crates/integrations/datafusion/src/table/mod.rs | 35 ++++---
crates/paimon/src/io/storage.rs | 2 +-
dev/spark/README.md | 19 ++++
28 files changed, 669 insertions(+), 172 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index bd04db0..ad308be 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -90,6 +90,17 @@ jobs:
steps:
- uses: actions/checkout@v6
+ - name: Rust Cache
+ uses: actions/cache@v4
+ with:
+ path: |
+ ~/.cargo/registry
+ ~/.cargo/git
+ target
+ key: integration-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
+ restore-keys: |
+ integration-${{ runner.os }}-
+
- name: Start Docker containers
run: make docker-up
@@ -105,6 +116,16 @@ jobs:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
+ - name: Install uv
+ uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78
+ with:
+ version: "0.9.3"
+ enable-cache: true
+
+ - name: Python Binding Integration Test
+ working-directory: bindings/python
+ run: make install && make test
+
- name: Go Integration Test
working-directory: bindings/go
run: make test
diff --git a/Cargo.toml b/.licenserc.yaml
similarity index 61%
copy from Cargo.toml
copy to .licenserc.yaml
index 2fea052..c9e9755 100644
--- a/Cargo.toml
+++ b/.licenserc.yaml
@@ -15,21 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"crates/integrations/datafusion"]
+header:
+ license:
+ spdx-id: Apache-2.0
+ copyright-owner: Apache Software Foundation
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/docs/rust/"
-repository = "https://github.com/apache/paimon-rust"
-license = "Apache-2.0"
-rust-version = "1.86.0"
+ paths-ignore:
+ - "LICENSE"
+ - "NOTICE"
+ - ".github/PULL_REQUEST_TEMPLATE.md"
+ - "crates/paimon/tests/**/*.json"
+ - "**/go.sum"
+ - ".devcontainer/devcontainer.json"
+ - "bindings/python/python/pypaimon_rust/py.typed"
-[workspace.dependencies]
-arrow-array = { version = "57.0", features = ["ffi"] }
-arrow-schema = "57.0"
-arrow-cast = "57.0"
-parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+ comment: on-failure
diff --git a/Cargo.toml b/Cargo.toml
index 2fea052..004fb9d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,7 @@
[workspace]
resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"crates/integrations/datafusion"]
+members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"bindings/python", "crates/integrations/datafusion"]
[workspace.package]
version = "0.0.0"
@@ -28,8 +28,11 @@ license = "Apache-2.0"
rust-version = "1.86.0"
[workspace.dependencies]
+arrow = "57.0"
arrow-array = { version = "57.0", features = ["ffi"] }
arrow-schema = "57.0"
arrow-cast = "57.0"
+datafusion = "52.3.0"
+datafusion-ffi = "52.3.0"
parquet = "57.0"
tokio = "1.39.2"
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
index c0ea729..261eeb9 100644
--- a/LICENSE
+++ b/LICENSE
@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2021 Datafuse Labs
+ Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -198,4 +198,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
- limitations under the License.
\ No newline at end of file
+ limitations under the License.
diff --git a/bindings/go/go.mod b/bindings/go/go.mod
index 31cc85a..55eff75 100644
--- a/bindings/go/go.mod
+++ b/bindings/go/go.mod
@@ -1,3 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
module github.com/apache/paimon-rust/bindings/go
go 1.22.4
diff --git a/bindings/go/tests/go.mod b/bindings/go/tests/go.mod
index d2d340a..f8f2457 100644
--- a/bindings/go/tests/go.mod
+++ b/bindings/go/tests/go.mod
@@ -1,3 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
module paimon_test
go 1.22.4
diff --git a/Cargo.toml b/bindings/python/.gitignore
similarity index 61%
copy from Cargo.toml
copy to bindings/python/.gitignore
index 2fea052..ffafbc8 100644
--- a/Cargo.toml
+++ b/bindings/python/.gitignore
@@ -15,21 +15,11 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"crates/integrations/datafusion"]
+.venv/
+.idea/
+__pycache__/
+.pytest_cache/
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/docs/rust/"
-repository = "https://github.com/apache/paimon-rust"
-license = "Apache-2.0"
-rust-version = "1.86.0"
-
-[workspace.dependencies]
-arrow-array = { version = "57.0", features = ["ffi"] }
-arrow-schema = "57.0"
-arrow-cast = "57.0"
-parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+*.dSYM/
+*.so
+uv.lock
diff --git a/crates/integrations/datafusion/Cargo.toml
b/bindings/python/Cargo.toml
similarity index 64%
copy from crates/integrations/datafusion/Cargo.toml
copy to bindings/python/Cargo.toml
index 6dbd34f..617c7f6 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -16,22 +16,21 @@
# under the License.
[package]
-name = "paimon-datafusion"
+name = "pypaimon_rust"
+publish = false
edition.workspace = true
version.workspace = true
license.workspace = true
-homepage = "https://paimon.apache.org/docs/rust/datafusion/"
-documentation = "https://docs.rs/paimon-datafusion"
-description = "Apache Paimon DataFusion Integration"
-categories = ["database"]
-keywords = ["paimon", "datafusion", "integrations"]
-[dependencies]
-async-trait = "0.1"
-chrono = "0.4"
-datafusion = { version = "52.3.0"}
-paimon = { path = "../../paimon" }
-futures = "0.3"
+[lib]
+crate-type = ["cdylib"]
+doc = false
-[dev-dependencies]
-tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
+[dependencies]
+arrow = { workspace = true, features = ["pyarrow"] }
+datafusion = { workspace = true }
+datafusion-ffi = { workspace = true }
+paimon = { path = "../../crates/paimon", features = ["storage-all"] }
+paimon-datafusion = { path = "../../crates/integrations/datafusion" }
+pyo3 = { version = "0.26", features = ["abi3-py310"] }
+tokio = { workspace = true }
diff --git a/LICENSE b/bindings/python/LICENSE
similarity index 99%
copy from LICENSE
copy to bindings/python/LICENSE
index c0ea729..261eeb9 100644
--- a/LICENSE
+++ b/bindings/python/LICENSE
@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2021 Datafuse Labs
+ Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -198,4 +198,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
- limitations under the License.
\ No newline at end of file
+ limitations under the License.
diff --git a/Cargo.toml b/bindings/python/Makefile
similarity index 61%
copy from Cargo.toml
copy to bindings/python/Makefile
index 2fea052..e341632 100644
--- a/Cargo.toml
+++ b/bindings/python/Makefile
@@ -15,21 +15,12 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"crates/integrations/datafusion"]
+build:
+ uv run --no-project maturin develop
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/docs/rust/"
-repository = "https://github.com/apache/paimon-rust"
-license = "Apache-2.0"
-rust-version = "1.86.0"
+install:
+ uv sync --group dev --no-install-project
+ $(MAKE) build
-[workspace.dependencies]
-arrow-array = { version = "57.0", features = ["ffi"] }
-arrow-schema = "57.0"
-arrow-cast = "57.0"
-parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+test:
+ uv run --no-sync pytest
diff --git a/bindings/python/NOTICE b/bindings/python/NOTICE
new file mode 100644
index 0000000..f5a777b
--- /dev/null
+++ b/bindings/python/NOTICE
@@ -0,0 +1,5 @@
+Apache Paimon Rust
+Copyright 2024 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/bindings/python/README.md b/bindings/python/README.md
new file mode 100644
index 0000000..90e184b
--- /dev/null
+++ b/bindings/python/README.md
@@ -0,0 +1,72 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+# PyPaimon Rust
+
+This project builds the Rust-powered core for
[PyPaimon](https://paimon.apache.org/docs/master/pypaimon/overview/) while also
providing DataFusion integration for querying Paimon tables.
+
+## Usage
+
+For DataFusion queries, use the native `SessionContext` and register a
`PaimonCatalog`:
+
+```python
+from datafusion import SessionContext
+from pypaimon_rust.datafusion import PaimonCatalog
+
+catalog = PaimonCatalog({"warehouse": "/path/to/warehouse"})
+ctx = SessionContext()
+ctx.register_catalog_provider("paimon", catalog)
+
+df = ctx.sql("SELECT * FROM paimon.default.my_table")
+df.show()
+```
+
+## Setup
+
+Install [uv](https://docs.astral.sh/uv/getting-started/installation/):
+
+```shell
+pip install uv
+```
+
+Set up the development environment:
+
+```shell
+make install
+```
+
+## Build
+
+```shell
+make build
+```
+
+## Test
+
+Python integration tests expect the shared Paimon test warehouse to be prepared
+first from the repository root:
+
+```shell
+make docker-up
+cd bindings/python
+```
+
+```shell
+make test
+```````
diff --git a/bindings/python/project-description.md
b/bindings/python/project-description.md
new file mode 100644
index 0000000..56e3ad2
--- /dev/null
+++ b/bindings/python/project-description.md
@@ -0,0 +1,69 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+# PyPaimon Rust
+
+This project builds the Rust-powered core for
[PyPaimon](https://paimon.apache.org/docs/master/pypaimon/overview/) while also
providing DataFusion integration for querying Paimon tables.
+
+Install via PyPI:
+
+```
+pip install pypaimon-rust
+```
+
+If you want to use the native Python DataFusion `SessionContext`, install
`datafusion` as well.
+
+## Query Paimon Tables with DataFusion
+
+`pypaimon-rust` provides a `PaimonCatalog` that can be registered into the
native DataFusion `SessionContext`.
+This keeps the standard DataFusion Python API available for regular queries.
+
+```python
+from datafusion import SessionContext
+from pypaimon_rust.datafusion import PaimonCatalog
+
+catalog = PaimonCatalog({
+ "warehouse": "/path/to/warehouse",
+})
+
+ctx = SessionContext()
+ctx.register_catalog_provider("paimon", catalog)
+
+# Query tables via SQL (catalog.database.table)
+df = ctx.sql("SELECT * FROM paimon.default.my_table LIMIT 10")
+df.show()
+```
+
+### REST Catalog
+
+```python
+from datafusion import SessionContext
+from pypaimon_rust.datafusion import PaimonCatalog
+
+catalog = PaimonCatalog({
+ "metastore": "rest",
+ "uri": "http://localhost:8080",
+ "warehouse": "my_warehouse",
+})
+
+ctx = SessionContext()
+ctx.register_catalog_provider("paimon", catalog)
+```
+
+Time travel queries are not supported in the Python binding at this time.
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
new file mode 100644
index 0000000..dd5b153
--- /dev/null
+++ b/bindings/python/pyproject.toml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[build-system]
+build-backend = "maturin"
+requires = ["maturin>=1.9.4,<2.0"]
+
+[project]
+name = "pypaimon-rust"
+readme = "project-description.md"
+requires-python = ">=3.10,<4"
+dynamic = ["version"]
+license = { file = "LICENSE" }
+classifiers = [
+ "Development Status :: 3 - Alpha",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3 :: Only",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Programming Language :: Python :: 3.13",
+ "Programming Language :: Rust",
+]
+
+[tool.maturin]
+module-name = "pypaimon_rust.pypaimon_rust"
+python-source = "python"
+include = [
+ { path = "LICENSE", format = ["sdist", "wheel"] },
+ { path = "NOTICE", format = ["sdist", "wheel"] },
+ { path = "python/pypaimon_rust/py.typed", format = ["sdist", "wheel"] },
+]
+
+[dependency-groups]
+dev = [
+ "maturin>=1.9.4,<2.0",
+ "pytest>=8.0",
+ "pyarrow>=17.0",
+ "datafusion==52.3.0",
+]
diff --git a/Cargo.toml b/bindings/python/python/pypaimon_rust/__init__.py
similarity index 61%
copy from Cargo.toml
copy to bindings/python/python/pypaimon_rust/__init__.py
index 2fea052..b36002b 100644
--- a/Cargo.toml
+++ b/bindings/python/python/pypaimon_rust/__init__.py
@@ -15,21 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"crates/integrations/datafusion"]
-
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/docs/rust/"
-repository = "https://github.com/apache/paimon-rust"
-license = "Apache-2.0"
-rust-version = "1.86.0"
-
-[workspace.dependencies]
-arrow-array = { version = "57.0", features = ["ffi"] }
-arrow-schema = "57.0"
-arrow-cast = "57.0"
-parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+from .pypaimon_rust import *
diff --git a/Cargo.toml b/bindings/python/python/pypaimon_rust/datafusion.pyi
similarity index 61%
copy from Cargo.toml
copy to bindings/python/python/pypaimon_rust/datafusion.pyi
index 2fea052..3ca0617 100644
--- a/Cargo.toml
+++ b/bindings/python/python/pypaimon_rust/datafusion.pyi
@@ -15,21 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"crates/integrations/datafusion"]
+from typing import Any, Dict
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/docs/rust/"
-repository = "https://github.com/apache/paimon-rust"
-license = "Apache-2.0"
-rust-version = "1.86.0"
-
-[workspace.dependencies]
-arrow-array = { version = "57.0", features = ["ffi"] }
-arrow-schema = "57.0"
-arrow-cast = "57.0"
-parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+class PaimonCatalog:
+ def __init__(self, catalog_options: Dict[str, str]) -> None: ...
+ def __datafusion_catalog_provider__(self, session: Any) -> object: ...
diff --git a/bindings/python/python/pypaimon_rust/py.typed
b/bindings/python/python/pypaimon_rust/py.typed
new file mode 100644
index 0000000..e69de29
diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs
new file mode 100644
index 0000000..6631ff6
--- /dev/null
+++ b/bindings/python/src/context.rs
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::ptr::NonNull;
+use std::sync::Arc;
+
+use datafusion::catalog::CatalogProvider;
+use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
+use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use paimon::{CatalogFactory, Options};
+use paimon_datafusion::PaimonCatalogProvider;
+use pyo3::exceptions::PyValueError;
+use pyo3::prelude::*;
+use pyo3::types::PyCapsule;
+
+use crate::error::to_py_err;
+use paimon_datafusion::runtime::runtime;
+
+fn build_paimon_catalog_provider(
+ catalog_options: HashMap<String, String>,
+) -> PyResult<Arc<PaimonCatalogProvider>> {
+ let rt = runtime();
+ rt.block_on(async {
+ let options = Options::from_map(catalog_options);
+ let catalog =
CatalogFactory::create(options).await.map_err(to_py_err)?;
+ Ok::<_, PyErr>(Arc::new(PaimonCatalogProvider::new(catalog)))
+ })
+}
+
+fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>) ->
PyResult<FFI_LogicalExtensionCodec> {
+ let attr_name = "__datafusion_logical_extension_codec__";
+ let capsule = if obj.hasattr(attr_name)? {
+ obj.getattr(attr_name)?.call0()?
+ } else {
+ obj
+ };
+
+ let capsule = capsule.cast::<PyCapsule>()?;
+ let expected_name = c"datafusion_logical_extension_codec";
+ match capsule.name()? {
+ Some(name) if name == expected_name => {}
+ Some(name) => {
+ return Err(PyValueError::new_err(format!(
+ "Expected capsule named {expected_name:?}, got {name:?}"
+ )));
+ }
+ None => {
+ return Err(PyValueError::new_err(format!(
+ "Expected capsule named {expected_name:?}, got unnamed capsule"
+ )));
+ }
+ }
+
+ let data =
NonNull::new(capsule.pointer().cast::<FFI_LogicalExtensionCodec>())
+ .ok_or_else(|| PyValueError::new_err("Null logical extension codec
capsule pointer"))?;
+ let codec = unsafe { data.as_ref() };
+
+ Ok(codec.clone())
+}
+
+/// A Paimon catalog exportable to Python DataFusion `SessionContext`.
+#[pyclass(name = "PaimonCatalog")]
+pub struct PaimonCatalog {
+ provider: Arc<PaimonCatalogProvider>,
+}
+
+#[pymethods]
+impl PaimonCatalog {
+ /// Create a Paimon catalog that can be registered into a DataFusion
session.
+ #[new]
+ fn new(catalog_options: HashMap<String, String>) -> PyResult<Self> {
+ Ok(Self {
+ provider: build_paimon_catalog_provider(catalog_options)?,
+ })
+ }
+
+ /// Export this catalog as a DataFusion catalog provider PyCapsule.
+ fn __datafusion_catalog_provider__<'py>(
+ &self,
+ py: Python<'py>,
+ session: Bound<'py, PyAny>,
+ ) -> PyResult<Bound<'py, PyCapsule>> {
+ let name = cr"datafusion_catalog_provider".into();
+ let provider = Arc::clone(&self.provider) as Arc<dyn CatalogProvider +
Send>;
+ let codec = ffi_logical_codec_from_pycapsule(session)?;
+ let provider = FFI_CatalogProvider::new_with_ffi_codec(provider,
Some(runtime()), codec);
+ PyCapsule::new(py, provider, Some(name))
+ }
+}
+
+pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) ->
PyResult<()> {
+ let this = PyModule::new(py, "datafusion")?;
+ this.add_class::<PaimonCatalog>()?;
+ m.add_submodule(&this)?;
+ py.import("sys")?
+ .getattr("modules")?
+ .set_item("pypaimon_rust.datafusion", this)?;
+ Ok(())
+}
diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs
new file mode 100644
index 0000000..05dbe2e
--- /dev/null
+++ b/bindings/python/src/error.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::exceptions::PyValueError;
+use pyo3::PyErr;
+
+pub fn to_py_err(err: paimon::Error) -> PyErr {
+ PyValueError::new_err(err.to_string())
+}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
new file mode 100644
index 0000000..326796d
--- /dev/null
+++ b/bindings/python/src/lib.rs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+
+mod context;
+mod error;
+
+#[pymodule]
+fn pypaimon_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
+ context::register_module(py, m)?;
+ Ok(())
+}
diff --git a/Cargo.toml b/bindings/python/tests/test_datafusion.py
similarity index 52%
copy from Cargo.toml
copy to bindings/python/tests/test_datafusion.py
index 2fea052..919ef55 100644
--- a/Cargo.toml
+++ b/bindings/python/tests/test_datafusion.py
@@ -15,21 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"crates/integrations/datafusion"]
-
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/docs/rust/"
-repository = "https://github.com/apache/paimon-rust"
-license = "Apache-2.0"
-rust-version = "1.86.0"
-
-[workspace.dependencies]
-arrow-array = { version = "57.0", features = ["ffi"] }
-arrow-schema = "57.0"
-arrow-cast = "57.0"
-parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+import os
+
+import pyarrow as pa
+from datafusion import SessionContext
+
+from pypaimon_rust.datafusion import PaimonCatalog
+
+WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE", "/tmp/paimon-warehouse")
+
+
+def extract_rows(batches):
+ table = pa.Table.from_batches(batches)
+ return sorted(zip(table["id"].to_pylist(), table["name"].to_pylist()))
+
+
+def test_query_simple_table_via_catalog_provider():
+ catalog = PaimonCatalog({"warehouse": WAREHOUSE})
+ ctx = SessionContext()
+ ctx.register_catalog_provider("paimon", catalog)
+
+ df = ctx.sql("SELECT id, name FROM paimon.default.simple_log_table")
+
+ assert extract_rows(df.collect()) == [
+ (1, "alice"),
+ (2, "bob"),
+ (3, "carol"),
+ ]
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 6dbd34f..4fdff40 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -29,9 +29,10 @@ keywords = ["paimon", "datafusion", "integrations"]
[dependencies]
async-trait = "0.1"
chrono = "0.4"
-datafusion = { version = "52.3.0"}
+datafusion = { workspace = true }
paimon = { path = "../../paimon" }
futures = "0.3"
+tokio = { workspace = true, features = ["rt", "time", "fs"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/catalog.rs
b/crates/integrations/datafusion/src/catalog.rs
index af5aa3d..626a47f 100644
--- a/crates/integrations/datafusion/src/catalog.rs
+++ b/crates/integrations/datafusion/src/catalog.rs
@@ -28,6 +28,7 @@ use datafusion::error::Result as DFResult;
use paimon::catalog::{Catalog, Identifier};
use crate::error::to_datafusion_error;
+use crate::runtime::{await_with_runtime, block_on_with_runtime};
use crate::table::PaimonTableProvider;
/// Provides an interface to manage and access multiple schemas (databases)
@@ -61,22 +62,29 @@ impl CatalogProvider for PaimonCatalogProvider {
}
fn schema_names(&self) -> Vec<String> {
- futures::executor::block_on(async {
- self.catalog.list_databases().await.unwrap_or_default()
- })
+ let catalog = Arc::clone(&self.catalog);
+ block_on_with_runtime(
+ async move { catalog.list_databases().await.unwrap_or_default() },
+ "paimon catalog access thread panicked",
+ )
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
- futures::executor::block_on(async {
- match self.catalog.get_database(name).await {
- Ok(_) => Some(Arc::new(PaimonSchemaProvider::new(
- self.catalog.clone(),
- name.to_string(),
- )) as Arc<dyn SchemaProvider>),
- Err(paimon::Error::DatabaseNotExist { .. }) => None,
- Err(_) => None,
- }
- })
+ let catalog = Arc::clone(&self.catalog);
+ let name = name.to_string();
+ block_on_with_runtime(
+ async move {
+ match catalog.get_database(&name).await {
+ Ok(_) => Some(
+
Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
+ as Arc<dyn SchemaProvider>,
+ ),
+ Err(paimon::Error::DatabaseNotExist { .. }) => None,
+ Err(_) => None,
+ }
+ },
+ "paimon catalog access thread panicked",
+ )
}
}
@@ -113,36 +121,42 @@ impl SchemaProvider for PaimonSchemaProvider {
}
fn table_names(&self) -> Vec<String> {
- // Use blocking call to fetch table names synchronously.
- // This is acceptable as table name listing is lightweight.
- futures::executor::block_on(async {
- self.catalog
- .list_tables(&self.database)
- .await
- .unwrap_or_default()
- })
+ let catalog = Arc::clone(&self.catalog);
+ let database = self.database.clone();
+ block_on_with_runtime(
+ async move {
catalog.list_tables(&database).await.unwrap_or_default() },
+ "paimon catalog access thread panicked",
+ )
}
async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn
TableProvider>>> {
+ let catalog = Arc::clone(&self.catalog);
let identifier = Identifier::new(self.database.clone(), name);
- match self.catalog.get_table(&identifier).await {
- Ok(table) => {
- let provider = PaimonTableProvider::try_new(table)?;
- Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
+ await_with_runtime(async move {
+ match catalog.get_table(&identifier).await {
+ Ok(table) => {
+ let provider = PaimonTableProvider::try_new(table)?;
+ Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
+ }
+ Err(paimon::Error::TableNotExist { .. }) => Ok(None),
+ Err(e) => Err(to_datafusion_error(e)),
}
- Err(paimon::Error::TableNotExist { .. }) => Ok(None),
- Err(e) => Err(to_datafusion_error(e)),
- }
+ })
+ .await
}
fn table_exist(&self, name: &str) -> bool {
+ let catalog = Arc::clone(&self.catalog);
let identifier = Identifier::new(self.database.clone(), name);
- futures::executor::block_on(async {
- match self.catalog.get_table(&identifier).await {
- Ok(_) => true,
- Err(paimon::Error::TableNotExist { .. }) => false,
- Err(_) => false,
- }
- })
+ block_on_with_runtime(
+ async move {
+ match catalog.get_table(&identifier).await {
+ Ok(_) => true,
+ Err(paimon::Error::TableNotExist { .. }) => false,
+ Err(_) => false,
+ }
+ },
+ "paimon catalog access thread panicked",
+ )
}
}
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index 7aab461..8454bf7 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -41,6 +41,7 @@ mod error;
mod filter_pushdown;
mod physical_plan;
mod relation_planner;
+pub mod runtime;
mod table;
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
diff --git a/crates/integrations/datafusion/src/runtime.rs
b/crates/integrations/datafusion/src/runtime.rs
new file mode 100644
index 0000000..4da502a
--- /dev/null
+++ b/crates/integrations/datafusion/src/runtime.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::future::Future;
+use std::sync::OnceLock;
+
+use tokio::runtime::{Handle, Runtime};
+
+static RUNTIME: OnceLock<Runtime> = OnceLock::new();
+
+fn global_runtime() -> &'static Runtime {
+ RUNTIME.get_or_init(|| {
+ Runtime::new()
+ .expect("failed to build global tokio runtime for paimon
datafusion integration")
+ })
+}
+
+/// Returns a [`Handle`] to the global Tokio runtime.
+///
+/// If a Tokio runtime is already entered on the current thread, its handle is
+/// returned directly. Otherwise a lazily-initialised global runtime is used.
+pub fn runtime() -> Handle {
+ match Handle::try_current() {
+ Ok(h) => h,
+ _ => global_runtime().handle().clone(),
+ }
+}
+
+// These helpers work around DataFusion FFI callbacks that may run without an
+// entered Tokio runtime. See
https://github.com/apache/datafusion/issues/16312.
+// A global OnceLock<Runtime> avoids creating a new runtime on every call;
+// if DataFusion fixes runtime propagation end-to-end, we should be able to
+// remove these manual fallback runtimes.
+pub(crate) async fn await_with_runtime<F>(future: F) -> F::Output
+where
+ F: Future,
+{
+ if Handle::try_current().is_ok() {
+ future.await
+ } else {
+ global_runtime().block_on(future)
+ }
+}
+
+// The blocking variant is for synchronous DataFusion FFI callbacks such as
+// CatalogProvider::schema(), where we cannot `.await` directly.
+pub(crate) fn block_on_with_runtime<F>(future: F, panic_error: &'static str)
-> F::Output
+where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+{
+ if Handle::try_current().is_ok() {
+ let handle = global_runtime().handle().clone();
+ std::thread::spawn(move || handle.block_on(future))
+ .join()
+ .expect(panic_error)
+ } else {
+ global_runtime().block_on(future)
+ }
+}
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index 26a50c9..3c4a98c 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -32,6 +32,7 @@ use paimon::table::Table;
use crate::error::to_datafusion_error;
use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown};
use crate::physical_plan::PaimonTableScan;
+use crate::runtime::await_with_runtime;
/// Read-only table provider for a Paimon table.
///
@@ -82,19 +83,6 @@ impl TableProvider for PaimonTableProvider {
TableType::Base
}
- fn supports_filters_pushdown(
- &self,
- filters: &[&Expr],
- ) -> DFResult<Vec<TableProviderFilterPushDown>> {
- let fields = self.table.schema().fields();
- let partition_keys = self.table.schema().partition_keys();
-
- Ok(filters
- .iter()
- .map(|filter| classify_filter_pushdown(filter, fields,
partition_keys))
- .collect())
- }
-
async fn scan(
&self,
state: &dyn Session,
@@ -125,7 +113,13 @@ impl TableProvider for PaimonTableProvider {
read_builder.with_limit(limit);
}
let scan = read_builder.new_scan();
- let plan = scan.plan().await.map_err(to_datafusion_error)?;
+ // DataFusion's Python FFI may poll `TableProvider::scan()` without an
active
+ // Tokio runtime. `scan.plan()` can reach OpenDAL/Tokio filesystem
calls while
+ // reading Paimon metadata, so we must provide a runtime here instead
of
+ // assuming the caller already entered one.
+ let plan = await_with_runtime(scan.plan())
+ .await
+ .map_err(to_datafusion_error)?;
// Distribute splits across DataFusion partitions, capped by the
// session's target_partitions to avoid over-sharding with many small
splits.
@@ -151,6 +145,19 @@ impl TableProvider for PaimonTableProvider {
limit,
)))
}
+
+ fn supports_filters_pushdown(
+ &self,
+ filters: &[&Expr],
+ ) -> DFResult<Vec<TableProviderFilterPushDown>> {
+ let fields = self.table.schema().fields();
+ let partition_keys = self.table.schema().partition_keys();
+
+ Ok(filters
+ .iter()
+ .map(|filter| classify_filter_pushdown(filter, fields,
partition_keys))
+ .collect())
+ }
}
#[cfg(test)]
diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs
index 31eab86..ccef982 100644
--- a/crates/paimon/src/io/storage.rs
+++ b/crates/paimon/src/io/storage.rs
@@ -150,7 +150,7 @@ impl Storage {
}
#[cfg(feature = "storage-s3")]
- fn s3_bucket_and_relative_path<'a>(path: &'a str) ->
crate::Result<(String, &'a str)> {
+ fn s3_bucket_and_relative_path(path: &str) -> crate::Result<(String,
&str)> {
let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
message: format!("Invalid S3 url: {path}"),
})?;
diff --git a/dev/spark/README.md b/dev/spark/README.md
index f125c1f..519a526 100644
--- a/dev/spark/README.md
+++ b/dev/spark/README.md
@@ -1,3 +1,22 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
# Spark Provisioning for Integration Tests
This directory contains the Spark + Paimon setup that provisions test tables
into `/tmp/paimon-warehouse`.