This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 15956bb feat: implement RESTCatalog with database and table CRUD
(#160)
15956bb is described below
commit 15956bbd56be7c0d72f04a99fce7abb54a0887eb
Author: umi <[email protected]>
AuthorDate: Wed Apr 1 21:24:00 2026 +0800
feat: implement RESTCatalog with database and table CRUD (#160)
---
crates/integration_tests/Cargo.toml | 4 +
crates/integration_tests/tests/read_tables.rs | 164 ++++++--
crates/paimon/Cargo.toml | 4 +-
crates/paimon/examples/rest_catalog_example.rs | 302 ++++++++++++++
crates/paimon/src/api/api_response.rs | 11 +
crates/paimon/src/api/auth/base.rs | 9 +-
crates/paimon/src/api/auth/bearer_provider.rs | 6 +-
crates/paimon/src/api/auth/dlf_provider.rs | 16 +-
crates/paimon/src/api/auth/factory.rs | 4 +-
crates/paimon/src/api/mod.rs | 2 +-
crates/paimon/src/api/rest_api.rs | 64 +--
crates/paimon/src/api/rest_client.rs | 10 +-
crates/paimon/src/api/rest_util.rs | 26 ++
.../paimon/src/{io/mod.rs => catalog/database.rs} | 44 +-
crates/paimon/src/catalog/filesystem.rs | 12 +-
crates/paimon/src/catalog/mod.rs | 10 +
crates/paimon/src/{io => catalog/rest}/mod.rs | 33 +-
crates/paimon/src/catalog/rest/rest_catalog.rs | 386 ++++++++++++++++++
.../src/{io/mod.rs => catalog/rest/rest_token.rs} | 41 +-
.../paimon/src/catalog/rest/rest_token_file_io.rs | 193 +++++++++
crates/paimon/src/common/options.rs | 3 +
crates/paimon/src/io/file_io.rs | 42 +-
crates/paimon/src/io/mod.rs | 2 +-
crates/paimon/src/io/storage_oss.rs | 19 +-
crates/paimon/tests/mock_server.rs | 46 ++-
crates/paimon/tests/rest_api_test.rs | 30 +-
crates/paimon/tests/rest_catalog_test.rs | 453 +++++++++++++++++++++
27 files changed, 1741 insertions(+), 195 deletions(-)
diff --git a/crates/integration_tests/Cargo.toml
b/crates/integration_tests/Cargo.toml
index a4753bf..092ad94 100644
--- a/crates/integration_tests/Cargo.toml
+++ b/crates/integration_tests/Cargo.toml
@@ -28,3 +28,7 @@ paimon = { path = "../paimon" }
arrow-array = { workspace = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"
+
+[dev-dependencies]
+serde_json = "1"
+axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
\ No newline at end of file
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 568400d..1b0032a 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -19,23 +19,27 @@
use arrow_array::{Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
-use paimon::catalog::Identifier;
+use paimon::api::ConfigResponse;
+use paimon::catalog::{Identifier, RESTCatalog};
+use paimon::common::Options;
+use paimon::spec::{DataType, IntType, Schema, VarCharType};
use paimon::{Catalog, Error, FileSystemCatalog, Plan};
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
+
+#[path = "../../paimon/tests/mock_server.rs"]
+mod mock_server;
+use mock_server::start_mock_server;
fn get_test_warehouse() -> String {
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_|
"/tmp/paimon-warehouse".to_string())
}
-async fn scan_and_read(table_name: &str) -> (Plan, Vec<RecordBatch>) {
- scan_and_read_with_projection(table_name, None).await
-}
-
-async fn scan_and_read_with_projection(
+async fn scan_and_read<C: Catalog + ?Sized>(
+ catalog: &C,
table_name: &str,
projection: Option<&[&str]>,
) -> (Plan, Vec<RecordBatch>) {
- let table = get_test_table(table_name).await;
+ let table = get_table_from_catalog(catalog, table_name).await;
let mut read_builder = table.new_read_builder();
if let Some(cols) = projection {
@@ -60,6 +64,30 @@ async fn scan_and_read_with_projection(
(plan, batches)
}
+async fn get_table_from_catalog<C: Catalog + ?Sized>(
+ catalog: &C,
+ table_name: &str,
+) -> paimon::Table {
+ let identifier = Identifier::new("default", table_name);
+ catalog
+ .get_table(&identifier)
+ .await
+ .expect("Failed to get table")
+}
+
+fn create_file_system_catalog() -> FileSystemCatalog {
+ let warehouse = get_test_warehouse();
+ FileSystemCatalog::new(warehouse).expect("Failed to create
FileSystemCatalog")
+}
+
+async fn scan_and_read_with_fs_catalog(
+ table_name: &str,
+ projection: Option<&[&str]>,
+) -> (Plan, Vec<RecordBatch>) {
+ let catalog = create_file_system_catalog();
+ scan_and_read(&catalog, table_name, projection).await
+}
+
fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
let mut rows = Vec::new();
for batch in batches {
@@ -81,7 +109,7 @@ fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32,
String)> {
#[tokio::test]
async fn test_read_log_table() {
- let (plan, batches) = scan_and_read("simple_log_table").await;
+ let (plan, batches) = scan_and_read_with_fs_catalog("simple_log_table",
None).await;
// Non-partitioned table: partition should be a valid arity=0 BinaryRow
// deserialized from manifest bytes, not a stub without backing data.
@@ -105,7 +133,7 @@ async fn test_read_log_table() {
#[tokio::test]
async fn test_read_dv_primary_key_table() {
- let (_, batches) = scan_and_read("simple_dv_pk_table").await;
+ let (_, batches) = scan_and_read_with_fs_catalog("simple_dv_pk_table",
None).await;
let actual = extract_id_name(&batches);
let expected = vec![
(1, "alice-v2".to_string()),
@@ -123,7 +151,7 @@ async fn test_read_dv_primary_key_table() {
#[tokio::test]
async fn test_read_partitioned_log_table() {
- let (plan, batches) = scan_and_read("partitioned_log_table").await;
+ let (plan, batches) =
scan_and_read_with_fs_catalog("partitioned_log_table", None).await;
let mut seen_partitions: HashSet<String> = HashSet::new();
for split in plan.splits() {
@@ -176,7 +204,7 @@ async fn test_read_partitioned_log_table() {
#[tokio::test]
async fn test_read_multi_partitioned_log_table() {
- let (plan, batches) = scan_and_read("multi_partitioned_log_table").await;
+ let (plan, batches) =
scan_and_read_with_fs_catalog("multi_partitioned_log_table", None).await;
let mut seen_partitions: HashSet<(String, i32)> = HashSet::new();
for split in plan.splits() {
@@ -244,7 +272,7 @@ async fn test_read_multi_partitioned_log_table() {
#[tokio::test]
async fn test_read_partitioned_dv_pk_table() {
- let (plan, batches) = scan_and_read("partitioned_dv_pk_table").await;
+ let (plan, batches) =
scan_and_read_with_fs_catalog("partitioned_dv_pk_table", None).await;
// Verify partition metadata on each split.
let mut seen_partitions: HashSet<String> = HashSet::new();
@@ -298,20 +326,10 @@ async fn test_read_partitioned_dv_pk_table() {
);
}
-async fn get_test_table(table_name: &str) -> paimon::Table {
- let warehouse = get_test_warehouse();
- let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create
catalog");
- let identifier = Identifier::new("default", table_name);
- catalog
- .get_table(&identifier)
- .await
- .expect("Failed to get table")
-}
-
#[tokio::test]
async fn test_read_with_column_projection() {
let (_, batches) =
- scan_and_read_with_projection("partitioned_log_table", Some(&["name",
"id"])).await;
+ scan_and_read_with_fs_catalog("partitioned_log_table", Some(&["name",
"id"])).await;
// Verify that output schema preserves caller-specified column order.
for batch in &batches {
@@ -340,7 +358,8 @@ async fn test_read_with_column_projection() {
#[tokio::test]
async fn test_read_projection_empty() {
- let table = get_test_table("simple_log_table").await;
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog, "simple_log_table").await;
let mut read_builder = table.new_read_builder();
read_builder.with_projection(&[]);
@@ -378,10 +397,10 @@ async fn test_read_projection_empty() {
);
}
}
-
#[tokio::test]
async fn test_read_projection_unknown_column() {
- let table = get_test_table("simple_log_table").await;
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog, "simple_log_table").await;
let mut read_builder = table.new_read_builder();
read_builder.with_projection(&["id", "nonexistent_column"]);
@@ -403,7 +422,8 @@ async fn test_read_projection_unknown_column() {
#[tokio::test]
async fn test_read_projection_all_invalid() {
- let table = get_test_table("simple_log_table").await;
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog, "simple_log_table").await;
let mut read_builder = table.new_read_builder();
read_builder.with_projection(&["nonexistent_a", "nonexistent_b"]);
@@ -425,7 +445,8 @@ async fn test_read_projection_all_invalid() {
#[tokio::test]
async fn test_read_projection_duplicate_column() {
- let table = get_test_table("simple_log_table").await;
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog, "simple_log_table").await;
let mut read_builder = table.new_read_builder();
read_builder.with_projection(&["id", "id"]);
@@ -438,3 +459,88 @@ async fn test_read_projection_duplicate_column() {
"Expected ConfigInvalid for duplicate projection, got: {err:?}"
);
}
+
+// ======================= REST Catalog read tests
===============================
+
+/// Build a simple test schema matching the Spark-provisioned tables (id INT,
name VARCHAR).
+fn simple_log_schema() -> Schema {
+ Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("name", DataType::VarChar(VarCharType::string_type()))
+ .build()
+ .expect("Failed to build schema")
+}
+
+/// Start a mock REST server backed by Spark-provisioned data on disk,
+/// register the given tables, and return a connected `RESTCatalog`.
+async fn setup_rest_catalog_with_tables(
+ table_configs: &[(&str, &str, Schema)],
+) -> (mock_server::RESTServer, RESTCatalog) {
+ let catalog_path = get_test_warehouse();
+ // Use a simple warehouse name (no slashes) to avoid URL-encoding issues
+ let warehouse_name = "test_warehouse";
+ let prefix = "mock-test";
+ let mut defaults = HashMap::new();
+ defaults.insert("prefix".to_string(), prefix.to_string());
+ let config = ConfigResponse::new(defaults);
+
+ let server = start_mock_server(
+ warehouse_name.to_string(),
+ catalog_path.clone(),
+ config,
+ vec!["default".to_string()],
+ )
+ .await;
+
+ // Register each table with its schema and the real on-disk path
+ for (database, table_name, schema) in table_configs {
+ let table_path = format!("{}/{}.db/{}", catalog_path, database,
table_name);
+ server.add_table_with_schema(database, table_name, schema.clone(),
&table_path);
+ }
+
+ let url = server.url().expect("Failed to get server URL");
+ let mut options = Options::new();
+ options.set("uri", &url);
+ options.set("warehouse", warehouse_name);
+ options.set("token.provider", "bear");
+ options.set("token", "test_token");
+
+ let catalog = RESTCatalog::new(options, true)
+ .await
+ .expect("Failed to create RESTCatalog");
+
+ (server, catalog)
+}
+
+/// Test reading an append-only (log) table via REST catalog backed by mock
server.
+///
+/// The mock server returns table metadata pointing to Spark-provisioned data
on disk.
+#[tokio::test]
+async fn test_rest_catalog_read_append_table() {
+ let table_name = "simple_log_table";
+ let (_server, catalog) =
+ setup_rest_catalog_with_tables(&[("default", table_name,
simple_log_schema())]).await;
+
+ let (plan, batches) = scan_and_read(&catalog, table_name, None).await;
+
+ assert!(
+ !plan.splits().is_empty(),
+ "REST append table should have at least one split"
+ );
+
+ assert!(
+ !batches.is_empty(),
+ "REST append table should produce at least one batch"
+ );
+
+ let actual = extract_id_name(&batches);
+ let expected = vec![
+ (1, "alice".to_string()),
+ (2, "bob".to_string()),
+ (3, "carol".to_string()),
+ ];
+ assert_eq!(
+ actual, expected,
+ "REST catalog append table rows should match expected values"
+ );
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 8ea6f9e..c78d35b 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -27,7 +27,7 @@ license.workspace = true
version.workspace = true
[features]
-default = ["storage-memory", "storage-fs"]
+default = ["storage-memory", "storage-fs", "storage-oss"]
storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3"]
storage-memory = ["opendal/services-memory"]
@@ -49,7 +49,7 @@ serde_with = "3.9.0"
serde_repr = "0.1"
snafu = "0.8.3"
typed-builder = "^0.19"
-opendal = { version = "0.49", features = ["services-fs"] }
+opendal = { version = "0.55", features = ["services-fs"] }
pretty_assertions = "1"
apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
indexmap = "2.5.0"
diff --git a/crates/paimon/examples/rest_catalog_example.rs
b/crates/paimon/examples/rest_catalog_example.rs
new file mode 100644
index 0000000..1e0d6fa
--- /dev/null
+++ b/crates/paimon/examples/rest_catalog_example.rs
@@ -0,0 +1,302 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Example: REST Catalog Operations (Complete)
+//!
+//! This example demonstrates how to use `RESTCatalog` for:
+//! 1. Database operations (create, list, get, drop)
+//! 2. Table operations (create, list, get, rename, drop)
+//! 3. Data reading from append-only tables
+//!
+//! # Usage
+//! ```bash
+//! # With DLF authentication:
+//! DLF_ACCESS_KEY_ID=xxx DLF_ACCESS_KEY_SECRET=yyy \
+//! cargo run -p paimon --example rest_catalog_example
+//!
+//! # With Bearer token authentication:
+//! PAIMON_REST_TOKEN=zzz \
+//! cargo run -p paimon --example rest_catalog_example
+//! ```
+
+use std::collections::HashMap;
+
+use futures::TryStreamExt;
+
+use paimon::catalog::{Catalog, Identifier, RESTCatalog};
+use paimon::common::{CatalogOptions, Options};
+use paimon::spec::{DataType, IntType, Schema, VarCharType};
+
+/// Create a simple test schema with `id` (INT) and `name` (VARCHAR) columns.
+fn create_test_schema() -> Schema {
+ Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("name", DataType::VarChar(VarCharType::new(255).unwrap()))
+ .build()
+ .expect("Failed to build schema")
+}
+
+/// Format a single cell value from an Arrow array at the given row index.
+/// Supports INT (Int32), BIGINT (Int64), and VARCHAR (String/LargeString).
+fn array_value_to_string(array: &dyn arrow_array::Array, row: usize) -> String
{
+ use arrow_array::*;
+
+ if array.is_null(row) {
+ return "null".to_string();
+ }
+
+ if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
+ return arr.value(row).to_string();
+ }
+ if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
+ return arr.value(row).to_string();
+ }
+ if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
+ return arr.value(row).to_string();
+ }
+ if let Some(arr) = array.as_any().downcast_ref::<LargeStringArray>() {
+ return arr.value(row).to_string();
+ }
+
+ format!("<unsupported type: {:?}>", array.data_type())
+}
+
+#[tokio::main]
+async fn main() {
+ // ==================== Configuration ====================
+ let mut options = Options::new();
+
+ // Basic configuration — replace with your actual server URL and warehouse
+ options.set(CatalogOptions::METASTORE, "rest");
+ options.set(CatalogOptions::WAREHOUSE, "paimon_catalog");
+ options.set(CatalogOptions::URI, "http://sample.net/");
+
+ // --- Authentication (choose one) ---
+
+ // DLF authentication (Alibaba Cloud)
+ options.set(CatalogOptions::TOKEN_PROVIDER, "dlf");
+ options.set("dlf.region", "cn-hangzhou");
+ options.set(
+ "dlf.access-key-id",
+ std::env::var("DLF_ACCESS_KEY_ID").expect("DLF_ACCESS_KEY_ID env var
not set"),
+ );
+ options.set(
+ "dlf.access-key-secret",
+ std::env::var("DLF_ACCESS_KEY_SECRET").expect("DLF_ACCESS_KEY_SECRET
env var not set"),
+ );
+
+ // ==================== Create RESTCatalog ====================
+ println!("Creating RESTCatalog instance...");
+ let catalog = match RESTCatalog::new(options, true).await {
+ Ok(catalog) => catalog,
+ Err(err) => {
+ eprintln!("Failed to create RESTCatalog: {}", err);
+ return;
+ }
+ };
+
+ // ==================== Part 1: Database Operations ====================
+ println!("\n=== Part 1: Database Operations ===\n");
+
+ // List databases
+ println!("Listing databases...");
+ match catalog.list_databases().await {
+ Ok(databases) => {
+ println!("Databases found: {:?}", databases);
+ println!("Total count: {}", databases.len());
+ }
+ Err(err) => {
+ eprintln!("Failed to list databases: {}", err);
+ }
+ }
+
+ // Create database
+ println!("\nCreating database 'example_db'...");
+ match catalog
+ .create_database("example_db", false, HashMap::new())
+ .await
+ {
+ Ok(()) => println!("Database created successfully"),
+ Err(err) => eprintln!("Failed to create database: {}", err),
+ }
+
+ // Get database info
+ println!("\nGetting database info for 'example_db'...");
+ match catalog.get_database("example_db").await {
+ Ok(database) => println!("Database: {:?}", database),
+ Err(err) => eprintln!("Failed to get database: {}", err),
+ }
+
+ // ==================== Part 2: Table Operations ====================
+ println!("\n=== Part 2: Table Operations ===\n");
+
+ // Create table
+ let table_identifier = Identifier::new("example_db", "users");
+ println!("Creating table '{}'...", table_identifier);
+ let schema = create_test_schema();
+ match catalog.create_table(&table_identifier, schema, false).await {
+ Ok(()) => println!("Table created successfully"),
+ Err(err) => eprintln!("Failed to create table: {}", err),
+ }
+
+ // List tables
+ println!("\nListing tables in 'example_db'...");
+ match catalog.list_tables("example_db").await {
+ Ok(tables) => {
+ println!("Tables found: {:?}", tables);
+ }
+ Err(err) => {
+ eprintln!("Failed to list tables: {}", err);
+ }
+ }
+
+ // Get table info
+ println!("\nGetting table info for '{}'...", table_identifier);
+ match catalog.get_table(&table_identifier).await {
+ Ok(table) => {
+ println!("Table location: {}", table.location());
+ println!("Table schema fields: {:?}", table.schema().fields());
+ }
+ Err(err) => eprintln!("Failed to get table: {}", err),
+ }
+
+ // Rename table
+ let renamed_identifier = Identifier::new("example_db", "users_renamed");
+ println!(
+ "\nRenaming table '{}' to '{}'...",
+ table_identifier, renamed_identifier
+ );
+ match catalog
+ .rename_table(&table_identifier, &renamed_identifier, false)
+ .await
+ {
+ Ok(()) => println!("Table renamed successfully"),
+ Err(err) => eprintln!("Failed to rename table: {}", err),
+ }
+
+ // ==================== Part 3: Read Data from Existing Table
====================
+ println!("\n=== Part 3: Read Data from Existing Table ===\n");
+
+ // Try to read from an existing table (example_db.users_renamed)
+ // This table must already exist on the REST catalog server
+ let read_table_identifier = Identifier::new("example_db", "users_renamed");
+ println!(
+ "Attempting to read from table '{}'...",
+ read_table_identifier
+ );
+
+ match catalog.get_table(&read_table_identifier).await {
+ Ok(table) => {
+ println!("Table retrieved successfully");
+ println!(" Location: {}", table.location());
+ println!(" Schema fields: {:?}", table.schema().fields());
+
+ // Scan table
+ println!("\nScanning table...");
+ let read_builder = table.new_read_builder();
+ let scan = read_builder.new_scan();
+
+ match scan.plan().await {
+ Ok(plan) => {
+ println!(" Number of splits: {}", plan.splits().len());
+
+ if plan.splits().is_empty() {
+ println!("No data splits found — the table may be
empty.");
+ } else {
+ // Read table data
+ println!("\nReading table data...");
+ match read_builder.new_read() {
+ Ok(read) => match read.to_arrow(plan.splits()) {
+ Ok(stream) => {
+ let batches: Vec<_> =
+
stream.try_collect().await.unwrap_or_default();
+ println!("Collected {} record batch(es)",
batches.len());
+
+ let mut total_rows = 0;
+ for (batch_index, batch) in
batches.iter().enumerate() {
+ let num_rows = batch.num_rows();
+ total_rows += num_rows;
+ println!(
+ "\n--- Batch {} ({} rows, {}
columns) ---",
+ batch_index,
+ num_rows,
+ batch.num_columns()
+ );
+ println!("Schema: {}", batch.schema());
+
+ // Print up to 10 rows per batch
+ let display_rows = num_rows.min(10);
+ for row in 0..display_rows {
+ let mut row_values = Vec::new();
+ for col in 0..batch.num_columns() {
+ let column = batch.column(col);
+
row_values.push(array_value_to_string(column, row));
+ }
+ println!(" Row {}: [{}]", row,
row_values.join(", "));
+ }
+ if num_rows > display_rows {
+ println!(
+ " ... ({} more rows omitted)",
+ num_rows - display_rows
+ );
+ }
+ }
+
+ println!("\n=== Read Summary ===");
+ println!("Total rows read: {}",
total_rows);
+ println!("Total batches: {}",
batches.len());
+ }
+ Err(err) => {
+ eprintln!("Failed to create arrow stream:
{}", err);
+ }
+ },
+ Err(err) => {
+ eprintln!("Failed to create table read: {}",
err);
+ }
+ }
+ }
+ }
+ Err(err) => {
+ eprintln!("Failed to plan scan: {}", err);
+ }
+ }
+ }
+ Err(err) => {
+ eprintln!(
+ "Failed to get table '{}' (this is expected if the table
doesn't exist): {}",
+ read_table_identifier, err
+ );
+ }
+ }
+
+ // ==================== Cleanup ====================
+ println!("\n=== Cleanup ===\n");
+ // Drop table
+ println!("\nDropping table '{}'...", renamed_identifier);
+ match catalog.drop_table(&renamed_identifier, false).await {
+ Ok(()) => println!("Table dropped successfully"),
+ Err(err) => eprintln!("Failed to drop table: {}", err),
+ }
+ // Drop database (cascade = true to force drop even if not empty)
+ println!("Dropping database 'example_db'...");
+ match catalog.drop_database("example_db", false, true).await {
+ Ok(()) => println!("Database dropped successfully"),
+ Err(err) => eprintln!("Failed to drop database: {}", err),
+ }
+
+ println!("\nExample completed!");
+}
diff --git a/crates/paimon/src/api/api_response.rs
b/crates/paimon/src/api/api_response.rs
index e282080..c3169a2 100644
--- a/crates/paimon/src/api/api_response.rs
+++ b/crates/paimon/src/api/api_response.rs
@@ -277,6 +277,17 @@ impl<T> PagedList<T> {
}
}
}
+
+/// Response for getting table token.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetTableTokenResponse {
+ /// Token key-value pairs (e.g. access_key_id, access_key_secret, etc.)
+ pub token: HashMap<String, String>,
+ /// Token expiration time in milliseconds since epoch.
+ pub expires_at_millis: Option<i64>,
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/paimon/src/api/auth/base.rs
b/crates/paimon/src/api/auth/base.rs
index 7b0ae33..444afe1 100644
--- a/crates/paimon/src/api/auth/base.rs
+++ b/crates/paimon/src/api/auth/base.rs
@@ -75,7 +75,7 @@ impl RESTAuthParameter {
/// Implement this trait to provide custom authentication mechanisms
/// for REST API requests.
#[async_trait]
-pub trait AuthProvider: Send {
+pub trait AuthProvider: Send + Sync {
/// Merge authentication headers into the base headers.
///
/// # Arguments
@@ -84,7 +84,7 @@ pub trait AuthProvider: Send {
///
/// # Returns
async fn merge_auth_header(
- &mut self,
+ &self,
base_header: HashMap<String, String>,
parameter: &RESTAuthParameter,
) -> Result<HashMap<String, String>>;
@@ -120,10 +120,7 @@ impl RESTAuthFunction {
///
/// # Returns
/// A HashMap containing the authenticated headers.
- pub async fn apply(
- &mut self,
- parameter: &RESTAuthParameter,
- ) -> Result<HashMap<String, String>> {
+ pub async fn apply(&self, parameter: &RESTAuthParameter) ->
Result<HashMap<String, String>> {
self.auth_provider
.merge_auth_header(self.init_header.clone(), parameter)
.await
diff --git a/crates/paimon/src/api/auth/bearer_provider.rs
b/crates/paimon/src/api/auth/bearer_provider.rs
index 2b9f0e1..ba6d4dd 100644
--- a/crates/paimon/src/api/auth/bearer_provider.rs
+++ b/crates/paimon/src/api/auth/bearer_provider.rs
@@ -46,7 +46,7 @@ impl BearerTokenAuthProvider {
#[async_trait]
impl AuthProvider for BearerTokenAuthProvider {
async fn merge_auth_header(
- &mut self,
+ &self,
mut base_header: HashMap<String, String>,
_parameter: &RESTAuthParameter,
) -> crate::Result<HashMap<String, String>> {
@@ -64,7 +64,7 @@ mod tests {
#[tokio::test]
async fn test_bearer_token_auth() {
- let mut provider = BearerTokenAuthProvider::new("test-token");
+ let provider = BearerTokenAuthProvider::new("test-token");
let base_header = HashMap::new();
let parameter = RESTAuthParameter::for_get("/test", HashMap::new());
@@ -81,7 +81,7 @@ mod tests {
#[tokio::test]
async fn test_bearer_token_with_base_headers() {
- let mut provider = BearerTokenAuthProvider::new("my-token");
+ let provider = BearerTokenAuthProvider::new("my-token");
let mut base_header = HashMap::new();
base_header.insert("Content-Type".to_string(),
"application/json".to_string());
let parameter = RESTAuthParameter::for_get("/test", HashMap::new());
diff --git a/crates/paimon/src/api/auth/dlf_provider.rs
b/crates/paimon/src/api/auth/dlf_provider.rs
index 4f264d8..a18a014 100644
--- a/crates/paimon/src/api/auth/dlf_provider.rs
+++ b/crates/paimon/src/api/auth/dlf_provider.rs
@@ -244,7 +244,7 @@ const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
/// (ROA v2 HMAC-SHA1).
pub struct DLFAuthProvider {
uri: String,
- token: Option<DLFToken>,
+ token: tokio::sync::Mutex<Option<DLFToken>>,
token_loader: Option<Arc<dyn DLFTokenLoader>>,
signer: Box<dyn DLFRequestSigner>,
}
@@ -279,7 +279,7 @@ impl DLFAuthProvider {
Ok(Self {
uri,
- token,
+ token: tokio::sync::Mutex::new(token),
token_loader,
signer,
})
@@ -290,9 +290,11 @@ impl DLFAuthProvider {
/// If token_loader is configured, this method will:
/// - Load a new token if current token is None
/// - Refresh the token if it's about to expire (within
TOKEN_EXPIRATION_SAFE_TIME_MILLIS)
- async fn get_or_refresh_token(&mut self) -> Result<DLFToken> {
+ async fn get_or_refresh_token(&self) -> Result<DLFToken> {
+ let mut token_guard = self.token.lock().await;
+
if let Some(loader) = &self.token_loader {
- let need_reload = match &self.token {
+ let need_reload = match &*token_guard {
None => true,
Some(token) => match token.expiration_at_millis {
Some(expiration_at_millis) => {
@@ -305,11 +307,11 @@ impl DLFAuthProvider {
if need_reload {
let new_token = loader.load_token().await?;
- self.token = Some(new_token);
+ *token_guard = Some(new_token);
}
}
- self.token.clone().ok_or_else(|| Error::DataInvalid {
+ token_guard.clone().ok_or_else(|| Error::DataInvalid {
message: "Either token or token_loader must be
provided".to_string(),
source: None,
})
@@ -330,7 +332,7 @@ impl DLFAuthProvider {
#[async_trait]
impl AuthProvider for DLFAuthProvider {
async fn merge_auth_header(
- &mut self,
+ &self,
mut base_header: HashMap<String, String>,
rest_auth_parameter: &RESTAuthParameter,
) -> crate::Result<HashMap<String, String>> {
diff --git a/crates/paimon/src/api/auth/factory.rs
b/crates/paimon/src/api/auth/factory.rs
index 68c2881..fc2a1b5 100644
--- a/crates/paimon/src/api/auth/factory.rs
+++ b/crates/paimon/src/api/auth/factory.rs
@@ -162,7 +162,7 @@ mod tests {
options.set(CatalogOptions::TOKEN_PROVIDER, "bear");
options.set(CatalogOptions::TOKEN, "test-token");
- let mut provider =
AuthProviderFactory::create_auth_provider(&options).unwrap();
+ let provider =
AuthProviderFactory::create_auth_provider(&options).unwrap();
let base_header = HashMap::new();
let param = RESTAuthParameter::new("GET", "/test", None,
HashMap::new());
@@ -202,7 +202,7 @@ mod tests {
options.set(CatalogOptions::DLF_ACCESS_KEY_ID, "test_key_id");
options.set(CatalogOptions::DLF_ACCESS_KEY_SECRET, "test_key_secret");
- let mut provider =
AuthProviderFactory::create_auth_provider(&options).unwrap();
+ let provider =
AuthProviderFactory::create_auth_provider(&options).unwrap();
let base_header = HashMap::new();
let param = RESTAuthParameter::new("GET", "/test", None,
HashMap::new());
diff --git a/crates/paimon/src/api/mod.rs b/crates/paimon/src/api/mod.rs
index 958323e..6307cf8 100644
--- a/crates/paimon/src/api/mod.rs
+++ b/crates/paimon/src/api/mod.rs
@@ -37,7 +37,7 @@ pub use api_request::{
// Re-export response types
pub use api_response::{
AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse,
GetTableResponse,
- ListDatabasesResponse, ListTablesResponse, PagedList,
+ GetTableTokenResponse, ListDatabasesResponse, ListTablesResponse,
PagedList,
};
// Re-export error types
diff --git a/crates/paimon/src/api/rest_api.rs
b/crates/paimon/src/api/rest_api.rs
index be09e9a..00a32b8 100644
--- a/crates/paimon/src/api/rest_api.rs
+++ b/crates/paimon/src/api/rest_api.rs
@@ -77,7 +77,6 @@ fn validate_non_empty_multi(values: &[(&str, &str)]) ->
Result<()> {
pub struct RESTApi {
client: HttpClient,
resource_paths: ResourcePaths,
- #[allow(dead_code)]
options: Options,
}
@@ -101,7 +100,7 @@ impl RESTApi {
///
/// # Errors
/// Returns an error if required options are missing or if config fetch
fails.
- pub async fn new(mut options: Options, config_required: bool) ->
Result<Self> {
+ pub async fn new(options: Options, config_required: bool) -> Result<Self> {
let uri = options
.get(CatalogOptions::URI)
.ok_or_else(|| crate::Error::ConfigInvalid {
@@ -144,17 +143,17 @@ impl RESTApi {
.await?;
// Merge config response with options (client config takes
priority)
- options = config_response.merge_options(&options);
+ let merged = config_response.merge_options(&options);
// Update base headers from merged options and recreate auth
function
- base_headers.extend(RESTUtil::extract_prefix_map(&options,
Self::HEADER_PREFIX));
+ base_headers.extend(RESTUtil::extract_prefix_map(&merged,
Self::HEADER_PREFIX));
// Recreate auth function with updated headers if needed
- let auth_provider =
AuthProviderFactory::create_auth_provider(&options)?;
+ let auth_provider =
AuthProviderFactory::create_auth_provider(&merged)?;
let rest_auth_function = RESTAuthFunction::new(base_headers,
auth_provider);
client.set_auth_function(rest_auth_function);
- options
+ merged
} else {
options
};
@@ -168,10 +167,15 @@ impl RESTApi {
})
}
+ /// Get the options (potentially merged with server config).
+ pub fn options(&self) -> &Options {
+ &self.options
+ }
+
// ==================== Database Operations ====================
/// List all databases.
- pub async fn list_databases(&mut self) -> Result<Vec<String>> {
+ pub async fn list_databases(&self) -> Result<Vec<String>> {
let mut results = Vec::new();
let mut page_token: Option<String> = None;
@@ -192,7 +196,7 @@ impl RESTApi {
/// List databases with pagination.
pub async fn list_databases_paged(
- &mut self,
+ &self,
max_results: Option<u32>,
page_token: Option<&str>,
database_name_pattern: Option<&str>,
@@ -223,9 +227,9 @@ impl RESTApi {
/// Create a new database.
pub async fn create_database(
- &mut self,
+ &self,
name: &str,
- options: Option<std::collections::HashMap<String, String>>,
+ options: Option<HashMap<String, String>>,
) -> Result<()> {
validate_non_empty(name, "database name")?;
let path = self.resource_paths.databases();
@@ -235,7 +239,7 @@ impl RESTApi {
}
/// Get database information.
- pub async fn get_database(&mut self, name: &str) ->
Result<GetDatabaseResponse> {
+ pub async fn get_database(&self, name: &str) ->
Result<GetDatabaseResponse> {
validate_non_empty(name, "database name")?;
let path = self.resource_paths.database(name);
self.client.get(&path, None::<&[(&str, &str)]>).await
@@ -243,10 +247,10 @@ impl RESTApi {
/// Alter database configuration.
pub async fn alter_database(
- &mut self,
+ &self,
name: &str,
removals: Vec<String>,
- updates: std::collections::HashMap<String, String>,
+ updates: HashMap<String, String>,
) -> Result<()> {
validate_non_empty(name, "database name")?;
let path = self.resource_paths.database(name);
@@ -256,7 +260,7 @@ impl RESTApi {
}
/// Drop a database.
- pub async fn drop_database(&mut self, name: &str) -> Result<()> {
+ pub async fn drop_database(&self, name: &str) -> Result<()> {
validate_non_empty(name, "database name")?;
let path = self.resource_paths.database(name);
let _resp: serde_json::Value = self.client.delete(&path,
None::<&[(&str, &str)]>).await?;
@@ -266,7 +270,7 @@ impl RESTApi {
// ==================== Table Operations ====================
/// List all tables in a database.
- pub async fn list_tables(&mut self, database: &str) -> Result<Vec<String>>
{
+ pub async fn list_tables(&self, database: &str) -> Result<Vec<String>> {
validate_non_empty(database, "database name")?;
let mut results = Vec::new();
@@ -289,7 +293,7 @@ impl RESTApi {
/// List tables with pagination.
pub async fn list_tables_paged(
- &mut self,
+ &self,
database: &str,
max_results: Option<u32>,
page_token: Option<&str>,
@@ -329,7 +333,7 @@ impl RESTApi {
}
/// Create a new table.
- pub async fn create_table(&mut self, identifier: &Identifier, schema:
Schema) -> Result<()> {
+ pub async fn create_table(&self, identifier: &Identifier, schema: Schema)
-> Result<()> {
let database = identifier.database();
let table = identifier.object();
validate_non_empty_multi(&[(database, "database name"), (table, "table
name")])?;
@@ -340,7 +344,7 @@ impl RESTApi {
}
/// Get table information.
- pub async fn get_table(&mut self, identifier: &Identifier) ->
Result<GetTableResponse> {
+ pub async fn get_table(&self, identifier: &Identifier) ->
Result<GetTableResponse> {
let database = identifier.database();
let table = identifier.object();
validate_non_empty_multi(&[(database, "database name"), (table, "table
name")])?;
@@ -349,11 +353,7 @@ impl RESTApi {
}
/// Rename a table.
- pub async fn rename_table(
- &mut self,
- source: &Identifier,
- destination: &Identifier,
- ) -> Result<()> {
+ pub async fn rename_table(&self, source: &Identifier, destination:
&Identifier) -> Result<()> {
validate_non_empty_multi(&[
(source.database(), "source database name"),
(source.object(), "source table name"),
@@ -367,7 +367,7 @@ impl RESTApi {
}
/// Drop a table.
- pub async fn drop_table(&mut self, identifier: &Identifier) -> Result<()> {
+ pub async fn drop_table(&self, identifier: &Identifier) -> Result<()> {
let database = identifier.database();
let table = identifier.object();
validate_non_empty_multi(&[(database, "database name"), (table, "table
name")])?;
@@ -375,4 +375,20 @@ impl RESTApi {
let _resp: serde_json::Value = self.client.delete(&path,
None::<&[(&str, &str)]>).await?;
Ok(())
}
+
+ // ==================== Token Operations ====================
+
+ /// Load table token for data access.
+ ///
+ /// Corresponds to Python `RESTApi.load_table_token`.
+ pub async fn load_table_token(
+ &self,
+ identifier: &Identifier,
+ ) -> Result<super::api_response::GetTableTokenResponse> {
+ let database = identifier.database();
+ let table = identifier.object();
+ validate_non_empty_multi(&[(database, "database name"), (table, "table
name")])?;
+ let path = self.resource_paths.table_token(database, table);
+ self.client.get(&path, None::<&[(&str, &str)]>).await
+ }
}
diff --git a/crates/paimon/src/api/rest_client.rs
b/crates/paimon/src/api/rest_client.rs
index 76db48f..1b6a636 100644
--- a/crates/paimon/src/api/rest_client.rs
+++ b/crates/paimon/src/api/rest_client.rs
@@ -94,7 +94,7 @@ impl HttpClient {
/// # Returns
/// The parsed JSON response.
pub async fn get<T: DeserializeOwned>(
- &mut self,
+ &self,
path: &str,
params: Option<&[(impl AsRef<str>, impl AsRef<str>)]>,
) -> Result<T> {
@@ -136,7 +136,7 @@ impl HttpClient {
/// # Returns
/// The parsed JSON response.
pub async fn post<T: DeserializeOwned, B: serde::Serialize>(
- &mut self,
+ &self,
path: &str,
body: &B,
) -> Result<T> {
@@ -163,7 +163,7 @@ impl HttpClient {
/// # Returns
/// The parsed JSON response.
pub async fn delete<T: DeserializeOwned>(
- &mut self,
+ &self,
path: &str,
params: Option<&[(impl AsRef<str>, impl AsRef<str>)]>,
) -> Result<T> {
@@ -203,13 +203,13 @@ impl HttpClient {
/// Build auth headers for a request.
async fn build_auth_headers(
- &mut self,
+ &self,
method: &str,
path: &str,
data: Option<&str>,
params: HashMap<String, String>,
) -> Result<HashMap<String, String>> {
- if let Some(ref mut auth_fn) = self.auth_function {
+ if let Some(ref auth_fn) = self.auth_function {
let parameter =
RESTAuthParameter::new(method, path, data.map(|s|
s.to_string()), params);
auth_fn.apply(¶meter).await
diff --git a/crates/paimon/src/api/rest_util.rs
b/crates/paimon/src/api/rest_util.rs
index ce8627c..5c259fe 100644
--- a/crates/paimon/src/api/rest_util.rs
+++ b/crates/paimon/src/api/rest_util.rs
@@ -41,6 +41,32 @@ impl RESTUtil {
pub fn extract_prefix_map(options: &Options, prefix: &str) ->
HashMap<String, String> {
options.extract_prefix_map(prefix)
}
+
+ /// Merge two property maps, with `override_properties` taking precedence.
+ ///
+ /// For keys present in both maps, the value from `override_properties`
wins.
+ /// `None` values are skipped (only relevant at the map level; individual
+ /// entries are always `String`).
+ pub fn merge(
+ base_properties: Option<&HashMap<String, String>>,
+ override_properties: Option<&HashMap<String, String>>,
+ ) -> HashMap<String, String> {
+ let mut result = HashMap::new();
+
+ if let Some(base) = base_properties {
+ for (key, value) in base {
+ result.insert(key.clone(), value.clone());
+ }
+ }
+
+ if let Some(overrides) = override_properties {
+ for (key, value) in overrides {
+ result.insert(key.clone(), value.clone());
+ }
+ }
+
+ result
+ }
}
#[cfg(test)]
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/catalog/database.rs
similarity index 55%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/catalog/database.rs
index 226aa72..b291ec7 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/catalog/database.rs
@@ -15,28 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-mod file_io;
-pub use file_io::*;
+//! Database structure for Apache Paimon catalogs.
-mod storage;
-pub use storage::*;
+use std::collections::HashMap;
-#[cfg(feature = "storage-fs")]
-mod storage_fs;
-#[cfg(feature = "storage-fs")]
-use storage_fs::*;
+/// Structure representing a database in a Paimon catalog.
+#[derive(Debug, Clone, PartialEq)]
+pub struct Database {
+ /// Database name.
+ pub name: String,
+ /// Database options/properties.
+ pub options: HashMap<String, String>,
+ /// Optional comment describing the database.
+ pub comment: Option<String>,
+}
-#[cfg(feature = "storage-memory")]
-mod storage_memory;
-#[cfg(feature = "storage-memory")]
-use storage_memory::*;
-
-#[cfg(feature = "storage-oss")]
-mod storage_oss;
-#[cfg(feature = "storage-oss")]
-use storage_oss::*;
-
-#[cfg(feature = "storage-s3")]
-mod storage_s3;
-#[cfg(feature = "storage-s3")]
-use storage_s3::*;
+impl Database {
+ /// Create a new Database.
+ pub fn new(name: String, options: HashMap<String, String>, comment:
Option<String>) -> Self {
+ Self {
+ name,
+ options,
+ comment,
+ }
+ }
+}
diff --git a/crates/paimon/src/catalog/filesystem.rs
b/crates/paimon/src/catalog/filesystem.rs
index 2685c66..61d6e8d 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -21,7 +21,7 @@
use std::collections::HashMap;
-use crate::catalog::{Catalog, Identifier, DB_LOCATION_PROP, DB_SUFFIX};
+use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP,
DB_SUFFIX};
use crate::error::{Error, Result};
use crate::io::FileIO;
use crate::spec::{Schema, TableSchema};
@@ -237,6 +237,16 @@ impl Catalog for FileSystemCatalog {
Ok(())
}
+ async fn get_database(&self, name: &str) -> Result<Database> {
+ if !self.database_exists(name).await? {
+ return Err(Error::DatabaseNotExist {
+ database: name.to_string(),
+ });
+ }
+
+ Ok(Database::new(name.to_string(), HashMap::new(), None))
+ }
+
async fn drop_database(
&self,
name: &str,
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index 4b43ffa..1b4bf57 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -20,12 +20,16 @@
//! Design aligns with [Paimon Java
Catalog](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java)
//! and follows API patterns from Apache Iceberg Rust.
+mod database;
mod filesystem;
+mod rest;
use std::collections::HashMap;
use std::fmt;
+pub use database::*;
pub use filesystem::*;
+pub use rest::*;
use serde::{Deserialize, Serialize};
/// Splitter for system table names (e.g. `table$snapshots`).
@@ -146,6 +150,12 @@ pub trait Catalog: Send + Sync {
properties: HashMap<String, String>,
) -> Result<()>;
+ /// Get a database by name.
+ ///
+ /// # Errors
+ /// * [`crate::Error::DatabaseNotExist`] - database does not exist.
+ async fn get_database(&self, name: &str) -> Result<Database>;
+
/// Drop a database.
///
/// * `ignore_if_not_exists` - if true, do nothing when the database does
not exist.
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/catalog/rest/mod.rs
similarity index 62%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/catalog/rest/mod.rs
index 226aa72..921e629 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/catalog/rest/mod.rs
@@ -15,28 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-mod file_io;
-pub use file_io::*;
+//! REST catalog implementation for Apache Paimon.
+//!
+//! This module provides a REST-based catalog that communicates with
+//! a Paimon REST catalog server for metadata operations.
-mod storage;
-pub use storage::*;
+mod rest_catalog;
+mod rest_token;
+mod rest_token_file_io;
-#[cfg(feature = "storage-fs")]
-mod storage_fs;
-#[cfg(feature = "storage-fs")]
-use storage_fs::*;
-
-#[cfg(feature = "storage-memory")]
-mod storage_memory;
-#[cfg(feature = "storage-memory")]
-use storage_memory::*;
-
-#[cfg(feature = "storage-oss")]
-mod storage_oss;
-#[cfg(feature = "storage-oss")]
-use storage_oss::*;
-
-#[cfg(feature = "storage-s3")]
-mod storage_s3;
-#[cfg(feature = "storage-s3")]
-use storage_s3::*;
+pub use rest_catalog::*;
+pub use rest_token::*;
+pub use rest_token_file_io::*;
diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs
b/crates/paimon/src/catalog/rest/rest_catalog.rs
new file mode 100644
index 0000000..e5d023f
--- /dev/null
+++ b/crates/paimon/src/catalog/rest/rest_catalog.rs
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! REST catalog implementation for Apache Paimon.
+//!
+//! This module provides a REST-based catalog that communicates with
+//! a Paimon REST catalog server for database and table CRUD operations.
+
+use std::collections::HashMap;
+
+use async_trait::async_trait;
+
+use crate::api::rest_api::RESTApi;
+use crate::api::rest_error::RestError;
+use crate::api::PagedList;
+use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP};
+use crate::common::{CatalogOptions, Options};
+use crate::error::Error;
+use crate::io::FileIO;
+use crate::spec::{Schema, SchemaChange, TableSchema};
+use crate::table::Table;
+use crate::Result;
+
+use super::rest_token_file_io::RESTTokenFileIO;
+
+/// REST catalog implementation.
+///
+/// This catalog communicates with a Paimon REST catalog server
+/// for all metadata operations (database and table CRUD).
+///
+/// Corresponds to Python `RESTCatalog` in
`pypaimon/catalog/rest/rest_catalog.py`.
+pub struct RESTCatalog {
+ /// The REST API client.
+ api: RESTApi,
+ /// Catalog configuration options.
+ options: Options,
+ /// Warehouse path.
+ warehouse: String,
+ /// Whether data token is enabled for FileIO construction.
+ data_token_enabled: bool,
+}
+
+impl RESTCatalog {
+ /// Create a new REST catalog.
+ ///
+ /// # Arguments
+ /// * `options` - Configuration options containing URI, warehouse, etc.
+ /// * `config_required` - Whether to fetch config from server and merge
with options.
+ ///
+ /// # Errors
+ /// Returns an error if required options are missing or if initialization
fails.
+ pub async fn new(options: Options, config_required: bool) -> Result<Self> {
+ let warehouse = options
+ .get(CatalogOptions::WAREHOUSE)
+ .cloned()
+ .ok_or_else(|| RestError::BadRequest {
+ message: format!("Missing required option: {}",
CatalogOptions::WAREHOUSE),
+ })?;
+
+ let api = RESTApi::new(options.clone(), config_required).await?;
+
+ let data_token_enabled = api
+ .options()
+ .get(CatalogOptions::DATA_TOKEN_ENABLED)
+ .map(|v| v.eq_ignore_ascii_case("true"))
+ .unwrap_or(false);
+
+ let api_options = api.options().clone();
+
+ Ok(Self {
+ api,
+ options: api_options,
+ warehouse,
+ data_token_enabled,
+ })
+ }
+
+ /// Get the warehouse path.
+ pub fn warehouse(&self) -> &str {
+ &self.warehouse
+ }
+
+ /// Get the catalog options.
+ pub fn options(&self) -> &Options {
+ &self.options
+ }
+
+ /// Whether data token is enabled.
+ pub fn data_token_enabled(&self) -> bool {
+ self.data_token_enabled
+ }
+
+ /// List databases with pagination.
+ pub async fn list_databases_paged(
+ &self,
+ max_results: Option<u32>,
+ page_token: Option<&str>,
+ database_name_pattern: Option<&str>,
+ ) -> Result<PagedList<String>> {
+ self.api
+ .list_databases_paged(max_results, page_token,
database_name_pattern)
+ .await
+ }
+}
+
+// ============================================================================
+// Catalog trait implementation
+// ============================================================================
+
+#[async_trait]
+impl Catalog for RESTCatalog {
+ // ======================= database methods ===============================
+
+ async fn list_databases(&self) -> Result<Vec<String>> {
+ self.api.list_databases().await
+ }
+
+ async fn create_database(
+ &self,
+ name: &str,
+ ignore_if_exists: bool,
+ properties: HashMap<String, String>,
+ ) -> Result<()> {
+ let result = self
+ .api
+ .create_database(name, Some(properties))
+ .await
+ .map_err(|e| map_rest_error_for_database(e, name));
+ ignore_error_if(result, |e| {
+ ignore_if_exists && matches!(e, Error::DatabaseAlreadyExist { .. })
+ })
+ }
+
+ async fn get_database(&self, name: &str) -> Result<Database> {
+ let response = self
+ .api
+ .get_database(name)
+ .await
+ .map_err(|e| map_rest_error_for_database(e, name))?;
+
+ let mut options = response.options;
+ if let Some(location) = response.location {
+ options.insert(DB_LOCATION_PROP.to_string(), location);
+ }
+
+ Ok(Database::new(name.to_string(), options, None))
+ }
+
+ async fn drop_database(
+ &self,
+ name: &str,
+ ignore_if_not_exists: bool,
+ cascade: bool,
+ ) -> Result<()> {
+ // If not cascade, check if database is empty first
+ if !cascade {
+ let tables = match self.api.list_tables(name).await {
+ Ok(tables) => tables,
+ Err(err) => {
+ return
ignore_error_if(Err(map_rest_error_for_database(err, name)), |e| {
+ ignore_if_not_exists && matches!(e,
Error::DatabaseNotExist { .. })
+ });
+ }
+ };
+ if !tables.is_empty() {
+ return Err(Error::DatabaseNotEmpty {
+ database: name.to_string(),
+ });
+ }
+ }
+
+ let result = self
+ .api
+ .drop_database(name)
+ .await
+ .map_err(|e| map_rest_error_for_database(e, name));
+ ignore_error_if(result, |e| {
+ ignore_if_not_exists && matches!(e, Error::DatabaseNotExist { .. })
+ })
+ }
+
+ // ======================= table methods ===============================
+
+ async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
+ let response = self
+ .api
+ .get_table(identifier)
+ .await
+ .map_err(|e| map_rest_error_for_table(e, identifier))?;
+
+ // Extract schema from response
+ let schema = response.schema.ok_or_else(|| Error::DataInvalid {
+ message: format!("Table {} response missing schema",
identifier.full_name()),
+ source: None,
+ })?;
+
+ let schema_id = response.schema_id.ok_or_else(|| Error::DataInvalid {
+ message: format!(
+ "Table {} response missing schema_id",
+ identifier.full_name()
+ ),
+ source: None,
+ })?;
+ let table_schema = TableSchema::new(schema_id, &schema);
+
+ // Extract table path from response
+ let table_path = response.path.ok_or_else(|| Error::DataInvalid {
+ message: format!("Table {} response missing path",
identifier.full_name()),
+ source: None,
+ })?;
+
+ // Check if the table is external
+ let is_external = response.is_external.ok_or_else(||
Error::DataInvalid {
+ message: format!(
+ "Table {} response missing is_external",
+ identifier.full_name()
+ ),
+ source: None,
+ })?;
+
+ // Build FileIO based on data_token_enabled and is_external
+ // TODO Support token cache and direct oss access
+ let file_io = if self.data_token_enabled && !is_external {
+ // Use RESTTokenFileIO to get token-based FileIO
+ let token_file_io =
+ RESTTokenFileIO::new(identifier.clone(), table_path.clone(),
self.options.clone());
+ token_file_io.build_file_io().await?
+ } else {
+ // Use standard FileIO from path
+ FileIO::from_path(&table_path)?.build()?
+ };
+
+ Ok(Table::new(
+ file_io,
+ identifier.clone(),
+ table_path,
+ table_schema,
+ ))
+ }
+
+ async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
+ self.api
+ .list_tables(database_name)
+ .await
+ .map_err(|e| map_rest_error_for_database(e, database_name))
+ }
+
+ async fn create_table(
+ &self,
+ identifier: &Identifier,
+ creation: Schema,
+ ignore_if_exists: bool,
+ ) -> Result<()> {
+ let result = self
+ .api
+ .create_table(identifier, creation)
+ .await
+ .map_err(|e| map_rest_error_for_table(e, identifier));
+ ignore_error_if(result, |e| {
+ ignore_if_exists && matches!(e, Error::TableAlreadyExist { .. })
+ })
+ }
+
+ async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists:
bool) -> Result<()> {
+ let result = self
+ .api
+ .drop_table(identifier)
+ .await
+ .map_err(|e| map_rest_error_for_table(e, identifier));
+ ignore_error_if(result, |e| {
+ ignore_if_not_exists && matches!(e, Error::TableNotExist { .. })
+ })
+ }
+
+ async fn rename_table(
+ &self,
+ from: &Identifier,
+ to: &Identifier,
+ ignore_if_not_exists: bool,
+ ) -> Result<()> {
+ let result = self
+ .api
+ .rename_table(from, to)
+ .await
+ .map_err(|e| map_rest_error_for_table(e, from))
+ // Remap TableAlreadyExist to use destination identifier
+ .map_err(|e| match e {
+ Error::TableAlreadyExist { .. } => Error::TableAlreadyExist {
+ full_name: to.full_name(),
+ },
+ other => other,
+ });
+ ignore_error_if(result, |e| {
+ ignore_if_not_exists && matches!(e, Error::TableNotExist { .. })
+ })
+ }
+
+ async fn alter_table(
+ &self,
+ _identifier: &Identifier,
+ _changes: Vec<SchemaChange>,
+ _ignore_if_not_exists: bool,
+ ) -> Result<()> {
+ // TODO: Implement alter_table when RESTApi supports it
+ Err(Error::Unsupported {
+ message: "Alter table is not yet implemented for REST
catalog".to_string(),
+ })
+ }
+}
+// ============================================================================
+// Error mapping helpers
+// ============================================================================
+
+/// Map a REST API error to a catalog-level database error.
+///
+/// Converts `RestError::NoSuchResource` -> `Error::DatabaseNotExist`,
+/// `RestError::AlreadyExists` -> `Error::DatabaseAlreadyExist`,
+/// and passes through other errors via `Error::RestApi`.
+fn map_rest_error_for_database(err: Error, database_name: &str) -> Error {
+ match err {
+ Error::RestApi {
+ source: RestError::NoSuchResource { .. },
+ } => Error::DatabaseNotExist {
+ database: database_name.to_string(),
+ },
+ Error::RestApi {
+ source: RestError::AlreadyExists { .. },
+ } => Error::DatabaseAlreadyExist {
+ database: database_name.to_string(),
+ },
+ other => other,
+ }
+}
+
+/// Map a REST API error to a catalog-level table error.
+///
+/// Converts `RestError::NoSuchResource` -> `Error::TableNotExist`,
+/// `RestError::AlreadyExists` -> `Error::TableAlreadyExist`,
+/// and passes through other errors via `Error::RestApi`.
+fn map_rest_error_for_table(err: Error, identifier: &Identifier) -> Error {
+ match err {
+ Error::RestApi {
+ source: RestError::NoSuchResource { .. },
+ } => Error::TableNotExist {
+ full_name: identifier.full_name(),
+ },
+ Error::RestApi {
+ source: RestError::AlreadyExists { .. },
+ } => Error::TableAlreadyExist {
+ full_name: identifier.full_name(),
+ },
+ other => other,
+ }
+}
+
+/// Execute a fallible operation and ignore a specific error variant.
+///
+/// If the operation succeeds, returns `Ok(())`.
+/// If it fails with an error that `should_ignore` returns `true` for, returns
`Ok(())`.
+/// Otherwise, returns the error.
+fn ignore_error_if<F>(result: Result<()>, should_ignore: F) -> Result<()>
+where
+ F: Fn(&Error) -> bool,
+{
+ result.or_else(|err| {
+ if should_ignore(&err) {
+ Ok(())
+ } else {
+ Err(err)
+ }
+ })
+}
diff --git a/crates/paimon/src/io/mod.rs
b/crates/paimon/src/catalog/rest/rest_token.rs
similarity index 56%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/catalog/rest/rest_token.rs
index 226aa72..83d9dc9 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/catalog/rest/rest_token.rs
@@ -15,28 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-mod file_io;
-pub use file_io::*;
+//! REST token for data access in Apache Paimon.
-mod storage;
-pub use storage::*;
+use std::collections::HashMap;
-#[cfg(feature = "storage-fs")]
-mod storage_fs;
-#[cfg(feature = "storage-fs")]
-use storage_fs::*;
+/// Token for REST data access, containing credentials and expiration.
+#[derive(Debug, Clone)]
+pub struct RESTToken {
+ /// Token key-value pairs (e.g. access_key_id, access_key_secret, etc.)
+ pub token: HashMap<String, String>,
+ /// Token expiration time in milliseconds since epoch.
+ pub expire_at_millis: i64,
+}
-#[cfg(feature = "storage-memory")]
-mod storage_memory;
-#[cfg(feature = "storage-memory")]
-use storage_memory::*;
-
-#[cfg(feature = "storage-oss")]
-mod storage_oss;
-#[cfg(feature = "storage-oss")]
-use storage_oss::*;
-
-#[cfg(feature = "storage-s3")]
-mod storage_s3;
-#[cfg(feature = "storage-s3")]
-use storage_s3::*;
+impl RESTToken {
+ /// Create a new RESTToken.
+ pub fn new(token: HashMap<String, String>, expire_at_millis: i64) -> Self {
+ Self {
+ token,
+ expire_at_millis,
+ }
+ }
+}
diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs
b/crates/paimon/src/catalog/rest/rest_token_file_io.rs
new file mode 100644
index 0000000..6233eb1
--- /dev/null
+++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! REST token-based FileIO for Apache Paimon.
+//!
+//! This module provides a FileIO wrapper that supports getting data access
+//! tokens from a REST Server. It handles token caching, expiration detection,
+//! and automatic refresh.
+
+use std::collections::HashMap;
+
+use tokio::sync::{OnceCell, RwLock};
+
+use crate::api::rest_api::RESTApi;
+use crate::api::rest_util::RESTUtil;
+use crate::catalog::Identifier;
+use crate::common::{CatalogOptions, Options};
+use crate::io::storage_oss::OSS_ENDPOINT;
+use crate::io::FileIO;
+use crate::Result;
+
+use super::rest_token::RESTToken;
+
+/// Safe time margin (in milliseconds) before token expiration to trigger
refresh.
+const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
+
+/// A FileIO wrapper that supports getting data access tokens from a REST
Server.
+///
+/// This struct handles:
+/// - Token caching with expiration detection
+/// - Automatic token refresh via `RESTApi::load_table_token`
+/// - Merging token credentials into catalog options to build the underlying
`FileIO`
+pub struct RESTTokenFileIO {
+ /// Table identifier for token requests.
+ identifier: Identifier,
+ /// Table path (e.g. "oss://bucket/warehouse/db.db/table").
+ path: String,
+ /// Catalog options used to build FileIO and create RESTApi.
+ catalog_options: Options,
+ /// Lazily-initialized REST API client for token refresh.
+ /// Created on first token refresh and reused for subsequent refreshes.
+ api: OnceCell<RESTApi>,
+ /// Cached token with RwLock for concurrent access.
+ token: RwLock<Option<RESTToken>>,
+}
+
+impl RESTTokenFileIO {
+ /// Create a new RESTTokenFileIO.
+ ///
+ /// # Arguments
+ /// * `identifier` - Table identifier for token requests.
+ /// * `path` - Table path for FileIO construction.
+ /// * `catalog_options` - Catalog options for RESTApi and FileIO.
+ pub fn new(identifier: Identifier, path: String, catalog_options: Options)
-> Self {
+ Self {
+ identifier,
+ path,
+ catalog_options,
+ api: OnceCell::new(),
+ token: RwLock::new(None),
+ }
+ }
+
+ /// Build a `FileIO` instance with the current token merged into options.
+ ///
+ /// This method:
+ /// 1. Refreshes the token if expired or not yet obtained.
+ /// 2. Merges token credentials into catalog options.
+ /// 3. Builds a `FileIO` from the merged options.
+ ///
+ /// This method builds a FileIO with the current token,
+ /// which can be passed to `Table::new`. If the token expires, a new
+ /// `get_table` call is needed.
+ pub async fn build_file_io(&self) -> Result<FileIO> {
+ // Ensure token is fresh
+ self.try_to_refresh_token().await?;
+
+ let token_guard = self.token.read().await;
+ match token_guard.as_ref() {
+ Some(token) => {
+ // Merge catalog options (base) with token credentials
(override)
+ let merged_props =
+ RESTUtil::merge(Some(self.catalog_options.to_map()),
Some(&token.token));
+ // Build FileIO with merged properties
+ let mut builder = FileIO::from_path(&self.path)?;
+ builder = builder.with_props(merged_props);
+ builder.build()
+ }
+ None => {
+ // No token available, build FileIO from path only
+ FileIO::from_path(&self.path)?.build()
+ }
+ }
+ }
+
+ /// Try to refresh the token if it is expired or not yet obtained.
+ async fn try_to_refresh_token(&self) -> Result<()> {
+ // Fast path: check if token is still valid under read lock
+ {
+ let token_guard = self.token.read().await;
+ if let Some(token) = token_guard.as_ref() {
+ if !Self::is_token_expired(token) {
+ return Ok(());
+ }
+ }
+ }
+
+ // Slow path: acquire write lock and check again
+ {
+ let token_guard = self.token.write().await;
+ if let Some(token) = token_guard.as_ref() {
+ if !Self::is_token_expired(token) {
+ return Ok(());
+ }
+ }
+ }
+ // Write lock released before .await to avoid potential deadlock
+
+ // Refresh the token WITHOUT holding the lock
+ let new_token = self.refresh_token().await?;
+
+ // Acquire write lock again to update
+ let mut token_guard = self.token.write().await;
+ *token_guard = Some(new_token);
+ Ok(())
+ }
+
+ /// Refresh the token by calling `RESTApi::load_table_token`.
+ ///
+ /// Lazily creates a `RESTApi` instance on first call and reuses it
+ /// for subsequent refreshes.
+ async fn refresh_token(&self) -> Result<RESTToken> {
+ let api = self
+ .api
+ .get_or_try_init(|| async {
RESTApi::new(self.catalog_options.clone(), false).await })
+ .await?;
+
+ let response = api.load_table_token(&self.identifier).await?;
+
+ let expires_at_millis =
+ response
+ .expires_at_millis
+ .ok_or_else(|| crate::Error::DataInvalid {
+ message: format!(
+ "Token response for table '{}' missing
expires_at_millis",
+ self.identifier.full_name()
+ ),
+ source: None,
+ })?;
+
+ // Merge token with catalog options (e.g. DLF OSS endpoint override)
+ let merged_token =
self.merge_token_with_catalog_options(response.token);
+ Ok(RESTToken::new(merged_token, expires_at_millis))
+ }
+
+ /// Check if a token is expired (within the safe time margin).
+ fn is_token_expired(token: &RESTToken) -> bool {
+ let current_time = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_millis() as i64;
+ (token.expire_at_millis - current_time) <
TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+ }
+
+ /// Merge token credentials with catalog options for DLF OSS endpoint
override.
+ fn merge_token_with_catalog_options(
+ &self,
+ token: HashMap<String, String>,
+ ) -> HashMap<String, String> {
+ let mut merged = token;
+ // If catalog options contain a DLF OSS endpoint, override the
standard OSS endpoint
+ if let Some(dlf_oss_endpoint) =
self.catalog_options.get(CatalogOptions::DLF_OSS_ENDPOINT) {
+ if !dlf_oss_endpoint.trim().is_empty() {
+ merged.insert(OSS_ENDPOINT.to_string(),
dlf_oss_endpoint.clone());
+ }
+ }
+ merged
+ }
+}
diff --git a/crates/paimon/src/common/options.rs
b/crates/paimon/src/common/options.rs
index a469a07..792c601 100644
--- a/crates/paimon/src/common/options.rs
+++ b/crates/paimon/src/common/options.rs
@@ -69,6 +69,9 @@ impl CatalogOptions {
/// DLF ECS role name.
pub const DLF_TOKEN_ECS_ROLE_NAME: &'static str =
"dlf.token-ecs-role-name";
+
+ /// DLF OSS endpoint override.
+ pub const DLF_OSS_ENDPOINT: &'static str = "dlf.oss-endpoint";
}
/// Configuration options container.
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 3b0a4d6..84c134a 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -21,9 +21,9 @@ use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
-use chrono::{DateTime, Utc};
use opendal::raw::normalize_root;
-use opendal::{Metakey, Operator};
+use opendal::raw::Timestamp;
+use opendal::Operator;
use snafu::ResultExt;
use url::Url;
@@ -122,18 +122,21 @@ impl FileIO {
// use normalize_root to make sure it end with `/`.
let list_path = normalize_root(relative_path);
- // Request ContentLength and LastModified so accessing
meta.content_length() / last_modified()
- let entries = op
- .list_with(&list_path)
- .metakey(Metakey::ContentLength | Metakey::LastModified)
- .await
- .context(IoUnexpectedSnafu {
- message: format!("Failed to list files in '{path}'"),
- })?;
+ let entries = op.list_with(&list_path).await.context(IoUnexpectedSnafu
{
+ message: format!("Failed to list files in '{path}'"),
+ })?;
let mut statuses = Vec::new();
-
+ let list_path_normalized = list_path.trim_start_matches('/');
for entry in entries {
+ // opendal list_with includes the root directory itself as the
first entry.
+ // The root entry's path equals list_path (with or without leading
slash).
+ // Skip it so callers only see the direct children.
+ let entry_path = entry.path();
+ let entry_path_normalized = entry_path.trim_start_matches('/');
+ if entry_path_normalized == list_path_normalized {
+ continue;
+ }
let meta = entry.metadata();
statuses.push(FileStatus {
size: meta.content_length(),
@@ -152,7 +155,7 @@ impl FileIO {
pub async fn exists(&self, path: &str) -> Result<bool> {
let (op, relative_path) = self.storage.create(path)?;
- op.is_exist(relative_path).await.context(IoUnexpectedSnafu {
+ op.exists(relative_path).await.context(IoUnexpectedSnafu {
message: format!("Failed to check existence of '{path}'"),
})
}
@@ -285,7 +288,8 @@ impl FileWrite for opendal::Writer {
}
async fn close(&mut self) -> crate::Result<()> {
- Ok(opendal::Writer::close(self).await?)
+ opendal::Writer::close(self).await?;
+ Ok(())
}
}
@@ -294,7 +298,7 @@ pub struct FileStatus {
pub size: u64,
pub is_dir: bool,
pub path: String,
- pub last_modified: Option<DateTime<Utc>>,
+ pub last_modified: Option<Timestamp>,
}
#[derive(Debug)]
@@ -310,10 +314,7 @@ impl InputFile {
}
pub async fn exists(&self) -> crate::Result<bool> {
- Ok(self
- .op
- .is_exist(&self.path[self.relative_path_pos..])
- .await?)
+ Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
}
pub async fn metadata(&self) -> crate::Result<FileStatus> {
@@ -353,10 +354,7 @@ impl OutputFile {
}
pub async fn exists(&self) -> crate::Result<bool> {
- Ok(self
- .op
- .is_exist(&self.path[self.relative_path_pos..])
- .await?)
+ Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
}
pub fn to_input_file(self) -> InputFile {
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs
index 226aa72..92a909a 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -32,7 +32,7 @@ mod storage_memory;
use storage_memory::*;
#[cfg(feature = "storage-oss")]
-mod storage_oss;
+pub(crate) mod storage_oss;
#[cfg(feature = "storage-oss")]
use storage_oss::*;
diff --git a/crates/paimon/src/io/storage_oss.rs
b/crates/paimon/src/io/storage_oss.rs
index 884b0a7..7884894 100644
--- a/crates/paimon/src/io/storage_oss.rs
+++ b/crates/paimon/src/io/storage_oss.rs
@@ -27,23 +27,29 @@ use crate::Result;
/// Configuration key for OSS endpoint.
///
/// Compatible with paimon-java's `fs.oss.endpoint`.
-const OSS_ENDPOINT: &str = "fs.oss.endpoint";
+pub(crate) const OSS_ENDPOINT: &str = "fs.oss.endpoint";
/// Configuration key for OSS access key ID.
///
/// Compatible with paimon-java's `fs.oss.accessKeyId`.
-const OSS_ACCESS_KEY_ID: &str = "fs.oss.accessKeyId";
+pub(crate) const OSS_ACCESS_KEY_ID: &str = "fs.oss.accessKeyId";
/// Configuration key for OSS access key secret.
///
/// Compatible with paimon-java's `fs.oss.accessKeySecret`.
-const OSS_ACCESS_KEY_SECRET: &str = "fs.oss.accessKeySecret";
+pub(crate) const OSS_ACCESS_KEY_SECRET: &str = "fs.oss.accessKeySecret";
+
+/// Configuration key for OSS STS security token (optional).
+///
+/// Compatible with paimon-java's `fs.oss.securityToken`.
+/// Required when using STS temporary credentials (e.g. from REST data tokens).
+pub(crate) const OSS_SECURITY_TOKEN: &str = "fs.oss.securityToken";
/// Parse paimon catalog options into an [`OssConfig`].
///
-/// Extracts OSS-related configuration keys (endpoint, access key, secret key)
-/// from the provided properties map and maps them to the corresponding
-/// [`OssConfig`] fields.
+/// Extracts OSS-related configuration keys (endpoint, access key, secret key,
+/// and optional security token) from the provided properties map and maps them
+/// to the corresponding [`OssConfig`] fields.
///
/// Returns an error if any required configuration key is missing.
pub(crate) fn oss_config_parse(mut props: HashMap<String, String>) ->
Result<OssConfig> {
@@ -73,6 +79,7 @@ pub(crate) fn oss_config_parse(mut props: HashMap<String,
String>) -> Result<Oss
})?,
);
+ cfg.security_token = props.remove(OSS_SECURITY_TOKEN);
Ok(cfg)
}
diff --git a/crates/paimon/tests/mock_server.rs
b/crates/paimon/tests/mock_server.rs
index def0ada..e5e9750 100644
--- a/crates/paimon/tests/mock_server.rs
+++ b/crates/paimon/tests/mock_server.rs
@@ -24,8 +24,8 @@ use axum::{
extract::{Extension, Json, Path, Query},
http::StatusCode,
response::IntoResponse,
- routing::get,
- Router,
+ routing::{get, post},
+ serve, Router,
};
use serde_json::json;
use std::collections::{HashMap, HashSet};
@@ -570,6 +570,44 @@ impl RESTServer {
});
}
+ /// Add a table with schema and path to the server state.
+ ///
+ /// This is needed for `RESTCatalog::get_table` which requires
+ /// the response to contain `schema` and `path`.
+ #[allow(dead_code)]
+ pub fn add_table_with_schema(
+ &self,
+ database: &str,
+ table: &str,
+ schema: paimon::spec::Schema,
+ path: &str,
+ ) {
+ let mut s = self.inner.lock().unwrap();
+ s.databases.entry(database.to_string()).or_insert_with(|| {
+ GetDatabaseResponse::new(
+ Some(database.to_string()),
+ Some(database.to_string()),
+ None,
+ HashMap::new(),
+ AuditRESTResponse::new(None, None, None, None, None),
+ )
+ });
+
+ let key = format!("{}.{}", database, table);
+ s.tables.insert(
+ key,
+ GetTableResponse::new(
+ Some(table.to_string()),
+ Some(table.to_string()),
+ Some(path.to_string()),
+ Some(true),
+ Some(0),
+ Some(schema),
+ AuditRESTResponse::new(None, None, None, None, None),
+ ),
+ );
+ }
+
/// Add a no-permission table to the server state.
#[allow(dead_code)]
pub fn add_no_permission_table(&self, database: &str, table: &str) {
@@ -696,7 +734,7 @@ pub async fn start_mock_server(
)
.route(
&format!("{prefix}/tables/rename"),
- axum::routing::post(RESTServer::rename_table),
+ post(RESTServer::rename_table),
)
// ECS metadata endpoints (for token loader testing)
.route(
@@ -715,7 +753,7 @@ pub async fn start_mock_server(
let addr = listener.local_addr().unwrap();
let server_handle = tokio::spawn(async move {
- if let Err(e) = axum::serve(listener, app.into_make_service()).await {
+ if let Err(e) = serve(listener, app.into_make_service()).await {
eprintln!("mock server error: {e}");
}
});
diff --git a/crates/paimon/tests/rest_api_test.rs
b/crates/paimon/tests/rest_api_test.rs
index 753f5d1..74784b3 100644
--- a/crates/paimon/tests/rest_api_test.rs
+++ b/crates/paimon/tests/rest_api_test.rs
@@ -73,7 +73,7 @@ async fn setup_test_server(initial_dbs: Vec<&str>) ->
TestContext {
// ==================== Database Tests ====================
#[tokio::test]
async fn test_list_databases() {
- let mut ctx = setup_test_server(vec!["default", "test_db1",
"prod_db"]).await;
+ let ctx = setup_test_server(vec!["default", "test_db1", "prod_db"]).await;
let dbs = ctx.api.list_databases().await.unwrap();
@@ -84,7 +84,7 @@ async fn test_list_databases() {
#[tokio::test]
async fn test_create_database() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
// Create new database
let result = ctx.api.create_database("new_db", None).await;
@@ -101,7 +101,7 @@ async fn test_create_database() {
#[tokio::test]
async fn test_get_database() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
let db_resp = ctx.api.get_database("default").await.unwrap();
assert_eq!(db_resp.name, Some("default".to_string()));
@@ -160,7 +160,7 @@ async fn test_error_responses_status_mapping() {
#[tokio::test]
async fn test_alter_database() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
// Alter database with updates
let mut updates = HashMap::new();
@@ -189,7 +189,7 @@ async fn test_alter_database() {
#[tokio::test]
async fn test_alter_database_not_found() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
let result = ctx
.api
@@ -203,7 +203,7 @@ async fn test_alter_database_not_found() {
#[tokio::test]
async fn test_drop_database() {
- let mut ctx = setup_test_server(vec!["default", "to_drop"]).await;
+ let ctx = setup_test_server(vec!["default", "to_drop"]).await;
// Verify database exists
let dbs = ctx.api.list_databases().await.unwrap();
@@ -227,7 +227,7 @@ async fn test_drop_database() {
#[tokio::test]
async fn test_drop_database_no_permission() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
ctx.server.add_no_permission_database("secret");
let result = ctx.api.drop_database("secret").await;
@@ -240,7 +240,7 @@ async fn test_drop_database_no_permission() {
#[tokio::test]
async fn test_list_tables_and_get_table() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
// Add tables
ctx.server.add_table("default", "table1");
@@ -262,7 +262,7 @@ async fn test_list_tables_and_get_table() {
#[tokio::test]
async fn test_get_table_not_found() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
let result = ctx
.api
@@ -273,7 +273,7 @@ async fn test_get_table_not_found() {
#[tokio::test]
async fn test_list_tables_empty_database() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
let tables = ctx.api.list_tables("default").await.unwrap();
assert!(
@@ -284,7 +284,7 @@ async fn test_list_tables_empty_database() {
#[tokio::test]
async fn test_multiple_databases_with_tables() {
- let mut ctx = setup_test_server(vec!["db1", "db2"]).await;
+ let ctx = setup_test_server(vec!["db1", "db2"]).await;
// Add tables to different databases
ctx.server.add_table("db1", "table1_db1");
@@ -305,7 +305,7 @@ async fn test_multiple_databases_with_tables() {
#[tokio::test]
async fn test_create_table() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
// Create a simple schema using builder
use paimon::spec::{DataType, Schema};
@@ -339,7 +339,7 @@ async fn test_create_table() {
#[tokio::test]
async fn test_drop_table() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
// Add a table
ctx.server.add_table("default", "table_to_drop");
@@ -369,7 +369,7 @@ async fn test_drop_table() {
#[tokio::test]
async fn test_drop_table_no_permission() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
ctx.server
.add_no_permission_table("default", "secret_table");
@@ -384,7 +384,7 @@ async fn test_drop_table_no_permission() {
#[tokio::test]
async fn test_rename_table() {
- let mut ctx = setup_test_server(vec!["default"]).await;
+ let ctx = setup_test_server(vec!["default"]).await;
// Add a table
ctx.server.add_table("default", "old_table");
diff --git a/crates/paimon/tests/rest_catalog_test.rs
b/crates/paimon/tests/rest_catalog_test.rs
new file mode 100644
index 0000000..7ac8536
--- /dev/null
+++ b/crates/paimon/tests/rest_catalog_test.rs
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for RESTCatalog.
+//!
+//! These tests use a mock server to verify the RESTCatalog behavior
+//! through the Catalog trait interface.
+
+use std::collections::HashMap;
+
+use paimon::api::ConfigResponse;
+use paimon::catalog::{Catalog, Identifier, RESTCatalog};
+use paimon::common::Options;
+use paimon::spec::{BigIntType, DataType, Schema, VarCharType};
+
+mod mock_server;
+use mock_server::{start_mock_server, RESTServer};
+
+/// Helper struct to hold test resources.
+struct TestContext {
+ server: RESTServer,
+ catalog: RESTCatalog,
+}
+
+/// Helper function to set up a test environment with RESTCatalog.
+async fn setup_catalog(initial_dbs: Vec<&str>) -> TestContext {
+ let prefix = "mock-test";
+ let mut defaults = HashMap::new();
+ defaults.insert("prefix".to_string(), prefix.to_string());
+ let config = ConfigResponse::new(defaults);
+
+ let initial: Vec<String> = initial_dbs.iter().map(|s|
s.to_string()).collect();
+ let server = start_mock_server(
+ "test_warehouse".to_string(),
+ "/tmp/test_warehouse".to_string(),
+ config,
+ initial,
+ )
+ .await;
+
+ let url = server.url().expect("Failed to get server URL");
+ let mut options = Options::new();
+ options.set("uri", &url);
+ options.set("warehouse", "test_warehouse");
+ options.set("token.provider", "bear");
+ options.set("token", "test_token");
+
+ let catalog = RESTCatalog::new(options, true)
+ .await
+ .expect("Failed to create RESTCatalog");
+
+ TestContext { server, catalog }
+}
+
+/// Helper to build a simple test schema.
+fn test_schema() -> Schema {
+ Schema::builder()
+ .column("id", DataType::BigInt(BigIntType::new()))
+ .column("name", DataType::VarChar(VarCharType::new(255).unwrap()))
+ .build()
+ .expect("Failed to build schema")
+}
+
+// ==================== Database Tests ====================
+
+#[tokio::test]
+async fn test_catalog_list_databases() {
+ let ctx = setup_catalog(vec!["default", "test_db1", "prod_db"]).await;
+
+ let dbs = ctx.catalog.list_databases().await.unwrap();
+
+ assert!(dbs.contains(&"default".to_string()));
+ assert!(dbs.contains(&"test_db1".to_string()));
+ assert!(dbs.contains(&"prod_db".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_create_database() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Create new database
+ let result = ctx
+ .catalog
+ .create_database("new_db", false, HashMap::new())
+ .await;
+ assert!(result.is_ok(), "failed to create database: {:?}", result);
+
+ // Verify creation
+ let dbs = ctx.catalog.list_databases().await.unwrap();
+ assert!(dbs.contains(&"new_db".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_create_database_already_exists() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Duplicate creation with ignore_if_exists=false should fail
+ let result = ctx
+ .catalog
+ .create_database("default", false, HashMap::new())
+ .await;
+ assert!(
+ result.is_err(),
+ "creating duplicate database should fail when ignore_if_exists=false"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_create_database_ignore_if_exists() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Duplicate creation with ignore_if_exists=true should succeed
+ let result = ctx
+ .catalog
+ .create_database("default", true, HashMap::new())
+ .await;
+ assert!(
+ result.is_ok(),
+ "creating duplicate database should succeed when ignore_if_exists=true"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database() {
+ let ctx = setup_catalog(vec!["default", "to_drop"]).await;
+
+ // Verify database exists
+ let dbs = ctx.catalog.list_databases().await.unwrap();
+ assert!(dbs.contains(&"to_drop".to_string()));
+
+ // Drop database (cascade=true to skip empty check)
+ let result = ctx.catalog.drop_database("to_drop", false, true).await;
+ assert!(result.is_ok(), "failed to drop database: {:?}", result);
+
+ // Verify database is gone
+ let dbs = ctx.catalog.list_databases().await.unwrap();
+ assert!(!dbs.contains(&"to_drop".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database_not_exists() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Dropping non-existent database with ignore_if_not_exists=false should
fail
+ let result = ctx.catalog.drop_database("non_existent", false, true).await;
+ assert!(
+ result.is_err(),
+ "dropping non-existent database should fail when
ignore_if_not_exists=false"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database_ignore_if_not_exists() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Dropping non-existent database with ignore_if_not_exists=true should
succeed
+ let result = ctx.catalog.drop_database("non_existent", true, true).await;
+ assert!(
+ result.is_ok(),
+ "dropping non-existent database should succeed when
ignore_if_not_exists=true"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database_not_empty_no_cascade() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Add a table to the database
+ ctx.server.add_table("default", "some_table");
+
+ // Drop database with cascade=false should fail because it's not empty
+ let result = ctx.catalog.drop_database("default", false, false).await;
+ assert!(
+ result.is_err(),
+ "dropping non-empty database should fail when cascade=false"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database_not_empty_cascade() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Add a table to the database
+ ctx.server.add_table("default", "some_table");
+
+ // Drop database with cascade=true should succeed
+ let result = ctx.catalog.drop_database("default", false, true).await;
+ assert!(
+ result.is_ok(),
+ "dropping non-empty database should succeed when cascade=true"
+ );
+
+ // Verify database is gone
+ let dbs = ctx.catalog.list_databases().await.unwrap();
+ assert!(!dbs.contains(&"default".to_string()));
+}
+
+// ==================== Table Tests ====================
+
+#[tokio::test]
+async fn test_catalog_list_tables() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Add tables
+ ctx.server.add_table("default", "table1");
+ ctx.server.add_table("default", "table2");
+
+ // List tables
+ let tables = ctx.catalog.list_tables("default").await.unwrap();
+ assert!(tables.contains(&"table1".to_string()));
+ assert!(tables.contains(&"table2".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_list_tables_empty() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ let tables = ctx.catalog.list_tables("default").await.unwrap();
+ assert!(
+ tables.is_empty(),
+ "expected empty tables list, got: {:?}",
+ tables
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_get_table() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Add a table with schema and path so get_table can build a Table object
+ let schema = test_schema();
+ ctx.server.add_table_with_schema(
+ "default",
+ "my_table",
+ schema,
+ "file:///tmp/test_warehouse/default.db/my_table",
+ );
+
+ let identifier = Identifier::new("default", "my_table");
+ let table = ctx.catalog.get_table(&identifier).await;
+ assert!(table.is_ok(), "failed to get table: {:?}", table);
+}
+
+#[tokio::test]
+async fn test_catalog_get_table_not_found() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ let identifier = Identifier::new("default", "non_existent");
+ let result = ctx.catalog.get_table(&identifier).await;
+ assert!(result.is_err(), "getting non-existent table should fail");
+}
+
+#[tokio::test]
+async fn test_catalog_create_table() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ let schema = test_schema();
+ let identifier = Identifier::new("default", "new_table");
+
+ let result = ctx.catalog.create_table(&identifier, schema, false).await;
+ assert!(result.is_ok(), "failed to create table: {:?}", result);
+
+ // Verify table exists
+ let tables = ctx.catalog.list_tables("default").await.unwrap();
+ assert!(tables.contains(&"new_table".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_create_table_already_exists() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Add a table first
+ ctx.server.add_table("default", "existing_table");
+
+ let schema = test_schema();
+ let identifier = Identifier::new("default", "existing_table");
+
+ // Create with ignore_if_exists=false should fail
+ let result = ctx.catalog.create_table(&identifier, schema, false).await;
+ assert!(
+ result.is_err(),
+ "creating duplicate table should fail when ignore_if_exists=false"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_create_table_ignore_if_exists() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Add a table first
+ ctx.server.add_table("default", "existing_table");
+
+ let schema = test_schema();
+ let identifier = Identifier::new("default", "existing_table");
+
+ // Create with ignore_if_exists=true should succeed
+ let result = ctx.catalog.create_table(&identifier, schema, true).await;
+ assert!(
+ result.is_ok(),
+ "creating duplicate table should succeed when ignore_if_exists=true"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_table() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Add a table
+ ctx.server.add_table("default", "table_to_drop");
+
+ let identifier = Identifier::new("default", "table_to_drop");
+
+ // Drop table
+ let result = ctx.catalog.drop_table(&identifier, false).await;
+ assert!(result.is_ok(), "failed to drop table: {:?}", result);
+
+ // Verify table is gone
+ let tables = ctx.catalog.list_tables("default").await.unwrap();
+ assert!(!tables.contains(&"table_to_drop".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_drop_table_not_found() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ let identifier = Identifier::new("default", "non_existent");
+
+ // Drop with ignore_if_not_exists=false should fail
+ let result = ctx.catalog.drop_table(&identifier, false).await;
+ assert!(
+ result.is_err(),
+ "dropping non-existent table should fail when
ignore_if_not_exists=false"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_table_ignore_if_not_exists() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ let identifier = Identifier::new("default", "non_existent");
+
+ // Drop with ignore_if_not_exists=true should succeed
+ let result = ctx.catalog.drop_table(&identifier, true).await;
+ assert!(
+ result.is_ok(),
+ "dropping non-existent table should succeed when
ignore_if_not_exists=true"
+ );
+}
+
+// ==================== Rename Table Tests ====================
+
+#[tokio::test]
+async fn test_catalog_rename_table() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ // Add a table
+ ctx.server.add_table("default", "old_table");
+
+ let from = Identifier::new("default", "old_table");
+ let to = Identifier::new("default", "new_table");
+
+ // Rename table
+ let result = ctx.catalog.rename_table(&from, &to, false).await;
+ assert!(result.is_ok(), "failed to rename table: {:?}", result);
+
+ // Verify old table is gone and new table exists
+ let tables = ctx.catalog.list_tables("default").await.unwrap();
+ assert!(!tables.contains(&"old_table".to_string()));
+ assert!(tables.contains(&"new_table".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_rename_table_not_found() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ let from = Identifier::new("default", "non_existent");
+ let to = Identifier::new("default", "new_name");
+
+ // Rename with ignore_if_not_exists=false should fail
+ let result = ctx.catalog.rename_table(&from, &to, false).await;
+ assert!(
+ result.is_err(),
+ "renaming non-existent table should fail when
ignore_if_not_exists=false"
+ );
+}
+
+#[tokio::test]
+async fn test_catalog_rename_table_ignore_if_not_exists() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ let from = Identifier::new("default", "non_existent");
+ let to = Identifier::new("default", "new_name");
+
+ // Rename with ignore_if_not_exists=true should succeed
+ let result = ctx.catalog.rename_table(&from, &to, true).await;
+ assert!(
+ result.is_ok(),
+ "renaming non-existent table should succeed when
ignore_if_not_exists=true"
+ );
+}
+
+// ==================== Alter Table Tests ====================
+
+#[tokio::test]
+async fn test_catalog_alter_table_unsupported() {
+ let ctx = setup_catalog(vec!["default"]).await;
+
+ let identifier = Identifier::new("default", "some_table");
+
+ // alter_table should return Unsupported error
+ let result = ctx.catalog.alter_table(&identifier, vec![], false).await;
+ assert!(
+ result.is_err(),
+ "alter_table should return Unsupported error"
+ );
+}
+
+// ==================== Multiple Databases Tests ====================
+
+#[tokio::test]
+async fn test_catalog_multiple_databases_with_tables() {
+ let ctx = setup_catalog(vec!["db1", "db2"]).await;
+
+ // Add tables to different databases
+ ctx.server.add_table("db1", "table1_db1");
+ ctx.server.add_table("db1", "table2_db1");
+ ctx.server.add_table("db2", "table1_db2");
+
+ // Verify db1 tables
+ let tables_db1 = ctx.catalog.list_tables("db1").await.unwrap();
+ assert_eq!(tables_db1.len(), 2);
+ assert!(tables_db1.contains(&"table1_db1".to_string()));
+ assert!(tables_db1.contains(&"table2_db1".to_string()));
+
+ // Verify db2 tables
+ let tables_db2 = ctx.catalog.list_tables("db2").await.unwrap();
+ assert_eq!(tables_db2.len(), 1);
+ assert!(tables_db2.contains(&"table1_db2".to_string()));
+}