This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 15956bb  feat: implement RESTCatalog with database and table CRUD 
(#160)
15956bb is described below

commit 15956bbd56be7c0d72f04a99fce7abb54a0887eb
Author: umi <[email protected]>
AuthorDate: Wed Apr 1 21:24:00 2026 +0800

    feat: implement RESTCatalog with database and table CRUD (#160)
---
 crates/integration_tests/Cargo.toml                |   4 +
 crates/integration_tests/tests/read_tables.rs      | 164 ++++++--
 crates/paimon/Cargo.toml                           |   4 +-
 crates/paimon/examples/rest_catalog_example.rs     | 302 ++++++++++++++
 crates/paimon/src/api/api_response.rs              |  11 +
 crates/paimon/src/api/auth/base.rs                 |   9 +-
 crates/paimon/src/api/auth/bearer_provider.rs      |   6 +-
 crates/paimon/src/api/auth/dlf_provider.rs         |  16 +-
 crates/paimon/src/api/auth/factory.rs              |   4 +-
 crates/paimon/src/api/mod.rs                       |   2 +-
 crates/paimon/src/api/rest_api.rs                  |  64 +--
 crates/paimon/src/api/rest_client.rs               |  10 +-
 crates/paimon/src/api/rest_util.rs                 |  26 ++
 .../paimon/src/{io/mod.rs => catalog/database.rs}  |  44 +-
 crates/paimon/src/catalog/filesystem.rs            |  12 +-
 crates/paimon/src/catalog/mod.rs                   |  10 +
 crates/paimon/src/{io => catalog/rest}/mod.rs      |  33 +-
 crates/paimon/src/catalog/rest/rest_catalog.rs     | 386 ++++++++++++++++++
 .../src/{io/mod.rs => catalog/rest/rest_token.rs}  |  41 +-
 .../paimon/src/catalog/rest/rest_token_file_io.rs  | 193 +++++++++
 crates/paimon/src/common/options.rs                |   3 +
 crates/paimon/src/io/file_io.rs                    |  42 +-
 crates/paimon/src/io/mod.rs                        |   2 +-
 crates/paimon/src/io/storage_oss.rs                |  19 +-
 crates/paimon/tests/mock_server.rs                 |  46 ++-
 crates/paimon/tests/rest_api_test.rs               |  30 +-
 crates/paimon/tests/rest_catalog_test.rs           | 453 +++++++++++++++++++++
 27 files changed, 1741 insertions(+), 195 deletions(-)

diff --git a/crates/integration_tests/Cargo.toml 
b/crates/integration_tests/Cargo.toml
index a4753bf..092ad94 100644
--- a/crates/integration_tests/Cargo.toml
+++ b/crates/integration_tests/Cargo.toml
@@ -28,3 +28,7 @@ paimon = { path = "../paimon" }
 arrow-array = { workspace = true }
 tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
 futures = "0.3"
+
+[dev-dependencies]
+serde_json = "1"
+axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
\ No newline at end of file
diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 568400d..1b0032a 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -19,23 +19,27 @@
 
 use arrow_array::{Int32Array, RecordBatch, StringArray};
 use futures::TryStreamExt;
-use paimon::catalog::Identifier;
+use paimon::api::ConfigResponse;
+use paimon::catalog::{Identifier, RESTCatalog};
+use paimon::common::Options;
+use paimon::spec::{DataType, IntType, Schema, VarCharType};
 use paimon::{Catalog, Error, FileSystemCatalog, Plan};
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
+
+#[path = "../../paimon/tests/mock_server.rs"]
+mod mock_server;
+use mock_server::start_mock_server;
 
 fn get_test_warehouse() -> String {
     std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| 
"/tmp/paimon-warehouse".to_string())
 }
 
-async fn scan_and_read(table_name: &str) -> (Plan, Vec<RecordBatch>) {
-    scan_and_read_with_projection(table_name, None).await
-}
-
-async fn scan_and_read_with_projection(
+async fn scan_and_read<C: Catalog + ?Sized>(
+    catalog: &C,
     table_name: &str,
     projection: Option<&[&str]>,
 ) -> (Plan, Vec<RecordBatch>) {
-    let table = get_test_table(table_name).await;
+    let table = get_table_from_catalog(catalog, table_name).await;
 
     let mut read_builder = table.new_read_builder();
     if let Some(cols) = projection {
@@ -60,6 +64,30 @@ async fn scan_and_read_with_projection(
     (plan, batches)
 }
 
+async fn get_table_from_catalog<C: Catalog + ?Sized>(
+    catalog: &C,
+    table_name: &str,
+) -> paimon::Table {
+    let identifier = Identifier::new("default", table_name);
+    catalog
+        .get_table(&identifier)
+        .await
+        .expect("Failed to get table")
+}
+
+fn create_file_system_catalog() -> FileSystemCatalog {
+    let warehouse = get_test_warehouse();
+    FileSystemCatalog::new(warehouse).expect("Failed to create 
FileSystemCatalog")
+}
+
+async fn scan_and_read_with_fs_catalog(
+    table_name: &str,
+    projection: Option<&[&str]>,
+) -> (Plan, Vec<RecordBatch>) {
+    let catalog = create_file_system_catalog();
+    scan_and_read(&catalog, table_name, projection).await
+}
+
 fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
     let mut rows = Vec::new();
     for batch in batches {
@@ -81,7 +109,7 @@ fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, 
String)> {
 
 #[tokio::test]
 async fn test_read_log_table() {
-    let (plan, batches) = scan_and_read("simple_log_table").await;
+    let (plan, batches) = scan_and_read_with_fs_catalog("simple_log_table", 
None).await;
 
     // Non-partitioned table: partition should be a valid arity=0 BinaryRow
     // deserialized from manifest bytes, not a stub without backing data.
@@ -105,7 +133,7 @@ async fn test_read_log_table() {
 
 #[tokio::test]
 async fn test_read_dv_primary_key_table() {
-    let (_, batches) = scan_and_read("simple_dv_pk_table").await;
+    let (_, batches) = scan_and_read_with_fs_catalog("simple_dv_pk_table", 
None).await;
     let actual = extract_id_name(&batches);
     let expected = vec![
         (1, "alice-v2".to_string()),
@@ -123,7 +151,7 @@ async fn test_read_dv_primary_key_table() {
 
 #[tokio::test]
 async fn test_read_partitioned_log_table() {
-    let (plan, batches) = scan_and_read("partitioned_log_table").await;
+    let (plan, batches) = 
scan_and_read_with_fs_catalog("partitioned_log_table", None).await;
 
     let mut seen_partitions: HashSet<String> = HashSet::new();
     for split in plan.splits() {
@@ -176,7 +204,7 @@ async fn test_read_partitioned_log_table() {
 
 #[tokio::test]
 async fn test_read_multi_partitioned_log_table() {
-    let (plan, batches) = scan_and_read("multi_partitioned_log_table").await;
+    let (plan, batches) = 
scan_and_read_with_fs_catalog("multi_partitioned_log_table", None).await;
 
     let mut seen_partitions: HashSet<(String, i32)> = HashSet::new();
     for split in plan.splits() {
@@ -244,7 +272,7 @@ async fn test_read_multi_partitioned_log_table() {
 
 #[tokio::test]
 async fn test_read_partitioned_dv_pk_table() {
-    let (plan, batches) = scan_and_read("partitioned_dv_pk_table").await;
+    let (plan, batches) = 
scan_and_read_with_fs_catalog("partitioned_dv_pk_table", None).await;
 
     // Verify partition metadata on each split.
     let mut seen_partitions: HashSet<String> = HashSet::new();
@@ -298,20 +326,10 @@ async fn test_read_partitioned_dv_pk_table() {
     );
 }
 
-async fn get_test_table(table_name: &str) -> paimon::Table {
-    let warehouse = get_test_warehouse();
-    let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create 
catalog");
-    let identifier = Identifier::new("default", table_name);
-    catalog
-        .get_table(&identifier)
-        .await
-        .expect("Failed to get table")
-}
-
 #[tokio::test]
 async fn test_read_with_column_projection() {
     let (_, batches) =
-        scan_and_read_with_projection("partitioned_log_table", Some(&["name", 
"id"])).await;
+        scan_and_read_with_fs_catalog("partitioned_log_table", Some(&["name", 
"id"])).await;
 
     // Verify that output schema preserves caller-specified column order.
     for batch in &batches {
@@ -340,7 +358,8 @@ async fn test_read_with_column_projection() {
 
 #[tokio::test]
 async fn test_read_projection_empty() {
-    let table = get_test_table("simple_log_table").await;
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, "simple_log_table").await;
 
     let mut read_builder = table.new_read_builder();
     read_builder.with_projection(&[]);
@@ -378,10 +397,10 @@ async fn test_read_projection_empty() {
         );
     }
 }
-
 #[tokio::test]
 async fn test_read_projection_unknown_column() {
-    let table = get_test_table("simple_log_table").await;
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, "simple_log_table").await;
 
     let mut read_builder = table.new_read_builder();
     read_builder.with_projection(&["id", "nonexistent_column"]);
@@ -403,7 +422,8 @@ async fn test_read_projection_unknown_column() {
 
 #[tokio::test]
 async fn test_read_projection_all_invalid() {
-    let table = get_test_table("simple_log_table").await;
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, "simple_log_table").await;
 
     let mut read_builder = table.new_read_builder();
     read_builder.with_projection(&["nonexistent_a", "nonexistent_b"]);
@@ -425,7 +445,8 @@ async fn test_read_projection_all_invalid() {
 
 #[tokio::test]
 async fn test_read_projection_duplicate_column() {
-    let table = get_test_table("simple_log_table").await;
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, "simple_log_table").await;
 
     let mut read_builder = table.new_read_builder();
     read_builder.with_projection(&["id", "id"]);
@@ -438,3 +459,88 @@ async fn test_read_projection_duplicate_column() {
         "Expected ConfigInvalid for duplicate projection, got: {err:?}"
     );
 }
+
+// ======================= REST Catalog read tests 
===============================
+
+/// Build a simple test schema matching the Spark-provisioned tables (id INT, 
name VARCHAR).
+fn simple_log_schema() -> Schema {
+    Schema::builder()
+        .column("id", DataType::Int(IntType::new()))
+        .column("name", DataType::VarChar(VarCharType::string_type()))
+        .build()
+        .expect("Failed to build schema")
+}
+
+/// Start a mock REST server backed by Spark-provisioned data on disk,
+/// register the given tables, and return a connected `RESTCatalog`.
+async fn setup_rest_catalog_with_tables(
+    table_configs: &[(&str, &str, Schema)],
+) -> (mock_server::RESTServer, RESTCatalog) {
+    let catalog_path = get_test_warehouse();
+    // Use a simple warehouse name (no slashes) to avoid URL-encoding issues
+    let warehouse_name = "test_warehouse";
+    let prefix = "mock-test";
+    let mut defaults = HashMap::new();
+    defaults.insert("prefix".to_string(), prefix.to_string());
+    let config = ConfigResponse::new(defaults);
+
+    let server = start_mock_server(
+        warehouse_name.to_string(),
+        catalog_path.clone(),
+        config,
+        vec!["default".to_string()],
+    )
+    .await;
+
+    // Register each table with its schema and the real on-disk path
+    for (database, table_name, schema) in table_configs {
+        let table_path = format!("{}/{}.db/{}", catalog_path, database, 
table_name);
+        server.add_table_with_schema(database, table_name, schema.clone(), 
&table_path);
+    }
+
+    let url = server.url().expect("Failed to get server URL");
+    let mut options = Options::new();
+    options.set("uri", &url);
+    options.set("warehouse", warehouse_name);
+    options.set("token.provider", "bear");
+    options.set("token", "test_token");
+
+    let catalog = RESTCatalog::new(options, true)
+        .await
+        .expect("Failed to create RESTCatalog");
+
+    (server, catalog)
+}
+
+/// Test reading an append-only (log) table via REST catalog backed by mock 
server.
+///
+/// The mock server returns table metadata pointing to Spark-provisioned data 
on disk.
+#[tokio::test]
+async fn test_rest_catalog_read_append_table() {
+    let table_name = "simple_log_table";
+    let (_server, catalog) =
+        setup_rest_catalog_with_tables(&[("default", table_name, 
simple_log_schema())]).await;
+
+    let (plan, batches) = scan_and_read(&catalog, table_name, None).await;
+
+    assert!(
+        !plan.splits().is_empty(),
+        "REST append table should have at least one split"
+    );
+
+    assert!(
+        !batches.is_empty(),
+        "REST append table should produce at least one batch"
+    );
+
+    let actual = extract_id_name(&batches);
+    let expected = vec![
+        (1, "alice".to_string()),
+        (2, "bob".to_string()),
+        (3, "carol".to_string()),
+    ];
+    assert_eq!(
+        actual, expected,
+        "REST catalog append table rows should match expected values"
+    );
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 8ea6f9e..c78d35b 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -27,7 +27,7 @@ license.workspace = true
 version.workspace = true
 
 [features]
-default = ["storage-memory", "storage-fs"]
+default = ["storage-memory", "storage-fs", "storage-oss"]
 storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3"]
 
 storage-memory = ["opendal/services-memory"]
@@ -49,7 +49,7 @@ serde_with = "3.9.0"
 serde_repr = "0.1"
 snafu = "0.8.3"
 typed-builder = "^0.19"
-opendal = { version = "0.49", features = ["services-fs"] }
+opendal = { version = "0.55", features = ["services-fs"] }
 pretty_assertions = "1"
 apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
 indexmap = "2.5.0"
diff --git a/crates/paimon/examples/rest_catalog_example.rs 
b/crates/paimon/examples/rest_catalog_example.rs
new file mode 100644
index 0000000..1e0d6fa
--- /dev/null
+++ b/crates/paimon/examples/rest_catalog_example.rs
@@ -0,0 +1,302 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Example: REST Catalog Operations (Complete)
+//!
+//! This example demonstrates how to use `RESTCatalog` for:
+//! 1. Database operations (create, list, get, drop)
+//! 2. Table operations (create, list, get, rename, drop)
+//! 3. Data reading from append-only tables
+//!
+//! # Usage
+//! ```bash
+//! # With DLF authentication:
+//! DLF_ACCESS_KEY_ID=xxx DLF_ACCESS_KEY_SECRET=yyy \
+//!   cargo run -p paimon --example rest_catalog_example
+//!
+//! # With Bearer token authentication:
+//! PAIMON_REST_TOKEN=zzz \
+//!   cargo run -p paimon --example rest_catalog_example
+//! ```
+
+use std::collections::HashMap;
+
+use futures::TryStreamExt;
+
+use paimon::catalog::{Catalog, Identifier, RESTCatalog};
+use paimon::common::{CatalogOptions, Options};
+use paimon::spec::{DataType, IntType, Schema, VarCharType};
+
+/// Create a simple test schema with `id` (INT) and `name` (VARCHAR) columns.
+fn create_test_schema() -> Schema {
+    Schema::builder()
+        .column("id", DataType::Int(IntType::new()))
+        .column("name", DataType::VarChar(VarCharType::new(255).unwrap()))
+        .build()
+        .expect("Failed to build schema")
+}
+
+/// Format a single cell value from an Arrow array at the given row index.
+/// Supports INT (Int32), BIGINT (Int64), and VARCHAR (String/LargeString).
+fn array_value_to_string(array: &dyn arrow_array::Array, row: usize) -> String 
{
+    use arrow_array::*;
+
+    if array.is_null(row) {
+        return "null".to_string();
+    }
+
+    if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
+        return arr.value(row).to_string();
+    }
+    if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
+        return arr.value(row).to_string();
+    }
+    if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
+        return arr.value(row).to_string();
+    }
+    if let Some(arr) = array.as_any().downcast_ref::<LargeStringArray>() {
+        return arr.value(row).to_string();
+    }
+
+    format!("<unsupported type: {:?}>", array.data_type())
+}
+
+#[tokio::main]
+async fn main() {
+    // ==================== Configuration ====================
+    let mut options = Options::new();
+
+    // Basic configuration — replace with your actual server URL and warehouse
+    options.set(CatalogOptions::METASTORE, "rest");
+    options.set(CatalogOptions::WAREHOUSE, "paimon_catalog");
+    options.set(CatalogOptions::URI, "http://sample.net/";);
+
+    // --- Authentication (choose one) ---
+
+    // DLF authentication (Alibaba Cloud)
+    options.set(CatalogOptions::TOKEN_PROVIDER, "dlf");
+    options.set("dlf.region", "cn-hangzhou");
+    options.set(
+        "dlf.access-key-id",
+        std::env::var("DLF_ACCESS_KEY_ID").expect("DLF_ACCESS_KEY_ID env var 
not set"),
+    );
+    options.set(
+        "dlf.access-key-secret",
+        std::env::var("DLF_ACCESS_KEY_SECRET").expect("DLF_ACCESS_KEY_SECRET 
env var not set"),
+    );
+
+    // ==================== Create RESTCatalog ====================
+    println!("Creating RESTCatalog instance...");
+    let catalog = match RESTCatalog::new(options, true).await {
+        Ok(catalog) => catalog,
+        Err(err) => {
+            eprintln!("Failed to create RESTCatalog: {}", err);
+            return;
+        }
+    };
+
+    // ==================== Part 1: Database Operations ====================
+    println!("\n=== Part 1: Database Operations ===\n");
+
+    // List databases
+    println!("Listing databases...");
+    match catalog.list_databases().await {
+        Ok(databases) => {
+            println!("Databases found: {:?}", databases);
+            println!("Total count: {}", databases.len());
+        }
+        Err(err) => {
+            eprintln!("Failed to list databases: {}", err);
+        }
+    }
+
+    // Create database
+    println!("\nCreating database 'example_db'...");
+    match catalog
+        .create_database("example_db", false, HashMap::new())
+        .await
+    {
+        Ok(()) => println!("Database created successfully"),
+        Err(err) => eprintln!("Failed to create database: {}", err),
+    }
+
+    // Get database info
+    println!("\nGetting database info for 'example_db'...");
+    match catalog.get_database("example_db").await {
+        Ok(database) => println!("Database: {:?}", database),
+        Err(err) => eprintln!("Failed to get database: {}", err),
+    }
+
+    // ==================== Part 2: Table Operations ====================
+    println!("\n=== Part 2: Table Operations ===\n");
+
+    // Create table
+    let table_identifier = Identifier::new("example_db", "users");
+    println!("Creating table '{}'...", table_identifier);
+    let schema = create_test_schema();
+    match catalog.create_table(&table_identifier, schema, false).await {
+        Ok(()) => println!("Table created successfully"),
+        Err(err) => eprintln!("Failed to create table: {}", err),
+    }
+
+    // List tables
+    println!("\nListing tables in 'example_db'...");
+    match catalog.list_tables("example_db").await {
+        Ok(tables) => {
+            println!("Tables found: {:?}", tables);
+        }
+        Err(err) => {
+            eprintln!("Failed to list tables: {}", err);
+        }
+    }
+
+    // Get table info
+    println!("\nGetting table info for '{}'...", table_identifier);
+    match catalog.get_table(&table_identifier).await {
+        Ok(table) => {
+            println!("Table location: {}", table.location());
+            println!("Table schema fields: {:?}", table.schema().fields());
+        }
+        Err(err) => eprintln!("Failed to get table: {}", err),
+    }
+
+    // Rename table
+    let renamed_identifier = Identifier::new("example_db", "users_renamed");
+    println!(
+        "\nRenaming table '{}' to '{}'...",
+        table_identifier, renamed_identifier
+    );
+    match catalog
+        .rename_table(&table_identifier, &renamed_identifier, false)
+        .await
+    {
+        Ok(()) => println!("Table renamed successfully"),
+        Err(err) => eprintln!("Failed to rename table: {}", err),
+    }
+
+    // ==================== Part 3: Read Data from Existing Table 
====================
+    println!("\n=== Part 3: Read Data from Existing Table ===\n");
+
+    // Try to read from an existing table (example_db.users_renamed)
+    // This table must already exist on the REST catalog server
+    let read_table_identifier = Identifier::new("example_db", "users_renamed");
+    println!(
+        "Attempting to read from table '{}'...",
+        read_table_identifier
+    );
+
+    match catalog.get_table(&read_table_identifier).await {
+        Ok(table) => {
+            println!("Table retrieved successfully");
+            println!("  Location: {}", table.location());
+            println!("  Schema fields: {:?}", table.schema().fields());
+
+            // Scan table
+            println!("\nScanning table...");
+            let read_builder = table.new_read_builder();
+            let scan = read_builder.new_scan();
+
+            match scan.plan().await {
+                Ok(plan) => {
+                    println!("  Number of splits: {}", plan.splits().len());
+
+                    if plan.splits().is_empty() {
+                        println!("No data splits found — the table may be 
empty.");
+                    } else {
+                        // Read table data
+                        println!("\nReading table data...");
+                        match read_builder.new_read() {
+                            Ok(read) => match read.to_arrow(plan.splits()) {
+                                Ok(stream) => {
+                                    let batches: Vec<_> =
+                                        
stream.try_collect().await.unwrap_or_default();
+                                    println!("Collected {} record batch(es)", 
batches.len());
+
+                                    let mut total_rows = 0;
+                                    for (batch_index, batch) in 
batches.iter().enumerate() {
+                                        let num_rows = batch.num_rows();
+                                        total_rows += num_rows;
+                                        println!(
+                                            "\n--- Batch {} ({} rows, {} 
columns) ---",
+                                            batch_index,
+                                            num_rows,
+                                            batch.num_columns()
+                                        );
+                                        println!("Schema: {}", batch.schema());
+
+                                        // Print up to 10 rows per batch
+                                        let display_rows = num_rows.min(10);
+                                        for row in 0..display_rows {
+                                            let mut row_values = Vec::new();
+                                            for col in 0..batch.num_columns() {
+                                                let column = batch.column(col);
+                                                
row_values.push(array_value_to_string(column, row));
+                                            }
+                                            println!("  Row {}: [{}]", row, 
row_values.join(", "));
+                                        }
+                                        if num_rows > display_rows {
+                                            println!(
+                                                "  ... ({} more rows omitted)",
+                                                num_rows - display_rows
+                                            );
+                                        }
+                                    }
+
+                                    println!("\n=== Read Summary ===");
+                                    println!("Total rows read: {}", 
total_rows);
+                                    println!("Total batches: {}", 
batches.len());
+                                }
+                                Err(err) => {
+                                    eprintln!("Failed to create arrow stream: 
{}", err);
+                                }
+                            },
+                            Err(err) => {
+                                eprintln!("Failed to create table read: {}", 
err);
+                            }
+                        }
+                    }
+                }
+                Err(err) => {
+                    eprintln!("Failed to plan scan: {}", err);
+                }
+            }
+        }
+        Err(err) => {
+            eprintln!(
+                "Failed to get table '{}' (this is expected if the table 
doesn't exist): {}",
+                read_table_identifier, err
+            );
+        }
+    }
+
+    // ==================== Cleanup ====================
+    println!("\n=== Cleanup ===\n");
+    // Drop table
+    println!("\nDropping table '{}'...", renamed_identifier);
+    match catalog.drop_table(&renamed_identifier, false).await {
+        Ok(()) => println!("Table dropped successfully"),
+        Err(err) => eprintln!("Failed to drop table: {}", err),
+    }
+    // Drop database (cascade = true to force drop even if not empty)
+    println!("Dropping database 'example_db'...");
+    match catalog.drop_database("example_db", false, true).await {
+        Ok(()) => println!("Database dropped successfully"),
+        Err(err) => eprintln!("Failed to drop database: {}", err),
+    }
+
+    println!("\nExample completed!");
+}
diff --git a/crates/paimon/src/api/api_response.rs 
b/crates/paimon/src/api/api_response.rs
index e282080..c3169a2 100644
--- a/crates/paimon/src/api/api_response.rs
+++ b/crates/paimon/src/api/api_response.rs
@@ -277,6 +277,17 @@ impl<T> PagedList<T> {
         }
     }
 }
+
+/// Response for getting table token.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetTableTokenResponse {
+    /// Token key-value pairs (e.g. access_key_id, access_key_secret, etc.)
+    pub token: HashMap<String, String>,
+    /// Token expiration time in milliseconds since epoch.
+    pub expires_at_millis: Option<i64>,
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/crates/paimon/src/api/auth/base.rs 
b/crates/paimon/src/api/auth/base.rs
index 7b0ae33..444afe1 100644
--- a/crates/paimon/src/api/auth/base.rs
+++ b/crates/paimon/src/api/auth/base.rs
@@ -75,7 +75,7 @@ impl RESTAuthParameter {
 /// Implement this trait to provide custom authentication mechanisms
 /// for REST API requests.
 #[async_trait]
-pub trait AuthProvider: Send {
+pub trait AuthProvider: Send + Sync {
     /// Merge authentication headers into the base headers.
     ///
     /// # Arguments
@@ -84,7 +84,7 @@ pub trait AuthProvider: Send {
     ///
     /// # Returns
     async fn merge_auth_header(
-        &mut self,
+        &self,
         base_header: HashMap<String, String>,
         parameter: &RESTAuthParameter,
     ) -> Result<HashMap<String, String>>;
@@ -120,10 +120,7 @@ impl RESTAuthFunction {
     ///
     /// # Returns
     /// A HashMap containing the authenticated headers.
-    pub async fn apply(
-        &mut self,
-        parameter: &RESTAuthParameter,
-    ) -> Result<HashMap<String, String>> {
+    pub async fn apply(&self, parameter: &RESTAuthParameter) -> 
Result<HashMap<String, String>> {
         self.auth_provider
             .merge_auth_header(self.init_header.clone(), parameter)
             .await
diff --git a/crates/paimon/src/api/auth/bearer_provider.rs 
b/crates/paimon/src/api/auth/bearer_provider.rs
index 2b9f0e1..ba6d4dd 100644
--- a/crates/paimon/src/api/auth/bearer_provider.rs
+++ b/crates/paimon/src/api/auth/bearer_provider.rs
@@ -46,7 +46,7 @@ impl BearerTokenAuthProvider {
 #[async_trait]
 impl AuthProvider for BearerTokenAuthProvider {
     async fn merge_auth_header(
-        &mut self,
+        &self,
         mut base_header: HashMap<String, String>,
         _parameter: &RESTAuthParameter,
     ) -> crate::Result<HashMap<String, String>> {
@@ -64,7 +64,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_bearer_token_auth() {
-        let mut provider = BearerTokenAuthProvider::new("test-token");
+        let provider = BearerTokenAuthProvider::new("test-token");
         let base_header = HashMap::new();
         let parameter = RESTAuthParameter::for_get("/test", HashMap::new());
 
@@ -81,7 +81,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_bearer_token_with_base_headers() {
-        let mut provider = BearerTokenAuthProvider::new("my-token");
+        let provider = BearerTokenAuthProvider::new("my-token");
         let mut base_header = HashMap::new();
         base_header.insert("Content-Type".to_string(), 
"application/json".to_string());
         let parameter = RESTAuthParameter::for_get("/test", HashMap::new());
diff --git a/crates/paimon/src/api/auth/dlf_provider.rs 
b/crates/paimon/src/api/auth/dlf_provider.rs
index 4f264d8..a18a014 100644
--- a/crates/paimon/src/api/auth/dlf_provider.rs
+++ b/crates/paimon/src/api/auth/dlf_provider.rs
@@ -244,7 +244,7 @@ const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
 /// (ROA v2 HMAC-SHA1).
 pub struct DLFAuthProvider {
     uri: String,
-    token: Option<DLFToken>,
+    token: tokio::sync::Mutex<Option<DLFToken>>,
     token_loader: Option<Arc<dyn DLFTokenLoader>>,
     signer: Box<dyn DLFRequestSigner>,
 }
@@ -279,7 +279,7 @@ impl DLFAuthProvider {
 
         Ok(Self {
             uri,
-            token,
+            token: tokio::sync::Mutex::new(token),
             token_loader,
             signer,
         })
@@ -290,9 +290,11 @@ impl DLFAuthProvider {
     /// If token_loader is configured, this method will:
     /// - Load a new token if current token is None
     /// - Refresh the token if it's about to expire (within 
TOKEN_EXPIRATION_SAFE_TIME_MILLIS)
-    async fn get_or_refresh_token(&mut self) -> Result<DLFToken> {
+    async fn get_or_refresh_token(&self) -> Result<DLFToken> {
+        let mut token_guard = self.token.lock().await;
+
         if let Some(loader) = &self.token_loader {
-            let need_reload = match &self.token {
+            let need_reload = match &*token_guard {
                 None => true,
                 Some(token) => match token.expiration_at_millis {
                     Some(expiration_at_millis) => {
@@ -305,11 +307,11 @@ impl DLFAuthProvider {
 
             if need_reload {
                 let new_token = loader.load_token().await?;
-                self.token = Some(new_token);
+                *token_guard = Some(new_token);
             }
         }
 
-        self.token.clone().ok_or_else(|| Error::DataInvalid {
+        token_guard.clone().ok_or_else(|| Error::DataInvalid {
             message: "Either token or token_loader must be 
provided".to_string(),
             source: None,
         })
@@ -330,7 +332,7 @@ impl DLFAuthProvider {
 #[async_trait]
 impl AuthProvider for DLFAuthProvider {
     async fn merge_auth_header(
-        &mut self,
+        &self,
         mut base_header: HashMap<String, String>,
         rest_auth_parameter: &RESTAuthParameter,
     ) -> crate::Result<HashMap<String, String>> {
diff --git a/crates/paimon/src/api/auth/factory.rs 
b/crates/paimon/src/api/auth/factory.rs
index 68c2881..fc2a1b5 100644
--- a/crates/paimon/src/api/auth/factory.rs
+++ b/crates/paimon/src/api/auth/factory.rs
@@ -162,7 +162,7 @@ mod tests {
         options.set(CatalogOptions::TOKEN_PROVIDER, "bear");
         options.set(CatalogOptions::TOKEN, "test-token");
 
-        let mut provider = 
AuthProviderFactory::create_auth_provider(&options).unwrap();
+        let provider = 
AuthProviderFactory::create_auth_provider(&options).unwrap();
 
         let base_header = HashMap::new();
         let param = RESTAuthParameter::new("GET", "/test", None, 
HashMap::new());
@@ -202,7 +202,7 @@ mod tests {
         options.set(CatalogOptions::DLF_ACCESS_KEY_ID, "test_key_id");
         options.set(CatalogOptions::DLF_ACCESS_KEY_SECRET, "test_key_secret");
 
-        let mut provider = 
AuthProviderFactory::create_auth_provider(&options).unwrap();
+        let provider = 
AuthProviderFactory::create_auth_provider(&options).unwrap();
 
         let base_header = HashMap::new();
         let param = RESTAuthParameter::new("GET", "/test", None, 
HashMap::new());
diff --git a/crates/paimon/src/api/mod.rs b/crates/paimon/src/api/mod.rs
index 958323e..6307cf8 100644
--- a/crates/paimon/src/api/mod.rs
+++ b/crates/paimon/src/api/mod.rs
@@ -37,7 +37,7 @@ pub use api_request::{
 // Re-export response types
 pub use api_response::{
     AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse, 
GetTableResponse,
-    ListDatabasesResponse, ListTablesResponse, PagedList,
+    GetTableTokenResponse, ListDatabasesResponse, ListTablesResponse, 
PagedList,
 };
 
 // Re-export error types
diff --git a/crates/paimon/src/api/rest_api.rs 
b/crates/paimon/src/api/rest_api.rs
index be09e9a..00a32b8 100644
--- a/crates/paimon/src/api/rest_api.rs
+++ b/crates/paimon/src/api/rest_api.rs
@@ -77,7 +77,6 @@ fn validate_non_empty_multi(values: &[(&str, &str)]) -> 
Result<()> {
 pub struct RESTApi {
     client: HttpClient,
     resource_paths: ResourcePaths,
-    #[allow(dead_code)]
     options: Options,
 }
 
@@ -101,7 +100,7 @@ impl RESTApi {
     ///
     /// # Errors
     /// Returns an error if required options are missing or if config fetch 
fails.
-    pub async fn new(mut options: Options, config_required: bool) -> 
Result<Self> {
+    pub async fn new(options: Options, config_required: bool) -> Result<Self> {
         let uri = options
             .get(CatalogOptions::URI)
             .ok_or_else(|| crate::Error::ConfigInvalid {
@@ -144,17 +143,17 @@ impl RESTApi {
                 .await?;
 
             // Merge config response with options (client config takes 
priority)
-            options = config_response.merge_options(&options);
+            let merged = config_response.merge_options(&options);
 
             // Update base headers from merged options and recreate auth 
function
-            base_headers.extend(RESTUtil::extract_prefix_map(&options, 
Self::HEADER_PREFIX));
+            base_headers.extend(RESTUtil::extract_prefix_map(&merged, 
Self::HEADER_PREFIX));
             // Recreate auth function with updated headers if needed
-            let auth_provider = 
AuthProviderFactory::create_auth_provider(&options)?;
+            let auth_provider = 
AuthProviderFactory::create_auth_provider(&merged)?;
             let rest_auth_function = RESTAuthFunction::new(base_headers, 
auth_provider);
 
             client.set_auth_function(rest_auth_function);
 
-            options
+            merged
         } else {
             options
         };
@@ -168,10 +167,15 @@ impl RESTApi {
         })
     }
 
+    /// Get the options (potentially merged with server config).
+    pub fn options(&self) -> &Options {
+        &self.options
+    }
+
     // ==================== Database Operations ====================
 
     /// List all databases.
-    pub async fn list_databases(&mut self) -> Result<Vec<String>> {
+    pub async fn list_databases(&self) -> Result<Vec<String>> {
         let mut results = Vec::new();
         let mut page_token: Option<String> = None;
 
@@ -192,7 +196,7 @@ impl RESTApi {
 
     /// List databases with pagination.
     pub async fn list_databases_paged(
-        &mut self,
+        &self,
         max_results: Option<u32>,
         page_token: Option<&str>,
         database_name_pattern: Option<&str>,
@@ -223,9 +227,9 @@ impl RESTApi {
 
     /// Create a new database.
     pub async fn create_database(
-        &mut self,
+        &self,
         name: &str,
-        options: Option<std::collections::HashMap<String, String>>,
+        options: Option<HashMap<String, String>>,
     ) -> Result<()> {
         validate_non_empty(name, "database name")?;
         let path = self.resource_paths.databases();
@@ -235,7 +239,7 @@ impl RESTApi {
     }
 
     /// Get database information.
-    pub async fn get_database(&mut self, name: &str) -> 
Result<GetDatabaseResponse> {
+    pub async fn get_database(&self, name: &str) -> 
Result<GetDatabaseResponse> {
         validate_non_empty(name, "database name")?;
         let path = self.resource_paths.database(name);
         self.client.get(&path, None::<&[(&str, &str)]>).await
@@ -243,10 +247,10 @@ impl RESTApi {
 
     /// Alter database configuration.
     pub async fn alter_database(
-        &mut self,
+        &self,
         name: &str,
         removals: Vec<String>,
-        updates: std::collections::HashMap<String, String>,
+        updates: HashMap<String, String>,
     ) -> Result<()> {
         validate_non_empty(name, "database name")?;
         let path = self.resource_paths.database(name);
@@ -256,7 +260,7 @@ impl RESTApi {
     }
 
     /// Drop a database.
-    pub async fn drop_database(&mut self, name: &str) -> Result<()> {
+    pub async fn drop_database(&self, name: &str) -> Result<()> {
         validate_non_empty(name, "database name")?;
         let path = self.resource_paths.database(name);
         let _resp: serde_json::Value = self.client.delete(&path, 
None::<&[(&str, &str)]>).await?;
@@ -266,7 +270,7 @@ impl RESTApi {
     // ==================== Table Operations ====================
 
     /// List all tables in a database.
-    pub async fn list_tables(&mut self, database: &str) -> Result<Vec<String>> 
{
+    pub async fn list_tables(&self, database: &str) -> Result<Vec<String>> {
         validate_non_empty(database, "database name")?;
 
         let mut results = Vec::new();
@@ -289,7 +293,7 @@ impl RESTApi {
 
     /// List tables with pagination.
     pub async fn list_tables_paged(
-        &mut self,
+        &self,
         database: &str,
         max_results: Option<u32>,
         page_token: Option<&str>,
@@ -329,7 +333,7 @@ impl RESTApi {
     }
 
     /// Create a new table.
-    pub async fn create_table(&mut self, identifier: &Identifier, schema: 
Schema) -> Result<()> {
+    pub async fn create_table(&self, identifier: &Identifier, schema: Schema) 
-> Result<()> {
         let database = identifier.database();
         let table = identifier.object();
         validate_non_empty_multi(&[(database, "database name"), (table, "table 
name")])?;
@@ -340,7 +344,7 @@ impl RESTApi {
     }
 
     /// Get table information.
-    pub async fn get_table(&mut self, identifier: &Identifier) -> 
Result<GetTableResponse> {
+    pub async fn get_table(&self, identifier: &Identifier) -> 
Result<GetTableResponse> {
         let database = identifier.database();
         let table = identifier.object();
         validate_non_empty_multi(&[(database, "database name"), (table, "table 
name")])?;
@@ -349,11 +353,7 @@ impl RESTApi {
     }
 
     /// Rename a table.
-    pub async fn rename_table(
-        &mut self,
-        source: &Identifier,
-        destination: &Identifier,
-    ) -> Result<()> {
+    pub async fn rename_table(&self, source: &Identifier, destination: 
&Identifier) -> Result<()> {
         validate_non_empty_multi(&[
             (source.database(), "source database name"),
             (source.object(), "source table name"),
@@ -367,7 +367,7 @@ impl RESTApi {
     }
 
     /// Drop a table.
-    pub async fn drop_table(&mut self, identifier: &Identifier) -> Result<()> {
+    pub async fn drop_table(&self, identifier: &Identifier) -> Result<()> {
         let database = identifier.database();
         let table = identifier.object();
         validate_non_empty_multi(&[(database, "database name"), (table, "table 
name")])?;
@@ -375,4 +375,20 @@ impl RESTApi {
         let _resp: serde_json::Value = self.client.delete(&path, 
None::<&[(&str, &str)]>).await?;
         Ok(())
     }
+
+    // ==================== Token Operations ====================
+
+    /// Load table token for data access.
+    ///
+    /// Corresponds to Python `RESTApi.load_table_token`.
+    pub async fn load_table_token(
+        &self,
+        identifier: &Identifier,
+    ) -> Result<super::api_response::GetTableTokenResponse> {
+        let database = identifier.database();
+        let table = identifier.object();
+        validate_non_empty_multi(&[(database, "database name"), (table, "table 
name")])?;
+        let path = self.resource_paths.table_token(database, table);
+        self.client.get(&path, None::<&[(&str, &str)]>).await
+    }
 }
diff --git a/crates/paimon/src/api/rest_client.rs 
b/crates/paimon/src/api/rest_client.rs
index 76db48f..1b6a636 100644
--- a/crates/paimon/src/api/rest_client.rs
+++ b/crates/paimon/src/api/rest_client.rs
@@ -94,7 +94,7 @@ impl HttpClient {
     /// # Returns
     /// The parsed JSON response.
     pub async fn get<T: DeserializeOwned>(
-        &mut self,
+        &self,
         path: &str,
         params: Option<&[(impl AsRef<str>, impl AsRef<str>)]>,
     ) -> Result<T> {
@@ -136,7 +136,7 @@ impl HttpClient {
     /// # Returns
     /// The parsed JSON response.
     pub async fn post<T: DeserializeOwned, B: serde::Serialize>(
-        &mut self,
+        &self,
         path: &str,
         body: &B,
     ) -> Result<T> {
@@ -163,7 +163,7 @@ impl HttpClient {
     /// # Returns
     /// The parsed JSON response.
     pub async fn delete<T: DeserializeOwned>(
-        &mut self,
+        &self,
         path: &str,
         params: Option<&[(impl AsRef<str>, impl AsRef<str>)]>,
     ) -> Result<T> {
@@ -203,13 +203,13 @@ impl HttpClient {
 
     /// Build auth headers for a request.
     async fn build_auth_headers(
-        &mut self,
+        &self,
         method: &str,
         path: &str,
         data: Option<&str>,
         params: HashMap<String, String>,
     ) -> Result<HashMap<String, String>> {
-        if let Some(ref mut auth_fn) = self.auth_function {
+        if let Some(ref auth_fn) = self.auth_function {
             let parameter =
                 RESTAuthParameter::new(method, path, data.map(|s| 
s.to_string()), params);
             auth_fn.apply(&parameter).await
diff --git a/crates/paimon/src/api/rest_util.rs 
b/crates/paimon/src/api/rest_util.rs
index ce8627c..5c259fe 100644
--- a/crates/paimon/src/api/rest_util.rs
+++ b/crates/paimon/src/api/rest_util.rs
@@ -41,6 +41,32 @@ impl RESTUtil {
     pub fn extract_prefix_map(options: &Options, prefix: &str) -> 
HashMap<String, String> {
         options.extract_prefix_map(prefix)
     }
+
+    /// Merge two property maps, with `override_properties` taking precedence.
+    ///
+    /// For keys present in both maps, the value from `override_properties` 
wins.
+    /// `None` values are skipped (only relevant at the map level; individual
+    /// entries are always `String`).
+    pub fn merge(
+        base_properties: Option<&HashMap<String, String>>,
+        override_properties: Option<&HashMap<String, String>>,
+    ) -> HashMap<String, String> {
+        let mut result = HashMap::new();
+
+        if let Some(base) = base_properties {
+            for (key, value) in base {
+                result.insert(key.clone(), value.clone());
+            }
+        }
+
+        if let Some(overrides) = override_properties {
+            for (key, value) in overrides {
+                result.insert(key.clone(), value.clone());
+            }
+        }
+
+        result
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/catalog/database.rs
similarity index 55%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/catalog/database.rs
index 226aa72..b291ec7 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/catalog/database.rs
@@ -15,28 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod file_io;
-pub use file_io::*;
+//! Database structure for Apache Paimon catalogs.
 
-mod storage;
-pub use storage::*;
+use std::collections::HashMap;
 
-#[cfg(feature = "storage-fs")]
-mod storage_fs;
-#[cfg(feature = "storage-fs")]
-use storage_fs::*;
+/// Structure representing a database in a Paimon catalog.
+#[derive(Debug, Clone, PartialEq)]
+pub struct Database {
+    /// Database name.
+    pub name: String,
+    /// Database options/properties.
+    pub options: HashMap<String, String>,
+    /// Optional comment describing the database.
+    pub comment: Option<String>,
+}
 
-#[cfg(feature = "storage-memory")]
-mod storage_memory;
-#[cfg(feature = "storage-memory")]
-use storage_memory::*;
-
-#[cfg(feature = "storage-oss")]
-mod storage_oss;
-#[cfg(feature = "storage-oss")]
-use storage_oss::*;
-
-#[cfg(feature = "storage-s3")]
-mod storage_s3;
-#[cfg(feature = "storage-s3")]
-use storage_s3::*;
+impl Database {
+    /// Create a new Database.
+    pub fn new(name: String, options: HashMap<String, String>, comment: 
Option<String>) -> Self {
+        Self {
+            name,
+            options,
+            comment,
+        }
+    }
+}
diff --git a/crates/paimon/src/catalog/filesystem.rs 
b/crates/paimon/src/catalog/filesystem.rs
index 2685c66..61d6e8d 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -21,7 +21,7 @@
 
 use std::collections::HashMap;
 
-use crate::catalog::{Catalog, Identifier, DB_LOCATION_PROP, DB_SUFFIX};
+use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP, 
DB_SUFFIX};
 use crate::error::{Error, Result};
 use crate::io::FileIO;
 use crate::spec::{Schema, TableSchema};
@@ -237,6 +237,16 @@ impl Catalog for FileSystemCatalog {
         Ok(())
     }
 
+    async fn get_database(&self, name: &str) -> Result<Database> {
+        if !self.database_exists(name).await? {
+            return Err(Error::DatabaseNotExist {
+                database: name.to_string(),
+            });
+        }
+
+        Ok(Database::new(name.to_string(), HashMap::new(), None))
+    }
+
     async fn drop_database(
         &self,
         name: &str,
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index 4b43ffa..1b4bf57 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -20,12 +20,16 @@
 //! Design aligns with [Paimon Java 
Catalog](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java)
 //! and follows API patterns from Apache Iceberg Rust.
 
+mod database;
 mod filesystem;
+mod rest;
 
 use std::collections::HashMap;
 use std::fmt;
 
+pub use database::*;
 pub use filesystem::*;
+pub use rest::*;
 use serde::{Deserialize, Serialize};
 
 /// Splitter for system table names (e.g. `table$snapshots`).
@@ -146,6 +150,12 @@ pub trait Catalog: Send + Sync {
         properties: HashMap<String, String>,
     ) -> Result<()>;
 
+    /// Get a database by name.
+    ///
+    /// # Errors
+    /// * [`crate::Error::DatabaseNotExist`] - database does not exist.
+    async fn get_database(&self, name: &str) -> Result<Database>;
+
     /// Drop a database.
     ///
     /// * `ignore_if_not_exists` - if true, do nothing when the database does 
not exist.
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/catalog/rest/mod.rs
similarity index 62%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/catalog/rest/mod.rs
index 226aa72..921e629 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/catalog/rest/mod.rs
@@ -15,28 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod file_io;
-pub use file_io::*;
+//! REST catalog implementation for Apache Paimon.
+//!
+//! This module provides a REST-based catalog that communicates with
+//! a Paimon REST catalog server for metadata operations.
 
-mod storage;
-pub use storage::*;
+mod rest_catalog;
+mod rest_token;
+mod rest_token_file_io;
 
-#[cfg(feature = "storage-fs")]
-mod storage_fs;
-#[cfg(feature = "storage-fs")]
-use storage_fs::*;
-
-#[cfg(feature = "storage-memory")]
-mod storage_memory;
-#[cfg(feature = "storage-memory")]
-use storage_memory::*;
-
-#[cfg(feature = "storage-oss")]
-mod storage_oss;
-#[cfg(feature = "storage-oss")]
-use storage_oss::*;
-
-#[cfg(feature = "storage-s3")]
-mod storage_s3;
-#[cfg(feature = "storage-s3")]
-use storage_s3::*;
+pub use rest_catalog::*;
+pub use rest_token::*;
+pub use rest_token_file_io::*;
diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs 
b/crates/paimon/src/catalog/rest/rest_catalog.rs
new file mode 100644
index 0000000..e5d023f
--- /dev/null
+++ b/crates/paimon/src/catalog/rest/rest_catalog.rs
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! REST catalog implementation for Apache Paimon.
+//!
+//! This module provides a REST-based catalog that communicates with
+//! a Paimon REST catalog server for database and table CRUD operations.
+
+use std::collections::HashMap;
+
+use async_trait::async_trait;
+
+use crate::api::rest_api::RESTApi;
+use crate::api::rest_error::RestError;
+use crate::api::PagedList;
+use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP};
+use crate::common::{CatalogOptions, Options};
+use crate::error::Error;
+use crate::io::FileIO;
+use crate::spec::{Schema, SchemaChange, TableSchema};
+use crate::table::Table;
+use crate::Result;
+
+use super::rest_token_file_io::RESTTokenFileIO;
+
+/// REST catalog implementation.
+///
+/// This catalog communicates with a Paimon REST catalog server
+/// for all metadata operations (database and table CRUD).
+///
+/// Corresponds to Python `RESTCatalog` in 
`pypaimon/catalog/rest/rest_catalog.py`.
+pub struct RESTCatalog {
+    /// The REST API client.
+    api: RESTApi,
+    /// Catalog configuration options.
+    options: Options,
+    /// Warehouse path.
+    warehouse: String,
+    /// Whether data token is enabled for FileIO construction.
+    data_token_enabled: bool,
+}
+
+impl RESTCatalog {
+    /// Create a new REST catalog.
+    ///
+    /// # Arguments
+    /// * `options` - Configuration options containing URI, warehouse, etc.
+    /// * `config_required` - Whether to fetch config from server and merge 
with options.
+    ///
+    /// # Errors
+    /// Returns an error if required options are missing or if initialization 
fails.
+    pub async fn new(options: Options, config_required: bool) -> Result<Self> {
+        let warehouse = options
+            .get(CatalogOptions::WAREHOUSE)
+            .cloned()
+            .ok_or_else(|| RestError::BadRequest {
+                message: format!("Missing required option: {}", 
CatalogOptions::WAREHOUSE),
+            })?;
+
+        let api = RESTApi::new(options.clone(), config_required).await?;
+
+        let data_token_enabled = api
+            .options()
+            .get(CatalogOptions::DATA_TOKEN_ENABLED)
+            .map(|v| v.eq_ignore_ascii_case("true"))
+            .unwrap_or(false);
+
+        let api_options = api.options().clone();
+
+        Ok(Self {
+            api,
+            options: api_options,
+            warehouse,
+            data_token_enabled,
+        })
+    }
+
+    /// Get the warehouse path.
+    pub fn warehouse(&self) -> &str {
+        &self.warehouse
+    }
+
+    /// Get the catalog options.
+    pub fn options(&self) -> &Options {
+        &self.options
+    }
+
+    /// Whether data token is enabled.
+    pub fn data_token_enabled(&self) -> bool {
+        self.data_token_enabled
+    }
+
+    /// List databases with pagination.
+    pub async fn list_databases_paged(
+        &self,
+        max_results: Option<u32>,
+        page_token: Option<&str>,
+        database_name_pattern: Option<&str>,
+    ) -> Result<PagedList<String>> {
+        self.api
+            .list_databases_paged(max_results, page_token, 
database_name_pattern)
+            .await
+    }
+}
+
+// ============================================================================
+// Catalog trait implementation
+// ============================================================================
+
+#[async_trait]
+impl Catalog for RESTCatalog {
+    // ======================= database methods ===============================
+
+    async fn list_databases(&self) -> Result<Vec<String>> {
+        self.api.list_databases().await
+    }
+
+    async fn create_database(
+        &self,
+        name: &str,
+        ignore_if_exists: bool,
+        properties: HashMap<String, String>,
+    ) -> Result<()> {
+        let result = self
+            .api
+            .create_database(name, Some(properties))
+            .await
+            .map_err(|e| map_rest_error_for_database(e, name));
+        ignore_error_if(result, |e| {
+            ignore_if_exists && matches!(e, Error::DatabaseAlreadyExist { .. })
+        })
+    }
+
+    async fn get_database(&self, name: &str) -> Result<Database> {
+        let response = self
+            .api
+            .get_database(name)
+            .await
+            .map_err(|e| map_rest_error_for_database(e, name))?;
+
+        let mut options = response.options;
+        if let Some(location) = response.location {
+            options.insert(DB_LOCATION_PROP.to_string(), location);
+        }
+
+        Ok(Database::new(name.to_string(), options, None))
+    }
+
+    async fn drop_database(
+        &self,
+        name: &str,
+        ignore_if_not_exists: bool,
+        cascade: bool,
+    ) -> Result<()> {
+        // If not cascade, check if database is empty first
+        if !cascade {
+            let tables = match self.api.list_tables(name).await {
+                Ok(tables) => tables,
+                Err(err) => {
+                    return 
ignore_error_if(Err(map_rest_error_for_database(err, name)), |e| {
+                        ignore_if_not_exists && matches!(e, 
Error::DatabaseNotExist { .. })
+                    });
+                }
+            };
+            if !tables.is_empty() {
+                return Err(Error::DatabaseNotEmpty {
+                    database: name.to_string(),
+                });
+            }
+        }
+
+        let result = self
+            .api
+            .drop_database(name)
+            .await
+            .map_err(|e| map_rest_error_for_database(e, name));
+        ignore_error_if(result, |e| {
+            ignore_if_not_exists && matches!(e, Error::DatabaseNotExist { .. })
+        })
+    }
+
+    // ======================= table methods ===============================
+
+    async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
+        let response = self
+            .api
+            .get_table(identifier)
+            .await
+            .map_err(|e| map_rest_error_for_table(e, identifier))?;
+
+        // Extract schema from response
+        let schema = response.schema.ok_or_else(|| Error::DataInvalid {
+            message: format!("Table {} response missing schema", 
identifier.full_name()),
+            source: None,
+        })?;
+
+        let schema_id = response.schema_id.ok_or_else(|| Error::DataInvalid {
+            message: format!(
+                "Table {} response missing schema_id",
+                identifier.full_name()
+            ),
+            source: None,
+        })?;
+        let table_schema = TableSchema::new(schema_id, &schema);
+
+        // Extract table path from response
+        let table_path = response.path.ok_or_else(|| Error::DataInvalid {
+            message: format!("Table {} response missing path", 
identifier.full_name()),
+            source: None,
+        })?;
+
+        // Check if the table is external
+        let is_external = response.is_external.ok_or_else(|| 
Error::DataInvalid {
+            message: format!(
+                "Table {} response missing is_external",
+                identifier.full_name()
+            ),
+            source: None,
+        })?;
+
+        // Build FileIO based on data_token_enabled and is_external
+        // TODO Support token cache and direct oss access
+        let file_io = if self.data_token_enabled && !is_external {
+            // Use RESTTokenFileIO to get token-based FileIO
+            let token_file_io =
+                RESTTokenFileIO::new(identifier.clone(), table_path.clone(), 
self.options.clone());
+            token_file_io.build_file_io().await?
+        } else {
+            // Use standard FileIO from path
+            FileIO::from_path(&table_path)?.build()?
+        };
+
+        Ok(Table::new(
+            file_io,
+            identifier.clone(),
+            table_path,
+            table_schema,
+        ))
+    }
+
+    async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
+        self.api
+            .list_tables(database_name)
+            .await
+            .map_err(|e| map_rest_error_for_database(e, database_name))
+    }
+
+    async fn create_table(
+        &self,
+        identifier: &Identifier,
+        creation: Schema,
+        ignore_if_exists: bool,
+    ) -> Result<()> {
+        let result = self
+            .api
+            .create_table(identifier, creation)
+            .await
+            .map_err(|e| map_rest_error_for_table(e, identifier));
+        ignore_error_if(result, |e| {
+            ignore_if_exists && matches!(e, Error::TableAlreadyExist { .. })
+        })
+    }
+
+    async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: 
bool) -> Result<()> {
+        let result = self
+            .api
+            .drop_table(identifier)
+            .await
+            .map_err(|e| map_rest_error_for_table(e, identifier));
+        ignore_error_if(result, |e| {
+            ignore_if_not_exists && matches!(e, Error::TableNotExist { .. })
+        })
+    }
+
+    async fn rename_table(
+        &self,
+        from: &Identifier,
+        to: &Identifier,
+        ignore_if_not_exists: bool,
+    ) -> Result<()> {
+        let result = self
+            .api
+            .rename_table(from, to)
+            .await
+            .map_err(|e| map_rest_error_for_table(e, from))
+            // Remap TableAlreadyExist to use destination identifier
+            .map_err(|e| match e {
+                Error::TableAlreadyExist { .. } => Error::TableAlreadyExist {
+                    full_name: to.full_name(),
+                },
+                other => other,
+            });
+        ignore_error_if(result, |e| {
+            ignore_if_not_exists && matches!(e, Error::TableNotExist { .. })
+        })
+    }
+
+    async fn alter_table(
+        &self,
+        _identifier: &Identifier,
+        _changes: Vec<SchemaChange>,
+        _ignore_if_not_exists: bool,
+    ) -> Result<()> {
+        // TODO: Implement alter_table when RESTApi supports it
+        Err(Error::Unsupported {
+            message: "Alter table is not yet implemented for REST 
catalog".to_string(),
+        })
+    }
+}
+// ============================================================================
+// Error mapping helpers
+// ============================================================================
+
+/// Map a REST API error to a catalog-level database error.
+///
+/// Converts `RestError::NoSuchResource` -> `Error::DatabaseNotExist`,
+/// `RestError::AlreadyExists` -> `Error::DatabaseAlreadyExist`,
+/// and passes through other errors via `Error::RestApi`.
+fn map_rest_error_for_database(err: Error, database_name: &str) -> Error {
+    match err {
+        Error::RestApi {
+            source: RestError::NoSuchResource { .. },
+        } => Error::DatabaseNotExist {
+            database: database_name.to_string(),
+        },
+        Error::RestApi {
+            source: RestError::AlreadyExists { .. },
+        } => Error::DatabaseAlreadyExist {
+            database: database_name.to_string(),
+        },
+        other => other,
+    }
+}
+
+/// Map a REST API error to a catalog-level table error.
+///
+/// Converts `RestError::NoSuchResource` -> `Error::TableNotExist`,
+/// `RestError::AlreadyExists` -> `Error::TableAlreadyExist`,
+/// and passes through other errors via `Error::RestApi`.
+fn map_rest_error_for_table(err: Error, identifier: &Identifier) -> Error {
+    match err {
+        Error::RestApi {
+            source: RestError::NoSuchResource { .. },
+        } => Error::TableNotExist {
+            full_name: identifier.full_name(),
+        },
+        Error::RestApi {
+            source: RestError::AlreadyExists { .. },
+        } => Error::TableAlreadyExist {
+            full_name: identifier.full_name(),
+        },
+        other => other,
+    }
+}
+
+/// Execute a fallible operation and ignore a specific error variant.
+///
+/// If the operation succeeds, returns `Ok(())`.
+/// If it fails with an error that `should_ignore` returns `true` for, returns 
`Ok(())`.
+/// Otherwise, returns the error.
+fn ignore_error_if<F>(result: Result<()>, should_ignore: F) -> Result<()>
+where
+    F: Fn(&Error) -> bool,
+{
+    result.or_else(|err| {
+        if should_ignore(&err) {
+            Ok(())
+        } else {
+            Err(err)
+        }
+    })
+}
diff --git a/crates/paimon/src/io/mod.rs 
b/crates/paimon/src/catalog/rest/rest_token.rs
similarity index 56%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/catalog/rest/rest_token.rs
index 226aa72..83d9dc9 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/catalog/rest/rest_token.rs
@@ -15,28 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod file_io;
-pub use file_io::*;
+//! REST token for data access in Apache Paimon.
 
-mod storage;
-pub use storage::*;
+use std::collections::HashMap;
 
-#[cfg(feature = "storage-fs")]
-mod storage_fs;
-#[cfg(feature = "storage-fs")]
-use storage_fs::*;
+/// Token for REST data access, containing credentials and expiration.
+#[derive(Debug, Clone)]
+pub struct RESTToken {
+    /// Token key-value pairs (e.g. access_key_id, access_key_secret, etc.)
+    pub token: HashMap<String, String>,
+    /// Token expiration time in milliseconds since epoch.
+    pub expire_at_millis: i64,
+}
 
-#[cfg(feature = "storage-memory")]
-mod storage_memory;
-#[cfg(feature = "storage-memory")]
-use storage_memory::*;
-
-#[cfg(feature = "storage-oss")]
-mod storage_oss;
-#[cfg(feature = "storage-oss")]
-use storage_oss::*;
-
-#[cfg(feature = "storage-s3")]
-mod storage_s3;
-#[cfg(feature = "storage-s3")]
-use storage_s3::*;
+impl RESTToken {
+    /// Create a new RESTToken.
+    pub fn new(token: HashMap<String, String>, expire_at_millis: i64) -> Self {
+        Self {
+            token,
+            expire_at_millis,
+        }
+    }
+}
diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs 
b/crates/paimon/src/catalog/rest/rest_token_file_io.rs
new file mode 100644
index 0000000..6233eb1
--- /dev/null
+++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! REST token-based FileIO for Apache Paimon.
+//!
+//! This module provides a FileIO wrapper that supports getting data access
+//! tokens from a REST Server. It handles token caching, expiration detection,
+//! and automatic refresh.
+
+use std::collections::HashMap;
+
+use tokio::sync::{OnceCell, RwLock};
+
+use crate::api::rest_api::RESTApi;
+use crate::api::rest_util::RESTUtil;
+use crate::catalog::Identifier;
+use crate::common::{CatalogOptions, Options};
+use crate::io::storage_oss::OSS_ENDPOINT;
+use crate::io::FileIO;
+use crate::Result;
+
+use super::rest_token::RESTToken;
+
+/// Safe time margin (in milliseconds) before token expiration to trigger 
refresh.
+const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
+
+/// A FileIO wrapper that supports getting data access tokens from a REST 
Server.
+///
+/// This struct handles:
+/// - Token caching with expiration detection
+/// - Automatic token refresh via `RESTApi::load_table_token`
+/// - Merging token credentials into catalog options to build the underlying 
`FileIO`
+pub struct RESTTokenFileIO {
+    /// Table identifier for token requests.
+    identifier: Identifier,
+    /// Table path (e.g. "oss://bucket/warehouse/db.db/table").
+    path: String,
+    /// Catalog options used to build FileIO and create RESTApi.
+    catalog_options: Options,
+    /// Lazily-initialized REST API client for token refresh.
+    /// Created on first token refresh and reused for subsequent refreshes.
+    api: OnceCell<RESTApi>,
+    /// Cached token with RwLock for concurrent access.
+    token: RwLock<Option<RESTToken>>,
+}
+
+impl RESTTokenFileIO {
+    /// Create a new RESTTokenFileIO.
+    ///
+    /// # Arguments
+    /// * `identifier` - Table identifier for token requests.
+    /// * `path` - Table path for FileIO construction.
+    /// * `catalog_options` - Catalog options for RESTApi and FileIO.
+    pub fn new(identifier: Identifier, path: String, catalog_options: Options) 
-> Self {
+        Self {
+            identifier,
+            path,
+            catalog_options,
+            api: OnceCell::new(),
+            token: RwLock::new(None),
+        }
+    }
+
+    /// Build a `FileIO` instance with the current token merged into options.
+    ///
+    /// This method:
+    /// 1. Refreshes the token if expired or not yet obtained.
+    /// 2. Merges token credentials into catalog options.
+    /// 3. Builds a `FileIO` from the merged options.
+    ///
+    /// This method builds a FileIO with the current token,
+    /// which can be passed to `Table::new`. If the token expires, a new
+    /// `get_table` call is needed.
+    pub async fn build_file_io(&self) -> Result<FileIO> {
+        // Ensure token is fresh
+        self.try_to_refresh_token().await?;
+
+        let token_guard = self.token.read().await;
+        match token_guard.as_ref() {
+            Some(token) => {
+                // Merge catalog options (base) with token credentials 
(override)
+                let merged_props =
+                    RESTUtil::merge(Some(self.catalog_options.to_map()), 
Some(&token.token));
+                // Build FileIO with merged properties
+                let mut builder = FileIO::from_path(&self.path)?;
+                builder = builder.with_props(merged_props);
+                builder.build()
+            }
+            None => {
+                // No token available, build FileIO from path only
+                FileIO::from_path(&self.path)?.build()
+            }
+        }
+    }
+
+    /// Try to refresh the token if it is expired or not yet obtained.
+    async fn try_to_refresh_token(&self) -> Result<()> {
+        // Fast path: check if token is still valid under read lock
+        {
+            let token_guard = self.token.read().await;
+            if let Some(token) = token_guard.as_ref() {
+                if !Self::is_token_expired(token) {
+                    return Ok(());
+                }
+            }
+        }
+
+        // Slow path: acquire write lock and check again
+        {
+            let token_guard = self.token.write().await;
+            if let Some(token) = token_guard.as_ref() {
+                if !Self::is_token_expired(token) {
+                    return Ok(());
+                }
+            }
+        }
+        // Write lock released before .await to avoid potential deadlock
+
+        // Refresh the token WITHOUT holding the lock
+        let new_token = self.refresh_token().await?;
+
+        // Acquire write lock again to update
+        let mut token_guard = self.token.write().await;
+        *token_guard = Some(new_token);
+        Ok(())
+    }
+
+    /// Refresh the token by calling `RESTApi::load_table_token`.
+    ///
+    /// Lazily creates a `RESTApi` instance on first call and reuses it
+    /// for subsequent refreshes.
+    async fn refresh_token(&self) -> Result<RESTToken> {
+        let api = self
+            .api
+            .get_or_try_init(|| async { 
RESTApi::new(self.catalog_options.clone(), false).await })
+            .await?;
+
+        let response = api.load_table_token(&self.identifier).await?;
+
+        let expires_at_millis =
+            response
+                .expires_at_millis
+                .ok_or_else(|| crate::Error::DataInvalid {
+                    message: format!(
+                        "Token response for table '{}' missing 
expires_at_millis",
+                        self.identifier.full_name()
+                    ),
+                    source: None,
+                })?;
+
+        // Merge token with catalog options (e.g. DLF OSS endpoint override)
+        let merged_token = 
self.merge_token_with_catalog_options(response.token);
+        Ok(RESTToken::new(merged_token, expires_at_millis))
+    }
+
+    /// Check if a token is expired (within the safe time margin).
+    fn is_token_expired(token: &RESTToken) -> bool {
+        let current_time = std::time::SystemTime::now()
+            .duration_since(std::time::UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_millis() as i64;
+        (token.expire_at_millis - current_time) < 
TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+    }
+
+    /// Merge token credentials with catalog options for DLF OSS endpoint 
override.
+    fn merge_token_with_catalog_options(
+        &self,
+        token: HashMap<String, String>,
+    ) -> HashMap<String, String> {
+        let mut merged = token;
+        // If catalog options contain a DLF OSS endpoint, override the 
standard OSS endpoint
+        if let Some(dlf_oss_endpoint) = 
self.catalog_options.get(CatalogOptions::DLF_OSS_ENDPOINT) {
+            if !dlf_oss_endpoint.trim().is_empty() {
+                merged.insert(OSS_ENDPOINT.to_string(), 
dlf_oss_endpoint.clone());
+            }
+        }
+        merged
+    }
+}
diff --git a/crates/paimon/src/common/options.rs 
b/crates/paimon/src/common/options.rs
index a469a07..792c601 100644
--- a/crates/paimon/src/common/options.rs
+++ b/crates/paimon/src/common/options.rs
@@ -69,6 +69,9 @@ impl CatalogOptions {
 
     /// DLF ECS role name.
     pub const DLF_TOKEN_ECS_ROLE_NAME: &'static str = 
"dlf.token-ecs-role-name";
+
+    /// DLF OSS endpoint override.
+    pub const DLF_OSS_ENDPOINT: &'static str = "dlf.oss-endpoint";
 }
 
 /// Configuration options container.
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 3b0a4d6..84c134a 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -21,9 +21,9 @@ use std::ops::Range;
 use std::sync::Arc;
 
 use bytes::Bytes;
-use chrono::{DateTime, Utc};
 use opendal::raw::normalize_root;
-use opendal::{Metakey, Operator};
+use opendal::raw::Timestamp;
+use opendal::Operator;
 use snafu::ResultExt;
 use url::Url;
 
@@ -122,18 +122,21 @@ impl FileIO {
         // use normalize_root to make sure it end with `/`.
         let list_path = normalize_root(relative_path);
 
-        // Request ContentLength and LastModified so accessing 
meta.content_length() / last_modified()
-        let entries = op
-            .list_with(&list_path)
-            .metakey(Metakey::ContentLength | Metakey::LastModified)
-            .await
-            .context(IoUnexpectedSnafu {
-                message: format!("Failed to list files in '{path}'"),
-            })?;
+        let entries = op.list_with(&list_path).await.context(IoUnexpectedSnafu 
{
+            message: format!("Failed to list files in '{path}'"),
+        })?;
 
         let mut statuses = Vec::new();
-
+        let list_path_normalized = list_path.trim_start_matches('/');
         for entry in entries {
+            // opendal list_with includes the root directory itself as the 
first entry.
+            // The root entry's path equals list_path (with or without leading 
slash).
+            // Skip it so callers only see the direct children.
+            let entry_path = entry.path();
+            let entry_path_normalized = entry_path.trim_start_matches('/');
+            if entry_path_normalized == list_path_normalized {
+                continue;
+            }
             let meta = entry.metadata();
             statuses.push(FileStatus {
                 size: meta.content_length(),
@@ -152,7 +155,7 @@ impl FileIO {
     pub async fn exists(&self, path: &str) -> Result<bool> {
         let (op, relative_path) = self.storage.create(path)?;
 
-        op.is_exist(relative_path).await.context(IoUnexpectedSnafu {
+        op.exists(relative_path).await.context(IoUnexpectedSnafu {
             message: format!("Failed to check existence of '{path}'"),
         })
     }
@@ -285,7 +288,8 @@ impl FileWrite for opendal::Writer {
     }
 
     async fn close(&mut self) -> crate::Result<()> {
-        Ok(opendal::Writer::close(self).await?)
+        opendal::Writer::close(self).await?;
+        Ok(())
     }
 }
 
@@ -294,7 +298,7 @@ pub struct FileStatus {
     pub size: u64,
     pub is_dir: bool,
     pub path: String,
-    pub last_modified: Option<DateTime<Utc>>,
+    pub last_modified: Option<Timestamp>,
 }
 
 #[derive(Debug)]
@@ -310,10 +314,7 @@ impl InputFile {
     }
 
     pub async fn exists(&self) -> crate::Result<bool> {
-        Ok(self
-            .op
-            .is_exist(&self.path[self.relative_path_pos..])
-            .await?)
+        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
     }
 
     pub async fn metadata(&self) -> crate::Result<FileStatus> {
@@ -353,10 +354,7 @@ impl OutputFile {
     }
 
     pub async fn exists(&self) -> crate::Result<bool> {
-        Ok(self
-            .op
-            .is_exist(&self.path[self.relative_path_pos..])
-            .await?)
+        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
     }
 
     pub fn to_input_file(self) -> InputFile {
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs
index 226aa72..92a909a 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -32,7 +32,7 @@ mod storage_memory;
 use storage_memory::*;
 
 #[cfg(feature = "storage-oss")]
-mod storage_oss;
+pub(crate) mod storage_oss;
 #[cfg(feature = "storage-oss")]
 use storage_oss::*;
 
diff --git a/crates/paimon/src/io/storage_oss.rs 
b/crates/paimon/src/io/storage_oss.rs
index 884b0a7..7884894 100644
--- a/crates/paimon/src/io/storage_oss.rs
+++ b/crates/paimon/src/io/storage_oss.rs
@@ -27,23 +27,29 @@ use crate::Result;
 /// Configuration key for OSS endpoint.
 ///
 /// Compatible with paimon-java's `fs.oss.endpoint`.
-const OSS_ENDPOINT: &str = "fs.oss.endpoint";
+pub(crate) const OSS_ENDPOINT: &str = "fs.oss.endpoint";
 
 /// Configuration key for OSS access key ID.
 ///
 /// Compatible with paimon-java's `fs.oss.accessKeyId`.
-const OSS_ACCESS_KEY_ID: &str = "fs.oss.accessKeyId";
+pub(crate) const OSS_ACCESS_KEY_ID: &str = "fs.oss.accessKeyId";
 
 /// Configuration key for OSS access key secret.
 ///
 /// Compatible with paimon-java's `fs.oss.accessKeySecret`.
-const OSS_ACCESS_KEY_SECRET: &str = "fs.oss.accessKeySecret";
+pub(crate) const OSS_ACCESS_KEY_SECRET: &str = "fs.oss.accessKeySecret";
+
+/// Configuration key for OSS STS security token (optional).
+///
+/// Compatible with paimon-java's `fs.oss.securityToken`.
+/// Required when using STS temporary credentials (e.g. from REST data tokens).
+pub(crate) const OSS_SECURITY_TOKEN: &str = "fs.oss.securityToken";
 
 /// Parse paimon catalog options into an [`OssConfig`].
 ///
-/// Extracts OSS-related configuration keys (endpoint, access key, secret key)
-/// from the provided properties map and maps them to the corresponding
-/// [`OssConfig`] fields.
+/// Extracts OSS-related configuration keys (endpoint, access key, secret key,
+/// and optional security token) from the provided properties map and maps them
+/// to the corresponding [`OssConfig`] fields.
 ///
 /// Returns an error if any required configuration key is missing.
 pub(crate) fn oss_config_parse(mut props: HashMap<String, String>) -> 
Result<OssConfig> {
@@ -73,6 +79,7 @@ pub(crate) fn oss_config_parse(mut props: HashMap<String, 
String>) -> Result<Oss
                 })?,
         );
 
+    cfg.security_token = props.remove(OSS_SECURITY_TOKEN);
     Ok(cfg)
 }
 
diff --git a/crates/paimon/tests/mock_server.rs 
b/crates/paimon/tests/mock_server.rs
index def0ada..e5e9750 100644
--- a/crates/paimon/tests/mock_server.rs
+++ b/crates/paimon/tests/mock_server.rs
@@ -24,8 +24,8 @@ use axum::{
     extract::{Extension, Json, Path, Query},
     http::StatusCode,
     response::IntoResponse,
-    routing::get,
-    Router,
+    routing::{get, post},
+    serve, Router,
 };
 use serde_json::json;
 use std::collections::{HashMap, HashSet};
@@ -570,6 +570,44 @@ impl RESTServer {
         });
     }
 
+    /// Add a table with schema and path to the server state.
+    ///
+    /// This is needed for `RESTCatalog::get_table` which requires
+    /// the response to contain `schema` and `path`.
+    #[allow(dead_code)]
+    pub fn add_table_with_schema(
+        &self,
+        database: &str,
+        table: &str,
+        schema: paimon::spec::Schema,
+        path: &str,
+    ) {
+        let mut s = self.inner.lock().unwrap();
+        s.databases.entry(database.to_string()).or_insert_with(|| {
+            GetDatabaseResponse::new(
+                Some(database.to_string()),
+                Some(database.to_string()),
+                None,
+                HashMap::new(),
+                AuditRESTResponse::new(None, None, None, None, None),
+            )
+        });
+
+        let key = format!("{}.{}", database, table);
+        s.tables.insert(
+            key,
+            GetTableResponse::new(
+                Some(table.to_string()),
+                Some(table.to_string()),
+                Some(path.to_string()),
+                Some(true),
+                Some(0),
+                Some(schema),
+                AuditRESTResponse::new(None, None, None, None, None),
+            ),
+        );
+    }
+
     /// Add a no-permission table to the server state.
     #[allow(dead_code)]
     pub fn add_no_permission_table(&self, database: &str, table: &str) {
@@ -696,7 +734,7 @@ pub async fn start_mock_server(
         )
         .route(
             &format!("{prefix}/tables/rename"),
-            axum::routing::post(RESTServer::rename_table),
+            post(RESTServer::rename_table),
         )
         // ECS metadata endpoints (for token loader testing)
         .route(
@@ -715,7 +753,7 @@ pub async fn start_mock_server(
     let addr = listener.local_addr().unwrap();
 
     let server_handle = tokio::spawn(async move {
-        if let Err(e) = axum::serve(listener, app.into_make_service()).await {
+        if let Err(e) = serve(listener, app.into_make_service()).await {
             eprintln!("mock server error: {e}");
         }
     });
diff --git a/crates/paimon/tests/rest_api_test.rs 
b/crates/paimon/tests/rest_api_test.rs
index 753f5d1..74784b3 100644
--- a/crates/paimon/tests/rest_api_test.rs
+++ b/crates/paimon/tests/rest_api_test.rs
@@ -73,7 +73,7 @@ async fn setup_test_server(initial_dbs: Vec<&str>) -> 
TestContext {
 // ==================== Database Tests ====================
 #[tokio::test]
 async fn test_list_databases() {
-    let mut ctx = setup_test_server(vec!["default", "test_db1", 
"prod_db"]).await;
+    let ctx = setup_test_server(vec!["default", "test_db1", "prod_db"]).await;
 
     let dbs = ctx.api.list_databases().await.unwrap();
 
@@ -84,7 +84,7 @@ async fn test_list_databases() {
 
 #[tokio::test]
 async fn test_create_database() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     // Create new database
     let result = ctx.api.create_database("new_db", None).await;
@@ -101,7 +101,7 @@ async fn test_create_database() {
 
 #[tokio::test]
 async fn test_get_database() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     let db_resp = ctx.api.get_database("default").await.unwrap();
     assert_eq!(db_resp.name, Some("default".to_string()));
@@ -160,7 +160,7 @@ async fn test_error_responses_status_mapping() {
 
 #[tokio::test]
 async fn test_alter_database() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     // Alter database with updates
     let mut updates = HashMap::new();
@@ -189,7 +189,7 @@ async fn test_alter_database() {
 
 #[tokio::test]
 async fn test_alter_database_not_found() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     let result = ctx
         .api
@@ -203,7 +203,7 @@ async fn test_alter_database_not_found() {
 
 #[tokio::test]
 async fn test_drop_database() {
-    let mut ctx = setup_test_server(vec!["default", "to_drop"]).await;
+    let ctx = setup_test_server(vec!["default", "to_drop"]).await;
 
     // Verify database exists
     let dbs = ctx.api.list_databases().await.unwrap();
@@ -227,7 +227,7 @@ async fn test_drop_database() {
 
 #[tokio::test]
 async fn test_drop_database_no_permission() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
     ctx.server.add_no_permission_database("secret");
 
     let result = ctx.api.drop_database("secret").await;
@@ -240,7 +240,7 @@ async fn test_drop_database_no_permission() {
 
 #[tokio::test]
 async fn test_list_tables_and_get_table() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     // Add tables
     ctx.server.add_table("default", "table1");
@@ -262,7 +262,7 @@ async fn test_list_tables_and_get_table() {
 
 #[tokio::test]
 async fn test_get_table_not_found() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     let result = ctx
         .api
@@ -273,7 +273,7 @@ async fn test_get_table_not_found() {
 
 #[tokio::test]
 async fn test_list_tables_empty_database() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     let tables = ctx.api.list_tables("default").await.unwrap();
     assert!(
@@ -284,7 +284,7 @@ async fn test_list_tables_empty_database() {
 
 #[tokio::test]
 async fn test_multiple_databases_with_tables() {
-    let mut ctx = setup_test_server(vec!["db1", "db2"]).await;
+    let ctx = setup_test_server(vec!["db1", "db2"]).await;
 
     // Add tables to different databases
     ctx.server.add_table("db1", "table1_db1");
@@ -305,7 +305,7 @@ async fn test_multiple_databases_with_tables() {
 
 #[tokio::test]
 async fn test_create_table() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     // Create a simple schema using builder
     use paimon::spec::{DataType, Schema};
@@ -339,7 +339,7 @@ async fn test_create_table() {
 
 #[tokio::test]
 async fn test_drop_table() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     // Add a table
     ctx.server.add_table("default", "table_to_drop");
@@ -369,7 +369,7 @@ async fn test_drop_table() {
 
 #[tokio::test]
 async fn test_drop_table_no_permission() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
     ctx.server
         .add_no_permission_table("default", "secret_table");
 
@@ -384,7 +384,7 @@ async fn test_drop_table_no_permission() {
 
 #[tokio::test]
 async fn test_rename_table() {
-    let mut ctx = setup_test_server(vec!["default"]).await;
+    let ctx = setup_test_server(vec!["default"]).await;
 
     // Add a table
     ctx.server.add_table("default", "old_table");
diff --git a/crates/paimon/tests/rest_catalog_test.rs 
b/crates/paimon/tests/rest_catalog_test.rs
new file mode 100644
index 0000000..7ac8536
--- /dev/null
+++ b/crates/paimon/tests/rest_catalog_test.rs
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for RESTCatalog.
+//!
+//! These tests use a mock server to verify the RESTCatalog behavior
+//! through the Catalog trait interface.
+
+use std::collections::HashMap;
+
+use paimon::api::ConfigResponse;
+use paimon::catalog::{Catalog, Identifier, RESTCatalog};
+use paimon::common::Options;
+use paimon::spec::{BigIntType, DataType, Schema, VarCharType};
+
+mod mock_server;
+use mock_server::{start_mock_server, RESTServer};
+
+/// Helper struct to hold test resources.
+struct TestContext {
+    server: RESTServer,
+    catalog: RESTCatalog,
+}
+
+/// Helper function to set up a test environment with RESTCatalog.
+async fn setup_catalog(initial_dbs: Vec<&str>) -> TestContext {
+    let prefix = "mock-test";
+    let mut defaults = HashMap::new();
+    defaults.insert("prefix".to_string(), prefix.to_string());
+    let config = ConfigResponse::new(defaults);
+
+    let initial: Vec<String> = initial_dbs.iter().map(|s| 
s.to_string()).collect();
+    let server = start_mock_server(
+        "test_warehouse".to_string(),
+        "/tmp/test_warehouse".to_string(),
+        config,
+        initial,
+    )
+    .await;
+
+    let url = server.url().expect("Failed to get server URL");
+    let mut options = Options::new();
+    options.set("uri", &url);
+    options.set("warehouse", "test_warehouse");
+    options.set("token.provider", "bear");
+    options.set("token", "test_token");
+
+    let catalog = RESTCatalog::new(options, true)
+        .await
+        .expect("Failed to create RESTCatalog");
+
+    TestContext { server, catalog }
+}
+
+/// Helper to build a simple test schema.
+fn test_schema() -> Schema {
+    Schema::builder()
+        .column("id", DataType::BigInt(BigIntType::new()))
+        .column("name", DataType::VarChar(VarCharType::new(255).unwrap()))
+        .build()
+        .expect("Failed to build schema")
+}
+
+// ==================== Database Tests ====================
+
+#[tokio::test]
+async fn test_catalog_list_databases() {
+    let ctx = setup_catalog(vec!["default", "test_db1", "prod_db"]).await;
+
+    let dbs = ctx.catalog.list_databases().await.unwrap();
+
+    assert!(dbs.contains(&"default".to_string()));
+    assert!(dbs.contains(&"test_db1".to_string()));
+    assert!(dbs.contains(&"prod_db".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_create_database() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Create new database
+    let result = ctx
+        .catalog
+        .create_database("new_db", false, HashMap::new())
+        .await;
+    assert!(result.is_ok(), "failed to create database: {:?}", result);
+
+    // Verify creation
+    let dbs = ctx.catalog.list_databases().await.unwrap();
+    assert!(dbs.contains(&"new_db".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_create_database_already_exists() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Duplicate creation with ignore_if_exists=false should fail
+    let result = ctx
+        .catalog
+        .create_database("default", false, HashMap::new())
+        .await;
+    assert!(
+        result.is_err(),
+        "creating duplicate database should fail when ignore_if_exists=false"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_create_database_ignore_if_exists() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Duplicate creation with ignore_if_exists=true should succeed
+    let result = ctx
+        .catalog
+        .create_database("default", true, HashMap::new())
+        .await;
+    assert!(
+        result.is_ok(),
+        "creating duplicate database should succeed when ignore_if_exists=true"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database() {
+    let ctx = setup_catalog(vec!["default", "to_drop"]).await;
+
+    // Verify database exists
+    let dbs = ctx.catalog.list_databases().await.unwrap();
+    assert!(dbs.contains(&"to_drop".to_string()));
+
+    // Drop database (cascade=true to skip empty check)
+    let result = ctx.catalog.drop_database("to_drop", false, true).await;
+    assert!(result.is_ok(), "failed to drop database: {:?}", result);
+
+    // Verify database is gone
+    let dbs = ctx.catalog.list_databases().await.unwrap();
+    assert!(!dbs.contains(&"to_drop".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database_not_exists() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Dropping non-existent database with ignore_if_not_exists=false should 
fail
+    let result = ctx.catalog.drop_database("non_existent", false, true).await;
+    assert!(
+        result.is_err(),
+        "dropping non-existent database should fail when 
ignore_if_not_exists=false"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database_ignore_if_not_exists() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Dropping non-existent database with ignore_if_not_exists=true should 
succeed
+    let result = ctx.catalog.drop_database("non_existent", true, true).await;
+    assert!(
+        result.is_ok(),
+        "dropping non-existent database should succeed when 
ignore_if_not_exists=true"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database_not_empty_no_cascade() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Add a table to the database
+    ctx.server.add_table("default", "some_table");
+
+    // Drop database with cascade=false should fail because it's not empty
+    let result = ctx.catalog.drop_database("default", false, false).await;
+    assert!(
+        result.is_err(),
+        "dropping non-empty database should fail when cascade=false"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_database_not_empty_cascade() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Add a table to the database
+    ctx.server.add_table("default", "some_table");
+
+    // Drop database with cascade=true should succeed
+    let result = ctx.catalog.drop_database("default", false, true).await;
+    assert!(
+        result.is_ok(),
+        "dropping non-empty database should succeed when cascade=true"
+    );
+
+    // Verify database is gone
+    let dbs = ctx.catalog.list_databases().await.unwrap();
+    assert!(!dbs.contains(&"default".to_string()));
+}
+
+// ==================== Table Tests ====================
+
+#[tokio::test]
+async fn test_catalog_list_tables() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Add tables
+    ctx.server.add_table("default", "table1");
+    ctx.server.add_table("default", "table2");
+
+    // List tables
+    let tables = ctx.catalog.list_tables("default").await.unwrap();
+    assert!(tables.contains(&"table1".to_string()));
+    assert!(tables.contains(&"table2".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_list_tables_empty() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    let tables = ctx.catalog.list_tables("default").await.unwrap();
+    assert!(
+        tables.is_empty(),
+        "expected empty tables list, got: {:?}",
+        tables
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_get_table() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Add a table with schema and path so get_table can build a Table object
+    let schema = test_schema();
+    ctx.server.add_table_with_schema(
+        "default",
+        "my_table",
+        schema,
+        "file:///tmp/test_warehouse/default.db/my_table",
+    );
+
+    let identifier = Identifier::new("default", "my_table");
+    let table = ctx.catalog.get_table(&identifier).await;
+    assert!(table.is_ok(), "failed to get table: {:?}", table);
+}
+
+#[tokio::test]
+async fn test_catalog_get_table_not_found() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    let identifier = Identifier::new("default", "non_existent");
+    let result = ctx.catalog.get_table(&identifier).await;
+    assert!(result.is_err(), "getting non-existent table should fail");
+}
+
+#[tokio::test]
+async fn test_catalog_create_table() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    let schema = test_schema();
+    let identifier = Identifier::new("default", "new_table");
+
+    let result = ctx.catalog.create_table(&identifier, schema, false).await;
+    assert!(result.is_ok(), "failed to create table: {:?}", result);
+
+    // Verify table exists
+    let tables = ctx.catalog.list_tables("default").await.unwrap();
+    assert!(tables.contains(&"new_table".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_create_table_already_exists() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Add a table first
+    ctx.server.add_table("default", "existing_table");
+
+    let schema = test_schema();
+    let identifier = Identifier::new("default", "existing_table");
+
+    // Create with ignore_if_exists=false should fail
+    let result = ctx.catalog.create_table(&identifier, schema, false).await;
+    assert!(
+        result.is_err(),
+        "creating duplicate table should fail when ignore_if_exists=false"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_create_table_ignore_if_exists() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Add a table first
+    ctx.server.add_table("default", "existing_table");
+
+    let schema = test_schema();
+    let identifier = Identifier::new("default", "existing_table");
+
+    // Create with ignore_if_exists=true should succeed
+    let result = ctx.catalog.create_table(&identifier, schema, true).await;
+    assert!(
+        result.is_ok(),
+        "creating duplicate table should succeed when ignore_if_exists=true"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_table() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Add a table
+    ctx.server.add_table("default", "table_to_drop");
+
+    let identifier = Identifier::new("default", "table_to_drop");
+
+    // Drop table
+    let result = ctx.catalog.drop_table(&identifier, false).await;
+    assert!(result.is_ok(), "failed to drop table: {:?}", result);
+
+    // Verify table is gone
+    let tables = ctx.catalog.list_tables("default").await.unwrap();
+    assert!(!tables.contains(&"table_to_drop".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_drop_table_not_found() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    let identifier = Identifier::new("default", "non_existent");
+
+    // Drop with ignore_if_not_exists=false should fail
+    let result = ctx.catalog.drop_table(&identifier, false).await;
+    assert!(
+        result.is_err(),
+        "dropping non-existent table should fail when 
ignore_if_not_exists=false"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_drop_table_ignore_if_not_exists() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    let identifier = Identifier::new("default", "non_existent");
+
+    // Drop with ignore_if_not_exists=true should succeed
+    let result = ctx.catalog.drop_table(&identifier, true).await;
+    assert!(
+        result.is_ok(),
+        "dropping non-existent table should succeed when 
ignore_if_not_exists=true"
+    );
+}
+
+// ==================== Rename Table Tests ====================
+
+#[tokio::test]
+async fn test_catalog_rename_table() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    // Add a table
+    ctx.server.add_table("default", "old_table");
+
+    let from = Identifier::new("default", "old_table");
+    let to = Identifier::new("default", "new_table");
+
+    // Rename table
+    let result = ctx.catalog.rename_table(&from, &to, false).await;
+    assert!(result.is_ok(), "failed to rename table: {:?}", result);
+
+    // Verify old table is gone and new table exists
+    let tables = ctx.catalog.list_tables("default").await.unwrap();
+    assert!(!tables.contains(&"old_table".to_string()));
+    assert!(tables.contains(&"new_table".to_string()));
+}
+
+#[tokio::test]
+async fn test_catalog_rename_table_not_found() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    let from = Identifier::new("default", "non_existent");
+    let to = Identifier::new("default", "new_name");
+
+    // Rename with ignore_if_not_exists=false should fail
+    let result = ctx.catalog.rename_table(&from, &to, false).await;
+    assert!(
+        result.is_err(),
+        "renaming non-existent table should fail when 
ignore_if_not_exists=false"
+    );
+}
+
+#[tokio::test]
+async fn test_catalog_rename_table_ignore_if_not_exists() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    let from = Identifier::new("default", "non_existent");
+    let to = Identifier::new("default", "new_name");
+
+    // Rename with ignore_if_not_exists=true should succeed
+    let result = ctx.catalog.rename_table(&from, &to, true).await;
+    assert!(
+        result.is_ok(),
+        "renaming non-existent table should succeed when 
ignore_if_not_exists=true"
+    );
+}
+
+// ==================== Alter Table Tests ====================
+
+#[tokio::test]
+async fn test_catalog_alter_table_unsupported() {
+    let ctx = setup_catalog(vec!["default"]).await;
+
+    let identifier = Identifier::new("default", "some_table");
+
+    // alter_table should return Unsupported error
+    let result = ctx.catalog.alter_table(&identifier, vec![], false).await;
+    assert!(
+        result.is_err(),
+        "alter_table should return Unsupported error"
+    );
+}
+
+// ==================== Multiple Databases Tests ====================
+
+#[tokio::test]
+async fn test_catalog_multiple_databases_with_tables() {
+    let ctx = setup_catalog(vec!["db1", "db2"]).await;
+
+    // Add tables to different databases
+    ctx.server.add_table("db1", "table1_db1");
+    ctx.server.add_table("db1", "table2_db1");
+    ctx.server.add_table("db2", "table1_db2");
+
+    // Verify db1 tables
+    let tables_db1 = ctx.catalog.list_tables("db1").await.unwrap();
+    assert_eq!(tables_db1.len(), 2);
+    assert!(tables_db1.contains(&"table1_db1".to_string()));
+    assert!(tables_db1.contains(&"table2_db1".to_string()));
+
+    // Verify db2 tables
+    let tables_db2 = ctx.catalog.list_tables("db2").await.unwrap();
+    assert_eq!(tables_db2.len(), 1);
+    assert!(tables_db2.contains(&"table1_db2".to_string()));
+}

Reply via email to