This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b018c04  feat: introduce projection in go binding and datafusion. 
(#162)
b018c04 is described below

commit b018c046841382b4d6217d86f8d5c0bb0c8234fa
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Mar 30 19:49:20 2026 +0800

    feat: introduce projection in go binding and datafusion. (#162)
---
 bindings/c/src/table.rs                            | 133 ++++++++++++++++-----
 bindings/c/src/types.rs                            |  15 +++
 bindings/go/read_builder.go                        |  51 ++++++++
 bindings/go/tests/paimon_test.go                   | 111 +++++++++++++++++
 .../datafusion/src/physical_plan/scan.rs           |  21 +++-
 crates/integrations/datafusion/src/table/mod.rs    |  31 ++---
 .../integrations/datafusion/tests/read_tables.rs   |  38 ++++--
 crates/paimon/src/api/auth/dlf_provider.rs         |   8 +-
 crates/paimon/src/api/auth/dlf_signer.rs           |   8 +-
 crates/paimon/src/api/rest_api.rs                  |   2 +-
 crates/paimon/src/arrow/reader.rs                  |   3 +-
 crates/paimon/src/table/read_builder.rs            |  15 ++-
 crates/paimon/tests/mock_server.rs                 |  31 +++--
 crates/paimon/tests/rest_api_test.rs               |  19 ++-
 docs/src/getting-started.md                        |   2 +-
 15 files changed, 389 insertions(+), 99 deletions(-)

diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs
index 7892757..c3f57fe 100644
--- a/bindings/c/src/table.rs
+++ b/bindings/c/src/table.rs
@@ -48,6 +48,18 @@ unsafe fn free_table_wrapper<T>(ptr: *mut T, get_inner: impl 
FnOnce(&T) -> *mut
     }
 }
 
+// Helper to box a ReadBuilderState and return a raw pointer.
+unsafe fn box_read_builder_state(state: ReadBuilderState) -> *mut 
paimon_read_builder {
+    let inner = Box::into_raw(Box::new(state)) as *mut c_void;
+    Box::into_raw(Box::new(paimon_read_builder { inner }))
+}
+
+// Helper to box a TableReadState and return a raw pointer.
+unsafe fn box_table_read_state(state: TableReadState) -> *mut 
paimon_table_read {
+    let inner = Box::into_raw(Box::new(state)) as *mut c_void;
+    Box::into_raw(Box::new(paimon_table_read { inner }))
+}
+
 // ======================= Table ===============================
 
 /// Free a paimon_table.
@@ -74,8 +86,12 @@ pub unsafe extern "C" fn paimon_table_new_read_builder(
         };
     }
     let table_ref = &*((*table).inner as *const Table);
+    let state = ReadBuilderState {
+        table: table_ref.clone(),
+        projected_columns: None,
+    };
     paimon_result_read_builder {
-        read_builder: box_table_wrapper(table_ref, |inner| paimon_read_builder 
{ inner }),
+        read_builder: box_read_builder_state(state),
         error: std::ptr::null_mut(),
     }
 }
@@ -88,7 +104,57 @@ pub unsafe extern "C" fn paimon_table_new_read_builder(
 /// Only call with a read_builder returned from 
`paimon_table_new_read_builder`.
 #[no_mangle]
 pub unsafe extern "C" fn paimon_read_builder_free(rb: *mut 
paimon_read_builder) {
-    free_table_wrapper(rb, |r| r.inner);
+    if !rb.is_null() {
+        let wrapper = Box::from_raw(rb);
+        if !wrapper.inner.is_null() {
+            drop(Box::from_raw(wrapper.inner as *mut ReadBuilderState));
+        }
+    }
+}
+
+/// Set column projection for a ReadBuilder.
+///
+/// The `columns` parameter is a null-terminated array of null-terminated C 
strings.
+/// Output order follows the caller-specified order. Unknown or duplicate names
+/// cause `paimon_read_builder_new_read()` to fail; an empty list is a valid
+/// zero-column projection.
+///
+/// # Safety
+/// `rb` must be a valid pointer from `paimon_table_new_read_builder`, or null 
(returns error).
+/// `columns` must be a null-terminated array of null-terminated C strings, or 
null for no projection.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_read_builder_with_projection(
+    rb: *mut paimon_read_builder,
+    columns: *const *const std::ffi::c_char,
+) -> *mut paimon_error {
+    if let Err(e) = check_non_null(rb, "rb") {
+        return e;
+    }
+
+    let state = &mut *((*rb).inner as *mut ReadBuilderState);
+
+    if columns.is_null() {
+        state.projected_columns = None;
+        return std::ptr::null_mut();
+    }
+
+    let mut col_names = Vec::new();
+    let mut ptr = columns;
+    while !(*ptr).is_null() {
+        let c_str = std::ffi::CStr::from_ptr(*ptr);
+        match c_str.to_str() {
+            Ok(s) => col_names.push(s.to_string()),
+            Err(e) => {
+                return paimon_error::from_paimon(paimon::Error::ConfigInvalid {
+                    message: format!("Invalid UTF-8 in column name: {e}"),
+                });
+            }
+        }
+        ptr = ptr.add(1);
+    }
+
+    state.projected_columns = Some(col_names);
+    std::ptr::null_mut()
 }
 
 /// Create a new TableScan from a ReadBuilder.
@@ -105,9 +171,9 @@ pub unsafe extern "C" fn paimon_read_builder_new_scan(
             error: e,
         };
     }
