This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a89f75  feat: Add $schemas system table (#245)
0a89f75 is described below

commit 0a89f75b69cbe25c8fa7957bb9c1b772121850ee
Author: Jiajia Li <[email protected]>
AuthorDate: Thu Apr 16 10:14:02 2026 +0800

    feat: Add $schemas system table (#245)
---
 crates/integrations/datafusion/Cargo.toml          |   4 +-
 .../datafusion/src/system_tables/mod.rs            |   6 +-
 .../datafusion/src/system_tables/schemas.rs        | 138 ++++++++++++++++++
 .../integrations/datafusion/tests/system_tables.rs | 109 ++++++++++++++-
 crates/paimon/src/catalog/filesystem.rs            |  34 +----
 crates/paimon/src/table/schema_manager.rs          | 155 +++++++++++++++++++++
 6 files changed, 410 insertions(+), 36 deletions(-)

diff --git a/crates/integrations/datafusion/Cargo.toml 
b/crates/integrations/datafusion/Cargo.toml
index ddfa2c0..eb8c644 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -36,6 +36,8 @@ chrono = "0.4"
 datafusion = { workspace = true }
 paimon = { path = "../../paimon" }
 futures = "0.3"
+serde = { version = "1", features = ["derive"] }
+serde_json = "1"
 tokio = { workspace = true, features = ["rt", "time", "fs"] }
 
 [dev-dependencies]
@@ -43,8 +45,6 @@ arrow-array = { workspace = true }
 arrow-schema = { workspace = true }
 flate2 = "1"
 parquet = { workspace = true }
-serde = "1"
-serde_json = "1"
 tar = "0.4"
 tempfile = "3"
 tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs 
b/crates/integrations/datafusion/src/system_tables/mod.rs
index fddb3b9..65a2176 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -30,10 +30,11 @@ use paimon::table::Table;
 use crate::error::to_datafusion_error;
 
 mod options;
+mod schemas;
 
 type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
 
-const TABLES: &[(&str, Builder)] = &[("options", options::build)];
+const TABLES: &[(&str, Builder)] = &[("options", options::build), ("schemas", 
schemas::build)];
 
 /// Parse a Paimon object name into `(base_table, optional system_table_name)`.
 ///
@@ -117,6 +118,9 @@ mod tests {
         assert!(is_registered("options"));
         assert!(is_registered("Options"));
         assert!(is_registered("OPTIONS"));
+        assert!(is_registered("schemas"));
+        assert!(is_registered("Schemas"));
+        assert!(is_registered("SCHEMAS"));
         assert!(!is_registered("nonsense"));
     }
 
diff --git a/crates/integrations/datafusion/src/system_tables/schemas.rs 
b/crates/integrations/datafusion/src/system_tables/schemas.rs
new file mode 100644
index 0000000..03da793
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/schemas.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java 
[SchemasTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java).
+
+use std::any::Any;
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray, 
TimestampMillisecondArray};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, 
TimeUnit};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::table::Table;
+use serde::Serialize;
+
+use crate::error::to_datafusion_error;
+
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+    Ok(Arc::new(SchemasTable { table }))
+}
+
+fn schemas_schema() -> SchemaRef {
+    static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+    SCHEMA
+        .get_or_init(|| {
+            Arc::new(Schema::new(vec![
+                Field::new("schema_id", DataType::Int64, false),
+                Field::new("fields", DataType::Utf8, false),
+                Field::new("partition_keys", DataType::Utf8, false),
+                Field::new("primary_keys", DataType::Utf8, false),
+                Field::new("options", DataType::Utf8, false),
+                Field::new("comment", DataType::Utf8, true),
+                Field::new(
+                    "update_time",
+                    DataType::Timestamp(TimeUnit::Millisecond, None),
+                    false,
+                ),
+            ]))
+        })
+        .clone()
+}
+
+#[derive(Debug)]
+struct SchemasTable {
+    table: Table,
+}
+
+#[async_trait]
+impl TableProvider for SchemasTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        schemas_schema()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::View
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        let schemas = self
+            .table
+            .schema_manager()
+            .list_all()
+            .await
+            .map_err(to_datafusion_error)?;
+
+        let n = schemas.len();
+        let mut schema_ids: Vec<i64> = Vec::with_capacity(n);
+        let mut fields_json: Vec<String> = Vec::with_capacity(n);
+        let mut partition_keys_json: Vec<String> = Vec::with_capacity(n);
+        let mut primary_keys_json: Vec<String> = Vec::with_capacity(n);
+        let mut options_json: Vec<String> = Vec::with_capacity(n);
+        let mut comments: Vec<Option<String>> = Vec::with_capacity(n);
+        let mut update_times: Vec<i64> = Vec::with_capacity(n);
+
+        for schema in &schemas {
+            schema_ids.push(schema.id());
+            fields_json.push(to_json(schema.fields())?);
+            partition_keys_json.push(to_json(schema.partition_keys())?);
+            primary_keys_json.push(to_json(schema.primary_keys())?);
+            options_json.push(to_json(schema.options())?);
+            comments.push(schema.comment().map(str::to_string));
+            update_times.push(schema.time_millis());
+        }
+
+        let schema = schemas_schema();
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int64Array::from(schema_ids)),
+                Arc::new(StringArray::from(fields_json)),
+                Arc::new(StringArray::from(partition_keys_json)),
+                Arc::new(StringArray::from(primary_keys_json)),
+                Arc::new(StringArray::from(options_json)),
+                Arc::new(StringArray::from(comments)),
+                Arc::new(TimestampMillisecondArray::from(update_times)),
+            ],
+        )?;
+
+        Ok(MemorySourceConfig::try_new_exec(
+            &[vec![batch]],
+            schema,
+            projection.cloned(),
+        )?)
+    }
+}
+
+fn to_json<T: Serialize + ?Sized>(value: &T) -> DFResult<String> {
+    serde_json::to_string(value).map_err(|e| 
DataFusionError::External(Box::new(e)))
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs 
b/crates/integrations/datafusion/tests/system_tables.rs
index c3292e3..188b000 100644
--- a/crates/integrations/datafusion/tests/system_tables.rs
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -15,11 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Paimon `$options` system table end-to-end via DataFusion SQL.
+//! Paimon system tables end-to-end via DataFusion SQL.
 
 use std::sync::Arc;
 
-use datafusion::arrow::array::{Array, StringArray};
+use datafusion::arrow::array::{Array, Int64Array, StringArray};
+use datafusion::arrow::datatypes::{DataType, TimeUnit};
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::prelude::SessionContext;
 use paimon::catalog::Identifier;
@@ -129,6 +130,110 @@ async fn 
test_unknown_system_table_name_returns_not_found() {
     );
 }
 
