This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9227cf7140 [python] Introduce DataFusion SQL to PyPaimon (#7599)
9227cf7140 is described below

commit 9227cf71404ca22e2b336876f98ce74022aefd0e
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 6 20:10:49 2026 +0800

    [python] Introduce DataFusion SQL to PyPaimon (#7599)
    
    PR has introduced PyPaimon with SQL query capabilities based on
    PyPaimon-rust + DataFusion.
---
 .github/workflows/paimon-python-checks.yml       |  14 +-
 docs/content/pypaimon/cli.md                     | 102 ++++++++
 docs/content/pypaimon/sql.md                     | 168 ++++++++++++
 paimon-python/pypaimon/__init__.py               |   2 +
 paimon-python/pypaimon/cli/cli.py                |   4 +
 paimon-python/pypaimon/cli/cli_sql.py            | 313 +++++++++++++++++++++++
 paimon-python/pypaimon/{ => sql}/__init__.py     |  25 +-
 paimon-python/pypaimon/sql/sql_context.py        |  81 ++++++
 paimon-python/pypaimon/tests/sql_context_test.py | 152 +++++++++++
 paimon-python/setup.py                           |   4 +
 10 files changed, 845 insertions(+), 20 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 0154c637f6..fde69953ee 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -71,6 +71,8 @@ jobs:
             build-essential \
             git \
             curl \
+            pkg-config \
+            libssl-dev \
             && apt-get clean \
             && rm -rf /var/lib/apt/lists/*
 
@@ -139,12 +141,22 @@ jobs:
         if: matrix.python-version != '3.6.15'
         shell: bash
         run: |
-          pip install maturin
+          pip install maturin[patchelf]
           git clone -b support_directory 
https://github.com/JingsongLi/tantivy-py.git /tmp/tantivy-py
           cd /tmp/tantivy-py
           maturin build --release
           pip install target/wheels/tantivy-*.whl
 
+      - name: Build and install pypaimon-rust from source
+        if: matrix.python-version != '3.6.15'
+        shell: bash
+        run: |
+          git clone https://github.com/apache/paimon-rust.git /tmp/paimon-rust
+          cd /tmp/paimon-rust/bindings/python
+          maturin build --release -o dist
+          pip install dist/pypaimon_rust-*.whl
+          pip install 'datafusion>=52'
+
       - name: Run lint-python.sh
         shell: bash
         run: |
diff --git a/docs/content/pypaimon/cli.md b/docs/content/pypaimon/cli.md
index a328fc4744..6485ca5876 100644
--- a/docs/content/pypaimon/cli.md
+++ b/docs/content/pypaimon/cli.md
@@ -621,3 +621,105 @@ default
 mydb
 analytics
 ```
+
+## SQL Command
+
+Execute SQL queries on Paimon tables directly from the command line. This 
feature is powered by pypaimon-rust and DataFusion.
+
+**Prerequisites:**
+
+```shell
+pip install pypaimon[sql]
+```
+
+### One-Shot Query
+
+Execute a single SQL query and display the result:
+
+```shell
+paimon sql "SELECT * FROM users LIMIT 10"
+```
+
+Output:
+```
+ id    name  age      city
+  1   Alice   25   Beijing
+  2     Bob   30  Shanghai
+  3 Charlie   35 Guangzhou
+```
+
+**Options:**
+
+- `--format, -f`: Output format: `table` (default) or `json`
+
+**Examples:**
+
+```shell
+# Direct table name (uses default catalog and database)
+paimon sql "SELECT * FROM users"
+
+# Two-part: database.table
+paimon sql "SELECT * FROM mydb.users"
+
+# Query with filter and aggregation
+paimon sql "SELECT city, COUNT(*) AS cnt FROM users GROUP BY city ORDER BY cnt 
DESC"
+
+# Output as JSON
+paimon sql "SELECT * FROM users LIMIT 5" --format json
+```
+
+### Interactive REPL
+
+Start an interactive SQL session by running `paimon sql` without a query 
argument. The REPL supports arrow keys for line editing, and command history is 
persisted across sessions in `~/.paimon_history`.
+
+```shell
+paimon sql
+```
+
+Output:
+```
+    ____        _
+   / __ \____ _(_)___ ___  ____  ____
+  / /_/ / __ `/ / __ `__ \/ __ \/ __ \
+ / ____/ /_/ / / / / / / / /_/ / / / /
+/_/    \__,_/_/_/ /_/ /_/\____/_/ /_/
+
+  Powered by pypaimon-rust + DataFusion
+  Type 'help' for usage, 'exit' to quit.
+
+paimon> SHOW DATABASES;
+default
+mydb
+
+paimon> USE mydb;
+Using database 'mydb'.
+
+paimon> SHOW TABLES;
+orders
+users
+
+paimon> SELECT count(*) AS cnt
+     > FROM users
+     > WHERE age > 18;
+ cnt
+  42
+(1 row in 0.05s)
+
+paimon> exit
+Bye!
+```
+
+SQL statements end with `;` and can span multiple lines. The continuation 
prompt `     >` indicates that more input is expected.
+
+**REPL Commands:**
+
+| Command | Description |
+|---|---|
+| `USE <database>;` | Switch the default database |
+| `SHOW DATABASES;` | List all databases |
+| `SHOW TABLES;` | List tables in the current database |
+| `SELECT ...;` | Execute a SQL query |
+| `help` | Show usage information |
+| `exit` / `quit` | Exit the REPL |
+
+For more details on SQL syntax and the Python API, see [SQL Query]({{< ref 
"pypaimon/sql" >}}).
diff --git a/docs/content/pypaimon/sql.md b/docs/content/pypaimon/sql.md
new file mode 100644
index 0000000000..88d08a01d0
--- /dev/null
+++ b/docs/content/pypaimon/sql.md
@@ -0,0 +1,168 @@
+---
+title: "SQL Query"
+weight: 8
+type: docs
+aliases:
+  - /pypaimon/sql.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# SQL Query
+
+PyPaimon supports executing SQL queries on Paimon tables, powered by 
[pypaimon-rust](https://github.com/apache/paimon-rust/tree/main/bindings/python)
 and [DataFusion](https://datafusion.apache.org/python/).
+
+## Installation
+
+SQL query support requires additional dependencies. Install them with:
+
+```shell
+pip install pypaimon[sql]
+```
+
+This will install `pypaimon-rust` and `datafusion`.
+
+## Usage
+
+Create a `SQLContext`, register one or more catalogs with their options, and 
run SQL queries.
+
+### Basic Query
+
+```python
+from pypaimon.sql import SQLContext
+
+ctx = SQLContext()
+ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
+ctx.set_current_catalog("paimon")
+ctx.set_current_database("default")
+
+# Execute SQL and get PyArrow Table
+table = ctx.sql("SELECT * FROM my_table")
+print(table)
+
+# Convert to Pandas DataFrame
+df = table.to_pandas()
+print(df)
+```
+
+### Table Reference Format
+
+The default catalog and default database can be configured via 
`set_current_catalog()` and `set_current_database()`, so you can reference 
tables in two ways:
+
+```python
+# Direct table name (uses default database)
+ctx.sql("SELECT * FROM my_table")
+
+# Two-part: database.table
+ctx.sql("SELECT * FROM mydb.my_table")
+```
+
+### Filtering
+
+```python
+table = ctx.sql("""
+    SELECT id, name, age 
+    FROM users 
+    WHERE age > 18 AND city = 'Beijing'
+""")
+```
+
+### Aggregation
+
+```python
+table = ctx.sql("""
+    SELECT city, COUNT(*) AS cnt, AVG(age) AS avg_age
+    FROM users
+    GROUP BY city
+    ORDER BY cnt DESC
+""")
+```
+
+### Join
+
+```python
+table = ctx.sql("""
+    SELECT u.name, o.order_id, o.amount
+    FROM users u
+    JOIN orders o ON u.id = o.user_id
+    WHERE o.amount > 100
+""")
+```
+
+### Subquery
+
+```python
+table = ctx.sql("""
+    SELECT * FROM users
+    WHERE id IN (
+        SELECT user_id FROM orders
+        WHERE amount > 1000
+    )
+""")
+```
+
+### Cross-Database Query
+
+```python
+# Query a table in another database using two-part syntax
+table = ctx.sql("""
+    SELECT u.name, o.amount
+    FROM default.users u
+    JOIN analytics.orders o ON u.id = o.user_id
+""")
+```
+
+### Multi-Catalog Query
+
+`SQLContext` supports registering multiple catalogs for cross-catalog queries:
+
+```python
+from pypaimon.sql import SQLContext
+
+ctx = SQLContext()
+ctx.register_catalog("a", {"warehouse": "/path/to/warehouse_a"})
+ctx.register_catalog("b", {
+    "metastore": "rest",
+    "uri": "http://localhost:8080";,
+    "warehouse": "warehouse_b",
+})
+ctx.set_current_catalog("a")
+ctx.set_current_database("default")
+
+# Cross-catalog join
+table = ctx.sql("""
+    SELECT a_users.name, b_orders.amount
+    FROM a.default.users AS a_users
+    JOIN b.default.orders AS b_orders ON a_users.id = b_orders.user_id
+""")
+```
+
+## Supported SQL Syntax
+
+The SQL engine is powered by Apache DataFusion, which supports a rich set of 
SQL syntax including:
+
+- `SELECT`, `WHERE`, `GROUP BY`, `HAVING`, `ORDER BY`, `LIMIT`
+- `JOIN` (INNER, LEFT, RIGHT, FULL, CROSS)
+- Subqueries and CTEs (`WITH`)
+- Aggregate functions (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`, etc.)
+- Window functions (`ROW_NUMBER`, `RANK`, `LAG`, `LEAD`, etc.)
+- `UNION`, `INTERSECT`, `EXCEPT`
+
+For the full SQL reference, see the [DataFusion SQL 
documentation](https://datafusion.apache.org/user-guide/sql/index.html).
diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/__init__.py
index 77965c3a14..e07179fb28 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -28,6 +28,7 @@ from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
 from pypaimon.schema.schema import Schema
 from pypaimon.tag.tag import Tag
 from pypaimon.tag.tag_manager import TagManager
+from pypaimon.sql.sql_context import SQLContext
 
 __all__ = [
     "PaimonVirtualFileSystem",
@@ -35,4 +36,5 @@ __all__ = [
     "Schema",
     "Tag",
     "TagManager",
+    "SQLContext",
 ]
diff --git a/paimon-python/pypaimon/cli/cli.py 
b/paimon-python/pypaimon/cli/cli.py
index 2ffdbd206d..37a0d3cbb8 100644
--- a/paimon-python/pypaimon/cli/cli.py
+++ b/paimon-python/pypaimon/cli/cli.py
@@ -121,6 +121,10 @@ def main():
     from pypaimon.cli.cli_catalog import add_catalog_subcommands
     add_catalog_subcommands(catalog_parser)
 
+    # SQL command
+    from pypaimon.cli.cli_sql import add_sql_subcommand
+    add_sql_subcommand(subparsers)
+
     args = parser.parse_args()
     
     if args.command is None:
diff --git a/paimon-python/pypaimon/cli/cli_sql.py 
b/paimon-python/pypaimon/cli/cli_sql.py
new file mode 100644
index 0000000000..170cda8710
--- /dev/null
+++ b/paimon-python/pypaimon/cli/cli_sql.py
@@ -0,0 +1,313 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+SQL commands for Paimon CLI.
+
+This module provides SQL query capability via pypaimon-rust + DataFusion.
+"""
+
+import os
+import re
+import sys
+import time
+
+_PAIMON_BANNER = r"""
+    ____        _
+   / __ \____ _(_)___ ___  ____  ____
+  / /_/ / __ `/ / __ `__ \/ __ \/ __ \
+ / ____/ /_/ / / / / / / / /_/ / / / /
+/_/    \__,_/_/_/ /_/ /_/\____/_/ /_/
+
+  Powered by pypaimon-rust + DataFusion
+  Type 'help' for usage, 'exit' to quit.
+"""
+
+_USE_PATTERN = re.compile(
+    r"^\s*use\s+(\w+)\s*;?\s*$",
+    re.IGNORECASE,
+)
+
+_HISTORY_FILE = os.path.expanduser("~/.paimon_history")
+_HISTORY_MAX_LENGTH = 1000
+
+_PROMPT = "paimon> "
+_CONTINUATION_PROMPT = "      > "
+
+
+def _get_readline():
+    """Get the best available readline module.
+
+    Prefers gnureadline (full GNU readline) over the built-in readline
+    (which is libedit on macOS and may have limited features).
+    """
+    try:
+        import gnureadline as readline
+        return readline
+    except ImportError:
+        pass
+    try:
+        import readline
+        return readline
+    except ImportError:
+        return None
+
+
+def _is_libedit(rl):
+    """Check if the readline module is backed by libedit (macOS default)."""
+    return hasattr(rl, '__doc__') and rl.__doc__ and 'libedit' in rl.__doc__
+
+
+def _setup_readline():
+    """Enable readline for arrow key support and persistent command history."""
+    rl = _get_readline()
+    if rl is None:
+        return
+    rl.set_history_length(_HISTORY_MAX_LENGTH)
+    if not os.path.exists(_HISTORY_FILE):
+        return
+    if _is_libedit(rl):
+        # libedit escapes spaces as \040 in history files, so we load manually.
+        with open(_HISTORY_FILE, 'r', encoding='utf-8') as f:
+            for line in f:
+                line = line.rstrip('\n')
+                if line:
+                    rl.add_history(line)
+    else:
+        rl.read_history_file(_HISTORY_FILE)
+
+
+def _save_history():
+    """Save readline history to file."""
+    rl = _get_readline()
+    if rl is None:
+        return
+    try:
+        if _is_libedit(rl):
+            # Write history manually to avoid libedit's \040 escaping.
+            length = rl.get_current_history_length()
+            lines = []
+            for i in range(1, length + 1):
+                item = rl.get_history_item(i)
+                if item is not None:
+                    lines.append(item)
+            # Keep only the last N entries
+            lines = lines[-_HISTORY_MAX_LENGTH:]
+            with open(_HISTORY_FILE, 'w', encoding='utf-8') as f:
+                for line in lines:
+                    f.write(line + '\n')
+        else:
+            rl.write_history_file(_HISTORY_FILE)
+    except OSError:
+        pass
+
+
+def cmd_sql(args):
+    """
+    Execute the 'sql' command.
+
+    Runs a SQL query against Paimon tables, or starts an interactive SQL REPL.
+
+    Args:
+        args: Parsed command line arguments.
+    """
+    from pypaimon.cli.cli import load_catalog_config
+
+    config_path = args.config
+    config = load_catalog_config(config_path)
+
+    try:
+        from pypaimon.sql.sql_context import SQLContext
+        catalog_options = {str(k): str(v) for k, v in config.items()}
+        ctx = SQLContext()
+        ctx.register_catalog("paimon", catalog_options)
+        ctx.set_current_catalog("paimon")
+        ctx.set_current_database("default")
+    except Exception as e:
+        print(f"Error: {e}", file=sys.stderr)
+        sys.exit(1)
+
+    query = args.query
+    if query:
+        _execute_query(ctx, query, getattr(args, 'format', 'table'))
+    else:
+        _interactive_repl(ctx, getattr(args, 'format', 'table'))
+
+
+def _execute_query(ctx, query, output_format):
+    """Execute a single SQL query and print the result."""
+    try:
+        table = ctx.sql(query)
+    except Exception as e:
+        print(f"Error: {e}", file=sys.stderr)
+        sys.exit(1)
+
+    _print_table(table, output_format)
+
+
+def _print_table(table, output_format, elapsed=None):
+    """Print a PyArrow Table in the requested format."""
+    df = table.to_pandas()
+    if output_format == 'json':
+        import json
+        print(json.dumps(df.to_dict(orient='records'), ensure_ascii=False))
+    else:
+        print(df.to_string(index=False))
+
+    if elapsed is not None:
+        row_count = len(df)
+        print(f"({row_count} {'row' if row_count == 1 else 'rows'} in 
{elapsed:.2f}s)")
+
+
+def _read_multiline_query():
+    """Read a potentially multi-line SQL query, terminated by ';'.
+
+    Returns the complete query string, or None on EOF/interrupt.
+    """
+    lines = []
+    prompt = _PROMPT
+    while True:
+        try:
+            line = input(prompt)
+        except (EOFError, KeyboardInterrupt):
+            if lines:
+                # Cancel current multi-line input
+                print()
+                return ""
+            return None
+
+        lines.append(line)
+        joined = "\n".join(lines).strip()
+
+        if not joined:
+            lines.clear()
+            prompt = _PROMPT
+            continue
+
+        # Single-word commands that don't need ';'
+        lower = joined.lower().rstrip(';').strip()
+        if lower in ('exit', 'quit', 'help'):
+            return joined
+
+        # USE command doesn't strictly need ';'
+        if _USE_PATTERN.match(joined):
+            return joined
+
+        # For SQL statements, wait for ';'
+        if joined.endswith(';'):
+            return joined
+
+        prompt = _CONTINUATION_PROMPT
+
+
+def _handle_use(ctx, match):
+    """Handle USE <database> command."""
+    database = match.group(1)
+    try:
+        ctx.set_current_database(database)
+        print(f"Using database '{database}'.")
+    except Exception as e:
+        print(f"Error: {e}", file=sys.stderr)
+
+
+def _interactive_repl(ctx, output_format):
+    """Run an interactive SQL REPL."""
+    _setup_readline()
+    print(_PAIMON_BANNER)
+
+    try:
+        while True:
+            query = _read_multiline_query()
+            if query is None:
+                print("\nBye!")
+                break
+
+            if not query:
+                continue
+
+            lower = query.lower().rstrip(';').strip()
+            if lower in ('exit', 'quit'):
+                print("Bye!")
+                break
+            if lower == 'help':
+                _print_help()
+                continue
+
+            # Handle USE <database>
+            use_match = _USE_PATTERN.match(query)
+            if use_match:
+                _handle_use(ctx, use_match)
+                continue
+
+            try:
+                start = time.time()
+                table = ctx.sql(query)
+                elapsed = time.time() - start
+                _print_table(table, output_format, elapsed)
+                print()
+            except Exception as e:
+                print(f"Error: {e}\n", file=sys.stderr)
+    finally:
+        _save_history()
+
+
+def _print_help():
+    """Print REPL help information."""
+    print("""
+Commands:
+  USE <database>;              Switch the default database
+  SHOW DATABASES;              List all databases
+  SHOW TABLES;                 List tables in the current database
+  SELECT ... FROM <table>;     Execute a SQL query
+  exit / quit                  Exit the REPL
+
+Table reference:
+  <table>                      Table in the current default database
+  <database>.<table>           Table in a specific database
+
+Tips:
+  - SQL statements end with ';' and can span multiple lines
+  - Arrow keys are supported for line editing and command history
+  - Command history is saved across sessions (~/.paimon_history)
+""")
+
+
+def add_sql_subcommand(subparsers):
+    """
+    Add the sql subcommand to the main parser.
+
+    Args:
+        subparsers: The subparsers object from the main argument parser.
+    """
+    sql_parser = subparsers.add_parser(
+        'sql',
+        help='Execute SQL queries on Paimon tables (requires pypaimon-rust)'
+    )
+    sql_parser.add_argument(
+        'query',
+        nargs='?',
+        default=None,
+        help='SQL query to execute. If omitted, starts interactive REPL.'
+    )
+    sql_parser.add_argument(
+        '--format', '-f',
+        type=str,
+        choices=['table', 'json'],
+        default='table',
+        help='Output format: table (default) or json'
+    )
+    sql_parser.set_defaults(func=cmd_sql)
diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/sql/__init__.py
similarity index 60%
copy from paimon-python/pypaimon/__init__.py
copy to paimon-python/pypaimon/sql/__init__.py
index 77965c3a14..e059c025a6 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/sql/__init__.py
@@ -15,24 +15,11 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-import sys
+__all__ = ['SQLContext']
 
-if sys.version_info[:2] == (3, 6):
-    try:
-        from pypaimon.manifest import fastavro_py36_compat  # noqa: F401
-    except ImportError:
-        pass
 
-from pypaimon.catalog.catalog_factory import CatalogFactory
-from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
-from pypaimon.schema.schema import Schema
-from pypaimon.tag.tag import Tag
-from pypaimon.tag.tag_manager import TagManager
-
-__all__ = [
-    "PaimonVirtualFileSystem",
-    "CatalogFactory",
-    "Schema",
-    "Tag",
-    "TagManager",
-]
+def __getattr__(name):
+    if name == "SQLContext":
+        from pypaimon.sql.sql_context import SQLContext
+        return SQLContext
+    raise AttributeError("module 'pypaimon.sql' has no attribute 
{}".format(name))
diff --git a/paimon-python/pypaimon/sql/sql_context.py 
b/paimon-python/pypaimon/sql/sql_context.py
new file mode 100644
index 0000000000..67afa30f34
--- /dev/null
+++ b/paimon-python/pypaimon/sql/sql_context.py
@@ -0,0 +1,81 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import pyarrow as pa
+
+
+class SQLContext:
+    """SQL query context for Paimon tables.
+
+    Uses pypaimon-rust and DataFusion to execute SQL queries against Paimon 
tables.
+    Supports registering multiple catalogs for cross-catalog queries.
+
+    Example::
+
+        from pypaimon.sql import SQLContext
+
+        ctx = SQLContext()
+        ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
+        ctx.set_current_catalog("paimon")
+        ctx.set_current_database("default")
+        result = ctx.sql("SELECT * FROM my_table")
+    """
+
+    def __init__(self):
+        try:
+            from datafusion import SessionContext
+        except ImportError:
+            raise ImportError(
+                "datafusion is required for SQL query support. "
+                "Install it with: pip install pypaimon[sql]"
+            )
+
+        self._ctx = SessionContext()
+
+    def register_catalog(self, name, options):
+        """Register a Paimon catalog as a DataFusion catalog provider.
+
+        Args:
+            name: The catalog name to register under.
+            options: A dict of catalog options (e.g. {"warehouse": 
"/path/to/warehouse"}).
+        """
+        try:
+            from pypaimon_rust.datafusion import PaimonCatalog
+        except ImportError:
+            raise ImportError(
+                "pypaimon-rust is required for SQL query support. "
+                "Install it with: pip install pypaimon[sql]"
+            )
+
+        paimon_catalog = PaimonCatalog(options)
+        self._ctx.register_catalog_provider(name, paimon_catalog)
+
+    def set_current_catalog(self, catalog_name: str):
+        """Set the default catalog for SQL queries."""
+        self._ctx.sql(f"SET datafusion.catalog.default_catalog = 
'{catalog_name}'")
+
+    def set_current_database(self, database: str):
+        """Set the default database for SQL queries."""
+        self._ctx.sql(f"SET datafusion.catalog.default_schema = '{database}'")
+
+    def sql(self, query: str) -> pa.Table:
+        """Execute a SQL query and return the result as a PyArrow Table."""
+        df = self._ctx.sql(query)
+        batches = df.collect()
+        if not batches:
+            return pa.Table.from_batches([], schema=df.schema())
+        return pa.Table.from_batches(batches)
diff --git a/paimon-python/pypaimon/tests/sql_context_test.py 
b/paimon-python/pypaimon/tests/sql_context_test.py
new file mode 100644
index 0000000000..931052fae1
--- /dev/null
+++ b/paimon-python/pypaimon/tests/sql_context_test.py
@@ -0,0 +1,152 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import os
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+
+
+WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE", "/tmp/paimon-warehouse")
+
+
+class SQLContextTest(unittest.TestCase):
+
+    _table_created = False
+
+    def _create_catalog(self):
+        return CatalogFactory.create({"warehouse": WAREHOUSE})
+
+    def _create_sql_context(self):
+        from pypaimon.sql.sql_context import SQLContext
+        ctx = SQLContext()
+        ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+        ctx.set_current_catalog("paimon")
+        ctx.set_current_database("default")
+        return ctx
+
+    @classmethod
+    def setUpClass(cls):
+        """Create the test table once before all tests in this class."""
+        from pypaimon import Schema, CatalogFactory
+        from pypaimon.schema.data_types import DataField, AtomicType
+
+        catalog = CatalogFactory.create({"warehouse": WAREHOUSE})
+        try:
+            catalog.create_database("default", ignore_if_exists=True)
+        except Exception:
+            pass
+
+        identifier = "default.sql_test_table"
+
+        # Drop existing table to ensure clean state
+        catalog.drop_table(identifier, ignore_if_not_exists=True)
+
+        schema = Schema(
+            fields=[
+                DataField(0, "id", AtomicType("INT")),
+                DataField(1, "name", AtomicType("STRING")),
+            ],
+            primary_keys=[],
+            partition_keys=[],
+            options={},
+            comment="",
+        )
+        catalog.create_table(identifier, schema, ignore_if_exists=False)
+
+        table = catalog.get_table(identifier)
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        try:
+            pa_table = pa.table({
+                "id": pa.array([1, 2, 3], type=pa.int32()),
+                "name": pa.array(["alice", "bob", "carol"], type=pa.string()),
+            })
+            table_write.write_arrow(pa_table)
+            table_commit.commit(table_write.prepare_commit())
+        finally:
+            table_write.close()
+            table_commit.close()
+
+    @classmethod
+    def tearDownClass(cls):
+        """Clean up the test table after all tests."""
+        catalog = CatalogFactory.create({"warehouse": WAREHOUSE})
+        catalog.drop_table("default.sql_test_table", ignore_if_not_exists=True)
+
+    def test_sql_returns_table(self):
+        ctx = self._create_sql_context()
+        table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+        self.assertIsInstance(table, pa.Table)
+        self.assertEqual(table.num_rows, 3)
+        self.assertEqual(table.column("id").to_pylist(), [1, 2, 3])
+        self.assertEqual(table.column("name").to_pylist(), ["alice", "bob", 
"carol"])
+
+    def test_sql_to_pandas(self):
+        ctx = self._create_sql_context()
+        table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+        df = table.to_pandas()
+        self.assertEqual(len(df), 3)
+        self.assertListEqual(list(df.columns), ["id", "name"])
+
+    def test_sql_with_filter(self):
+        ctx = self._create_sql_context()
+        table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 1 
ORDER BY id")
+        self.assertEqual(table.num_rows, 2)
+        self.assertEqual(table.column("id").to_pylist(), [2, 3])
+
+    def test_sql_with_empty_result(self):
+        ctx = self._create_sql_context()
+        table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 4 
ORDER BY id")
+        self.assertIsInstance(table, pa.Table)
+        self.assertEqual(table.num_rows, 0)
+        self.assertEqual(table.schema.names, ["id", "name"])
+
+    def test_sql_with_aggregation(self):
+        ctx = self._create_sql_context()
+        table = ctx.sql("SELECT count(*) AS cnt FROM sql_test_table")
+        self.assertEqual(table.column("cnt").to_pylist(), [3])
+
+    def test_sql_two_part_reference(self):
+        ctx = self._create_sql_context()
+        table = ctx.sql("SELECT count(*) AS cnt FROM default.sql_test_table")
+        self.assertEqual(table.column("cnt").to_pylist(), [3])
+
+    def test_import_error_without_pypaimon_rust(self):
+        """register_catalog should raise ImportError when pypaimon-rust is 
missing."""
+        import unittest.mock as mock
+        import builtins
+        original_import = builtins.__import__
+
+        def mock_import(name, *args, **kwargs):
+            if name == "pypaimon_rust.datafusion" or name == "pypaimon_rust":
+                raise ImportError("No module named 'pypaimon_rust'")
+            return original_import(name, *args, **kwargs)
+
+        from pypaimon.sql.sql_context import SQLContext
+        ctx = SQLContext()
+        with mock.patch("builtins.__import__", side_effect=mock_import):
+            with self.assertRaises(ImportError) as cm:
+                ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+            self.assertIn("pypaimon-rust", str(cm.exception))
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 876d7f7ce9..3b0b6833f4 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -70,6 +70,10 @@ setup(
             'pylance>=0.20,<1; python_version>="3.9"',
             'pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"'
         ],
+        'sql': [
+            'pypaimon-rust; python_version>="3.10"',
+            'datafusion>=52; python_version>="3.10"',
+        ],
     },
     description="Apache Paimon Python API",
     long_description=long_description,

Reply via email to