-    let table = &*((*rb).inner as *const Table);
+    let state = &*((*rb).inner as *const ReadBuilderState);
     paimon_result_table_scan {
-        scan: box_table_wrapper(table, |inner| paimon_table_scan { inner }),
+        scan: box_table_wrapper(&state.table, |inner| paimon_table_scan { 
inner }),
         error: std::ptr::null_mut(),
     }
 }
@@ -126,13 +192,23 @@ pub unsafe extern "C" fn paimon_read_builder_new_read(
             error: e,
         };
     }
-    let table = &*((*rb).inner as *const Table);
-    let rb_rust = table.new_read_builder();
+    let state = &*((*rb).inner as *const ReadBuilderState);
+    let mut rb_rust = state.table.new_read_builder();
+
+    // Apply projection if set
+    if let Some(ref columns) = state.projected_columns {
+        let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
+        rb_rust.with_projection(&col_refs);
+    }
+
     match rb_rust.new_read() {
-        Ok(_) => {
-            let wrapper = box_table_wrapper(table, |inner| paimon_table_read { 
inner });
+        Ok(table_read) => {
+            let read_state = TableReadState {
+                table: state.table.clone(),
+                read_type: table_read.read_type().to_vec(),
+            };
             paimon_result_new_read {
-                read: wrapper,
+                read: box_table_read_state(read_state),
                 error: std::ptr::null_mut(),
             }
         }
@@ -226,7 +302,12 @@ pub unsafe extern "C" fn paimon_plan_num_splits(plan: 
*const paimon_plan) -> usi
 /// Only call with a read returned from `paimon_read_builder_new_read`.
 #[no_mangle]
 pub unsafe extern "C" fn paimon_table_read_free(read: *mut paimon_table_read) {
-    free_table_wrapper(read, |r| r.inner);
+    if !read.is_null() {
+        let wrapper = Box::from_raw(read);
+        if !wrapper.inner.is_null() {
+            drop(Box::from_raw(wrapper.inner as *mut TableReadState));
+        }
+    }
 }
 
 /// Read table data as Arrow record batches via a streaming reader.
@@ -261,31 +342,27 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(
         };
     }
 
-    let table = &*((*read).inner as *const Table);
+    let state = &*((*read).inner as *const TableReadState);
     let plan_ref = &*((*plan).inner as *const Plan);
     let all_splits = plan_ref.splits();
     let start = offset.min(all_splits.len());
     let end = (offset.saturating_add(length)).min(all_splits.len());
     let selected = &all_splits[start..end];
 
-    let rb = table.new_read_builder();
-    match rb.new_read() {
-        Ok(table_read) => match table_read.to_arrow(selected) {
-            Ok(stream) => {
-                let reader = Box::new(stream);
-                let wrapper = Box::new(paimon_record_batch_reader {
-                    inner: Box::into_raw(reader) as *mut c_void,
-                });
-                paimon_result_record_batch_reader {
-                    reader: Box::into_raw(wrapper),
-                    error: std::ptr::null_mut(),
-                }
+    // Create TableRead with the stored read_type (projection)
+    let table_read = paimon::table::TableRead::new(&state.table, 
state.read_type.clone());
+
+    match table_read.to_arrow(selected) {
+        Ok(stream) => {
+            let reader = Box::new(stream);
+            let wrapper = Box::new(paimon_record_batch_reader {
+                inner: Box::into_raw(reader) as *mut c_void,
+            });
+            paimon_result_record_batch_reader {
+                reader: Box::into_raw(wrapper),
+                error: std::ptr::null_mut(),
             }
-            Err(e) => paimon_result_record_batch_reader {
-                reader: std::ptr::null_mut(),
-                error: paimon_error::from_paimon(e),
-            },
-        },
+        }
         Err(e) => paimon_result_record_batch_reader {
             reader: std::ptr::null_mut(),
             error: paimon_error::from_paimon(e),
diff --git a/bindings/c/src/types.rs b/bindings/c/src/types.rs
index 1cb2be7..a95f965 100644
--- a/bindings/c/src/types.rs
+++ b/bindings/c/src/types.rs
@@ -17,6 +17,9 @@
 
 use std::ffi::c_void;
 
+use paimon::spec::DataField;
+use paimon::table::Table;
+
 /// C-compatible byte buffer.
 #[repr(C)]
 #[derive(Clone, Copy)]
@@ -68,6 +71,12 @@ pub struct paimon_read_builder {
     pub inner: *mut c_void,
 }
 
+/// Internal state for ReadBuilder that stores table and projection columns.
+pub(crate) struct ReadBuilderState {
+    pub table: Table,
+    pub projected_columns: Option<Vec<String>>,
+}
+
 #[repr(C)]
 pub struct paimon_table_scan {
     pub inner: *mut c_void,
@@ -78,6 +87,12 @@ pub struct paimon_table_read {
     pub inner: *mut c_void,
 }
 
+/// Internal state for TableRead that stores table and projected read type.
+pub(crate) struct TableReadState {
+    pub table: Table,
+    pub read_type: Vec<DataField>,
+}
+
 #[repr(C)]
 pub struct paimon_plan {
     pub inner: *mut c_void,
diff --git a/bindings/go/read_builder.go b/bindings/go/read_builder.go
index 20b7a0d..fde6bf5 100644
--- a/bindings/go/read_builder.go
+++ b/bindings/go/read_builder.go
@@ -21,6 +21,7 @@ package paimon
 
 import (
        "context"
+       "runtime"
        "sync"
        "unsafe"
 
@@ -44,6 +45,17 @@ func (rb *ReadBuilder) Close() {
        })
 }
 
+// WithProjection sets column projection by name. Output order follows the
+// caller-specified order. Unknown or duplicate names cause NewRead() to fail;
+// an empty list is a valid zero-column projection.
+func (rb *ReadBuilder) WithProjection(columns []string) error {
+       if rb.inner == nil {
+               return ErrClosed
+       }
+       projFn := ffiReadBuilderWithProjection.symbol(rb.ctx)
+       return projFn(rb.inner, columns)
+}
+
 // NewScan creates a TableScan for planning which data files to read.
 func (rb *ReadBuilder) NewScan() (*TableScan, error) {
        if rb.inner == nil {
@@ -85,6 +97,45 @@ var ffiReadBuilderFree = newFFI(ffiOpts{
        }
 })
 
+var ffiReadBuilderWithProjection = newFFI(ffiOpts{
+       sym:    "paimon_read_builder_with_projection",
+       rType:  &ffi.TypePointer,
+       aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder, 
columns []string) error {
+       return func(rb *paimonReadBuilder, columns []string) error {
+               var colPtrs []*byte
+               var cStrings [][]byte
+
+               // Convert Go strings to null-terminated C strings
+               for _, col := range columns {
+                       cStr := append([]byte(col), 0)
+                       cStrings = append(cStrings, cStr)
+                       colPtrs = append(colPtrs, &cStr[0])
+               }
+               // Null-terminate the array
+               colPtrs = append(colPtrs, nil)
+
+               var colsPtr unsafe.Pointer
+               if len(colPtrs) > 0 {
+                       colsPtr = unsafe.Pointer(&colPtrs[0])
+               }
+
+               var errPtr *paimonError
+               ffiCall(
+                       unsafe.Pointer(&errPtr),
+                       unsafe.Pointer(&rb),
+                       unsafe.Pointer(&colsPtr),
+               )
+               // Ensure Go-managed buffers stay alive for the full native 
call.
+               runtime.KeepAlive(cStrings)
+               runtime.KeepAlive(colPtrs)
+               if errPtr != nil {
+                       return parseError(ctx, errPtr)
+               }
+               return nil
+       }
+})
+
 var ffiReadBuilderNewScan = newFFI(ffiOpts{
        sym:    "paimon_read_builder_new_scan",
        rType:  &typeResultTableScan,
diff --git a/bindings/go/tests/paimon_test.go b/bindings/go/tests/paimon_test.go
index 28e3fc2..ef38c2b 100644
--- a/bindings/go/tests/paimon_test.go
+++ b/bindings/go/tests/paimon_test.go
@@ -154,3 +154,114 @@ func TestReadLogTable(t *testing.T) {
                }
        }
 }
+
+// TestReadWithProjection reads only the "id" column via WithProjection and
+// verifies that only the projected column is returned with correct values.
+func TestReadWithProjection(t *testing.T) {
+       warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE")
+       if warehouse == "" {
+               warehouse = "/tmp/paimon-warehouse"
+       }
+
+       if _, err := os.Stat(warehouse); os.IsNotExist(err) {
+               t.Skipf("Skipping: warehouse %s does not exist (run 'make 
docker-up' first)", warehouse)
+       }
+
+       catalog, err := paimon.NewFileSystemCatalog(warehouse)
+       if err != nil {
+               t.Fatalf("Failed to create catalog: %v", err)
+       }
+       defer catalog.Close()
+
+       table, err := catalog.GetTable(paimon.NewIdentifier("default", 
"simple_log_table"))
+       if err != nil {
+               t.Fatalf("Failed to get table: %v", err)
+       }
+       defer table.Close()
+
+       rb, err := table.NewReadBuilder()
+       if err != nil {
+               t.Fatalf("Failed to create read builder: %v", err)
+       }
+       defer rb.Close()
+
+       // Set projection to only read "id" column
+       if err := rb.WithProjection([]string{"id"}); err != nil {
+               t.Fatalf("Failed to set projection: %v", err)
+       }
+
+       scan, err := rb.NewScan()
+       if err != nil {
+               t.Fatalf("Failed to create scan: %v", err)
+       }
+       defer scan.Close()
+
+       plan, err := scan.Plan()
+       if err != nil {
+               t.Fatalf("Failed to plan: %v", err)
+       }
+       defer plan.Close()
+
+       splits := plan.Splits()
+       if len(splits) == 0 {
+               t.Fatal("Expected at least one split")
+       }
+
+       read, err := rb.NewRead()
+       if err != nil {
+               t.Fatalf("Failed to create table read: %v", err)
+       }
+       defer read.Close()
+
+       reader, err := read.NewRecordBatchReader(splits)
+       if err != nil {
+               t.Fatalf("Failed to create record batch reader: %v", err)
+       }
+       defer reader.Close()
+
+       var ids []int32
+       batchIdx := 0
+       for {
+               record, err := reader.NextRecord()
+               if errors.Is(err, io.EOF) {
+                       break
+               }
+               if err != nil {
+                       t.Fatalf("Batch %d: failed to read next record: %v", 
batchIdx, err)
+               }
+
+               // Verify schema only contains the projected column
+               schema := record.Schema()
+               if schema.NumFields() != 1 {
+                       record.Release()
+                       t.Fatalf("Batch %d: expected 1 field, got %d: %s", 
batchIdx, schema.NumFields(), schema)
+               }
+               if schema.Field(0).Name != "id" {
+                       record.Release()
+                       t.Fatalf("Batch %d: expected field 'id', got '%s'", 
batchIdx, schema.Field(0).Name)
+               }
+
+               idCol := record.Column(0).(*array.Int32)
+               for j := 0; j < int(record.NumRows()); j++ {
+                       ids = append(ids, idCol.Value(j))
+               }
+               record.Release()
+               batchIdx++
+       }
+
+       if len(ids) == 0 {
+               t.Fatal("Expected at least one row, got 0")
+       }
+
+       sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
+
+       expected := []int32{1, 2, 3}
+       if len(ids) != len(expected) {
+               t.Fatalf("Expected %d rows, got %d: %v", len(expected), 
len(ids), ids)
+       }
+       for i, exp := range expected {
+               if ids[i] != exp {
+                       t.Errorf("Row %d: expected id=%d, got id=%d", i, exp, 
ids[i])
+               }
+       }
+}
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index e567d15..fcb1497 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -30,15 +30,21 @@ use paimon::table::Table;
 
 use crate::error::to_datafusion_error;
 
-/// Execution plan that scans a Paimon table (read-only, no projection, no 
predicate, no limit).
+/// Execution plan that scans a Paimon table with optional column projection.
 #[derive(Debug)]
 pub struct PaimonTableScan {
     table: Table,
+    /// Projected column names (if None, reads all columns).
+    projected_columns: Option<Vec<String>>,
     plan_properties: PlanProperties,
 }
 
 impl PaimonTableScan {
-    pub(crate) fn new(schema: ArrowSchemaRef, table: Table) -> Self {
+    pub(crate) fn new(
+        schema: ArrowSchemaRef,
+        table: Table,
+        projected_columns: Option<Vec<String>>,
+    ) -> Self {
         let plan_properties = PlanProperties::new(
             EquivalenceProperties::new(schema.clone()),
             // TODO: Currently all Paimon splits are read in a single 
DataFusion partition,
@@ -51,6 +57,7 @@ impl PaimonTableScan {
         );
         Self {
             table,
+            projected_columns,
             plan_properties,
         }
     }
@@ -91,9 +98,17 @@ impl ExecutionPlan for PaimonTableScan {
     ) -> DFResult<SendableRecordBatchStream> {
         let table = self.table.clone();
         let schema = self.schema();
+        let projected_columns = self.projected_columns.clone();
 
         let fut = async move {
-            let read_builder = table.new_read_builder();
+            let mut read_builder = table.new_read_builder();
+
+            // Apply projection if specified
+            if let Some(ref columns) = projected_columns {
+                let col_refs: Vec<&str> = columns.iter().map(|s| 
s.as_str()).collect();
+                read_builder.with_projection(&col_refs);
+            }
+
             let scan = read_builder.new_scan();
             let plan = scan.plan().await.map_err(to_datafusion_error)?;
             let read = read_builder.new_read().map_err(to_datafusion_error)?;
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index 04ccab6..1ba06f4 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -21,9 +21,8 @@ use std::any::Any;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::arrow::datatypes::{Field, Schema, SchemaRef as ArrowSchemaRef};
 use datafusion::catalog::Session;
-use datafusion::common::DataFusionError;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::error::Result as DFResult;
 use datafusion::logical_expr::Expr;
@@ -35,8 +34,8 @@ use crate::schema::paimon_schema_to_arrow;
 
 /// Read-only table provider for a Paimon table.
 ///
-/// Supports full table scan only (no write, no subset/reordered projection, 
no predicate
-/// pushdown).
+/// Supports full table scan and column projection. Predicate pushdown and 
writes
+/// are not yet supported.
 #[derive(Debug, Clone)]
 pub struct PaimonTableProvider {
     table: Table,
@@ -79,20 +78,22 @@ impl TableProvider for PaimonTableProvider {
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        if let Some(projection) = projection {
-            let is_full_schema_projection = projection.len() == 
self.schema.fields().len()
-                && 
projection.iter().copied().eq(0..self.schema.fields().len());
-
-            if !is_full_schema_projection {
-                return Err(DataFusionError::NotImplemented(
-                    "Paimon DataFusion integration does not yet support subset 
or reordered projections; use SELECT * until apache/paimon-rust#146 is 
implemented".to_string(),
-                ));
-            }
-        }
+        // Convert projection indices to column names and compute projected 
schema
+        let (projected_schema, projected_columns) = if let Some(indices) = 
projection {
+            let fields: Vec<Field> = indices
+                .iter()
+                .map(|&i| self.schema.field(i).clone())
+                .collect();
+            let column_names: Vec<String> = fields.iter().map(|f| 
f.name().clone()).collect();
+            (Arc::new(Schema::new(fields)), Some(column_names))
+        } else {
+            (self.schema.clone(), None)
+        };
 
         Ok(Arc::new(PaimonTableScan::new(
-            self.schema.clone(),
+            projected_schema,
             self.table.clone(),
+            projected_columns,
         )))
     }
 }
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index f813d78..97bad76 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -120,15 +120,39 @@ async fn test_read_primary_key_table_via_datafusion() {
 }
 
 #[tokio::test]
-async fn test_subset_projection_returns_not_implemented() {
-    let error = collect_query("simple_log_table", "SELECT id FROM 
simple_log_table")
+async fn test_projection_via_datafusion() {
+    let batches = collect_query("simple_log_table", "SELECT id FROM 
simple_log_table")
         .await
-        .expect_err("Subset projection should be rejected until projection 
support lands");
+        .expect("Subset projection should succeed");
 
     assert!(
-        error
-            .to_string()
-            .contains("does not yet support subset or reordered projections"),
-        "Expected explicit unsupported projection error, got: {error}"
+        !batches.is_empty(),
+        "Expected at least one batch from projected query"
+    );
+
+    let mut actual_ids = Vec::new();
+    for batch in &batches {
+        let schema = batch.schema();
+        let field_names: Vec<&str> = schema.fields().iter().map(|f| 
f.name().as_str()).collect();
+        assert_eq!(
+            field_names,
+            vec!["id"],
+            "Projected query should only return 'id' column"
+        );
+
+        let id_array = batch
+            .column_by_name("id")
+            .and_then(|col| col.as_any().downcast_ref::<Int32Array>())
+            .expect("Expected Int32Array for id column");
+        for i in 0..id_array.len() {
+            actual_ids.push(id_array.value(i));
+        }
+    }
+
+    actual_ids.sort();
+    assert_eq!(
+        actual_ids,
+        vec![1, 2, 3],
+        "Projected id values should match"
     );
 }
diff --git a/crates/paimon/src/api/auth/dlf_provider.rs 
b/crates/paimon/src/api/auth/dlf_provider.rs
index e5a7ea0..4f264d8 100644
--- a/crates/paimon/src/api/auth/dlf_provider.rs
+++ b/crates/paimon/src/api/auth/dlf_provider.rs
@@ -168,7 +168,7 @@ impl DLFECSTokenLoader {
     async fn get_token(&self, url: &str) -> Result<DLFToken> {
         let token_json = self.http_client.get(url).await?;
         serde_json::from_str(&token_json).map_err(|e| Error::DataInvalid {
-            message: format!("Failed to parse token JSON: {}", e),
+            message: format!("Failed to parse token JSON: {e}"),
             source: None,
         })
     }
@@ -176,7 +176,7 @@ impl DLFECSTokenLoader {
     /// Build the token URL from base URL and role name.
     fn build_token_url(&self, role_name: &str) -> String {
         let base_url = self.ecs_metadata_url.trim_end_matches('/');
-        format!("{}/{}", base_url, role_name)
+        format!("{base_url}/{role_name}")
     }
 }
 
@@ -396,7 +396,7 @@ impl TokenHTTPClient {
             match self.client.get(url).send().await {
                 Ok(response) if response.status().is_success() => {
                     return response.text().await.map_err(|e| 
Error::DataInvalid {
-                        message: format!("Failed to read response: {}", e),
+                        message: format!("Failed to read response: {e}"),
                         source: None,
                     });
                 }
@@ -404,7 +404,7 @@ impl TokenHTTPClient {
                     last_error = format!("HTTP error: {}", response.status());
                 }
                 Err(e) => {
-                    last_error = format!("Request failed: {}", e);
+                    last_error = format!("Request failed: {e}");
                 }
             }
 
diff --git a/crates/paimon/src/api/auth/dlf_signer.rs 
b/crates/paimon/src/api/auth/dlf_signer.rs
index 133970b..ca49a28 100644
--- a/crates/paimon/src/api/auth/dlf_signer.rs
+++ b/crates/paimon/src/api/auth/dlf_signer.rs
@@ -236,7 +236,7 @@ impl DLFDefaultSigner {
 
         let sorted_headers = self.build_sorted_signed_headers_map(headers);
         for (key, value) in sorted_headers {
-            parts.push(format!("{}:{}", key, value));
+            parts.push(format!("{key}:{value}"));
         }
 
         let content_sha256 = headers
@@ -262,7 +262,7 @@ impl DLFDefaultSigner {
                 let key = Self::trim(key);
                 if !value.is_empty() {
                     let value = Self::trim(value);
-                    format!("{}={}", key, value)
+                    format!("{key}={value}")
                 } else {
                     key.to_string()
                 }
@@ -480,7 +480,7 @@ impl DLFOpenApiSigner {
 
         let mut result = String::new();
         for (key, value) in sorted_headers {
-            result.push_str(&format!("{}:{}\n", key, value));
+            result.push_str(&format!("{key}:{value}\n"));
         }
         result
     }
@@ -500,7 +500,7 @@ impl DLFOpenApiSigner {
             .map(|(key, value)| {
                 let decoded_value = 
urlencoding::decode(value).unwrap_or_default();
                 if !decoded_value.is_empty() {
-                    format!("{}={}", key, decoded_value)
+                    format!("{key}={decoded_value}")
                 } else {
                     key.to_string()
                 }
diff --git a/crates/paimon/src/api/rest_api.rs 
b/crates/paimon/src/api/rest_api.rs
index f785111..be09e9a 100644
--- a/crates/paimon/src/api/rest_api.rs
+++ b/crates/paimon/src/api/rest_api.rs
@@ -50,7 +50,7 @@ use super::rest_util::RESTUtil;
 fn validate_non_empty(value: &str, field_name: &str) -> Result<()> {
     if value.trim().is_empty() {
         return Err(crate::Error::ConfigInvalid {
-            message: format!("{} cannot be empty", field_name),
+            message: format!("{field_name} cannot be empty"),
         });
     }
     Ok(())
diff --git a/crates/paimon/src/arrow/reader.rs 
b/crates/paimon/src/arrow/reader.rs
index d7b07a7..693aaf8 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -165,8 +165,7 @@ impl ArrowReader {
                                 batch.schema().index_of(name).map_err(|_| {
                                     Error::UnexpectedError {
                                         message: format!(
-                                            "Projected column '{}' not found 
in Parquet batch schema of file {}",
-                                            name, path_to_read
+                                            "Projected column '{name}' not 
found in Parquet batch schema of file {path_to_read}"
                                         ),
                                         source: None,
                                     }
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index 454807e..69e1674 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -65,10 +65,7 @@ impl<'a> ReadBuilder<'a> {
             Some(projected) => self.resolve_projected_fields(projected)?,
         };
 
-        Ok(TableRead {
-            table: self.table,
-            read_type,
-        })
+        Ok(TableRead::new(self.table, read_type))
     }
 
     fn resolve_projected_fields(&self, projected_fields: &[String]) -> 
Result<Vec<DataField>> {
@@ -91,10 +88,7 @@ impl<'a> ReadBuilder<'a> {
         for name in projected_fields {
             if !seen.insert(name.as_str()) {
                 return Err(Error::ConfigInvalid {
-                    message: format!(
-                        "Duplicate projection column '{}' for table {}",
-                        name, full_name
-                    ),
+                    message: format!("Duplicate projection column '{name}' for 
table {full_name}"),
                 });
             }
 
@@ -121,6 +115,11 @@ pub struct TableRead<'a> {
 }
 
 impl<'a> TableRead<'a> {
+    /// Create a new TableRead with a specific read type (projected fields).
+    pub fn new(table: &'a Table, read_type: Vec<DataField>) -> Self {
+        Self { table, read_type }
+    }
+
     /// Schema (fields) that this read will produce.
     pub fn read_type(&self) -> &[DataField] {
         &self.read_type
diff --git a/crates/paimon/tests/mock_server.rs 
b/crates/paimon/tests/mock_server.rs
index fee16c7..def0ada 100644
--- a/crates/paimon/tests/mock_server.rs
+++ b/crates/paimon/tests/mock_server.rs
@@ -115,7 +115,7 @@ impl RESTServer {
                 let err = ErrorResponse::new(
                     None,
                     None,
-                    Some(format!("Warehouse {} not found", warehouse)),
+                    Some(format!("Warehouse {warehouse} not found")),
                     Some(404),
                 );
                 return (StatusCode::NOT_FOUND, Json(err)).into_response();
@@ -253,7 +253,7 @@ impl RESTServer {
 
         if s.databases.remove(&name).is_some() {
             // Also remove all tables in this database
-            let prefix = format!("{}.", name);
+            let prefix = format!("{name}.");
             s.tables.retain(|key, _| !key.starts_with(&prefix));
             s.no_permission_tables
                 .retain(|key| !key.starts_with(&prefix));
@@ -296,7 +296,7 @@ impl RESTServer {
             return (StatusCode::NOT_FOUND, Json(err)).into_response();
         }
 
-        let prefix = format!("{}.", db);
+        let prefix = format!("{db}.");
         let mut tables: Vec<String> = s
             .tables
             .keys()
@@ -353,7 +353,7 @@ impl RESTServer {
             return (StatusCode::NOT_FOUND, Json(err)).into_response();
         }
 
-        let key = format!("{}.{}", db, table_name);
+        let key = format!("{db}.{table_name}");
         if s.tables.contains_key(&key) {
             let err = ErrorResponse::new(
                 Some("table".to_string()),
@@ -385,7 +385,7 @@ impl RESTServer {
     ) -> impl IntoResponse {
         let s = state.inner.lock().unwrap();
 
-        let key = format!("{}.{}", db, table);
+        let key = format!("{db}.{table}");
         if s.no_permission_tables.contains(&key) {
             let err = ErrorResponse::new(
                 Some("table".to_string()),
@@ -426,7 +426,7 @@ impl RESTServer {
     ) -> impl IntoResponse {
         let mut s = state.inner.lock().unwrap();
 
-        let key = format!("{}.{}", db, table);
+        let key = format!("{db}.{table}");
         if s.no_permission_tables.contains(&key) {
             let err = ErrorResponse::new(
                 Some("table".to_string()),
@@ -556,7 +556,7 @@ impl RESTServer {
             )
         });
 
-        let key = format!("{}.{}", database, table);
+        let key = format!("{database}.{table}");
         s.tables.entry(key).or_insert_with(|| {
             GetTableResponse::new(
                 Some(table.to_string()),
@@ -574,12 +574,11 @@ impl RESTServer {
     #[allow(dead_code)]
     pub fn add_no_permission_table(&self, database: &str, table: &str) {
         let mut s = self.inner.lock().unwrap();
-        s.no_permission_tables
-            .insert(format!("{}.{}", database, table));
+        s.no_permission_tables.insert(format!("{database}.{table}"));
     }
     /// Get the server URL.
     pub fn url(&self) -> Option<String> {
-        self.addr.map(|a| format!("http://{}";, a))
+        self.addr.map(|a| format!("http://{a}";))
     }
     /// Get the warehouse path.
     #[allow(dead_code)]
@@ -678,25 +677,25 @@ pub async fn start_mock_server(
         .route("/v1/config", get(RESTServer::get_config))
         // Database routes
         .route(
-            &format!("{}/databases", prefix),
+            &format!("{prefix}/databases"),
             get(RESTServer::list_databases).post(RESTServer::create_database),
         )
         .route(
-            &format!("{}/databases/:name", prefix),
+            &format!("{prefix}/databases/:name"),
             get(RESTServer::get_database)
                 .post(RESTServer::alter_database)
                 .delete(RESTServer::drop_database),
         )
         .route(
-            &format!("{}/databases/:db/tables", prefix),
+            &format!("{prefix}/databases/:db/tables"),
             get(RESTServer::list_tables).post(RESTServer::create_table),
         )
         .route(
-            &format!("{}/databases/:db/tables/:table", prefix),
+            &format!("{prefix}/databases/:db/tables/:table"),
             get(RESTServer::get_table).delete(RESTServer::drop_table),
         )
         .route(
-            &format!("{}/tables/rename", prefix),
+            &format!("{prefix}/tables/rename"),
             axum::routing::post(RESTServer::rename_table),
         )
         // ECS metadata endpoints (for token loader testing)
@@ -717,7 +716,7 @@ pub async fn start_mock_server(
 
     let server_handle = tokio::spawn(async move {
         if let Err(e) = axum::serve(listener, app.into_make_service()).await {
-            eprintln!("mock server error: {}", e);
+            eprintln!("mock server error: {e}");
         }
     });
 
diff --git a/crates/paimon/tests/rest_api_test.rs 
b/crates/paimon/tests/rest_api_test.rs
index 9bd31fc..753f5d1 100644
--- a/crates/paimon/tests/rest_api_test.rs
+++ b/crates/paimon/tests/rest_api_test.rs
@@ -88,7 +88,7 @@ async fn test_create_database() {
 
     // Create new database
     let result = ctx.api.create_database("new_db", None).await;
-    assert!(result.is_ok(), "failed to create database: {:?}", result);
+    assert!(result.is_ok(), "failed to create database: {result:?}");
 
     // Verify creation
     let dbs = ctx.api.list_databases().await.unwrap();
@@ -132,7 +132,7 @@ async fn test_error_responses_status_mapping() {
             );
             assert_eq!(j.get("code").and_then(|v| v.as_u64()), Some(403));
         }
-        Err(e) => panic!("Expected 403 response, got error: {:?}", e),
+        Err(e) => panic!("Expected 403 response, got error: {e:?}"),
     }
 
     // POST create existing database -> 409
@@ -168,7 +168,7 @@ async fn test_alter_database() {
     updates.insert("key2".to_string(), "value2".to_string());
 
     let result = ctx.api.alter_database("default", vec![], updates).await;
-    assert!(result.is_ok(), "failed to alter database: {:?}", result);
+    assert!(result.is_ok(), "failed to alter database: {result:?}");
 
     // Verify the updates by getting the database
     let db_resp = ctx.api.get_database("default").await.unwrap();
@@ -180,7 +180,7 @@ async fn test_alter_database() {
         .api
         .alter_database("default", vec!["key1".to_string()], HashMap::new())
         .await;
-    assert!(result.is_ok(), "failed to remove key: {:?}", result);
+    assert!(result.is_ok(), "failed to remove key: {result:?}");
 
     let db_resp = ctx.api.get_database("default").await.unwrap();
     assert!(!db_resp.options.contains_key("key1"));
@@ -211,7 +211,7 @@ async fn test_drop_database() {
 
     // Drop database
     let result = ctx.api.drop_database("to_drop").await;
-    assert!(result.is_ok(), "failed to drop database: {:?}", result);
+    assert!(result.is_ok(), "failed to drop database: {result:?}");
 
     // Verify database is gone
     let dbs = ctx.api.list_databases().await.unwrap();
@@ -278,8 +278,7 @@ async fn test_list_tables_empty_database() {
     let tables = ctx.api.list_tables("default").await.unwrap();
     assert!(
         tables.is_empty(),
-        "expected empty tables list, got: {:?}",
-        tables
+        "expected empty tables list, got: {tables:?}"
     );
 }
 
@@ -323,7 +322,7 @@ async fn test_create_table() {
         .api
         .create_table(&Identifier::new("default", "new_table"), schema)
         .await;
-    assert!(result.is_ok(), "failed to create table: {:?}", result);
+    assert!(result.is_ok(), "failed to create table: {result:?}");
 
     // Verify table exists
     let tables = ctx.api.list_tables("default").await.unwrap();
@@ -354,7 +353,7 @@ async fn test_drop_table() {
         .api
         .drop_table(&Identifier::new("default", "table_to_drop"))
         .await;
-    assert!(result.is_ok(), "failed to drop table: {:?}", result);
+    assert!(result.is_ok(), "failed to drop table: {result:?}");
 
     // Verify table is gone
     let tables = ctx.api.list_tables("default").await.unwrap();
@@ -398,7 +397,7 @@ async fn test_rename_table() {
             &Identifier::new("default", "new_table"),
         )
         .await;
-    assert!(result.is_ok(), "failed to rename table: {:?}", result);
+    assert!(result.is_ok(), "failed to rename table: {result:?}");
 
     // Verify old table is gone
     let tables = ctx.api.list_tables("default").await.unwrap();
diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md
index c1dc287..6e42ae3 100644
--- a/docs/src/getting-started.md
+++ b/docs/src/getting-started.md
@@ -157,7 +157,7 @@ let df = ctx.sql("SELECT * FROM my_table").await?;
 df.show().await?;
 ```
 
-> **Note:** The DataFusion integration currently supports full table scans 
only. Column projection and predicate pushdown are not yet implemented.
+> **Note:** The DataFusion integration supports full table scans and column 
projection. Predicate pushdown is not yet implemented.
 
 ## Building from Source
 


Reply via email to