This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7d0a80a feat: support datafusion integration (#150)
7d0a80a is described below
commit 7d0a80a8b453166c9bfde69ad2c8b055db352b87
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Mar 24 20:11:53 2026 +0800
feat: support datafusion integration (#150)
---
.github/workflows/ci.yml | 6 +
Cargo.toml | 2 +-
.../integrations/datafusion/Cargo.toml | 29 ++---
crates/integrations/datafusion/src/error.rs | 23 ++++
crates/integrations/datafusion/src/lib.rs | 46 +++++++
.../datafusion/src/physical_plan/mod.rs | 20 +++
.../datafusion/src/physical_plan/scan.rs | 125 +++++++++++++++++++
crates/integrations/datafusion/src/schema.rs | 116 ++++++++++++++++++
crates/integrations/datafusion/src/table/mod.rs | 98 +++++++++++++++
.../integrations/datafusion/tests/read_tables.rs | 134 +++++++++++++++++++++
.../paimon/examples/rest_list_databases_example.rs | 6 +-
crates/paimon/src/api/auth/factory.rs | 2 +-
crates/paimon/src/api/rest_client.rs | 4 +-
crates/paimon/src/spec/partition_utils.rs | 16 +--
crates/paimon/tests/mock_server.rs | 8 +-
deny.toml | 1 +
16 files changed, 603 insertions(+), 33 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 73f47c9..df0aebd 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -96,6 +96,12 @@ jobs:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
+ - name: DataFusion Integration Test
+ run: cargo test -p paimon-datafusion --test read_tables
+ env:
+ RUST_LOG: DEBUG
+ RUST_BACKTRACE: full
+
- name: Go Integration Test
working-directory: bindings/go
run: make test
diff --git a/Cargo.toml b/Cargo.toml
index 7384d66..6282c09 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,7 @@
[workspace]
resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c"]
+members = ["crates/paimon", "crates/integration_tests", "bindings/c",
"crates/integrations/datafusion"]
[workspace.package]
version = "0.0.0"
diff --git a/Cargo.toml b/crates/integrations/datafusion/Cargo.toml
similarity index 62%
copy from Cargo.toml
copy to crates/integrations/datafusion/Cargo.toml
index 7384d66..c9673ec 100644
--- a/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -15,19 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-resolver = "2"
-members = ["crates/paimon", "crates/integration_tests", "bindings/c"]
+[package]
+name = "paimon-datafusion"
+edition.workspace = true
+version.workspace = true
+license.workspace = true
+description = "Apache Paimon DataFusion Integration (read-only)"
+categories = ["database"]
+keywords = ["paimon", "datafusion", "integrations"]
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/"
-repository = "https://github.com/apache/paimon-rust"
-license = "Apache-2.0"
-rust-version = "1.86.0"
+[dependencies]
+async-trait = "0.1"
+datafusion = { version = "52.3.0"}
+paimon = { path = "../../paimon" }
+futures = "0.3"
-[workspace.dependencies]
-arrow-array = { version = "57.0", features = ["ffi"] }
-parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+[dev-dependencies]
+tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/error.rs
b/crates/integrations/datafusion/src/error.rs
new file mode 100644
index 0000000..92b2728
--- /dev/null
+++ b/crates/integrations/datafusion/src/error.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::common::error::GenericError;
+
+/// Converts a Paimon error into a DataFusion error.
+pub fn to_datafusion_error(error: paimon::Error) ->
datafusion::error::DataFusionError {
+ datafusion::error::DataFusionError::External(GenericError::from(error))
+}
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
new file mode 100644
index 0000000..edfe1ed
--- /dev/null
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Apache Paimon DataFusion Integration (read-only).
+//!
+//! Register a Paimon table as a DataFusion table provider to query it with
SQL or DataFrame API.
+//!
+//! # Example
+//!
+//! ```ignore
+//! use std::sync::Arc;
+//! use datafusion::prelude::SessionContext;
+//! use paimon_datafusion::PaimonTableProvider;
+//!
+//! // Obtain a Paimon Table (e.g. from your catalog), then:
+//! let provider = PaimonTableProvider::try_new(table)?;
+//! let ctx = SessionContext::new();
+//! ctx.register_table("my_table", Arc::new(provider))?;
+//! let df = ctx.sql("SELECT * FROM my_table").await?;
+//! ```
+//!
+//! This version does not support write, column projection, or predicate
pushdown.
+
+mod error;
+mod physical_plan;
+mod schema;
+mod table;
+
+pub use error::to_datafusion_error;
+pub use physical_plan::PaimonTableScan;
+pub use schema::paimon_schema_to_arrow;
+pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs
b/crates/integrations/datafusion/src/physical_plan/mod.rs
new file mode 100644
index 0000000..48aa546
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub(crate) mod scan;
+
+pub use scan::PaimonTableScan;
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
new file mode 100644
index 0000000..e567d15
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::error::Result as DFResult;
+use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::physical_expr::EquivalenceProperties;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning,
PlanProperties};
+use futures::{StreamExt, TryStreamExt};
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+
+/// Execution plan that scans a Paimon table (read-only, no projection, no
predicate, no limit).
+#[derive(Debug)]
+pub struct PaimonTableScan {
+ table: Table,
+ plan_properties: PlanProperties,
+}
+
+impl PaimonTableScan {
+ pub(crate) fn new(schema: ArrowSchemaRef, table: Table) -> Self {
+ let plan_properties = PlanProperties::new(
+ EquivalenceProperties::new(schema.clone()),
+ // TODO: Currently all Paimon splits are read in a single
DataFusion partition,
+ // which means we lose DataFusion parallelism. A follow-up should
expose one
+ // execution partition per Paimon split so that DataFusion can
schedule them
+ // across threads.
+ Partitioning::UnknownPartitioning(1),
+ EmissionType::Incremental,
+ Boundedness::Bounded,
+ );
+ Self {
+ table,
+ plan_properties,
+ }
+ }
+
+ pub fn table(&self) -> &Table {
+ &self.table
+ }
+}
+
+impl ExecutionPlan for PaimonTableScan {
+ fn name(&self) -> &str {
+ "PaimonTableScan"
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ &self.plan_properties
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan + 'static>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ Ok(self)
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> DFResult<SendableRecordBatchStream> {
+ let table = self.table.clone();
+ let schema = self.schema();
+
+ let fut = async move {
+ let read_builder = table.new_read_builder();
+ let scan = read_builder.new_scan();
+ let plan = scan.plan().await.map_err(to_datafusion_error)?;
+ let read = read_builder.new_read().map_err(to_datafusion_error)?;
+ let stream =
read.to_arrow(plan.splits()).map_err(to_datafusion_error)?;
+ let stream = stream.map(|r| r.map_err(to_datafusion_error));
+
+ Ok::<_,
datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new(
+ schema,
+ Box::pin(stream),
+ ))
+ };
+
+ let stream = futures::stream::once(fut).try_flatten();
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ self.schema(),
+ stream,
+ )))
+ }
+}
+
+impl DisplayAs for PaimonTableScan {
+ fn fmt_as(
+ &self,
+ _t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "PaimonTableScan")
+ }
+}
diff --git a/crates/integrations/datafusion/src/schema.rs
b/crates/integrations/datafusion/src/schema.rs
new file mode 100644
index 0000000..231431b
--- /dev/null
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::common::DataFusionError;
+use datafusion::common::Result as DFResult;
+use std::sync::Arc;
+
+use paimon::spec::{DataField, DataType as PaimonDataType};
+
+/// Converts Paimon table schema (logical row type fields) to DataFusion Arrow
schema.
+pub fn paimon_schema_to_arrow(fields: &[DataField]) -> DFResult<Arc<Schema>> {
+ let arrow_fields: Vec<Field> = fields
+ .iter()
+ .map(|f| {
+ let arrow_type = paimon_data_type_to_arrow(f.data_type())?;
+ Ok(Field::new(
+ f.name(),
+ arrow_type,
+ f.data_type().is_nullable(),
+ ))
+ })
+ .collect::<DFResult<Vec<_>>>()?;
+ Ok(Arc::new(Schema::new(arrow_fields)))
+}
+
+fn paimon_data_type_to_arrow(dt: &PaimonDataType) -> DFResult<DataType> {
+ use datafusion::arrow::datatypes::TimeUnit;
+
+ Ok(match dt {
+ PaimonDataType::Boolean(_) => DataType::Boolean,
+ PaimonDataType::TinyInt(_) => DataType::Int8,
+ PaimonDataType::SmallInt(_) => DataType::Int16,
+ PaimonDataType::Int(_) => DataType::Int32,
+ PaimonDataType::BigInt(_) => DataType::Int64,
+ PaimonDataType::Float(_) => DataType::Float32,
+ PaimonDataType::Double(_) => DataType::Float64,
+ PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => DataType::Utf8,
+ PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) =>
DataType::Binary,
+ PaimonDataType::Date(_) => DataType::Date32,
+ PaimonDataType::Time(t) => match t.precision() {
+ // `read.to_arrow(...)` goes through the Parquet Arrow reader,
which exposes INT32
+ // TIME values as millisecond precision only. Mirror that here so
provider schema and
+ // runtime RecordBatch schema stay identical.
+ 0..=3 => DataType::Time32(TimeUnit::Millisecond),
+ 4..=6 => DataType::Time64(TimeUnit::Microsecond),
+ 7..=9 => DataType::Time64(TimeUnit::Nanosecond),
+ precision => {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported TIME precision {precision}"
+ )));
+ }
+ },
+ PaimonDataType::Timestamp(t) => {
+ DataType::Timestamp(timestamp_time_unit(t.precision())?, None)
+ }
+ PaimonDataType::LocalZonedTimestamp(t) => {
+ DataType::Timestamp(timestamp_time_unit(t.precision())?,
Some("UTC".into()))
+ }
+ PaimonDataType::Decimal(d) => {
+ let p = u8::try_from(d.precision()).map_err(|_| {
+ DataFusionError::Internal("Decimal precision exceeds
u8".to_string())
+ })?;
+ let s = i8::try_from(d.scale() as i32).map_err(|_| {
+ DataFusionError::Internal("Decimal scale out of i8
range".to_string())
+ })?;
+ match d.precision() {
+ // The Parquet Arrow reader normalizes DECIMAL columns to
Decimal128 regardless of
+ // Parquet physical storage width. Mirror that here to avoid
DataFusion schema
+ // mismatch between `TableProvider::schema()` and `execute()`
output.
+ 1..=38 => DataType::Decimal128(p, s),
+ precision => {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported DECIMAL precision {precision}"
+ )));
+ }
+ }
+ }
+ PaimonDataType::Array(_)
+ | PaimonDataType::Map(_)
+ | PaimonDataType::Multiset(_)
+ | PaimonDataType::Row(_) => {
+ return Err(DataFusionError::NotImplemented(
+ "Paimon DataFusion integration does not yet support nested
types (Array/Map/Row)"
+ .to_string(),
+ ));
+ }
+ })
+}
+
+fn timestamp_time_unit(precision: u32) ->
DFResult<datafusion::arrow::datatypes::TimeUnit> {
+ use datafusion::arrow::datatypes::TimeUnit;
+
+ match precision {
+ 0..=3 => Ok(TimeUnit::Millisecond),
+ 4..=6 => Ok(TimeUnit::Microsecond),
+ 7..=9 => Ok(TimeUnit::Nanosecond),
+ _ => Err(DataFusionError::Internal(format!(
+ "Unsupported TIMESTAMP precision {precision}"
+ ))),
+ }
+}
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
new file mode 100644
index 0000000..04ccab6
--- /dev/null
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Paimon table provider for DataFusion (read-only).
+
+use std::any::Any;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::catalog::Session;
+use datafusion::common::DataFusionError;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::table::Table;
+
+use crate::physical_plan::PaimonTableScan;
+use crate::schema::paimon_schema_to_arrow;
+
+/// Read-only table provider for a Paimon table.
+///
+/// Supports full table scan only (no write, no subset/reordered projection,
no predicate
+/// pushdown).
+#[derive(Debug, Clone)]
+pub struct PaimonTableProvider {
+ table: Table,
+ schema: ArrowSchemaRef,
+}
+
+impl PaimonTableProvider {
+ /// Create a table provider from a Paimon table.
+ ///
+ /// Loads the table schema and converts it to Arrow for DataFusion.
+ pub fn try_new(table: Table) -> DFResult<Self> {
+ let fields = table.schema().fields();
+ let schema = paimon_schema_to_arrow(fields)?;
+ Ok(Self { table, schema })
+ }
+
+ pub fn table(&self) -> &Table {
+ &self.table
+ }
+}
+
+#[async_trait]
+impl TableProvider for PaimonTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> ArrowSchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ if let Some(projection) = projection {
+ let is_full_schema_projection = projection.len() ==
self.schema.fields().len()
+ &&
projection.iter().copied().eq(0..self.schema.fields().len());
+
+ if !is_full_schema_projection {
+ return Err(DataFusionError::NotImplemented(
+ "Paimon DataFusion integration does not yet support subset
or reordered projections; use SELECT * until apache/paimon-rust#146 is
implemented".to_string(),
+ ));
+ }
+ }
+
+ Ok(Arc::new(PaimonTableScan::new(
+ self.schema.clone(),
+ self.table.clone(),
+ )))
+ }
+}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
new file mode 100644
index 0000000..f813d78
--- /dev/null
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::{Int32Array, StringArray};
+use datafusion::prelude::SessionContext;
+use paimon::catalog::Identifier;
+use paimon::{Catalog, FileSystemCatalog};
+use paimon_datafusion::PaimonTableProvider;
+
+fn get_test_warehouse() -> String {
+ std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_|
"/tmp/paimon-warehouse".to_string())
+}
+
+async fn create_context(table_name: &str) -> SessionContext {
+ let warehouse = get_test_warehouse();
+ let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create
catalog");
+ let identifier = Identifier::new("default", table_name);
+ let table = catalog
+ .get_table(&identifier)
+ .await
+ .expect("Failed to get table");
+
+ let provider = PaimonTableProvider::try_new(table).expect("Failed to
create table provider");
+ let ctx = SessionContext::new();
+ ctx.register_table(table_name, Arc::new(provider))
+ .expect("Failed to register table");
+
+ ctx
+}
+
+async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
+ let batches = collect_query(table_name, &format!("SELECT id, name FROM
{table_name}"))
+ .await
+ .expect("Failed to collect query result");
+
+ assert!(
+ !batches.is_empty(),
+ "Expected at least one batch from table {table_name}"
+ );
+
+ let mut actual_rows = Vec::new();
+ for batch in &batches {
+ let id_array = batch
+ .column_by_name("id")
+ .and_then(|column| column.as_any().downcast_ref::<Int32Array>())
+ .expect("Expected Int32Array for id column");
+ let name_array = batch
+ .column_by_name("name")
+ .and_then(|column| column.as_any().downcast_ref::<StringArray>())
+ .expect("Expected StringArray for name column");
+
+ for row_index in 0..batch.num_rows() {
+ actual_rows.push((
+ id_array.value(row_index),
+ name_array.value(row_index).to_string(),
+ ));
+ }
+ }
+
+ actual_rows.sort_by_key(|(id, _)| *id);
+ actual_rows
+}
+
+async fn collect_query(
+ table_name: &str,
+ sql: &str,
+) ->
datafusion::error::Result<Vec<datafusion::arrow::record_batch::RecordBatch>> {
+ let ctx = create_context(table_name).await;
+
+ ctx.sql(sql).await?.collect().await
+}
+
+#[tokio::test]
+async fn test_read_log_table_via_datafusion() {
+ let actual_rows = read_rows("simple_log_table").await;
+ let expected_rows = vec![
+ (1, "alice".to_string()),
+ (2, "bob".to_string()),
+ (3, "carol".to_string()),
+ ];
+
+ assert_eq!(
+ actual_rows, expected_rows,
+ "Rows should match expected values"
+ );
+}
+
+#[tokio::test]
+async fn test_read_primary_key_table_via_datafusion() {
+ let actual_rows = read_rows("simple_dv_pk_table").await;
+ let expected_rows = vec![
+ (1, "alice-v2".to_string()),
+ (2, "bob-v2".to_string()),
+ (3, "carol-v2".to_string()),
+ (4, "dave-v2".to_string()),
+ (5, "eve-v2".to_string()),
+ (6, "frank-v1".to_string()),
+ ];
+
+ assert_eq!(
+ actual_rows, expected_rows,
+ "Primary key table rows should match expected values"
+ );
+}
+
+#[tokio::test]
+async fn test_subset_projection_returns_not_implemented() {
+ let error = collect_query("simple_log_table", "SELECT id FROM
simple_log_table")
+ .await
+ .expect_err("Subset projection should be rejected until projection
support lands");
+
+ assert!(
+ error
+ .to_string()
+ .contains("does not yet support subset or reordered projections"),
+ "Expected explicit unsupported projection error, got: {error}"
+ );
+}
diff --git a/crates/paimon/examples/rest_list_databases_example.rs
b/crates/paimon/examples/rest_list_databases_example.rs
index 5140c17..2a5e435 100644
--- a/crates/paimon/examples/rest_list_databases_example.rs
+++ b/crates/paimon/examples/rest_list_databases_example.rs
@@ -48,7 +48,7 @@ async fn main() {
let api = match RESTApi::new(options, true).await {
Ok(api) => api,
Err(e) => {
- eprintln!("Failed to create RESTApi: {}", e);
+ eprintln!("Failed to create RESTApi: {e}");
return;
}
};
@@ -57,11 +57,11 @@ async fn main() {
println!("Calling list_databases()...");
match api.list_databases().await {
Ok(databases) => {
- println!("Databases found: {:?}", databases);
+ println!("Databases found: {databases:?}");
println!("Total count: {}", databases.len());
}
Err(e) => {
- eprintln!("Failed to list databases: {}", e);
+ eprintln!("Failed to list databases: {e}");
}
}
}
diff --git a/crates/paimon/src/api/auth/factory.rs
b/crates/paimon/src/api/auth/factory.rs
index d073640..58234b2 100644
--- a/crates/paimon/src/api/auth/factory.rs
+++ b/crates/paimon/src/api/auth/factory.rs
@@ -53,7 +53,7 @@ impl AuthProviderFactory {
message: "auth provider is required".to_string(),
}),
Some(unknown) => Err(Error::ConfigInvalid {
- message: format!("Unknown auth provider: {}", unknown),
+ message: format!("Unknown auth provider: {unknown}"),
}),
}
}
diff --git a/crates/paimon/src/api/rest_client.rs
b/crates/paimon/src/api/rest_client.rs
index 13f59be..b348918 100644
--- a/crates/paimon/src/api/rest_client.rs
+++ b/crates/paimon/src/api/rest_client.rs
@@ -48,7 +48,7 @@ impl HttpClient {
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| Error::ConfigInvalid {
- message: format!("Failed to create HTTP client: {}", e),
+ message: format!("Failed to create HTTP client: {e}"),
})?;
Ok(HttpClient {
@@ -78,7 +78,7 @@ impl HttpClient {
let normalized_url = if uri.starts_with("http://") ||
uri.starts_with("https://") {
uri.to_string()
} else {
- format!("http://{}", uri)
+ format!("http://{uri}")
};
// Remove trailing slash
diff --git a/crates/paimon/src/spec/partition_utils.rs
b/crates/paimon/src/spec/partition_utils.rs
index 409e79d..418ecfc 100644
--- a/crates/paimon/src/spec/partition_utils.rs
+++ b/crates/paimon/src/spec/partition_utils.rs
@@ -189,7 +189,7 @@ fn resolve_partition_fields<'a>(
.iter()
.find(|f| f.name() == key)
.ok_or_else(|| Error::UnexpectedError {
- message: format!("Partition key '{}' not found in schema
fields", key),
+ message: format!("Partition key '{key}' not found in
schema fields"),
source: None,
})
})
@@ -298,7 +298,7 @@ fn format_partition_value(
| DataType::Multiset(_)
| DataType::Row(_) => {
return Err(Error::Unsupported {
- message: format!("{:?} type is not supported as partition
key", data_type),
+ message: format!("{data_type:?} type is not supported as
partition key"),
});
}
};
@@ -330,7 +330,7 @@ fn format_time(millis_of_day: i32, precision: u32) ->
String {
let s = (ms % 60_000) / 1_000;
let mut frac_ms = ms % 1_000;
- let hms = format!("{:02}:{:02}:{:02}", h, m, s);
+ let hms = format!("{h:02}:{m:02}:{s:02}");
if precision == 0 || frac_ms == 0 {
return hms;
}
@@ -348,7 +348,7 @@ fn format_time(millis_of_day: i32, precision: u32) ->
String {
remaining -= 1;
}
- format!("{}.{}", hms, frac)
+ format!("{hms}.{frac}")
}
/// Format an unscaled i128 value with the given scale to a plain decimal
string.
@@ -448,9 +448,9 @@ fn format_timestamp_legacy(dt: NaiveDateTime) -> String {
return date_hour_min;
}
- let mut result = format!("{}:{:02}", date_hour_min, sec);
+ let mut result = format!("{date_hour_min}:{sec:02}");
if nano > 0 {
- let frac = format!("{:09}", nano);
+ let frac = format!("{nano:09}");
let trimmed = frac.trim_end_matches('0');
result.push('.');
result.push_str(trimmed);
@@ -471,7 +471,7 @@ fn format_timestamp_non_legacy(dt: NaiveDateTime,
precision: u32) -> String {
}
// Pad nano to 9 digits, then strip trailing zeros but keep at least up to
`precision` digits.
- let nano_str = format!("{:09}", nano);
+ let nano_str = format!("{nano:09}");
let mut fraction = &nano_str[..];
// Strip trailing zeros, but don't go below the precision boundary.
@@ -482,7 +482,7 @@ fn format_timestamp_non_legacy(dt: NaiveDateTime,
precision: u32) -> String {
if fraction.is_empty() {
ymdhms
} else {
- format!("{}.{}", ymdhms, fraction)
+ format!("{ymdhms}.{fraction}")
}
}
diff --git a/crates/paimon/tests/mock_server.rs
b/crates/paimon/tests/mock_server.rs
index bedb46f..32ced4d 100644
--- a/crates/paimon/tests/mock_server.rs
+++ b/crates/paimon/tests/mock_server.rs
@@ -90,7 +90,7 @@ impl RESTServer {
let err = ErrorResponse::new(
None,
None,
- Some(format!("Warehouse {} not found", warehouse)),
+ Some(format!("Warehouse {warehouse} not found")),
Some(404),
);
return (StatusCode::NOT_FOUND, Json(err)).into_response();
@@ -130,7 +130,7 @@ impl RESTServer {
/// Get the server URL.
pub fn url(&self) -> Option<String> {
- self.addr.map(|a| format!("http://{}", a))
+ self.addr.map(|a| format!("http://{a}"))
}
/// Get the server address.
@@ -174,7 +174,7 @@ pub async fn start_mock_server(
.route("/v1/config", get(RESTServer::get_config))
// Database routes
.route(
- &format!("{}/databases", prefix),
+ &format!("{prefix}/databases"),
get(RESTServer::list_databases),
)
.layer(Extension(state));
@@ -186,7 +186,7 @@ pub async fn start_mock_server(
let server_handle = tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app.into_make_service()).await {
- eprintln!("mock server error: {}", e);
+ eprintln!("mock server error: {e}");
}
});
diff --git a/deny.toml b/deny.toml
index c11cec6..02dd81b 100644
--- a/deny.toml
+++ b/deny.toml
@@ -26,6 +26,7 @@ allow = [
"MIT",
"Unicode-3.0",
"Zlib",
+ "bzip2-1.0.6",
]
exceptions = [