This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f996751  feat(datafusion): Add $options system table (#240)
f996751 is described below

commit f99675109055793e88c0156261a5f09d1466839b
Author: Jiajia Li <[email protected]>
AuthorDate: Wed Apr 15 09:44:16 2026 +0800

    feat(datafusion): Add $options system table (#240)
---
 crates/integrations/datafusion/src/catalog.rs      |  23 ++-
 crates/integrations/datafusion/src/lib.rs          |   1 +
 .../datafusion/src/system_tables/mod.rs            | 161 +++++++++++++++++++++
 .../datafusion/src/system_tables/options.rs        |  92 ++++++++++++
 .../integrations/datafusion/tests/system_tables.rs | 145 +++++++++++++++++++
 5 files changed, 420 insertions(+), 2 deletions(-)

diff --git a/crates/integrations/datafusion/src/catalog.rs 
b/crates/integrations/datafusion/src/catalog.rs
index 7ac66d4..2698116 100644
--- a/crates/integrations/datafusion/src/catalog.rs
+++ b/crates/integrations/datafusion/src/catalog.rs
@@ -30,6 +30,7 @@ use paimon::catalog::{Catalog, Identifier};
 
 use crate::error::to_datafusion_error;
 use crate::runtime::{await_with_runtime, block_on_with_runtime};
+use crate::system_tables;
 use crate::table::PaimonTableProvider;
 
 /// Provides an interface to manage and access multiple schemas (databases)
@@ -175,8 +176,19 @@ impl SchemaProvider for PaimonSchemaProvider {
     }
 
     async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn 
TableProvider>>> {
+        let (base, system_name) = system_tables::split_object_name(name);
+        if let Some(system_name) = system_name {
+            return await_with_runtime(system_tables::load(
+                Arc::clone(&self.catalog),
+                self.database.clone(),
+                base.to_string(),
+                system_name.to_string(),
+            ))
+            .await;
+        }
+
         let catalog = Arc::clone(&self.catalog);
-        let identifier = Identifier::new(self.database.clone(), name);
+        let identifier = Identifier::new(self.database.clone(), base);
         await_with_runtime(async move {
             match catalog.get_table(&identifier).await {
                 Ok(table) => {
@@ -191,8 +203,15 @@ impl SchemaProvider for PaimonSchemaProvider {
     }
 
     fn table_exist(&self, name: &str) -> bool {
+        let (base, system_name) = system_tables::split_object_name(name);
+        if let Some(system_name) = system_name {
+            if !system_tables::is_registered(system_name) {
+                return false;
+            }
+        }
+
         let catalog = Arc::clone(&self.catalog);
-        let identifier = Identifier::new(self.database.clone(), name);
+        let identifier = Identifier::new(self.database.clone(), 
base.to_string());
         block_on_with_runtime(
             async move {
                 match catalog.get_table(&identifier).await {
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index e803153..4717854 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -45,6 +45,7 @@ mod full_text_search;
 mod physical_plan;
 mod relation_planner;
 pub mod runtime;
+mod system_tables;
 mod table;
 
 pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs 
b/crates/integrations/datafusion/src/system_tables/mod.rs
new file mode 100644
index 0000000..fddb3b9
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Paimon system tables (`<table>$<name>`) as DataFusion table providers.
+//!
+//! Mirrors Java 
[SystemTableLoader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java):
+//! `TABLES` maps each system-table name to its builder function.
+
+use std::sync::Arc;
+
+use datafusion::datasource::TableProvider;
+use datafusion::error::{DataFusionError, Result as DFResult};
+use paimon::catalog::{Catalog, Identifier, SYSTEM_BRANCH_PREFIX, 
SYSTEM_TABLE_SPLITTER};
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+
+mod options;
+
+type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
+
+const TABLES: &[(&str, Builder)] = &[("options", options::build)];
+
+/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
+///
+/// Mirrors Java 
[Identifier.splitObjectName](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/catalog/Identifier.java).
+///
+/// - `t` → `("t", None)`
+/// - `t$options` → `("t", Some("options"))`
+/// - `t$branch_main` → `("t", None)` (branch reference, not a system table)
+/// - `t$branch_main$options` → `("t", Some("options"))` (branch + system 
table)
+pub(crate) fn split_object_name(name: &str) -> (&str, Option<&str>) {
+    let mut parts = name.splitn(3, SYSTEM_TABLE_SPLITTER);
+    let base = parts.next().unwrap_or(name);
+    match (parts.next(), parts.next()) {
+        (None, _) => (base, None),
+        (Some(second), None) => {
+            if second.starts_with(SYSTEM_BRANCH_PREFIX) {
+                (base, None)
+            } else {
+                (base, Some(second))
+            }
+        }
+        (Some(second), Some(third)) => {
+            if second.starts_with(SYSTEM_BRANCH_PREFIX) {
+                (base, Some(third))
+            } else {
+                // `$` is legal in table names, so `t$foo$bar` falls through as
+                // plain `t` and errors later as "table not found".
+                (base, None)
+            }
+        }
+    }
+}
+
+/// Returns true if `name` is a recognised Paimon system table suffix.
+pub(crate) fn is_registered(name: &str) -> bool {
+    TABLES.iter().any(|(n, _)| name.eq_ignore_ascii_case(n))
+}
+
+/// Wraps an already-loaded base table as the system table `name`.
+fn wrap_to_system_table(name: &str, base_table: Table) -> 
Option<DFResult<Arc<dyn TableProvider>>> {
+    TABLES
+        .iter()
+        .find(|(n, _)| name.eq_ignore_ascii_case(n))
+        .map(|(_, build)| build(base_table))
+}
+
+/// Loads `<base>$<system_name>` from the catalog and wraps it as a system
+/// table provider.
+///
+/// - Unknown `system_name` → `Ok(None)` (DataFusion reports "table not found")
+/// - Base table missing    → `Err(Plan)` so users can distinguish it from an
+///   unknown system name
+pub(crate) async fn load(
+    catalog: Arc<dyn Catalog>,
+    database: String,
+    base: String,
+    system_name: String,
+) -> DFResult<Option<Arc<dyn TableProvider>>> {
+    if !is_registered(&system_name) {
+        return Ok(None);
+    }
+    let identifier = Identifier::new(database, base.clone());
+    match catalog.get_table(&identifier).await {
+        Ok(table) => wrap_to_system_table(&system_name, table)
+            .expect("is_registered guarantees a builder")
+            .map(Some),
+        Err(paimon::Error::TableNotExist { .. }) => 
Err(DataFusionError::Plan(format!(
+            "Cannot read system table `${system_name}`: \
+             base table `{base}` does not exist"
+        ))),
+        Err(e) => Err(to_datafusion_error(e)),
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{is_registered, split_object_name};
+
+    #[test]
+    fn is_registered_is_case_insensitive() {
+        assert!(is_registered("options"));
+        assert!(is_registered("Options"));
+        assert!(is_registered("OPTIONS"));
+        assert!(!is_registered("nonsense"));
+    }
+
+    #[test]
+    fn plain_table_name() {
+        assert_eq!(split_object_name("orders"), ("orders", None));
+    }
+
+    #[test]
+    fn system_table_only() {
+        assert_eq!(
+            split_object_name("orders$options"),
+            ("orders", Some("options"))
+        );
+    }
+
+    #[test]
+    fn branch_reference_is_not_a_system_table() {
+        assert_eq!(split_object_name("orders$branch_main"), ("orders", None));
+    }
+
+    #[test]
+    fn branch_plus_system_table() {
+        assert_eq!(
+            split_object_name("orders$branch_main$options"),
+            ("orders", Some("options"))
+        );
+    }
+
+    #[test]
+    fn three_parts_without_branch_prefix_is_not_a_system_table() {
+        assert_eq!(split_object_name("orders$foo$bar"), ("orders", None));
+    }
+
+    #[test]
+    fn system_table_name_preserves_case() {
+        assert_eq!(
+            split_object_name("orders$Options"),
+            ("orders", Some("Options"))
+        );
+    }
+}
diff --git a/crates/integrations/datafusion/src/system_tables/options.rs 
b/crates/integrations/datafusion/src/system_tables/options.rs
new file mode 100644
index 0000000..fe86c92
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/options.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java 
[OptionsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java).
+
+use std::any::Any;
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{RecordBatch, StringArray};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::table::Table;
+
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+    Ok(Arc::new(OptionsTable { table }))
+}
+
+fn options_schema() -> SchemaRef {
+    static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+    SCHEMA
+        .get_or_init(|| {
+            Arc::new(Schema::new(vec![
+                Field::new("key", DataType::Utf8, false),
+                Field::new("value", DataType::Utf8, false),
+            ]))
+        })
+        .clone()
+}
+
+#[derive(Debug)]
+struct OptionsTable {
+    table: Table,
+}
+
+#[async_trait]
+impl TableProvider for OptionsTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        options_schema()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::View
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        // Java uses LinkedHashMap insertion order; HashMap has none — sort 
for stable output.
+        let mut entries: Vec<(&String, &String)> = 
self.table.schema().options().iter().collect();
+        entries.sort_by(|a, b| a.0.cmp(b.0));
+
+        let keys = StringArray::from_iter_values(entries.iter().map(|(k, _)| 
k.as_str()));
+        let values = StringArray::from_iter_values(entries.iter().map(|(_, v)| 
v.as_str()));
+
+        let schema = options_schema();
+        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(keys), 
Arc::new(values)])?;
+
+        Ok(MemorySourceConfig::try_new_exec(
+            &[vec![batch]],
+            schema,
+            projection.cloned(),
+        )?)
+    }
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs 
b/crates/integrations/datafusion/tests/system_tables.rs
new file mode 100644
index 0000000..c3292e3
--- /dev/null
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Paimon `$options` system table end-to-end via DataFusion SQL.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{Array, StringArray};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::prelude::SessionContext;
+use paimon::catalog::Identifier;
+use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
+use paimon_datafusion::PaimonCatalogProvider;
+
+const FIXTURE_TABLE: &str = "test_tantivy_fulltext";
+
+fn extract_test_warehouse() -> (tempfile::TempDir, String) {
+    let archive_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
+        .join("testdata/test_tantivy_fulltext.tar.gz");
+    let file = std::fs::File::open(&archive_path)
+        .unwrap_or_else(|e| panic!("Failed to open {}: {e}", 
archive_path.display()));
+    let decoder = flate2::read::GzDecoder::new(file);
+    let mut archive = tar::Archive::new(decoder);
+
+    let tmp = tempfile::tempdir().expect("Failed to create temp dir");
+    let db_dir = tmp.path().join("default.db");
+    std::fs::create_dir_all(&db_dir).unwrap();
+    archive.unpack(&db_dir).unwrap();
+
+    let warehouse = format!("file://{}", tmp.path().display());
+    (tmp, warehouse)
+}
+
+async fn create_context() -> (SessionContext, Arc<dyn Catalog>, 
tempfile::TempDir) {
+    let (tmp, warehouse) = extract_test_warehouse();
+    let mut options = Options::new();
+    options.set(CatalogOptions::WAREHOUSE, warehouse);
+    let catalog = FileSystemCatalog::new(options).expect("Failed to create 
catalog");
+    let catalog: Arc<dyn Catalog> = Arc::new(catalog);
+
+    let ctx = SessionContext::new();
+    ctx.register_catalog(
+        "paimon",
+        Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog))),
+    );
+    (ctx, catalog, tmp)
+}
+
+async fn run_sql(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch> {
+    ctx.sql(sql)
+        .await
+        .unwrap_or_else(|e| panic!("Failed to plan `{sql}`: {e}"))
+        .collect()
+        .await
+        .unwrap_or_else(|e| panic!("Failed to execute `{sql}`: {e}"))
+}
+
+#[tokio::test]
+async fn test_options_system_table() {
+    let (ctx, catalog, _tmp) = create_context().await;
+    let sql = format!("SELECT key, value FROM 
paimon.default.{FIXTURE_TABLE}$options");
+    let batches = run_sql(&ctx, &sql).await;
+
+    assert!(!batches.is_empty(), "$options should return ≥1 batch");
+    let schema = batches[0].schema();
+    assert_eq!(schema.field(0).name(), "key");
+    assert_eq!(schema.field(1).name(), "value");
+
+    let mut actual: Vec<(String, String)> = Vec::new();
+    for batch in &batches {
+        let keys = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("key column is Utf8");
+        let values = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("value column is Utf8");
+        for i in 0..batch.num_rows() {
+            actual.push((keys.value(i).to_string(), 
values.value(i).to_string()));
+        }
+    }
+    actual.sort();
+
+    let identifier = Identifier::new("default".to_string(), 
FIXTURE_TABLE.to_string());
+    let table = catalog
+        .get_table(&identifier)
+        .await
+        .expect("fixture table should load");
+    let mut expected: Vec<(String, String)> = table
+        .schema()
+        .options()
+        .iter()
+        .map(|(k, v)| (k.clone(), v.clone()))
+        .collect();
+    expected.sort();
+
+    assert_eq!(actual, expected, "$options rows should match table options");
+}
+
+#[tokio::test]
+async fn test_unknown_system_table_name_returns_not_found() {
+    let (ctx, _catalog, _tmp) = create_context().await;
+    let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$nonsense");
+    let err = ctx
+        .sql(&sql)
+        .await
+        .expect_err("unknown system table should not resolve");
+    let msg = err.to_string();
+    assert!(
+        msg.contains("nonsense") || msg.to_lowercase().contains("not found"),
+        "unexpected error for unknown system table: {msg}"
+    );
+}
+
+#[tokio::test]
+async fn test_missing_base_table_for_system_table_errors() {
+    let (ctx, _catalog, _tmp) = create_context().await;
+    let sql = "SELECT * FROM paimon.default.does_not_exist$options";
+    let err = ctx
+        .sql(sql)
+        .await
+        .expect_err("missing base table should error");
+    let msg = err.to_string();
+    assert!(
+        msg.contains("does_not_exist") && msg.contains("$options"),
+        "expected error to mention both base table and system name, got: {msg}"
+    );
+}

Reply via email to