This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0a89f75 feat: Add $schemas system table (#245)
0a89f75 is described below
commit 0a89f75b69cbe25c8fa7957bb9c1b772121850ee
Author: Jiajia Li <[email protected]>
AuthorDate: Thu Apr 16 10:14:02 2026 +0800
feat: Add $schemas system table (#245)
---
crates/integrations/datafusion/Cargo.toml | 4 +-
.../datafusion/src/system_tables/mod.rs | 6 +-
.../datafusion/src/system_tables/schemas.rs | 138 ++++++++++++++++++
.../integrations/datafusion/tests/system_tables.rs | 109 ++++++++++++++-
crates/paimon/src/catalog/filesystem.rs | 34 +----
crates/paimon/src/table/schema_manager.rs | 155 +++++++++++++++++++++
6 files changed, 410 insertions(+), 36 deletions(-)
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index ddfa2c0..eb8c644 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -36,6 +36,8 @@ chrono = "0.4"
datafusion = { workspace = true }
paimon = { path = "../../paimon" }
futures = "0.3"
+serde = { version = "1", features = ["derive"] }
+serde_json = "1"
tokio = { workspace = true, features = ["rt", "time", "fs"] }
[dev-dependencies]
@@ -43,8 +45,6 @@ arrow-array = { workspace = true }
arrow-schema = { workspace = true }
flate2 = "1"
parquet = { workspace = true }
-serde = "1"
-serde_json = "1"
tar = "0.4"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs
b/crates/integrations/datafusion/src/system_tables/mod.rs
index fddb3b9..65a2176 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -30,10 +30,11 @@ use paimon::table::Table;
use crate::error::to_datafusion_error;
mod options;
+mod schemas;
type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
-const TABLES: &[(&str, Builder)] = &[("options", options::build)];
+const TABLES: &[(&str, Builder)] = &[("options", options::build), ("schemas",
schemas::build)];
/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
///
@@ -117,6 +118,9 @@ mod tests {
assert!(is_registered("options"));
assert!(is_registered("Options"));
assert!(is_registered("OPTIONS"));
+ assert!(is_registered("schemas"));
+ assert!(is_registered("Schemas"));
+ assert!(is_registered("SCHEMAS"));
assert!(!is_registered("nonsense"));
}
diff --git a/crates/integrations/datafusion/src/system_tables/schemas.rs
b/crates/integrations/datafusion/src/system_tables/schemas.rs
new file mode 100644
index 0000000..03da793
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/schemas.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java
[SchemasTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java).
+
+use std::any::Any;
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray,
TimestampMillisecondArray};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef,
TimeUnit};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::table::Table;
+use serde::Serialize;
+
+use crate::error::to_datafusion_error;
+
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+ Ok(Arc::new(SchemasTable { table }))
+}
+
+fn schemas_schema() -> SchemaRef {
+ static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+ SCHEMA
+ .get_or_init(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("schema_id", DataType::Int64, false),
+ Field::new("fields", DataType::Utf8, false),
+ Field::new("partition_keys", DataType::Utf8, false),
+ Field::new("primary_keys", DataType::Utf8, false),
+ Field::new("options", DataType::Utf8, false),
+ Field::new("comment", DataType::Utf8, true),
+ Field::new(
+ "update_time",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ false,
+ ),
+ ]))
+ })
+ .clone()
+}
+
+#[derive(Debug)]
+struct SchemasTable {
+ table: Table,
+}
+
+#[async_trait]
+impl TableProvider for SchemasTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ schemas_schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let schemas = self
+ .table
+ .schema_manager()
+ .list_all()
+ .await
+ .map_err(to_datafusion_error)?;
+
+ let n = schemas.len();
+ let mut schema_ids: Vec<i64> = Vec::with_capacity(n);
+ let mut fields_json: Vec<String> = Vec::with_capacity(n);
+ let mut partition_keys_json: Vec<String> = Vec::with_capacity(n);
+ let mut primary_keys_json: Vec<String> = Vec::with_capacity(n);
+ let mut options_json: Vec<String> = Vec::with_capacity(n);
+ let mut comments: Vec<Option<String>> = Vec::with_capacity(n);
+ let mut update_times: Vec<i64> = Vec::with_capacity(n);
+
+ for schema in &schemas {
+ schema_ids.push(schema.id());
+ fields_json.push(to_json(schema.fields())?);
+ partition_keys_json.push(to_json(schema.partition_keys())?);
+ primary_keys_json.push(to_json(schema.primary_keys())?);
+ options_json.push(to_json(schema.options())?);
+ comments.push(schema.comment().map(str::to_string));
+ update_times.push(schema.time_millis());
+ }
+
+ let schema = schemas_schema();
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int64Array::from(schema_ids)),
+ Arc::new(StringArray::from(fields_json)),
+ Arc::new(StringArray::from(partition_keys_json)),
+ Arc::new(StringArray::from(primary_keys_json)),
+ Arc::new(StringArray::from(options_json)),
+ Arc::new(StringArray::from(comments)),
+ Arc::new(TimestampMillisecondArray::from(update_times)),
+ ],
+ )?;
+
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema,
+ projection.cloned(),
+ )?)
+ }
+}
+
+fn to_json<T: Serialize + ?Sized>(value: &T) -> DFResult<String> {
+ serde_json::to_string(value).map_err(|e|
DataFusionError::External(Box::new(e)))
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs
b/crates/integrations/datafusion/tests/system_tables.rs
index c3292e3..188b000 100644
--- a/crates/integrations/datafusion/tests/system_tables.rs
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-//! Paimon `$options` system table end-to-end via DataFusion SQL.
+//! Paimon system tables end-to-end via DataFusion SQL.
use std::sync::Arc;
-use datafusion::arrow::array::{Array, StringArray};
+use datafusion::arrow::array::{Array, Int64Array, StringArray};
+use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::SessionContext;
use paimon::catalog::Identifier;
@@ -129,6 +130,110 @@ async fn
test_unknown_system_table_name_returns_not_found() {
);
}
+#[tokio::test]
+async fn test_schemas_system_table() {
+ let (ctx, catalog, _tmp) = create_context().await;
+ let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$schemas");
+ let batches = run_sql(&ctx, &sql).await;
+
+ assert!(!batches.is_empty(), "$schemas should return ≥1 batch");
+
+ let arrow_schema = batches[0].schema();
+ let expected_columns = [
+ ("schema_id", DataType::Int64),
+ ("fields", DataType::Utf8),
+ ("partition_keys", DataType::Utf8),
+ ("primary_keys", DataType::Utf8),
+ ("options", DataType::Utf8),
+ ("comment", DataType::Utf8),
+ (
+ "update_time",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ ),
+ ];
+ for (i, (name, dtype)) in expected_columns.iter().enumerate() {
+ let field = arrow_schema.field(i);
+ assert_eq!(field.name(), name, "column {i} name");
+ assert_eq!(field.data_type(), dtype, "column {i} type");
+ }
+
+ let identifier = Identifier::new("default".to_string(),
FIXTURE_TABLE.to_string());
+ let table = catalog
+ .get_table(&identifier)
+ .await
+ .expect("fixture table should load");
+ let all_schemas = table
+ .schema_manager()
+ .list_all()
+ .await
+ .expect("list_all should succeed");
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(
+ total_rows,
+ all_schemas.len(),
+ "$schemas rows should match list_all() length"
+ );
+
+ let mut ids: Vec<i64> = Vec::new();
+ for batch in &batches {
+ let col = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("schema_id is Int64");
+ for i in 0..batch.num_rows() {
+ ids.push(col.value(i));
+ }
+ }
+ let mut sorted = ids.clone();
+ sorted.sort_unstable();
+ assert_eq!(ids, sorted, "schema_id column should be ascending");
+
+ // The last row's JSON columns must round-trip to the current schema.
+ let last_batch = batches.last().unwrap();
+ let last_idx = last_batch.num_rows() - 1;
+ let latest = table.schema();
+ let json_columns: [(usize, &str, String); 4] = [
+ (
+ 1,
+ "fields",
+ serde_json::to_string(latest.fields()).expect("serialize fields"),
+ ),
+ (
+ 2,
+ "partition_keys",
+ serde_json::to_string(latest.partition_keys()).expect("serialize
partition_keys"),
+ ),
+ (
+ 3,
+ "primary_keys",
+ serde_json::to_string(latest.primary_keys()).expect("serialize
primary_keys"),
+ ),
+ (
+ 4,
+ "options",
+ serde_json::to_string(latest.options()).expect("serialize
options"),
+ ),
+ ];
+ for (col_idx, col_name, expected) in &json_columns {
+ let col = last_batch
+ .column(*col_idx)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap_or_else(|| panic!("column {col_name} is not Utf8"));
+ // Parse both sides before comparing: `options` is a HashMap whose
+ // JSON key order is non-deterministic across `HashMap` instances.
+ let actual: serde_json::Value =
serde_json::from_str(col.value(last_idx))
+ .unwrap_or_else(|e| panic!("parse actual `{col_name}`: {e}"));
+ let expected: serde_json::Value = serde_json::from_str(expected)
+ .unwrap_or_else(|e| panic!("parse expected `{col_name}`: {e}"));
+ assert_eq!(
+ actual, expected,
+ "latest-row `{col_name}` JSON should round-trip"
+ );
+ }
+}
+
#[tokio::test]
async fn test_missing_base_table_for_system_table_errors() {
let (ctx, _catalog, _tmp) = create_context().await;
diff --git a/crates/paimon/src/catalog/filesystem.rs
b/crates/paimon/src/catalog/filesystem.rs
index a061e8a..e61324f 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -26,7 +26,7 @@ use crate::common::{CatalogOptions, Options};
use crate::error::{ConfigInvalidSnafu, Error, Result};
use crate::io::FileIO;
use crate::spec::{Schema, TableSchema};
-use crate::table::Table;
+use crate::table::{SchemaManager, Table};
use async_trait::async_trait;
use bytes::Bytes;
use opendal::raw::get_basename;
@@ -167,36 +167,8 @@ impl FileSystemCatalog {
/// Load the latest schema for a table (highest schema-{version} file
under table_path/schema).
async fn load_latest_table_schema(&self, table_path: &str) ->
Result<Option<TableSchema>> {
- let schema_dir = self.schema_dir_path(table_path);
- if !self.file_io.exists(&schema_dir).await? {
- return Ok(None);
- }
- let statuses = self.file_io.list_status(&schema_dir).await?;
-
- let latest_schema_id = statuses
- .into_iter()
- .filter(|s| !s.is_dir)
- .filter_map(|s| {
- get_basename(s.path.as_str())
- .strip_prefix(SCHEMA_PREFIX)?
- .parse::<i64>()
- .ok()
- })
- .max();
-
- if let Some(schema_id) = latest_schema_id {
- let schema_path = self.schema_file_path(table_path, schema_id);
- let input_file = self.file_io.new_input(&schema_path)?;
- let content = input_file.read().await?;
- let schema: TableSchema =
- serde_json::from_slice(&content).map_err(|e|
Error::DataInvalid {
- message: format!("Failed to parse schema file:
{schema_path}"),
- source: Some(Box::new(e)),
- })?;
- return Ok(Some(schema));
- }
-
- Ok(None)
+ let manager = SchemaManager::new(self.file_io.clone(),
table_path.to_string());
+ Ok(manager.latest().await?.map(|arc| (*arc).clone()))
}
/// Save a table schema to a file.
diff --git a/crates/paimon/src/table/schema_manager.rs
b/crates/paimon/src/table/schema_manager.rs
index 057dc3f..61e3b81 100644
--- a/crates/paimon/src/table/schema_manager.rs
+++ b/crates/paimon/src/table/schema_manager.rs
@@ -21,6 +21,8 @@
use crate::io::FileIO;
use crate::spec::TableSchema;
+use futures::future::try_join_all;
+use opendal::raw::get_basename;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
@@ -65,6 +67,48 @@ impl SchemaManager {
format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id)
}
+ /// List all schema ids sorted ascending. Returns an empty vector if the
+ /// schema directory is missing or empty.
+ ///
+ /// Mirrors Java
[SchemaManager.listAllIds()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java).
+ pub async fn list_all_ids(&self) -> crate::Result<Vec<i64>> {
+ let mut ids: Vec<i64> = self
+ .file_io
+ .list_status(&self.schema_directory())
+ .await?
+ .into_iter()
+ .filter(|s| !s.is_dir)
+ .filter_map(|s| {
+ get_basename(s.path.as_str())
+ .strip_prefix(SCHEMA_PREFIX)?
+ .parse::<i64>()
+ .ok()
+ })
+ .collect();
+ ids.sort_unstable();
+ Ok(ids)
+ }
+
+ /// List all schemas sorted by id ascending.
+ ///
+ /// Mirrors Java
[SchemaManager.listAll()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java).
+ pub async fn list_all(&self) -> crate::Result<Vec<Arc<TableSchema>>> {
+ let ids = self.list_all_ids().await?;
+ try_join_all(ids.into_iter().map(|id| self.schema(id))).await
+ }
+
+ /// Return the schema with the highest id, or `None` when no schema files
+ /// exist under the schema directory.
+ ///
+ /// Mirrors Java
[SchemaManager.latest()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java).
+ pub async fn latest(&self) -> crate::Result<Option<Arc<TableSchema>>> {
+ let ids = self.list_all_ids().await?;
+ match ids.last() {
+ Some(&max_id) => Ok(Some(self.schema(max_id).await?)),
+ None => Ok(None),
+ }
+ }
+
/// Load a schema by ID. Returns cached version if available.
///
/// The cache is shared across all clones of this `SchemaManager`, so
loading
@@ -101,3 +145,114 @@ impl SchemaManager {
Ok(schema)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::spec::Schema;
+ use bytes::Bytes;
+
+ fn memory_file_io() -> FileIO {
+ FileIOBuilder::new("memory").build().unwrap()
+ }
+
+ async fn write_schema_file(file_io: &FileIO, dir: &str, id: i64) {
+ let schema = Schema::builder().build().unwrap();
+ let table_schema = TableSchema::new(id, &schema);
+ let json = serde_json::to_vec(&table_schema).unwrap();
+ let path = format!("{dir}/{SCHEMA_PREFIX}{id}");
+ let out = file_io.new_output(&path).unwrap();
+ out.write(Bytes::from(json)).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn list_all_ids_returns_empty_for_missing_directory() {
+ let file_io = memory_file_io();
+ let sm = SchemaManager::new(file_io,
"memory:/list_missing".to_string());
+ assert!(sm.list_all_ids().await.unwrap().is_empty());
+ }
+
+ #[tokio::test]
+ async fn list_all_ids_returns_empty_for_empty_directory() {
+ let file_io = memory_file_io();
+ let table_path = "memory:/list_empty";
+ let dir = format!("{table_path}/{SCHEMA_DIR}");
+ file_io.mkdirs(&dir).await.unwrap();
+
+ let sm = SchemaManager::new(file_io, table_path.to_string());
+ assert!(sm.list_all_ids().await.unwrap().is_empty());
+ }
+
+ #[tokio::test]
+ async fn list_all_ids_sorts_ascending() {
+ let file_io = memory_file_io();
+ let table_path = "memory:/list_sorted";
+ let dir = format!("{table_path}/{SCHEMA_DIR}");
+ file_io.mkdirs(&dir).await.unwrap();
+ for id in [3, 0, 2, 1] {
+ write_schema_file(&file_io, &dir, id).await;
+ }
+
+ let sm = SchemaManager::new(file_io, table_path.to_string());
+ assert_eq!(sm.list_all_ids().await.unwrap(), vec![0, 1, 2, 3]);
+ }
+
+ #[tokio::test]
+ async fn list_all_ids_ignores_unrelated_files() {
+ let file_io = memory_file_io();
+ let table_path = "memory:/list_filter";
+ let dir = format!("{table_path}/{SCHEMA_DIR}");
+ file_io.mkdirs(&dir).await.unwrap();
+ write_schema_file(&file_io, &dir, 0).await;
+ // `schema-foo` starts with the prefix but is not an i64.
+ let junk = file_io
+ .new_output(&format!("{dir}/{SCHEMA_PREFIX}foo"))
+ .unwrap();
+ junk.write(Bytes::from("{}")).await.unwrap();
+ // A completely unrelated file.
+ let other = file_io.new_output(&format!("{dir}/README")).unwrap();
+ other.write(Bytes::from("hi")).await.unwrap();
+
+ let sm = SchemaManager::new(file_io, table_path.to_string());
+ assert_eq!(sm.list_all_ids().await.unwrap(), vec![0]);
+ }
+
+ #[tokio::test]
+ async fn list_all_loads_schemas_in_order() {
+ let file_io = memory_file_io();
+ let table_path = "memory:/list_all_load";
+ let dir = format!("{table_path}/{SCHEMA_DIR}");
+ file_io.mkdirs(&dir).await.unwrap();
+ for id in [0, 2, 1] {
+ write_schema_file(&file_io, &dir, id).await;
+ }
+
+ let sm = SchemaManager::new(file_io, table_path.to_string());
+ let schemas = sm.list_all().await.unwrap();
+ let ids: Vec<i64> = schemas.iter().map(|s| s.id()).collect();
+ assert_eq!(ids, vec![0, 1, 2]);
+ }
+
+ #[tokio::test]
+ async fn latest_returns_none_when_no_schemas() {
+ let file_io = memory_file_io();
+ let sm = SchemaManager::new(file_io,
"memory:/latest_none".to_string());
+ assert!(sm.latest().await.unwrap().is_none());
+ }
+
+ #[tokio::test]
+ async fn latest_returns_max_id_schema() {
+ let file_io = memory_file_io();
+ let table_path = "memory:/latest_max";
+ let dir = format!("{table_path}/{SCHEMA_DIR}");
+ file_io.mkdirs(&dir).await.unwrap();
+ for id in [0, 5, 2] {
+ write_schema_file(&file_io, &dir, id).await;
+ }
+
+ let sm = SchemaManager::new(file_io, table_path.to_string());
+ let latest = sm.latest().await.unwrap().expect("latest");
+ assert_eq!(latest.id(), 5);
+ }
+}