+#[tokio::test]
+async fn test_schemas_system_table() {
+    let (ctx, catalog, _tmp) = create_context().await;
+    let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$schemas");
+    let batches = run_sql(&ctx, &sql).await;
+
+    assert!(!batches.is_empty(), "$schemas should return ≥1 batch");
+
+    let arrow_schema = batches[0].schema();
+    let expected_columns = [
+        ("schema_id", DataType::Int64),
+        ("fields", DataType::Utf8),
+        ("partition_keys", DataType::Utf8),
+        ("primary_keys", DataType::Utf8),
+        ("options", DataType::Utf8),
+        ("comment", DataType::Utf8),
+        (
+            "update_time",
+            DataType::Timestamp(TimeUnit::Millisecond, None),
+        ),
+    ];
+    for (i, (name, dtype)) in expected_columns.iter().enumerate() {
+        let field = arrow_schema.field(i);
+        assert_eq!(field.name(), name, "column {i} name");
+        assert_eq!(field.data_type(), dtype, "column {i} type");
+    }
+
+    let identifier = Identifier::new("default".to_string(), 
FIXTURE_TABLE.to_string());
+    let table = catalog
+        .get_table(&identifier)
+        .await
+        .expect("fixture table should load");
+    let all_schemas = table
+        .schema_manager()
+        .list_all()
+        .await
+        .expect("list_all should succeed");
+    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+    assert_eq!(
+        total_rows,
+        all_schemas.len(),
+        "$schemas rows should match list_all() length"
+    );
+
+    let mut ids: Vec<i64> = Vec::new();
+    for batch in &batches {
+        let col = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("schema_id is Int64");
+        for i in 0..batch.num_rows() {
+            ids.push(col.value(i));
+        }
+    }
+    let mut sorted = ids.clone();
+    sorted.sort_unstable();
+    assert_eq!(ids, sorted, "schema_id column should be ascending");
+
+    // The last row's JSON columns must round-trip to the current schema.
+    let last_batch = batches.last().unwrap();
+    let last_idx = last_batch.num_rows() - 1;
+    let latest = table.schema();
+    let json_columns: [(usize, &str, String); 4] = [
+        (
+            1,
+            "fields",
+            serde_json::to_string(latest.fields()).expect("serialize fields"),
+        ),
+        (
+            2,
+            "partition_keys",
+            serde_json::to_string(latest.partition_keys()).expect("serialize 
partition_keys"),
+        ),
+        (
+            3,
+            "primary_keys",
+            serde_json::to_string(latest.primary_keys()).expect("serialize 
primary_keys"),
+        ),
+        (
+            4,
+            "options",
+            serde_json::to_string(latest.options()).expect("serialize 
options"),
+        ),
+    ];
+    for (col_idx, col_name, expected) in &json_columns {
+        let col = last_batch
+            .column(*col_idx)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap_or_else(|| panic!("column {col_name} is not Utf8"));
+        // Parse both sides before comparing: `options` is a HashMap whose
+        // JSON key order is non-deterministic across `HashMap` instances.
+        let actual: serde_json::Value = 
serde_json::from_str(col.value(last_idx))
+            .unwrap_or_else(|e| panic!("parse actual `{col_name}`: {e}"));
+        let expected: serde_json::Value = serde_json::from_str(expected)
+            .unwrap_or_else(|e| panic!("parse expected `{col_name}`: {e}"));
+        assert_eq!(
+            actual, expected,
+            "latest-row `{col_name}` JSON should round-trip"
+        );
+    }
+}
+
 #[tokio::test]
 async fn test_missing_base_table_for_system_table_errors() {
     let (ctx, _catalog, _tmp) = create_context().await;
diff --git a/crates/paimon/src/catalog/filesystem.rs 
b/crates/paimon/src/catalog/filesystem.rs
index a061e8a..e61324f 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -26,7 +26,7 @@ use crate::common::{CatalogOptions, Options};
 use crate::error::{ConfigInvalidSnafu, Error, Result};
 use crate::io::FileIO;
 use crate::spec::{Schema, TableSchema};
-use crate::table::Table;
+use crate::table::{SchemaManager, Table};
 use async_trait::async_trait;
 use bytes::Bytes;
 use opendal::raw::get_basename;
@@ -167,36 +167,8 @@ impl FileSystemCatalog {
 
     /// Load the latest schema for a table (highest schema-{version} file 
under table_path/schema).
     async fn load_latest_table_schema(&self, table_path: &str) -> 
Result<Option<TableSchema>> {
-        let schema_dir = self.schema_dir_path(table_path);
-        if !self.file_io.exists(&schema_dir).await? {
-            return Ok(None);
-        }
-        let statuses = self.file_io.list_status(&schema_dir).await?;
-
-        let latest_schema_id = statuses
-            .into_iter()
-            .filter(|s| !s.is_dir)
-            .filter_map(|s| {
-                get_basename(s.path.as_str())
-                    .strip_prefix(SCHEMA_PREFIX)?
-                    .parse::<i64>()
-                    .ok()
-            })
-            .max();
-
-        if let Some(schema_id) = latest_schema_id {
-            let schema_path = self.schema_file_path(table_path, schema_id);
-            let input_file = self.file_io.new_input(&schema_path)?;
-            let content = input_file.read().await?;
-            let schema: TableSchema =
-                serde_json::from_slice(&content).map_err(|e| 
Error::DataInvalid {
-                    message: format!("Failed to parse schema file: 
{schema_path}"),
-                    source: Some(Box::new(e)),
-                })?;
-            return Ok(Some(schema));
-        }
-
-        Ok(None)
+        let manager = SchemaManager::new(self.file_io.clone(), 
table_path.to_string());
+        Ok(manager.latest().await?.map(|arc| (*arc).clone()))
     }
 
     /// Save a table schema to a file.
diff --git a/crates/paimon/src/table/schema_manager.rs 
b/crates/paimon/src/table/schema_manager.rs
index 057dc3f..61e3b81 100644
--- a/crates/paimon/src/table/schema_manager.rs
+++ b/crates/paimon/src/table/schema_manager.rs
@@ -21,6 +21,8 @@
 
 use crate::io::FileIO;
 use crate::spec::TableSchema;
+use futures::future::try_join_all;
+use opendal::raw::get_basename;
 use std::collections::HashMap;
 use std::sync::{Arc, Mutex};
 
@@ -65,6 +67,48 @@ impl SchemaManager {
         format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id)
     }
 
+    /// List all schema ids sorted ascending. Returns an empty vector if the
+    /// schema directory is missing or empty.
+    ///
+    /// Mirrors Java 
[SchemaManager.listAllIds()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java).
+    pub async fn list_all_ids(&self) -> crate::Result<Vec<i64>> {
+        let mut ids: Vec<i64> = self
+            .file_io
+            .list_status(&self.schema_directory())
+            .await?
+            .into_iter()
+            .filter(|s| !s.is_dir)
+            .filter_map(|s| {
+                get_basename(s.path.as_str())
+                    .strip_prefix(SCHEMA_PREFIX)?
+                    .parse::<i64>()
+                    .ok()
+            })
+            .collect();
+        ids.sort_unstable();
+        Ok(ids)
+    }
+
+    /// List all schemas sorted by id ascending.
+    ///
+    /// Mirrors Java 
[SchemaManager.listAll()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java).
+    pub async fn list_all(&self) -> crate::Result<Vec<Arc<TableSchema>>> {
+        let ids = self.list_all_ids().await?;
+        try_join_all(ids.into_iter().map(|id| self.schema(id))).await
+    }
+
+    /// Return the schema with the highest id, or `None` when no schema files
+    /// exist under the schema directory.
+    ///
+    /// Mirrors Java 
[SchemaManager.latest()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java).
+    pub async fn latest(&self) -> crate::Result<Option<Arc<TableSchema>>> {
+        let ids = self.list_all_ids().await?;
+        match ids.last() {
+            Some(&max_id) => Ok(Some(self.schema(max_id).await?)),
+            None => Ok(None),
+        }
+    }
+
     /// Load a schema by ID. Returns cached version if available.
     ///
     /// The cache is shared across all clones of this `SchemaManager`, so 
loading
@@ -101,3 +145,114 @@ impl SchemaManager {
         Ok(schema)
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::io::FileIOBuilder;
+    use crate::spec::Schema;
+    use bytes::Bytes;
+
+    fn memory_file_io() -> FileIO {
+        FileIOBuilder::new("memory").build().unwrap()
+    }
+
+    async fn write_schema_file(file_io: &FileIO, dir: &str, id: i64) {
+        let schema = Schema::builder().build().unwrap();
+        let table_schema = TableSchema::new(id, &schema);
+        let json = serde_json::to_vec(&table_schema).unwrap();
+        let path = format!("{dir}/{SCHEMA_PREFIX}{id}");
+        let out = file_io.new_output(&path).unwrap();
+        out.write(Bytes::from(json)).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn list_all_ids_returns_empty_for_missing_directory() {
+        let file_io = memory_file_io();
+        let sm = SchemaManager::new(file_io, 
"memory:/list_missing".to_string());
+        assert!(sm.list_all_ids().await.unwrap().is_empty());
+    }
+
+    #[tokio::test]
+    async fn list_all_ids_returns_empty_for_empty_directory() {
+        let file_io = memory_file_io();
+        let table_path = "memory:/list_empty";
+        let dir = format!("{table_path}/{SCHEMA_DIR}");
+        file_io.mkdirs(&dir).await.unwrap();
+
+        let sm = SchemaManager::new(file_io, table_path.to_string());
+        assert!(sm.list_all_ids().await.unwrap().is_empty());
+    }
+
+    #[tokio::test]
+    async fn list_all_ids_sorts_ascending() {
+        let file_io = memory_file_io();
+        let table_path = "memory:/list_sorted";
+        let dir = format!("{table_path}/{SCHEMA_DIR}");
+        file_io.mkdirs(&dir).await.unwrap();
+        for id in [3, 0, 2, 1] {
+            write_schema_file(&file_io, &dir, id).await;
+        }
+
+        let sm = SchemaManager::new(file_io, table_path.to_string());
+        assert_eq!(sm.list_all_ids().await.unwrap(), vec![0, 1, 2, 3]);
+    }
+
+    #[tokio::test]
+    async fn list_all_ids_ignores_unrelated_files() {
+        let file_io = memory_file_io();
+        let table_path = "memory:/list_filter";
+        let dir = format!("{table_path}/{SCHEMA_DIR}");
+        file_io.mkdirs(&dir).await.unwrap();
+        write_schema_file(&file_io, &dir, 0).await;
+        // `schema-foo` starts with the prefix but is not an i64.
+        let junk = file_io
+            .new_output(&format!("{dir}/{SCHEMA_PREFIX}foo"))
+            .unwrap();
+        junk.write(Bytes::from("{}")).await.unwrap();
+        // A completely unrelated file.
+        let other = file_io.new_output(&format!("{dir}/README")).unwrap();
+        other.write(Bytes::from("hi")).await.unwrap();
+
+        let sm = SchemaManager::new(file_io, table_path.to_string());
+        assert_eq!(sm.list_all_ids().await.unwrap(), vec![0]);
+    }
+
+    #[tokio::test]
+    async fn list_all_loads_schemas_in_order() {
+        let file_io = memory_file_io();
+        let table_path = "memory:/list_all_load";
+        let dir = format!("{table_path}/{SCHEMA_DIR}");
+        file_io.mkdirs(&dir).await.unwrap();
+        for id in [0, 2, 1] {
+            write_schema_file(&file_io, &dir, id).await;
+        }
+
+        let sm = SchemaManager::new(file_io, table_path.to_string());
+        let schemas = sm.list_all().await.unwrap();
+        let ids: Vec<i64> = schemas.iter().map(|s| s.id()).collect();
+        assert_eq!(ids, vec![0, 1, 2]);
+    }
+
+    #[tokio::test]
+    async fn latest_returns_none_when_no_schemas() {
+        let file_io = memory_file_io();
+        let sm = SchemaManager::new(file_io, 
"memory:/latest_none".to_string());
+        assert!(sm.latest().await.unwrap().is_none());
+    }
+
+    #[tokio::test]
+    async fn latest_returns_max_id_schema() {
+        let file_io = memory_file_io();
+        let table_path = "memory:/latest_max";
+        let dir = format!("{table_path}/{SCHEMA_DIR}");
+        file_io.mkdirs(&dir).await.unwrap();
+        for id in [0, 5, 2] {
+            write_schema_file(&file_io, &dir, id).await;
+        }
+
+        let sm = SchemaManager::new(file_io, table_path.to_string());
+        let latest = sm.latest().await.unwrap().expect("latest");
+        assert_eq!(latest.id(), 5);
+    }
+}

Reply via email to