This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f996751 feat(datafusion): Add $options system table (#240)
f996751 is described below
commit f99675109055793e88c0156261a5f09d1466839b
Author: Jiajia Li <[email protected]>
AuthorDate: Wed Apr 15 09:44:16 2026 +0800
feat(datafusion): Add $options system table (#240)
---
crates/integrations/datafusion/src/catalog.rs | 23 ++-
crates/integrations/datafusion/src/lib.rs | 1 +
.../datafusion/src/system_tables/mod.rs | 161 +++++++++++++++++++++
.../datafusion/src/system_tables/options.rs | 92 ++++++++++++
.../integrations/datafusion/tests/system_tables.rs | 145 +++++++++++++++++++
5 files changed, 420 insertions(+), 2 deletions(-)
diff --git a/crates/integrations/datafusion/src/catalog.rs
b/crates/integrations/datafusion/src/catalog.rs
index 7ac66d4..2698116 100644
--- a/crates/integrations/datafusion/src/catalog.rs
+++ b/crates/integrations/datafusion/src/catalog.rs
@@ -30,6 +30,7 @@ use paimon::catalog::{Catalog, Identifier};
use crate::error::to_datafusion_error;
use crate::runtime::{await_with_runtime, block_on_with_runtime};
+use crate::system_tables;
use crate::table::PaimonTableProvider;
/// Provides an interface to manage and access multiple schemas (databases)
@@ -175,8 +176,19 @@ impl SchemaProvider for PaimonSchemaProvider {
}
async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn
TableProvider>>> {
+ let (base, system_name) = system_tables::split_object_name(name);
+ if let Some(system_name) = system_name {
+ return await_with_runtime(system_tables::load(
+ Arc::clone(&self.catalog),
+ self.database.clone(),
+ base.to_string(),
+ system_name.to_string(),
+ ))
+ .await;
+ }
+
let catalog = Arc::clone(&self.catalog);
- let identifier = Identifier::new(self.database.clone(), name);
+ let identifier = Identifier::new(self.database.clone(), base);
await_with_runtime(async move {
match catalog.get_table(&identifier).await {
Ok(table) => {
@@ -191,8 +203,15 @@ impl SchemaProvider for PaimonSchemaProvider {
}
fn table_exist(&self, name: &str) -> bool {
+ let (base, system_name) = system_tables::split_object_name(name);
+ if let Some(system_name) = system_name {
+ if !system_tables::is_registered(system_name) {
+ return false;
+ }
+ }
+
let catalog = Arc::clone(&self.catalog);
- let identifier = Identifier::new(self.database.clone(), name);
+ let identifier = Identifier::new(self.database.clone(),
base.to_string());
block_on_with_runtime(
async move {
match catalog.get_table(&identifier).await {
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index e803153..4717854 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -45,6 +45,7 @@ mod full_text_search;
mod physical_plan;
mod relation_planner;
pub mod runtime;
+mod system_tables;
mod table;
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs
b/crates/integrations/datafusion/src/system_tables/mod.rs
new file mode 100644
index 0000000..fddb3b9
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Paimon system tables (`<table>$<name>`) as DataFusion table providers.
+//!
+//! Mirrors Java
[SystemTableLoader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java):
+//! `TABLES` maps each system-table name to its builder function.
+
+use std::sync::Arc;
+
+use datafusion::datasource::TableProvider;
+use datafusion::error::{DataFusionError, Result as DFResult};
+use paimon::catalog::{Catalog, Identifier, SYSTEM_BRANCH_PREFIX,
SYSTEM_TABLE_SPLITTER};
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+
+mod options;
+
+type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
+
+const TABLES: &[(&str, Builder)] = &[("options", options::build)];
+
+/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
+///
+/// Mirrors Java
[Identifier.splitObjectName](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/catalog/Identifier.java).
+///
+/// - `t` → `("t", None)`
+/// - `t$options` → `("t", Some("options"))`
+/// - `t$branch_main` → `("t", None)` (branch reference, not a system table)
+/// - `t$branch_main$options` → `("t", Some("options"))` (branch + system
table)
+pub(crate) fn split_object_name(name: &str) -> (&str, Option<&str>) {
+ let mut parts = name.splitn(3, SYSTEM_TABLE_SPLITTER);
+ let base = parts.next().unwrap_or(name);
+ match (parts.next(), parts.next()) {
+ (None, _) => (base, None),
+ (Some(second), None) => {
+ if second.starts_with(SYSTEM_BRANCH_PREFIX) {
+ (base, None)
+ } else {
+ (base, Some(second))
+ }
+ }
+ (Some(second), Some(third)) => {
+ if second.starts_with(SYSTEM_BRANCH_PREFIX) {
+ (base, Some(third))
+ } else {
+ // `$` is legal in table names, so `t$foo$bar` falls through as
+ // plain `t` and errors later as "table not found".
+ (base, None)
+ }
+ }
+ }
+}
+
+/// Returns true if `name` is a recognised Paimon system table suffix.
+pub(crate) fn is_registered(name: &str) -> bool {
+ TABLES.iter().any(|(n, _)| name.eq_ignore_ascii_case(n))
+}
+
+/// Wraps an already-loaded base table as the system table `name`.
+fn wrap_to_system_table(name: &str, base_table: Table) ->
Option<DFResult<Arc<dyn TableProvider>>> {
+ TABLES
+ .iter()
+ .find(|(n, _)| name.eq_ignore_ascii_case(n))
+ .map(|(_, build)| build(base_table))
+}
+
+/// Loads `<base>$<system_name>` from the catalog and wraps it as a system
+/// table provider.
+///
+/// - Unknown `system_name` → `Ok(None)` (DataFusion reports "table not found")
+/// - Base table missing → `Err(Plan)` so users can distinguish it from an
+/// unknown system name
+pub(crate) async fn load(
+ catalog: Arc<dyn Catalog>,
+ database: String,
+ base: String,
+ system_name: String,
+) -> DFResult<Option<Arc<dyn TableProvider>>> {
+ if !is_registered(&system_name) {
+ return Ok(None);
+ }
+ let identifier = Identifier::new(database, base.clone());
+ match catalog.get_table(&identifier).await {
+ Ok(table) => wrap_to_system_table(&system_name, table)
+ .expect("is_registered guarantees a builder")
+ .map(Some),
+ Err(paimon::Error::TableNotExist { .. }) =>
Err(DataFusionError::Plan(format!(
+ "Cannot read system table `${system_name}`: \
+ base table `{base}` does not exist"
+ ))),
+ Err(e) => Err(to_datafusion_error(e)),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{is_registered, split_object_name};
+
+ #[test]
+ fn is_registered_is_case_insensitive() {
+ assert!(is_registered("options"));
+ assert!(is_registered("Options"));
+ assert!(is_registered("OPTIONS"));
+ assert!(!is_registered("nonsense"));
+ }
+
+ #[test]
+ fn plain_table_name() {
+ assert_eq!(split_object_name("orders"), ("orders", None));
+ }
+
+ #[test]
+ fn system_table_only() {
+ assert_eq!(
+ split_object_name("orders$options"),
+ ("orders", Some("options"))
+ );
+ }
+
+ #[test]
+ fn branch_reference_is_not_a_system_table() {
+ assert_eq!(split_object_name("orders$branch_main"), ("orders", None));
+ }
+
+ #[test]
+ fn branch_plus_system_table() {
+ assert_eq!(
+ split_object_name("orders$branch_main$options"),
+ ("orders", Some("options"))
+ );
+ }
+
+ #[test]
+ fn three_parts_without_branch_prefix_is_not_a_system_table() {
+ assert_eq!(split_object_name("orders$foo$bar"), ("orders", None));
+ }
+
+ #[test]
+ fn system_table_name_preserves_case() {
+ assert_eq!(
+ split_object_name("orders$Options"),
+ ("orders", Some("Options"))
+ );
+ }
+}
diff --git a/crates/integrations/datafusion/src/system_tables/options.rs
b/crates/integrations/datafusion/src/system_tables/options.rs
new file mode 100644
index 0000000..fe86c92
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/options.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java
[OptionsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java).
+
+use std::any::Any;
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{RecordBatch, StringArray};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::table::Table;
+
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+ Ok(Arc::new(OptionsTable { table }))
+}
+
+fn options_schema() -> SchemaRef {
+ static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+ SCHEMA
+ .get_or_init(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("value", DataType::Utf8, false),
+ ]))
+ })
+ .clone()
+}
+
+#[derive(Debug)]
+struct OptionsTable {
+ table: Table,
+}
+
+#[async_trait]
+impl TableProvider for OptionsTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ options_schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ // Java uses LinkedHashMap insertion order; HashMap has none — sort
for stable output.
+ let mut entries: Vec<(&String, &String)> =
self.table.schema().options().iter().collect();
+ entries.sort_by(|a, b| a.0.cmp(b.0));
+
+ let keys = StringArray::from_iter_values(entries.iter().map(|(k, _)|
k.as_str()));
+ let values = StringArray::from_iter_values(entries.iter().map(|(_, v)|
v.as_str()));
+
+ let schema = options_schema();
+ let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(keys),
Arc::new(values)])?;
+
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema,
+ projection.cloned(),
+ )?)
+ }
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs
b/crates/integrations/datafusion/tests/system_tables.rs
new file mode 100644
index 0000000..c3292e3
--- /dev/null
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Paimon `$options` system table end-to-end via DataFusion SQL.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{Array, StringArray};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::prelude::SessionContext;
+use paimon::catalog::Identifier;
+use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
+use paimon_datafusion::PaimonCatalogProvider;
+
+const FIXTURE_TABLE: &str = "test_tantivy_fulltext";
+
+fn extract_test_warehouse() -> (tempfile::TempDir, String) {
+ let archive_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
+ .join("testdata/test_tantivy_fulltext.tar.gz");
+ let file = std::fs::File::open(&archive_path)
+ .unwrap_or_else(|e| panic!("Failed to open {}: {e}",
archive_path.display()));
+ let decoder = flate2::read::GzDecoder::new(file);
+ let mut archive = tar::Archive::new(decoder);
+
+ let tmp = tempfile::tempdir().expect("Failed to create temp dir");
+ let db_dir = tmp.path().join("default.db");
+ std::fs::create_dir_all(&db_dir).unwrap();
+ archive.unpack(&db_dir).unwrap();
+
+ let warehouse = format!("file://{}", tmp.path().display());
+ (tmp, warehouse)
+}
+
+async fn create_context() -> (SessionContext, Arc<dyn Catalog>,
tempfile::TempDir) {
+ let (tmp, warehouse) = extract_test_warehouse();
+ let mut options = Options::new();
+ options.set(CatalogOptions::WAREHOUSE, warehouse);
+ let catalog = FileSystemCatalog::new(options).expect("Failed to create
catalog");
+ let catalog: Arc<dyn Catalog> = Arc::new(catalog);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog(
+ "paimon",
+ Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog))),
+ );
+ (ctx, catalog, tmp)
+}
+
+async fn run_sql(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch> {
+ ctx.sql(sql)
+ .await
+ .unwrap_or_else(|e| panic!("Failed to plan `{sql}`: {e}"))
+ .collect()
+ .await
+ .unwrap_or_else(|e| panic!("Failed to execute `{sql}`: {e}"))
+}
+
+#[tokio::test]
+async fn test_options_system_table() {
+ let (ctx, catalog, _tmp) = create_context().await;
+ let sql = format!("SELECT key, value FROM
paimon.default.{FIXTURE_TABLE}$options");
+ let batches = run_sql(&ctx, &sql).await;
+
+ assert!(!batches.is_empty(), "$options should return ≥1 batch");
+ let schema = batches[0].schema();
+ assert_eq!(schema.field(0).name(), "key");
+ assert_eq!(schema.field(1).name(), "value");
+
+ let mut actual: Vec<(String, String)> = Vec::new();
+ for batch in &batches {
+ let keys = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("key column is Utf8");
+ let values = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("value column is Utf8");
+ for i in 0..batch.num_rows() {
+ actual.push((keys.value(i).to_string(),
values.value(i).to_string()));
+ }
+ }
+ actual.sort();
+
+ let identifier = Identifier::new("default".to_string(),
FIXTURE_TABLE.to_string());
+ let table = catalog
+ .get_table(&identifier)
+ .await
+ .expect("fixture table should load");
+ let mut expected: Vec<(String, String)> = table
+ .schema()
+ .options()
+ .iter()
+ .map(|(k, v)| (k.clone(), v.clone()))
+ .collect();
+ expected.sort();
+
+ assert_eq!(actual, expected, "$options rows should match table options");
+}
+
+#[tokio::test]
+async fn test_unknown_system_table_name_returns_not_found() {
+ let (ctx, _catalog, _tmp) = create_context().await;
+ let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$nonsense");
+ let err = ctx
+ .sql(&sql)
+ .await
+ .expect_err("unknown system table should not resolve");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("nonsense") || msg.to_lowercase().contains("not found"),
+ "unexpected error for unknown system table: {msg}"
+ );
+}
+
+#[tokio::test]
+async fn test_missing_base_table_for_system_table_errors() {
+ let (ctx, _catalog, _tmp) = create_context().await;
+ let sql = "SELECT * FROM paimon.default.does_not_exist$options";
+ let err = ctx
+ .sql(sql)
+ .await
+ .expect_err("missing base table should error");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("does_not_exist") && msg.contains("$options"),
+ "expected error to mention both base table and system name, got: {msg}"
+ );
+}