This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d0a80a  feat: support datafusion integration (#150)
7d0a80a is described below

commit 7d0a80a8b453166c9bfde69ad2c8b055db352b87
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Mar 24 20:11:53 2026 +0800

    feat: support datafusion integration (#150)
---
 .github/workflows/ci.yml                           |   6 +
 Cargo.toml                                         |   2 +-
 .../integrations/datafusion/Cargo.toml             |  29 ++---
 crates/integrations/datafusion/src/error.rs        |  23 ++++
 crates/integrations/datafusion/src/lib.rs          |  46 +++++++
 .../datafusion/src/physical_plan/mod.rs            |  20 +++
 .../datafusion/src/physical_plan/scan.rs           | 125 +++++++++++++++++++
 crates/integrations/datafusion/src/schema.rs       | 116 ++++++++++++++++++
 crates/integrations/datafusion/src/table/mod.rs    |  98 +++++++++++++++
 .../integrations/datafusion/tests/read_tables.rs   | 134 +++++++++++++++++++++
 .../paimon/examples/rest_list_databases_example.rs |   6 +-
 crates/paimon/src/api/auth/factory.rs              |   2 +-
 crates/paimon/src/api/rest_client.rs               |   4 +-
 crates/paimon/src/spec/partition_utils.rs          |  16 +--
 crates/paimon/tests/mock_server.rs                 |   8 +-
 deny.toml                                          |   1 +
 16 files changed, 603 insertions(+), 33 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 73f47c9..df0aebd 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -96,6 +96,12 @@ jobs:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
 
+      - name: DataFusion Integration Test
+        run: cargo test -p paimon-datafusion --test read_tables
+        env:
+          RUST_LOG: DEBUG
+          RUST_BACKTRACE: full
+
       - name: Go Integration Test
         working-directory: bindings/go
         run: make test
diff --git a/Cargo.toml b/Cargo.toml
index 7384d66..6282c09 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,7 @@
 
 [workspace]
 resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c"]
+members = ["crates/paimon", "crates/integration_tests", "bindings/c", 
"crates/integrations/datafusion"]
 
 [workspace.package]
 version = "0.0.0"
diff --git a/Cargo.toml b/crates/integrations/datafusion/Cargo.toml
similarity index 62%
copy from Cargo.toml
copy to crates/integrations/datafusion/Cargo.toml
index 7384d66..c9673ec 100644
--- a/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -15,19 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c"]
+[package]
+name = "paimon-datafusion"
+edition.workspace = true
+version.workspace = true
+license.workspace = true
+description = "Apache Paimon DataFusion Integration (read-only)"
+categories = ["database"]
+keywords = ["paimon", "datafusion", "integrations"]
 
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/";
-repository = "https://github.com/apache/paimon-rust";
-license = "Apache-2.0"
-rust-version = "1.86.0"
+[dependencies]
+async-trait = "0.1"
+datafusion = { version = "52.3.0"}
+paimon = { path = "../../paimon" }
+futures = "0.3"
 
-[workspace.dependencies]
-arrow-array = { version = "57.0", features = ["ffi"] }
-parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+[dev-dependencies]
+tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/error.rs 
b/crates/integrations/datafusion/src/error.rs
new file mode 100644
index 0000000..92b2728
--- /dev/null
+++ b/crates/integrations/datafusion/src/error.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::common::error::GenericError;
+
+/// Converts a Paimon error into a DataFusion error.
+pub fn to_datafusion_error(error: paimon::Error) -> 
datafusion::error::DataFusionError {
+    datafusion::error::DataFusionError::External(GenericError::from(error))
+}
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
new file mode 100644
index 0000000..edfe1ed
--- /dev/null
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Apache Paimon DataFusion Integration (read-only).
+//!
+//! Register a Paimon table as a DataFusion table provider to query it with 
SQL or DataFrame API.
+//!
+//! # Example
+//!
+//! ```ignore
+//! use std::sync::Arc;
+//! use datafusion::prelude::SessionContext;
+//! use paimon_datafusion::PaimonTableProvider;
+//!
+//! // Obtain a Paimon Table (e.g. from your catalog), then:
+//! let provider = PaimonTableProvider::try_new(table)?;
+//! let ctx = SessionContext::new();
+//! ctx.register_table("my_table", Arc::new(provider))?;
+//! let df = ctx.sql("SELECT * FROM my_table").await?;
+//! ```
+//!
+//! This version does not support write, column projection, or predicate 
pushdown.
+
+mod error;
+mod physical_plan;
+mod schema;
+mod table;
+
+pub use error::to_datafusion_error;
+pub use physical_plan::PaimonTableScan;
+pub use schema::paimon_schema_to_arrow;
+pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs 
b/crates/integrations/datafusion/src/physical_plan/mod.rs
new file mode 100644
index 0000000..48aa546
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub(crate) mod scan;
+
+pub use scan::PaimonTableScan;
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
new file mode 100644
index 0000000..e567d15
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::error::Result as DFResult;
+use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::physical_expr::EquivalenceProperties;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, 
PlanProperties};
+use futures::{StreamExt, TryStreamExt};
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+
+/// Execution plan that scans a Paimon table (read-only, no projection, no 
predicate, no limit).
+#[derive(Debug)]
+pub struct PaimonTableScan {
+    table: Table,
+    plan_properties: PlanProperties,
+}
+
+impl PaimonTableScan {
+    pub(crate) fn new(schema: ArrowSchemaRef, table: Table) -> Self {
+        let plan_properties = PlanProperties::new(
+            EquivalenceProperties::new(schema.clone()),
+            // TODO: Currently all Paimon splits are read in a single 
DataFusion partition,
+            // which means we lose DataFusion parallelism. A follow-up should 
expose one
+            // execution partition per Paimon split so that DataFusion can 
schedule them
+            // across threads.
+            Partitioning::UnknownPartitioning(1),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
+        );
+        Self {
+            table,
+            plan_properties,
+        }
+    }
+
+    pub fn table(&self) -> &Table {
+        &self.table
+    }
+}
+
+impl ExecutionPlan for PaimonTableScan {
+    fn name(&self) -> &str {
+        "PaimonTableScan"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.plan_properties
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan + 'static>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> DFResult<SendableRecordBatchStream> {
+        let table = self.table.clone();
+        let schema = self.schema();
+
+        let fut = async move {
+            let read_builder = table.new_read_builder();
+            let scan = read_builder.new_scan();
+            let plan = scan.plan().await.map_err(to_datafusion_error)?;
+            let read = read_builder.new_read().map_err(to_datafusion_error)?;
+            let stream = 
read.to_arrow(plan.splits()).map_err(to_datafusion_error)?;
+            let stream = stream.map(|r| r.map_err(to_datafusion_error));
+
+            Ok::<_, 
datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new(
+                schema,
+                Box::pin(stream),
+            ))
+        };
+
+        let stream = futures::stream::once(fut).try_flatten();
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            stream,
+        )))
+    }
+}
+
+impl DisplayAs for PaimonTableScan {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "PaimonTableScan")
+    }
+}
diff --git a/crates/integrations/datafusion/src/schema.rs 
b/crates/integrations/datafusion/src/schema.rs
new file mode 100644
index 0000000..231431b
--- /dev/null
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::common::DataFusionError;
+use datafusion::common::Result as DFResult;
+use std::sync::Arc;
+
+use paimon::spec::{DataField, DataType as PaimonDataType};
+
+/// Converts Paimon table schema (logical row type fields) to DataFusion Arrow 
schema.
+pub fn paimon_schema_to_arrow(fields: &[DataField]) -> DFResult<Arc<Schema>> {
+    let arrow_fields: Vec<Field> = fields
+        .iter()
+        .map(|f| {
+            let arrow_type = paimon_data_type_to_arrow(f.data_type())?;
+            Ok(Field::new(
+                f.name(),
+                arrow_type,
+                f.data_type().is_nullable(),
+            ))
+        })
+        .collect::<DFResult<Vec<_>>>()?;
+    Ok(Arc::new(Schema::new(arrow_fields)))
+}
+
+fn paimon_data_type_to_arrow(dt: &PaimonDataType) -> DFResult<DataType> {
+    use datafusion::arrow::datatypes::TimeUnit;
+
+    Ok(match dt {
+        PaimonDataType::Boolean(_) => DataType::Boolean,
+        PaimonDataType::TinyInt(_) => DataType::Int8,
+        PaimonDataType::SmallInt(_) => DataType::Int16,
+        PaimonDataType::Int(_) => DataType::Int32,
+        PaimonDataType::BigInt(_) => DataType::Int64,
+        PaimonDataType::Float(_) => DataType::Float32,
+        PaimonDataType::Double(_) => DataType::Float64,
+        PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => DataType::Utf8,
+        PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) => 
DataType::Binary,
+        PaimonDataType::Date(_) => DataType::Date32,
+        PaimonDataType::Time(t) => match t.precision() {
+            // `read.to_arrow(...)` goes through the Parquet Arrow reader, 
which exposes INT32
+            // TIME values as millisecond precision only. Mirror that here so 
provider schema and
+            // runtime RecordBatch schema stay identical.
+            0..=3 => DataType::Time32(TimeUnit::Millisecond),
+            4..=6 => DataType::Time64(TimeUnit::Microsecond),
+            7..=9 => DataType::Time64(TimeUnit::Nanosecond),
+            precision => {
+                return Err(DataFusionError::Internal(format!(
+                    "Unsupported TIME precision {precision}"
+                )));
+            }
+        },
+        PaimonDataType::Timestamp(t) => {
+            DataType::Timestamp(timestamp_time_unit(t.precision())?, None)
+        }
+        PaimonDataType::LocalZonedTimestamp(t) => {
+            DataType::Timestamp(timestamp_time_unit(t.precision())?, 
Some("UTC".into()))
+        }
+        PaimonDataType::Decimal(d) => {
+            let p = u8::try_from(d.precision()).map_err(|_| {
+                DataFusionError::Internal("Decimal precision exceeds 
u8".to_string())
+            })?;
+            let s = i8::try_from(d.scale() as i32).map_err(|_| {
+                DataFusionError::Internal("Decimal scale out of i8 
range".to_string())
+            })?;
+            match d.precision() {
+                // The Parquet Arrow reader normalizes DECIMAL columns to 
Decimal128 regardless of
+                // Parquet physical storage width. Mirror that here to avoid 
DataFusion schema
+                // mismatch between `TableProvider::schema()` and `execute()` 
output.
+                1..=38 => DataType::Decimal128(p, s),
+                precision => {
+                    return Err(DataFusionError::Internal(format!(
+                        "Unsupported DECIMAL precision {precision}"
+                    )));
+                }
+            }
+        }
+        PaimonDataType::Array(_)
+        | PaimonDataType::Map(_)
+        | PaimonDataType::Multiset(_)
+        | PaimonDataType::Row(_) => {
+            return Err(DataFusionError::NotImplemented(
+                "Paimon DataFusion integration does not yet support nested 
types (Array/Map/Row)"
+                    .to_string(),
+            ));
+        }
+    })
+}
+
+fn timestamp_time_unit(precision: u32) -> 
DFResult<datafusion::arrow::datatypes::TimeUnit> {
+    use datafusion::arrow::datatypes::TimeUnit;
+
+    match precision {
+        0..=3 => Ok(TimeUnit::Millisecond),
+        4..=6 => Ok(TimeUnit::Microsecond),
+        7..=9 => Ok(TimeUnit::Nanosecond),
+        _ => Err(DataFusionError::Internal(format!(
+            "Unsupported TIMESTAMP precision {precision}"
+        ))),
+    }
+}
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
new file mode 100644
index 0000000..04ccab6
--- /dev/null
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Paimon table provider for DataFusion (read-only).
+
+use std::any::Any;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::catalog::Session;
+use datafusion::common::DataFusionError;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::table::Table;
+
+use crate::physical_plan::PaimonTableScan;
+use crate::schema::paimon_schema_to_arrow;
+
+/// Read-only table provider for a Paimon table.
+///
+/// Supports full table scan only (no write, no subset/reordered projection, 
no predicate
+/// pushdown).
+#[derive(Debug, Clone)]
+pub struct PaimonTableProvider {
+    table: Table,
+    schema: ArrowSchemaRef,
+}
+
+impl PaimonTableProvider {
+    /// Create a table provider from a Paimon table.
+    ///
+    /// Loads the table schema and converts it to Arrow for DataFusion.
+    pub fn try_new(table: Table) -> DFResult<Self> {
+        let fields = table.schema().fields();
+        let schema = paimon_schema_to_arrow(fields)?;
+        Ok(Self { table, schema })
+    }
+
+    pub fn table(&self) -> &Table {
+        &self.table
+    }
+}
+
+#[async_trait]
+impl TableProvider for PaimonTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> ArrowSchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        if let Some(projection) = projection {
+            let is_full_schema_projection = projection.len() == 
self.schema.fields().len()
+                && 
projection.iter().copied().eq(0..self.schema.fields().len());
+
+            if !is_full_schema_projection {
+                return Err(DataFusionError::NotImplemented(
+                    "Paimon DataFusion integration does not yet support subset 
or reordered projections; use SELECT * until apache/paimon-rust#146 is 
implemented".to_string(),
+                ));
+            }
+        }
+
+        Ok(Arc::new(PaimonTableScan::new(
+            self.schema.clone(),
+            self.table.clone(),
+        )))
+    }
+}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
new file mode 100644
index 0000000..f813d78
--- /dev/null
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{Int32Array, StringArray};
+use datafusion::prelude::SessionContext;
+use paimon::catalog::Identifier;
+use paimon::{Catalog, FileSystemCatalog};
+use paimon_datafusion::PaimonTableProvider;
+
+fn get_test_warehouse() -> String {
+    std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| 
"/tmp/paimon-warehouse".to_string())
+}
+
+async fn create_context(table_name: &str) -> SessionContext {
+    let warehouse = get_test_warehouse();
+    let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create 
catalog");
+    let identifier = Identifier::new("default", table_name);
+    let table = catalog
+        .get_table(&identifier)
+        .await
+        .expect("Failed to get table");
+
+    let provider = PaimonTableProvider::try_new(table).expect("Failed to 
create table provider");
+    let ctx = SessionContext::new();
+    ctx.register_table(table_name, Arc::new(provider))
+        .expect("Failed to register table");
+
+    ctx
+}
+
+async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
+    let batches = collect_query(table_name, &format!("SELECT id, name FROM 
{table_name}"))
+        .await
+        .expect("Failed to collect query result");
+
+    assert!(
+        !batches.is_empty(),
+        "Expected at least one batch from table {table_name}"
+    );
+
+    let mut actual_rows = Vec::new();
+    for batch in &batches {
+        let id_array = batch
+            .column_by_name("id")
+            .and_then(|column| column.as_any().downcast_ref::<Int32Array>())
+            .expect("Expected Int32Array for id column");
+        let name_array = batch
+            .column_by_name("name")
+            .and_then(|column| column.as_any().downcast_ref::<StringArray>())
+            .expect("Expected StringArray for name column");
+
+        for row_index in 0..batch.num_rows() {
+            actual_rows.push((
+                id_array.value(row_index),
+                name_array.value(row_index).to_string(),
+            ));
+        }
+    }
+
+    actual_rows.sort_by_key(|(id, _)| *id);
+    actual_rows
+}
+
+async fn collect_query(
+    table_name: &str,
+    sql: &str,
+) -> 
datafusion::error::Result<Vec<datafusion::arrow::record_batch::RecordBatch>> {
+    let ctx = create_context(table_name).await;
+
+    ctx.sql(sql).await?.collect().await
+}
+
+#[tokio::test]
+async fn test_read_log_table_via_datafusion() {
+    let actual_rows = read_rows("simple_log_table").await;
+    let expected_rows = vec![
+        (1, "alice".to_string()),
+        (2, "bob".to_string()),
+        (3, "carol".to_string()),
+    ];
+
+    assert_eq!(
+        actual_rows, expected_rows,
+        "Rows should match expected values"
+    );
+}
+
+#[tokio::test]
+async fn test_read_primary_key_table_via_datafusion() {
+    let actual_rows = read_rows("simple_dv_pk_table").await;
+    let expected_rows = vec![
+        (1, "alice-v2".to_string()),
+        (2, "bob-v2".to_string()),
+        (3, "carol-v2".to_string()),
+        (4, "dave-v2".to_string()),
+        (5, "eve-v2".to_string()),
+        (6, "frank-v1".to_string()),
+    ];
+
+    assert_eq!(
+        actual_rows, expected_rows,
+        "Primary key table rows should match expected values"
+    );
+}
+
+#[tokio::test]
+async fn test_subset_projection_returns_not_implemented() {
+    let error = collect_query("simple_log_table", "SELECT id FROM 
simple_log_table")
+        .await
+        .expect_err("Subset projection should be rejected until projection 
support lands");
+
+    assert!(
+        error
+            .to_string()
+            .contains("does not yet support subset or reordered projections"),
+        "Expected explicit unsupported projection error, got: {error}"
+    );
+}
diff --git a/crates/paimon/examples/rest_list_databases_example.rs 
b/crates/paimon/examples/rest_list_databases_example.rs
index 5140c17..2a5e435 100644
--- a/crates/paimon/examples/rest_list_databases_example.rs
+++ b/crates/paimon/examples/rest_list_databases_example.rs
@@ -48,7 +48,7 @@ async fn main() {
     let api = match RESTApi::new(options, true).await {
         Ok(api) => api,
         Err(e) => {
-            eprintln!("Failed to create RESTApi: {}", e);
+            eprintln!("Failed to create RESTApi: {e}");
             return;
         }
     };
@@ -57,11 +57,11 @@ async fn main() {
     println!("Calling list_databases()...");
     match api.list_databases().await {
         Ok(databases) => {
-            println!("Databases found: {:?}", databases);
+            println!("Databases found: {databases:?}");
             println!("Total count: {}", databases.len());
         }
         Err(e) => {
-            eprintln!("Failed to list databases: {}", e);
+            eprintln!("Failed to list databases: {e}");
         }
     }
 }
diff --git a/crates/paimon/src/api/auth/factory.rs 
b/crates/paimon/src/api/auth/factory.rs
index d073640..58234b2 100644
--- a/crates/paimon/src/api/auth/factory.rs
+++ b/crates/paimon/src/api/auth/factory.rs
@@ -53,7 +53,7 @@ impl AuthProviderFactory {
                 message: "auth provider is required".to_string(),
             }),
             Some(unknown) => Err(Error::ConfigInvalid {
-                message: format!("Unknown auth provider: {}", unknown),
+                message: format!("Unknown auth provider: {unknown}"),
             }),
         }
     }
diff --git a/crates/paimon/src/api/rest_client.rs 
b/crates/paimon/src/api/rest_client.rs
index 13f59be..b348918 100644
--- a/crates/paimon/src/api/rest_client.rs
+++ b/crates/paimon/src/api/rest_client.rs
@@ -48,7 +48,7 @@ impl HttpClient {
             .timeout(Duration::from_secs(30))
             .build()
             .map_err(|e| Error::ConfigInvalid {
-                message: format!("Failed to create HTTP client: {}", e),
+                message: format!("Failed to create HTTP client: {e}"),
             })?;
 
         Ok(HttpClient {
@@ -78,7 +78,7 @@ impl HttpClient {
         let normalized_url = if uri.starts_with("http://";) || 
uri.starts_with("https://";) {
             uri.to_string()
         } else {
-            format!("http://{}";, uri)
+            format!("http://{uri}";)
         };
 
         // Remove trailing slash
diff --git a/crates/paimon/src/spec/partition_utils.rs 
b/crates/paimon/src/spec/partition_utils.rs
index 409e79d..418ecfc 100644
--- a/crates/paimon/src/spec/partition_utils.rs
+++ b/crates/paimon/src/spec/partition_utils.rs
@@ -189,7 +189,7 @@ fn resolve_partition_fields<'a>(
                 .iter()
                 .find(|f| f.name() == key)
                 .ok_or_else(|| Error::UnexpectedError {
-                    message: format!("Partition key '{}' not found in schema 
fields", key),
+                    message: format!("Partition key '{key}' not found in 
schema fields"),
                     source: None,
                 })
         })
@@ -298,7 +298,7 @@ fn format_partition_value(
         | DataType::Multiset(_)
         | DataType::Row(_) => {
             return Err(Error::Unsupported {
-                message: format!("{:?} type is not supported as partition 
key", data_type),
+                message: format!("{data_type:?} type is not supported as 
partition key"),
             });
         }
     };
@@ -330,7 +330,7 @@ fn format_time(millis_of_day: i32, precision: u32) -> 
String {
     let s = (ms % 60_000) / 1_000;
     let mut frac_ms = ms % 1_000;
 
-    let hms = format!("{:02}:{:02}:{:02}", h, m, s);
+    let hms = format!("{h:02}:{m:02}:{s:02}");
     if precision == 0 || frac_ms == 0 {
         return hms;
     }
@@ -348,7 +348,7 @@ fn format_time(millis_of_day: i32, precision: u32) -> 
String {
         remaining -= 1;
     }
 
-    format!("{}.{}", hms, frac)
+    format!("{hms}.{frac}")
 }
 
 /// Format an unscaled i128 value with the given scale to a plain decimal 
string.
@@ -448,9 +448,9 @@ fn format_timestamp_legacy(dt: NaiveDateTime) -> String {
         return date_hour_min;
     }
 
-    let mut result = format!("{}:{:02}", date_hour_min, sec);
+    let mut result = format!("{date_hour_min}:{sec:02}");
     if nano > 0 {
-        let frac = format!("{:09}", nano);
+        let frac = format!("{nano:09}");
         let trimmed = frac.trim_end_matches('0');
         result.push('.');
         result.push_str(trimmed);
@@ -471,7 +471,7 @@ fn format_timestamp_non_legacy(dt: NaiveDateTime, 
precision: u32) -> String {
     }
 
     // Pad nano to 9 digits, then strip trailing zeros but keep at least up to 
`precision` digits.
-    let nano_str = format!("{:09}", nano);
+    let nano_str = format!("{nano:09}");
     let mut fraction = &nano_str[..];
 
     // Strip trailing zeros, but don't go below the precision boundary.
@@ -482,7 +482,7 @@ fn format_timestamp_non_legacy(dt: NaiveDateTime, 
precision: u32) -> String {
     if fraction.is_empty() {
         ymdhms
     } else {
-        format!("{}.{}", ymdhms, fraction)
+        format!("{ymdhms}.{fraction}")
     }
 }
 
diff --git a/crates/paimon/tests/mock_server.rs 
b/crates/paimon/tests/mock_server.rs
index bedb46f..32ced4d 100644
--- a/crates/paimon/tests/mock_server.rs
+++ b/crates/paimon/tests/mock_server.rs
@@ -90,7 +90,7 @@ impl RESTServer {
                 let err = ErrorResponse::new(
                     None,
                     None,
-                    Some(format!("Warehouse {} not found", warehouse)),
+                    Some(format!("Warehouse {warehouse} not found")),
                     Some(404),
                 );
                 return (StatusCode::NOT_FOUND, Json(err)).into_response();
@@ -130,7 +130,7 @@ impl RESTServer {
 
     /// Get the server URL.
     pub fn url(&self) -> Option<String> {
-        self.addr.map(|a| format!("http://{}";, a))
+        self.addr.map(|a| format!("http://{a}";))
     }
 
     /// Get the server address.
@@ -174,7 +174,7 @@ pub async fn start_mock_server(
         .route("/v1/config", get(RESTServer::get_config))
         // Database routes
         .route(
-            &format!("{}/databases", prefix),
+            &format!("{prefix}/databases"),
             get(RESTServer::list_databases),
         )
         .layer(Extension(state));
@@ -186,7 +186,7 @@ pub async fn start_mock_server(
 
     let server_handle = tokio::spawn(async move {
         if let Err(e) = axum::serve(listener, app.into_make_service()).await {
-            eprintln!("mock server error: {}", e);
+            eprintln!("mock server error: {e}");
         }
     });
 
diff --git a/deny.toml b/deny.toml
index c11cec6..02dd81b 100644
--- a/deny.toml
+++ b/deny.toml
@@ -26,6 +26,7 @@ allow = [
     "MIT",
     "Unicode-3.0",
     "Zlib",
+    "bzip2-1.0.6",
 ]
 
 exceptions = [

Reply via email to