This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9227cf7140 [python] Introduce DataFusion SQL to PyPaimon (#7599)
9227cf7140 is described below
commit 9227cf71404ca22e2b336876f98ce74022aefd0e
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 6 20:10:49 2026 +0800
[python] Introduce DataFusion SQL to PyPaimon (#7599)
PR has introduced PyPaimon with SQL query capabilities based on
PyPaimon-rust + DataFusion.
---
.github/workflows/paimon-python-checks.yml | 14 +-
docs/content/pypaimon/cli.md | 102 ++++++++
docs/content/pypaimon/sql.md | 168 ++++++++++++
paimon-python/pypaimon/__init__.py | 2 +
paimon-python/pypaimon/cli/cli.py | 4 +
paimon-python/pypaimon/cli/cli_sql.py | 313 +++++++++++++++++++++++
paimon-python/pypaimon/{ => sql}/__init__.py | 25 +-
paimon-python/pypaimon/sql/sql_context.py | 81 ++++++
paimon-python/pypaimon/tests/sql_context_test.py | 152 +++++++++++
paimon-python/setup.py | 4 +
10 files changed, 845 insertions(+), 20 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 0154c637f6..fde69953ee 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -71,6 +71,8 @@ jobs:
build-essential \
git \
curl \
+ pkg-config \
+ libssl-dev \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
@@ -139,12 +141,22 @@ jobs:
if: matrix.python-version != '3.6.15'
shell: bash
run: |
- pip install maturin
+ pip install maturin[patchelf]
git clone -b support_directory
https://github.com/JingsongLi/tantivy-py.git /tmp/tantivy-py
cd /tmp/tantivy-py
maturin build --release
pip install target/wheels/tantivy-*.whl
+ - name: Build and install pypaimon-rust from source
+ if: matrix.python-version != '3.6.15'
+ shell: bash
+ run: |
+ git clone https://github.com/apache/paimon-rust.git /tmp/paimon-rust
+ cd /tmp/paimon-rust/bindings/python
+ maturin build --release -o dist
+ pip install dist/pypaimon_rust-*.whl
+ pip install 'datafusion>=52'
+
- name: Run lint-python.sh
shell: bash
run: |
diff --git a/docs/content/pypaimon/cli.md b/docs/content/pypaimon/cli.md
index a328fc4744..6485ca5876 100644
--- a/docs/content/pypaimon/cli.md
+++ b/docs/content/pypaimon/cli.md
@@ -621,3 +621,105 @@ default
mydb
analytics
```
+
+## SQL Command
+
+Execute SQL queries on Paimon tables directly from the command line. This
feature is powered by pypaimon-rust and DataFusion.
+
+**Prerequisites:**
+
+```shell
+pip install pypaimon[sql]
+```
+
+### One-Shot Query
+
+Execute a single SQL query and display the result:
+
+```shell
+paimon sql "SELECT * FROM users LIMIT 10"
+```
+
+Output:
+```
+ id name age city
+ 1 Alice 25 Beijing
+ 2 Bob 30 Shanghai
+ 3 Charlie 35 Guangzhou
+```
+
+**Options:**
+
+- `--format, -f`: Output format: `table` (default) or `json`
+
+**Examples:**
+
+```shell
+# Direct table name (uses default catalog and database)
+paimon sql "SELECT * FROM users"
+
+# Two-part: database.table
+paimon sql "SELECT * FROM mydb.users"
+
+# Query with filter and aggregation
+paimon sql "SELECT city, COUNT(*) AS cnt FROM users GROUP BY city ORDER BY cnt
DESC"
+
+# Output as JSON
+paimon sql "SELECT * FROM users LIMIT 5" --format json
+```
+
+### Interactive REPL
+
+Start an interactive SQL session by running `paimon sql` without a query
argument. The REPL supports arrow keys for line editing, and command history is
persisted across sessions in `~/.paimon_history`.
+
+```shell
+paimon sql
+```
+
+Output:
+```
+ ____ _
+ / __ \____ _(_)___ ___ ____ ____
+ / /_/ / __ `/ / __ `__ \/ __ \/ __ \
+ / ____/ /_/ / / / / / / / /_/ / / / /
+/_/ \__,_/_/_/ /_/ /_/\____/_/ /_/
+
+ Powered by pypaimon-rust + DataFusion
+ Type 'help' for usage, 'exit' to quit.
+
+paimon> SHOW DATABASES;
+default
+mydb
+
+paimon> USE mydb;
+Using database 'mydb'.
+
+paimon> SHOW TABLES;
+orders
+users
+
+paimon> SELECT count(*) AS cnt
+ > FROM users
+ > WHERE age > 18;
+ cnt
+ 42
+(1 row in 0.05s)
+
+paimon> exit
+Bye!
+```
+
+SQL statements end with `;` and can span multiple lines. The continuation
prompt ` >` indicates that more input is expected.
+
+**REPL Commands:**
+
+| Command | Description |
+|---|---|
+| `USE <database>;` | Switch the default database |
+| `SHOW DATABASES;` | List all databases |
+| `SHOW TABLES;` | List tables in the current database |
+| `SELECT ...;` | Execute a SQL query |
+| `help` | Show usage information |
+| `exit` / `quit` | Exit the REPL |
+
+For more details on SQL syntax and the Python API, see [SQL Query]({{< ref
"pypaimon/sql" >}}).
diff --git a/docs/content/pypaimon/sql.md b/docs/content/pypaimon/sql.md
new file mode 100644
index 0000000000..88d08a01d0
--- /dev/null
+++ b/docs/content/pypaimon/sql.md
@@ -0,0 +1,168 @@
+---
+title: "SQL Query"
+weight: 8
+type: docs
+aliases:
+ - /pypaimon/sql.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# SQL Query
+
+PyPaimon supports executing SQL queries on Paimon tables, powered by
[pypaimon-rust](https://github.com/apache/paimon-rust/tree/main/bindings/python)
and [DataFusion](https://datafusion.apache.org/python/).
+
+## Installation
+
+SQL query support requires additional dependencies. Install them with:
+
+```shell
+pip install pypaimon[sql]
+```
+
+This will install `pypaimon-rust` and `datafusion`.
+
+## Usage
+
+Create a `SQLContext`, register one or more catalogs with their options, and
run SQL queries.
+
+### Basic Query
+
+```python
+from pypaimon.sql import SQLContext
+
+ctx = SQLContext()
+ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
+ctx.set_current_catalog("paimon")
+ctx.set_current_database("default")
+
+# Execute SQL and get PyArrow Table
+table = ctx.sql("SELECT * FROM my_table")
+print(table)
+
+# Convert to Pandas DataFrame
+df = table.to_pandas()
+print(df)
+```
+
+### Table Reference Format
+
+The default catalog and default database can be configured via
`set_current_catalog()` and `set_current_database()`, so you can reference
tables in two ways:
+
+```python
+# Direct table name (uses default database)
+ctx.sql("SELECT * FROM my_table")
+
+# Two-part: database.table
+ctx.sql("SELECT * FROM mydb.my_table")
+```
+
+### Filtering
+
+```python
+table = ctx.sql("""
+ SELECT id, name, age
+ FROM users
+ WHERE age > 18 AND city = 'Beijing'
+""")
+```
+
+### Aggregation
+
+```python
+table = ctx.sql("""
+ SELECT city, COUNT(*) AS cnt, AVG(age) AS avg_age
+ FROM users
+ GROUP BY city
+ ORDER BY cnt DESC
+""")
+```
+
+### Join
+
+```python
+table = ctx.sql("""
+ SELECT u.name, o.order_id, o.amount
+ FROM users u
+ JOIN orders o ON u.id = o.user_id
+ WHERE o.amount > 100
+""")
+```
+
+### Subquery
+
+```python
+table = ctx.sql("""
+ SELECT * FROM users
+ WHERE id IN (
+ SELECT user_id FROM orders
+ WHERE amount > 1000
+ )
+""")
+```
+
+### Cross-Database Query
+
+```python
+# Query a table in another database using two-part syntax
+table = ctx.sql("""
+ SELECT u.name, o.amount
+ FROM default.users u
+ JOIN analytics.orders o ON u.id = o.user_id
+""")
+```
+
+### Multi-Catalog Query
+
+`SQLContext` supports registering multiple catalogs for cross-catalog queries:
+
+```python
+from pypaimon.sql import SQLContext
+
+ctx = SQLContext()
+ctx.register_catalog("a", {"warehouse": "/path/to/warehouse_a"})
+ctx.register_catalog("b", {
+ "metastore": "rest",
+ "uri": "http://localhost:8080",
+ "warehouse": "warehouse_b",
+})
+ctx.set_current_catalog("a")
+ctx.set_current_database("default")
+
+# Cross-catalog join
+table = ctx.sql("""
+ SELECT a_users.name, b_orders.amount
+ FROM a.default.users AS a_users
+ JOIN b.default.orders AS b_orders ON a_users.id = b_orders.user_id
+""")
+```
+
+## Supported SQL Syntax
+
+The SQL engine is powered by Apache DataFusion, which supports a rich set of
SQL syntax including:
+
+- `SELECT`, `WHERE`, `GROUP BY`, `HAVING`, `ORDER BY`, `LIMIT`
+- `JOIN` (INNER, LEFT, RIGHT, FULL, CROSS)
+- Subqueries and CTEs (`WITH`)
+- Aggregate functions (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`, etc.)
+- Window functions (`ROW_NUMBER`, `RANK`, `LAG`, `LEAD`, etc.)
+- `UNION`, `INTERSECT`, `EXCEPT`
+
+For the full SQL reference, see the [DataFusion SQL
documentation](https://datafusion.apache.org/user-guide/sql/index.html).
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/__init__.py
index 77965c3a14..e07179fb28 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -28,6 +28,7 @@ from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
from pypaimon.schema.schema import Schema
from pypaimon.tag.tag import Tag
from pypaimon.tag.tag_manager import TagManager
+from pypaimon.sql.sql_context import SQLContext
__all__ = [
"PaimonVirtualFileSystem",
@@ -35,4 +36,5 @@ __all__ = [
"Schema",
"Tag",
"TagManager",
+ "SQLContext",
]
diff --git a/paimon-python/pypaimon/cli/cli.py
b/paimon-python/pypaimon/cli/cli.py
index 2ffdbd206d..37a0d3cbb8 100644
--- a/paimon-python/pypaimon/cli/cli.py
+++ b/paimon-python/pypaimon/cli/cli.py
@@ -121,6 +121,10 @@ def main():
from pypaimon.cli.cli_catalog import add_catalog_subcommands
add_catalog_subcommands(catalog_parser)
+ # SQL command
+ from pypaimon.cli.cli_sql import add_sql_subcommand
+ add_sql_subcommand(subparsers)
+
args = parser.parse_args()
if args.command is None:
diff --git a/paimon-python/pypaimon/cli/cli_sql.py
b/paimon-python/pypaimon/cli/cli_sql.py
new file mode 100644
index 0000000000..170cda8710
--- /dev/null
+++ b/paimon-python/pypaimon/cli/cli_sql.py
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+SQL commands for Paimon CLI.
+
+This module provides SQL query capability via pypaimon-rust + DataFusion.
+"""
+
+import os
+import re
+import sys
+import time
+
+_PAIMON_BANNER = r"""
+ ____ _
+ / __ \____ _(_)___ ___ ____ ____
+ / /_/ / __ `/ / __ `__ \/ __ \/ __ \
+ / ____/ /_/ / / / / / / / /_/ / / / /
+/_/ \__,_/_/_/ /_/ /_/\____/_/ /_/
+
+ Powered by pypaimon-rust + DataFusion
+ Type 'help' for usage, 'exit' to quit.
+"""
+
+_USE_PATTERN = re.compile(
+ r"^\s*use\s+(\w+)\s*;?\s*$",
+ re.IGNORECASE,
+)
+
+_HISTORY_FILE = os.path.expanduser("~/.paimon_history")
+_HISTORY_MAX_LENGTH = 1000
+
+_PROMPT = "paimon> "
+_CONTINUATION_PROMPT = " > "
+
+
+def _get_readline():
+ """Get the best available readline module.
+
+ Prefers gnureadline (full GNU readline) over the built-in readline
+ (which is libedit on macOS and may have limited features).
+ """
+ try:
+ import gnureadline as readline
+ return readline
+ except ImportError:
+ pass
+ try:
+ import readline
+ return readline
+ except ImportError:
+ return None
+
+
+def _is_libedit(rl):
+ """Check if the readline module is backed by libedit (macOS default)."""
+ return hasattr(rl, '__doc__') and rl.__doc__ and 'libedit' in rl.__doc__
+
+
+def _setup_readline():
+ """Enable readline for arrow key support and persistent command history."""
+ rl = _get_readline()
+ if rl is None:
+ return
+ rl.set_history_length(_HISTORY_MAX_LENGTH)
+ if not os.path.exists(_HISTORY_FILE):
+ return
+ if _is_libedit(rl):
+ # libedit escapes spaces as \040 in history files, so we load manually.
+ with open(_HISTORY_FILE, 'r', encoding='utf-8') as f:
+ for line in f:
+ line = line.rstrip('\n')
+ if line:
+ rl.add_history(line)
+ else:
+ rl.read_history_file(_HISTORY_FILE)
+
+
+def _save_history():
+ """Save readline history to file."""
+ rl = _get_readline()
+ if rl is None:
+ return
+ try:
+ if _is_libedit(rl):
+ # Write history manually to avoid libedit's \040 escaping.
+ length = rl.get_current_history_length()
+ lines = []
+ for i in range(1, length + 1):
+ item = rl.get_history_item(i)
+ if item is not None:
+ lines.append(item)
+ # Keep only the last N entries
+ lines = lines[-_HISTORY_MAX_LENGTH:]
+ with open(_HISTORY_FILE, 'w', encoding='utf-8') as f:
+ for line in lines:
+ f.write(line + '\n')
+ else:
+ rl.write_history_file(_HISTORY_FILE)
+ except OSError:
+ pass
+
+
+def cmd_sql(args):
+ """
+ Execute the 'sql' command.
+
+ Runs a SQL query against Paimon tables, or starts an interactive SQL REPL.
+
+ Args:
+ args: Parsed command line arguments.
+ """
+ from pypaimon.cli.cli import load_catalog_config
+
+ config_path = args.config
+ config = load_catalog_config(config_path)
+
+ try:
+ from pypaimon.sql.sql_context import SQLContext
+ catalog_options = {str(k): str(v) for k, v in config.items()}
+ ctx = SQLContext()
+ ctx.register_catalog("paimon", catalog_options)
+ ctx.set_current_catalog("paimon")
+ ctx.set_current_database("default")
+ except Exception as e:
+ print(f"Error: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ query = args.query
+ if query:
+ _execute_query(ctx, query, getattr(args, 'format', 'table'))
+ else:
+ _interactive_repl(ctx, getattr(args, 'format', 'table'))
+
+
+def _execute_query(ctx, query, output_format):
+ """Execute a single SQL query and print the result."""
+ try:
+ table = ctx.sql(query)
+ except Exception as e:
+ print(f"Error: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ _print_table(table, output_format)
+
+
+def _print_table(table, output_format, elapsed=None):
+ """Print a PyArrow Table in the requested format."""
+ df = table.to_pandas()
+ if output_format == 'json':
+ import json
+ print(json.dumps(df.to_dict(orient='records'), ensure_ascii=False))
+ else:
+ print(df.to_string(index=False))
+
+ if elapsed is not None:
+ row_count = len(df)
+ print(f"({row_count} {'row' if row_count == 1 else 'rows'} in
{elapsed:.2f}s)")
+
+
+def _read_multiline_query():
+ """Read a potentially multi-line SQL query, terminated by ';'.
+
+ Returns the complete query string, or None on EOF/interrupt.
+ """
+ lines = []
+ prompt = _PROMPT
+ while True:
+ try:
+ line = input(prompt)
+ except (EOFError, KeyboardInterrupt):
+ if lines:
+ # Cancel current multi-line input
+ print()
+ return ""
+ return None
+
+ lines.append(line)
+ joined = "\n".join(lines).strip()
+
+ if not joined:
+ lines.clear()
+ prompt = _PROMPT
+ continue
+
+ # Single-word commands that don't need ';'
+ lower = joined.lower().rstrip(';').strip()
+ if lower in ('exit', 'quit', 'help'):
+ return joined
+
+ # USE command doesn't strictly need ';'
+ if _USE_PATTERN.match(joined):
+ return joined
+
+ # For SQL statements, wait for ';'
+ if joined.endswith(';'):
+ return joined
+
+ prompt = _CONTINUATION_PROMPT
+
+
+def _handle_use(ctx, match):
+ """Handle USE <database> command."""
+ database = match.group(1)
+ try:
+ ctx.set_current_database(database)
+ print(f"Using database '{database}'.")
+ except Exception as e:
+ print(f"Error: {e}", file=sys.stderr)
+
+
+def _interactive_repl(ctx, output_format):
+ """Run an interactive SQL REPL."""
+ _setup_readline()
+ print(_PAIMON_BANNER)
+
+ try:
+ while True:
+ query = _read_multiline_query()
+ if query is None:
+ print("\nBye!")
+ break
+
+ if not query:
+ continue
+
+ lower = query.lower().rstrip(';').strip()
+ if lower in ('exit', 'quit'):
+ print("Bye!")
+ break
+ if lower == 'help':
+ _print_help()
+ continue
+
+ # Handle USE <database>
+ use_match = _USE_PATTERN.match(query)
+ if use_match:
+ _handle_use(ctx, use_match)
+ continue
+
+ try:
+ start = time.time()
+ table = ctx.sql(query)
+ elapsed = time.time() - start
+ _print_table(table, output_format, elapsed)
+ print()
+ except Exception as e:
+ print(f"Error: {e}\n", file=sys.stderr)
+ finally:
+ _save_history()
+
+
+def _print_help():
+ """Print REPL help information."""
+ print("""
+Commands:
+ USE <database>; Switch the default database
+ SHOW DATABASES; List all databases
+ SHOW TABLES; List tables in the current database
+ SELECT ... FROM <table>; Execute a SQL query
+ exit / quit Exit the REPL
+
+Table reference:
+ <table> Table in the current default database
+ <database>.<table> Table in a specific database
+
+Tips:
+ - SQL statements end with ';' and can span multiple lines
+ - Arrow keys are supported for line editing and command history
+ - Command history is saved across sessions (~/.paimon_history)
+""")
+
+
+def add_sql_subcommand(subparsers):
+ """
+ Add the sql subcommand to the main parser.
+
+ Args:
+ subparsers: The subparsers object from the main argument parser.
+ """
+ sql_parser = subparsers.add_parser(
+ 'sql',
+ help='Execute SQL queries on Paimon tables (requires pypaimon-rust)'
+ )
+ sql_parser.add_argument(
+ 'query',
+ nargs='?',
+ default=None,
+ help='SQL query to execute. If omitted, starts interactive REPL.'
+ )
+ sql_parser.add_argument(
+ '--format', '-f',
+ type=str,
+ choices=['table', 'json'],
+ default='table',
+ help='Output format: table (default) or json'
+ )
+ sql_parser.set_defaults(func=cmd_sql)
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/sql/__init__.py
similarity index 60%
copy from paimon-python/pypaimon/__init__.py
copy to paimon-python/pypaimon/sql/__init__.py
index 77965c3a14..e059c025a6 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/sql/__init__.py
@@ -15,24 +15,11 @@
# specific language governing permissions and limitations
# under the License.
-import sys
+__all__ = ['SQLContext']
-if sys.version_info[:2] == (3, 6):
- try:
- from pypaimon.manifest import fastavro_py36_compat # noqa: F401
- except ImportError:
- pass
-from pypaimon.catalog.catalog_factory import CatalogFactory
-from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
-from pypaimon.schema.schema import Schema
-from pypaimon.tag.tag import Tag
-from pypaimon.tag.tag_manager import TagManager
-
-__all__ = [
- "PaimonVirtualFileSystem",
- "CatalogFactory",
- "Schema",
- "Tag",
- "TagManager",
-]
+def __getattr__(name):
+ if name == "SQLContext":
+ from pypaimon.sql.sql_context import SQLContext
+ return SQLContext
+ raise AttributeError("module 'pypaimon.sql' has no attribute
{}".format(name))
diff --git a/paimon-python/pypaimon/sql/sql_context.py
b/paimon-python/pypaimon/sql/sql_context.py
new file mode 100644
index 0000000000..67afa30f34
--- /dev/null
+++ b/paimon-python/pypaimon/sql/sql_context.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow as pa
+
+
+class SQLContext:
+ """SQL query context for Paimon tables.
+
+ Uses pypaimon-rust and DataFusion to execute SQL queries against Paimon
tables.
+ Supports registering multiple catalogs for cross-catalog queries.
+
+ Example::
+
+ from pypaimon.sql import SQLContext
+
+ ctx = SQLContext()
+ ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
+ ctx.set_current_catalog("paimon")
+ ctx.set_current_database("default")
+ result = ctx.sql("SELECT * FROM my_table")
+ """
+
+ def __init__(self):
+ try:
+ from datafusion import SessionContext
+ except ImportError:
+ raise ImportError(
+ "datafusion is required for SQL query support. "
+ "Install it with: pip install pypaimon[sql]"
+ )
+
+ self._ctx = SessionContext()
+
+ def register_catalog(self, name, options):
+ """Register a Paimon catalog as a DataFusion catalog provider.
+
+ Args:
+ name: The catalog name to register under.
+ options: A dict of catalog options (e.g. {"warehouse":
"/path/to/warehouse"}).
+ """
+ try:
+ from pypaimon_rust.datafusion import PaimonCatalog
+ except ImportError:
+ raise ImportError(
+ "pypaimon-rust is required for SQL query support. "
+ "Install it with: pip install pypaimon[sql]"
+ )
+
+ paimon_catalog = PaimonCatalog(options)
+ self._ctx.register_catalog_provider(name, paimon_catalog)
+
+ def set_current_catalog(self, catalog_name: str):
+ """Set the default catalog for SQL queries."""
+ self._ctx.sql(f"SET datafusion.catalog.default_catalog =
'{catalog_name}'")
+
+ def set_current_database(self, database: str):
+ """Set the default database for SQL queries."""
+ self._ctx.sql(f"SET datafusion.catalog.default_schema = '{database}'")
+
+ def sql(self, query: str) -> pa.Table:
+ """Execute a SQL query and return the result as a PyArrow Table."""
+ df = self._ctx.sql(query)
+ batches = df.collect()
+ if not batches:
+ return pa.Table.from_batches([], schema=df.schema())
+ return pa.Table.from_batches(batches)
diff --git a/paimon-python/pypaimon/tests/sql_context_test.py
b/paimon-python/pypaimon/tests/sql_context_test.py
new file mode 100644
index 0000000000..931052fae1
--- /dev/null
+++ b/paimon-python/pypaimon/tests/sql_context_test.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+
+
+WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE", "/tmp/paimon-warehouse")
+
+
+class SQLContextTest(unittest.TestCase):
+
+ _table_created = False
+
+ def _create_catalog(self):
+ return CatalogFactory.create({"warehouse": WAREHOUSE})
+
+ def _create_sql_context(self):
+ from pypaimon.sql.sql_context import SQLContext
+ ctx = SQLContext()
+ ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+ ctx.set_current_catalog("paimon")
+ ctx.set_current_database("default")
+ return ctx
+
+ @classmethod
+ def setUpClass(cls):
+ """Create the test table once before all tests in this class."""
+ from pypaimon import Schema, CatalogFactory
+ from pypaimon.schema.data_types import DataField, AtomicType
+
+ catalog = CatalogFactory.create({"warehouse": WAREHOUSE})
+ try:
+ catalog.create_database("default", ignore_if_exists=True)
+ except Exception:
+ pass
+
+ identifier = "default.sql_test_table"
+
+ # Drop existing table to ensure clean state
+ catalog.drop_table(identifier, ignore_if_not_exists=True)
+
+ schema = Schema(
+ fields=[
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "name", AtomicType("STRING")),
+ ],
+ primary_keys=[],
+ partition_keys=[],
+ options={},
+ comment="",
+ )
+ catalog.create_table(identifier, schema, ignore_if_exists=False)
+
+ table = catalog.get_table(identifier)
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ try:
+ pa_table = pa.table({
+ "id": pa.array([1, 2, 3], type=pa.int32()),
+ "name": pa.array(["alice", "bob", "carol"], type=pa.string()),
+ })
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ finally:
+ table_write.close()
+ table_commit.close()
+
+ @classmethod
+ def tearDownClass(cls):
+ """Clean up the test table after all tests."""
+ catalog = CatalogFactory.create({"warehouse": WAREHOUSE})
+ catalog.drop_table("default.sql_test_table", ignore_if_not_exists=True)
+
+ def test_sql_returns_table(self):
+ ctx = self._create_sql_context()
+ table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+ self.assertIsInstance(table, pa.Table)
+ self.assertEqual(table.num_rows, 3)
+ self.assertEqual(table.column("id").to_pylist(), [1, 2, 3])
+ self.assertEqual(table.column("name").to_pylist(), ["alice", "bob",
"carol"])
+
+ def test_sql_to_pandas(self):
+ ctx = self._create_sql_context()
+ table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+ df = table.to_pandas()
+ self.assertEqual(len(df), 3)
+ self.assertListEqual(list(df.columns), ["id", "name"])
+
+ def test_sql_with_filter(self):
+ ctx = self._create_sql_context()
+ table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 1
ORDER BY id")
+ self.assertEqual(table.num_rows, 2)
+ self.assertEqual(table.column("id").to_pylist(), [2, 3])
+
+ def test_sql_with_empty_result(self):
+ ctx = self._create_sql_context()
+ table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 4
ORDER BY id")
+ self.assertIsInstance(table, pa.Table)
+ self.assertEqual(table.num_rows, 0)
+ self.assertEqual(table.schema.names, ["id", "name"])
+
+ def test_sql_with_aggregation(self):
+ ctx = self._create_sql_context()
+ table = ctx.sql("SELECT count(*) AS cnt FROM sql_test_table")
+ self.assertEqual(table.column("cnt").to_pylist(), [3])
+
+ def test_sql_two_part_reference(self):
+ ctx = self._create_sql_context()
+ table = ctx.sql("SELECT count(*) AS cnt FROM default.sql_test_table")
+ self.assertEqual(table.column("cnt").to_pylist(), [3])
+
+ def test_import_error_without_pypaimon_rust(self):
+ """register_catalog should raise ImportError when pypaimon-rust is
missing."""
+ import unittest.mock as mock
+ import builtins
+ original_import = builtins.__import__
+
+ def mock_import(name, *args, **kwargs):
+ if name == "pypaimon_rust.datafusion" or name == "pypaimon_rust":
+ raise ImportError("No module named 'pypaimon_rust'")
+ return original_import(name, *args, **kwargs)
+
+ from pypaimon.sql.sql_context import SQLContext
+ ctx = SQLContext()
+ with mock.patch("builtins.__import__", side_effect=mock_import):
+ with self.assertRaises(ImportError) as cm:
+ ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+ self.assertIn("pypaimon-rust", str(cm.exception))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 876d7f7ce9..3b0b6833f4 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -70,6 +70,10 @@ setup(
'pylance>=0.20,<1; python_version>="3.9"',
'pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"'
],
+ 'sql': [
+ 'pypaimon-rust; python_version>="3.10"',
+ 'datafusion>=52; python_version>="3.10"',
+ ],
},
description="Apache Paimon Python API",
long_description=long_description,