This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b018c04 feat: introduce projection in go binding and datafusion.
(#162)
b018c04 is described below
commit b018c046841382b4d6217d86f8d5c0bb0c8234fa
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Mar 30 19:49:20 2026 +0800
feat: introduce projection in go binding and datafusion. (#162)
---
bindings/c/src/table.rs | 133 ++++++++++++++++-----
bindings/c/src/types.rs | 15 +++
bindings/go/read_builder.go | 51 ++++++++
bindings/go/tests/paimon_test.go | 111 +++++++++++++++++
.../datafusion/src/physical_plan/scan.rs | 21 +++-
crates/integrations/datafusion/src/table/mod.rs | 31 ++---
.../integrations/datafusion/tests/read_tables.rs | 38 ++++--
crates/paimon/src/api/auth/dlf_provider.rs | 8 +-
crates/paimon/src/api/auth/dlf_signer.rs | 8 +-
crates/paimon/src/api/rest_api.rs | 2 +-
crates/paimon/src/arrow/reader.rs | 3 +-
crates/paimon/src/table/read_builder.rs | 15 ++-
crates/paimon/tests/mock_server.rs | 31 +++--
crates/paimon/tests/rest_api_test.rs | 19 ++-
docs/src/getting-started.md | 2 +-
15 files changed, 389 insertions(+), 99 deletions(-)
diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs
index 7892757..c3f57fe 100644
--- a/bindings/c/src/table.rs
+++ b/bindings/c/src/table.rs
@@ -48,6 +48,18 @@ unsafe fn free_table_wrapper<T>(ptr: *mut T, get_inner: impl
FnOnce(&T) -> *mut
}
}
+// Helper to box a ReadBuilderState and return a raw pointer.
+unsafe fn box_read_builder_state(state: ReadBuilderState) -> *mut
paimon_read_builder {
+ let inner = Box::into_raw(Box::new(state)) as *mut c_void;
+ Box::into_raw(Box::new(paimon_read_builder { inner }))
+}
+
+// Helper to box a TableReadState and return a raw pointer.
+unsafe fn box_table_read_state(state: TableReadState) -> *mut
paimon_table_read {
+ let inner = Box::into_raw(Box::new(state)) as *mut c_void;
+ Box::into_raw(Box::new(paimon_table_read { inner }))
+}
+
// ======================= Table ===============================
/// Free a paimon_table.
@@ -74,8 +86,12 @@ pub unsafe extern "C" fn paimon_table_new_read_builder(
};
}
let table_ref = &*((*table).inner as *const Table);
+ let state = ReadBuilderState {
+ table: table_ref.clone(),
+ projected_columns: None,
+ };
paimon_result_read_builder {
- read_builder: box_table_wrapper(table_ref, |inner| paimon_read_builder
{ inner }),
+ read_builder: box_read_builder_state(state),
error: std::ptr::null_mut(),
}
}
@@ -88,7 +104,57 @@ pub unsafe extern "C" fn paimon_table_new_read_builder(
/// Only call with a read_builder returned from
`paimon_table_new_read_builder`.
#[no_mangle]
pub unsafe extern "C" fn paimon_read_builder_free(rb: *mut
paimon_read_builder) {
- free_table_wrapper(rb, |r| r.inner);
+ if !rb.is_null() {
+ let wrapper = Box::from_raw(rb);
+ if !wrapper.inner.is_null() {
+ drop(Box::from_raw(wrapper.inner as *mut ReadBuilderState));
+ }
+ }
+}
+
+/// Set column projection for a ReadBuilder.
+///
+/// The `columns` parameter is a null-terminated array of null-terminated C
strings.
+/// Output order follows the caller-specified order. Unknown or duplicate names
+/// cause `paimon_read_builder_new_read()` to fail; an empty list is a valid
+/// zero-column projection.
+///
+/// # Safety
+/// `rb` must be a valid pointer from `paimon_table_new_read_builder`, or null
(returns error).
+/// `columns` must be a null-terminated array of null-terminated C strings, or
null for no projection.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_read_builder_with_projection(
+ rb: *mut paimon_read_builder,
+ columns: *const *const std::ffi::c_char,
+) -> *mut paimon_error {
+ if let Err(e) = check_non_null(rb, "rb") {
+ return e;
+ }
+
+ let state = &mut *((*rb).inner as *mut ReadBuilderState);
+
+ if columns.is_null() {
+ state.projected_columns = None;
+ return std::ptr::null_mut();
+ }
+
+ let mut col_names = Vec::new();
+ let mut ptr = columns;
+ while !(*ptr).is_null() {
+ let c_str = std::ffi::CStr::from_ptr(*ptr);
+ match c_str.to_str() {
+ Ok(s) => col_names.push(s.to_string()),
+ Err(e) => {
+ return paimon_error::from_paimon(paimon::Error::ConfigInvalid {
+ message: format!("Invalid UTF-8 in column name: {e}"),
+ });
+ }
+ }
+ ptr = ptr.add(1);
+ }
+
+ state.projected_columns = Some(col_names);
+ std::ptr::null_mut()
}
/// Create a new TableScan from a ReadBuilder.
@@ -105,9 +171,9 @@ pub unsafe extern "C" fn paimon_read_builder_new_scan(
error: e,
};
}
- let table = &*((*rb).inner as *const Table);
+ let state = &*((*rb).inner as *const ReadBuilderState);
paimon_result_table_scan {
- scan: box_table_wrapper(table, |inner| paimon_table_scan { inner }),
+ scan: box_table_wrapper(&state.table, |inner| paimon_table_scan {
inner }),
error: std::ptr::null_mut(),
}
}
@@ -126,13 +192,23 @@ pub unsafe extern "C" fn paimon_read_builder_new_read(
error: e,
};
}
- let table = &*((*rb).inner as *const Table);
- let rb_rust = table.new_read_builder();
+ let state = &*((*rb).inner as *const ReadBuilderState);
+ let mut rb_rust = state.table.new_read_builder();
+
+ // Apply projection if set
+ if let Some(ref columns) = state.projected_columns {
+ let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
+ rb_rust.with_projection(&col_refs);
+ }
+
match rb_rust.new_read() {
- Ok(_) => {
- let wrapper = box_table_wrapper(table, |inner| paimon_table_read {
inner });
+ Ok(table_read) => {
+ let read_state = TableReadState {
+ table: state.table.clone(),
+ read_type: table_read.read_type().to_vec(),
+ };
paimon_result_new_read {
- read: wrapper,
+ read: box_table_read_state(read_state),
error: std::ptr::null_mut(),
}
}
@@ -226,7 +302,12 @@ pub unsafe extern "C" fn paimon_plan_num_splits(plan:
*const paimon_plan) -> usi
/// Only call with a read returned from `paimon_read_builder_new_read`.
#[no_mangle]
pub unsafe extern "C" fn paimon_table_read_free(read: *mut paimon_table_read) {
- free_table_wrapper(read, |r| r.inner);
+ if !read.is_null() {
+ let wrapper = Box::from_raw(read);
+ if !wrapper.inner.is_null() {
+ drop(Box::from_raw(wrapper.inner as *mut TableReadState));
+ }
+ }
}
/// Read table data as Arrow record batches via a streaming reader.
@@ -261,31 +342,27 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(
};
}
- let table = &*((*read).inner as *const Table);
+ let state = &*((*read).inner as *const TableReadState);
let plan_ref = &*((*plan).inner as *const Plan);
let all_splits = plan_ref.splits();
let start = offset.min(all_splits.len());
let end = (offset.saturating_add(length)).min(all_splits.len());
let selected = &all_splits[start..end];
- let rb = table.new_read_builder();
- match rb.new_read() {
- Ok(table_read) => match table_read.to_arrow(selected) {
- Ok(stream) => {
- let reader = Box::new(stream);
- let wrapper = Box::new(paimon_record_batch_reader {
- inner: Box::into_raw(reader) as *mut c_void,
- });
- paimon_result_record_batch_reader {
- reader: Box::into_raw(wrapper),
- error: std::ptr::null_mut(),
- }
+ // Create TableRead with the stored read_type (projection)
+ let table_read = paimon::table::TableRead::new(&state.table,
state.read_type.clone());
+
+ match table_read.to_arrow(selected) {
+ Ok(stream) => {
+ let reader = Box::new(stream);
+ let wrapper = Box::new(paimon_record_batch_reader {
+ inner: Box::into_raw(reader) as *mut c_void,
+ });
+ paimon_result_record_batch_reader {
+ reader: Box::into_raw(wrapper),
+ error: std::ptr::null_mut(),
}
- Err(e) => paimon_result_record_batch_reader {
- reader: std::ptr::null_mut(),
- error: paimon_error::from_paimon(e),
- },
- },
+ }
Err(e) => paimon_result_record_batch_reader {
reader: std::ptr::null_mut(),
error: paimon_error::from_paimon(e),
diff --git a/bindings/c/src/types.rs b/bindings/c/src/types.rs
index 1cb2be7..a95f965 100644
--- a/bindings/c/src/types.rs
+++ b/bindings/c/src/types.rs
@@ -17,6 +17,9 @@
use std::ffi::c_void;
+use paimon::spec::DataField;
+use paimon::table::Table;
+
/// C-compatible byte buffer.
#[repr(C)]
#[derive(Clone, Copy)]
@@ -68,6 +71,12 @@ pub struct paimon_read_builder {
pub inner: *mut c_void,
}
+/// Internal state for ReadBuilder that stores table and projection columns.
+pub(crate) struct ReadBuilderState {
+ pub table: Table,
+ pub projected_columns: Option<Vec<String>>,
+}
+
#[repr(C)]
pub struct paimon_table_scan {
pub inner: *mut c_void,
@@ -78,6 +87,12 @@ pub struct paimon_table_read {
pub inner: *mut c_void,
}
+/// Internal state for TableRead that stores table and projected read type.
+pub(crate) struct TableReadState {
+ pub table: Table,
+ pub read_type: Vec<DataField>,
+}
+
#[repr(C)]
pub struct paimon_plan {
pub inner: *mut c_void,
diff --git a/bindings/go/read_builder.go b/bindings/go/read_builder.go
index 20b7a0d..fde6bf5 100644
--- a/bindings/go/read_builder.go
+++ b/bindings/go/read_builder.go
@@ -21,6 +21,7 @@ package paimon
import (
"context"
+ "runtime"
"sync"
"unsafe"
@@ -44,6 +45,17 @@ func (rb *ReadBuilder) Close() {
})
}
+// WithProjection sets column projection by name. Output order follows the
+// caller-specified order. Unknown or duplicate names cause NewRead() to fail;
+// an empty list is a valid zero-column projection.
+func (rb *ReadBuilder) WithProjection(columns []string) error {
+ if rb.inner == nil {
+ return ErrClosed
+ }
+ projFn := ffiReadBuilderWithProjection.symbol(rb.ctx)
+ return projFn(rb.inner, columns)
+}
+
// NewScan creates a TableScan for planning which data files to read.
func (rb *ReadBuilder) NewScan() (*TableScan, error) {
if rb.inner == nil {
@@ -85,6 +97,45 @@ var ffiReadBuilderFree = newFFI(ffiOpts{
}
})
+var ffiReadBuilderWithProjection = newFFI(ffiOpts{
+ sym: "paimon_read_builder_with_projection",
+ rType: &ffi.TypePointer,
+ aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder,
columns []string) error {
+ return func(rb *paimonReadBuilder, columns []string) error {
+ var colPtrs []*byte
+ var cStrings [][]byte
+
+ // Convert Go strings to null-terminated C strings
+ for _, col := range columns {
+ cStr := append([]byte(col), 0)
+ cStrings = append(cStrings, cStr)
+ colPtrs = append(colPtrs, &cStr[0])
+ }
+ // Null-terminate the array
+ colPtrs = append(colPtrs, nil)
+
+ var colsPtr unsafe.Pointer
+ if len(colPtrs) > 0 {
+ colsPtr = unsafe.Pointer(&colPtrs[0])
+ }
+
+ var errPtr *paimonError
+ ffiCall(
+ unsafe.Pointer(&errPtr),
+ unsafe.Pointer(&rb),
+ unsafe.Pointer(&colsPtr),
+ )
+ // Ensure Go-managed buffers stay alive for the full native
call.
+ runtime.KeepAlive(cStrings)
+ runtime.KeepAlive(colPtrs)
+ if errPtr != nil {
+ return parseError(ctx, errPtr)
+ }
+ return nil
+ }
+})
+
var ffiReadBuilderNewScan = newFFI(ffiOpts{
sym: "paimon_read_builder_new_scan",
rType: &typeResultTableScan,
diff --git a/bindings/go/tests/paimon_test.go b/bindings/go/tests/paimon_test.go
index 28e3fc2..ef38c2b 100644
--- a/bindings/go/tests/paimon_test.go
+++ b/bindings/go/tests/paimon_test.go
@@ -154,3 +154,114 @@ func TestReadLogTable(t *testing.T) {
}
}
}
+
+// TestReadWithProjection reads only the "id" column via WithProjection and
+// verifies that only the projected column is returned with correct values.
+func TestReadWithProjection(t *testing.T) {
+ warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE")
+ if warehouse == "" {
+ warehouse = "/tmp/paimon-warehouse"
+ }
+
+ if _, err := os.Stat(warehouse); os.IsNotExist(err) {
+ t.Skipf("Skipping: warehouse %s does not exist (run 'make
docker-up' first)", warehouse)
+ }
+
+ catalog, err := paimon.NewFileSystemCatalog(warehouse)
+ if err != nil {
+ t.Fatalf("Failed to create catalog: %v", err)
+ }
+ defer catalog.Close()
+
+ table, err := catalog.GetTable(paimon.NewIdentifier("default",
"simple_log_table"))
+ if err != nil {
+ t.Fatalf("Failed to get table: %v", err)
+ }
+ defer table.Close()
+
+ rb, err := table.NewReadBuilder()
+ if err != nil {
+ t.Fatalf("Failed to create read builder: %v", err)
+ }
+ defer rb.Close()
+
+ // Set projection to only read "id" column
+ if err := rb.WithProjection([]string{"id"}); err != nil {
+ t.Fatalf("Failed to set projection: %v", err)
+ }
+
+ scan, err := rb.NewScan()
+ if err != nil {
+ t.Fatalf("Failed to create scan: %v", err)
+ }
+ defer scan.Close()
+
+ plan, err := scan.Plan()
+ if err != nil {
+ t.Fatalf("Failed to plan: %v", err)
+ }
+ defer plan.Close()
+
+ splits := plan.Splits()
+ if len(splits) == 0 {
+ t.Fatal("Expected at least one split")
+ }
+
+ read, err := rb.NewRead()
+ if err != nil {
+ t.Fatalf("Failed to create table read: %v", err)
+ }
+ defer read.Close()
+
+ reader, err := read.NewRecordBatchReader(splits)
+ if err != nil {
+ t.Fatalf("Failed to create record batch reader: %v", err)
+ }
+ defer reader.Close()
+
+ var ids []int32
+ batchIdx := 0
+ for {
+ record, err := reader.NextRecord()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ t.Fatalf("Batch %d: failed to read next record: %v",
batchIdx, err)
+ }
+
+ // Verify schema only contains the projected column
+ schema := record.Schema()
+ if schema.NumFields() != 1 {
+ record.Release()
+ t.Fatalf("Batch %d: expected 1 field, got %d: %s",
batchIdx, schema.NumFields(), schema)
+ }
+ if schema.Field(0).Name != "id" {
+ record.Release()
+ t.Fatalf("Batch %d: expected field 'id', got '%s'",
batchIdx, schema.Field(0).Name)
+ }
+
+ idCol := record.Column(0).(*array.Int32)
+ for j := 0; j < int(record.NumRows()); j++ {
+ ids = append(ids, idCol.Value(j))
+ }
+ record.Release()
+ batchIdx++
+ }
+
+ if len(ids) == 0 {
+ t.Fatal("Expected at least one row, got 0")
+ }
+
+ sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
+
+ expected := []int32{1, 2, 3}
+ if len(ids) != len(expected) {
+ t.Fatalf("Expected %d rows, got %d: %v", len(expected),
len(ids), ids)
+ }
+ for i, exp := range expected {
+ if ids[i] != exp {
+ t.Errorf("Row %d: expected id=%d, got id=%d", i, exp,
ids[i])
+ }
+ }
+}
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index e567d15..fcb1497 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -30,15 +30,21 @@ use paimon::table::Table;
use crate::error::to_datafusion_error;
-/// Execution plan that scans a Paimon table (read-only, no projection, no
predicate, no limit).
+/// Execution plan that scans a Paimon table with optional column projection.
#[derive(Debug)]
pub struct PaimonTableScan {
table: Table,
+ /// Projected column names (if None, reads all columns).
+ projected_columns: Option<Vec<String>>,
plan_properties: PlanProperties,
}
impl PaimonTableScan {
- pub(crate) fn new(schema: ArrowSchemaRef, table: Table) -> Self {
+ pub(crate) fn new(
+ schema: ArrowSchemaRef,
+ table: Table,
+ projected_columns: Option<Vec<String>>,
+ ) -> Self {
let plan_properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
// TODO: Currently all Paimon splits are read in a single
DataFusion partition,
@@ -51,6 +57,7 @@ impl PaimonTableScan {
);
Self {
table,
+ projected_columns,
plan_properties,
}
}
@@ -91,9 +98,17 @@ impl ExecutionPlan for PaimonTableScan {
) -> DFResult<SendableRecordBatchStream> {
let table = self.table.clone();
let schema = self.schema();
+ let projected_columns = self.projected_columns.clone();
let fut = async move {
- let read_builder = table.new_read_builder();
+ let mut read_builder = table.new_read_builder();
+
+ // Apply projection if specified
+ if let Some(ref columns) = projected_columns {
+ let col_refs: Vec<&str> = columns.iter().map(|s|
s.as_str()).collect();
+ read_builder.with_projection(&col_refs);
+ }
+
let scan = read_builder.new_scan();
let plan = scan.plan().await.map_err(to_datafusion_error)?;
let read = read_builder.new_read().map_err(to_datafusion_error)?;
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index 04ccab6..1ba06f4 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -21,9 +21,8 @@ use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
-use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::arrow::datatypes::{Field, Schema, SchemaRef as ArrowSchemaRef};
use datafusion::catalog::Session;
-use datafusion::common::DataFusionError;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::Expr;
@@ -35,8 +34,8 @@ use crate::schema::paimon_schema_to_arrow;
/// Read-only table provider for a Paimon table.
///
-/// Supports full table scan only (no write, no subset/reordered projection,
no predicate
-/// pushdown).
+/// Supports full table scan and column projection. Predicate pushdown and
writes
+/// are not yet supported.
#[derive(Debug, Clone)]
pub struct PaimonTableProvider {
table: Table,
@@ -79,20 +78,22 @@ impl TableProvider for PaimonTableProvider {
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- if let Some(projection) = projection {
- let is_full_schema_projection = projection.len() ==
self.schema.fields().len()
- &&
projection.iter().copied().eq(0..self.schema.fields().len());
-
- if !is_full_schema_projection {
- return Err(DataFusionError::NotImplemented(
- "Paimon DataFusion integration does not yet support subset
or reordered projections; use SELECT * until apache/paimon-rust#146 is
implemented".to_string(),
- ));
- }
- }
+ // Convert projection indices to column names and compute projected
schema
+ let (projected_schema, projected_columns) = if let Some(indices) =
projection {
+ let fields: Vec<Field> = indices
+ .iter()
+ .map(|&i| self.schema.field(i).clone())
+ .collect();
+ let column_names: Vec<String> = fields.iter().map(|f|
f.name().clone()).collect();
+ (Arc::new(Schema::new(fields)), Some(column_names))
+ } else {
+ (self.schema.clone(), None)
+ };
Ok(Arc::new(PaimonTableScan::new(
- self.schema.clone(),
+ projected_schema,
self.table.clone(),
+ projected_columns,
)))
}
}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index f813d78..97bad76 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -120,15 +120,39 @@ async fn test_read_primary_key_table_via_datafusion() {
}
#[tokio::test]
-async fn test_subset_projection_returns_not_implemented() {
- let error = collect_query("simple_log_table", "SELECT id FROM
simple_log_table")
+async fn test_projection_via_datafusion() {
+ let batches = collect_query("simple_log_table", "SELECT id FROM
simple_log_table")
.await
- .expect_err("Subset projection should be rejected until projection
support lands");
+ .expect("Subset projection should succeed");
assert!(
- error
- .to_string()
- .contains("does not yet support subset or reordered projections"),
- "Expected explicit unsupported projection error, got: {error}"
+ !batches.is_empty(),
+ "Expected at least one batch from projected query"
+ );
+
+ let mut actual_ids = Vec::new();
+ for batch in &batches {
+ let schema = batch.schema();
+ let field_names: Vec<&str> = schema.fields().iter().map(|f|
f.name().as_str()).collect();
+ assert_eq!(
+ field_names,
+ vec!["id"],
+ "Projected query should only return 'id' column"
+ );
+
+ let id_array = batch
+ .column_by_name("id")
+ .and_then(|col| col.as_any().downcast_ref::<Int32Array>())
+ .expect("Expected Int32Array for id column");
+ for i in 0..id_array.len() {
+ actual_ids.push(id_array.value(i));
+ }
+ }
+
+ actual_ids.sort();
+ assert_eq!(
+ actual_ids,
+ vec![1, 2, 3],
+ "Projected id values should match"
);
}
diff --git a/crates/paimon/src/api/auth/dlf_provider.rs
b/crates/paimon/src/api/auth/dlf_provider.rs
index e5a7ea0..4f264d8 100644
--- a/crates/paimon/src/api/auth/dlf_provider.rs
+++ b/crates/paimon/src/api/auth/dlf_provider.rs
@@ -168,7 +168,7 @@ impl DLFECSTokenLoader {
async fn get_token(&self, url: &str) -> Result<DLFToken> {
let token_json = self.http_client.get(url).await?;
serde_json::from_str(&token_json).map_err(|e| Error::DataInvalid {
- message: format!("Failed to parse token JSON: {}", e),
+ message: format!("Failed to parse token JSON: {e}"),
source: None,
})
}
@@ -176,7 +176,7 @@ impl DLFECSTokenLoader {
/// Build the token URL from base URL and role name.
fn build_token_url(&self, role_name: &str) -> String {
let base_url = self.ecs_metadata_url.trim_end_matches('/');
- format!("{}/{}", base_url, role_name)
+ format!("{base_url}/{role_name}")
}
}
@@ -396,7 +396,7 @@ impl TokenHTTPClient {
match self.client.get(url).send().await {
Ok(response) if response.status().is_success() => {
return response.text().await.map_err(|e|
Error::DataInvalid {
- message: format!("Failed to read response: {}", e),
+ message: format!("Failed to read response: {e}"),
source: None,
});
}
@@ -404,7 +404,7 @@ impl TokenHTTPClient {
last_error = format!("HTTP error: {}", response.status());
}
Err(e) => {
- last_error = format!("Request failed: {}", e);
+ last_error = format!("Request failed: {e}");
}
}
diff --git a/crates/paimon/src/api/auth/dlf_signer.rs
b/crates/paimon/src/api/auth/dlf_signer.rs
index 133970b..ca49a28 100644
--- a/crates/paimon/src/api/auth/dlf_signer.rs
+++ b/crates/paimon/src/api/auth/dlf_signer.rs
@@ -236,7 +236,7 @@ impl DLFDefaultSigner {
let sorted_headers = self.build_sorted_signed_headers_map(headers);
for (key, value) in sorted_headers {
- parts.push(format!("{}:{}", key, value));
+ parts.push(format!("{key}:{value}"));
}
let content_sha256 = headers
@@ -262,7 +262,7 @@ impl DLFDefaultSigner {
let key = Self::trim(key);
if !value.is_empty() {
let value = Self::trim(value);
- format!("{}={}", key, value)
+ format!("{key}={value}")
} else {
key.to_string()
}
@@ -480,7 +480,7 @@ impl DLFOpenApiSigner {
let mut result = String::new();
for (key, value) in sorted_headers {
- result.push_str(&format!("{}:{}\n", key, value));
+ result.push_str(&format!("{key}:{value}\n"));
}
result
}
@@ -500,7 +500,7 @@ impl DLFOpenApiSigner {
.map(|(key, value)| {
let decoded_value =
urlencoding::decode(value).unwrap_or_default();
if !decoded_value.is_empty() {
- format!("{}={}", key, decoded_value)
+ format!("{key}={decoded_value}")
} else {
key.to_string()
}
diff --git a/crates/paimon/src/api/rest_api.rs
b/crates/paimon/src/api/rest_api.rs
index f785111..be09e9a 100644
--- a/crates/paimon/src/api/rest_api.rs
+++ b/crates/paimon/src/api/rest_api.rs
@@ -50,7 +50,7 @@ use super::rest_util::RESTUtil;
fn validate_non_empty(value: &str, field_name: &str) -> Result<()> {
if value.trim().is_empty() {
return Err(crate::Error::ConfigInvalid {
- message: format!("{} cannot be empty", field_name),
+ message: format!("{field_name} cannot be empty"),
});
}
Ok(())
diff --git a/crates/paimon/src/arrow/reader.rs
b/crates/paimon/src/arrow/reader.rs
index d7b07a7..693aaf8 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -165,8 +165,7 @@ impl ArrowReader {
batch.schema().index_of(name).map_err(|_| {
Error::UnexpectedError {
message: format!(
- "Projected column '{}' not found
in Parquet batch schema of file {}",
- name, path_to_read
+ "Projected column '{name}' not
found in Parquet batch schema of file {path_to_read}"
),
source: None,
}
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index 454807e..69e1674 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -65,10 +65,7 @@ impl<'a> ReadBuilder<'a> {
Some(projected) => self.resolve_projected_fields(projected)?,
};
- Ok(TableRead {
- table: self.table,
- read_type,
- })
+ Ok(TableRead::new(self.table, read_type))
}
fn resolve_projected_fields(&self, projected_fields: &[String]) ->
Result<Vec<DataField>> {
@@ -91,10 +88,7 @@ impl<'a> ReadBuilder<'a> {
for name in projected_fields {
if !seen.insert(name.as_str()) {
return Err(Error::ConfigInvalid {
- message: format!(
- "Duplicate projection column '{}' for table {}",
- name, full_name
- ),
+ message: format!("Duplicate projection column '{name}' for
table {full_name}"),
});
}
@@ -121,6 +115,11 @@ pub struct TableRead<'a> {
}
impl<'a> TableRead<'a> {
+ /// Create a new TableRead with a specific read type (projected fields).
+ pub fn new(table: &'a Table, read_type: Vec<DataField>) -> Self {
+ Self { table, read_type }
+ }
+
/// Schema (fields) that this read will produce.
pub fn read_type(&self) -> &[DataField] {
&self.read_type
diff --git a/crates/paimon/tests/mock_server.rs
b/crates/paimon/tests/mock_server.rs
index fee16c7..def0ada 100644
--- a/crates/paimon/tests/mock_server.rs
+++ b/crates/paimon/tests/mock_server.rs
@@ -115,7 +115,7 @@ impl RESTServer {
let err = ErrorResponse::new(
None,
None,
- Some(format!("Warehouse {} not found", warehouse)),
+ Some(format!("Warehouse {warehouse} not found")),
Some(404),
);
return (StatusCode::NOT_FOUND, Json(err)).into_response();
@@ -253,7 +253,7 @@ impl RESTServer {
if s.databases.remove(&name).is_some() {
// Also remove all tables in this database
- let prefix = format!("{}.", name);
+ let prefix = format!("{name}.");
s.tables.retain(|key, _| !key.starts_with(&prefix));
s.no_permission_tables
.retain(|key| !key.starts_with(&prefix));
@@ -296,7 +296,7 @@ impl RESTServer {
return (StatusCode::NOT_FOUND, Json(err)).into_response();
}
- let prefix = format!("{}.", db);
+ let prefix = format!("{db}.");
let mut tables: Vec<String> = s
.tables
.keys()
@@ -353,7 +353,7 @@ impl RESTServer {
return (StatusCode::NOT_FOUND, Json(err)).into_response();
}
- let key = format!("{}.{}", db, table_name);
+ let key = format!("{db}.{table_name}");
if s.tables.contains_key(&key) {
let err = ErrorResponse::new(
Some("table".to_string()),
@@ -385,7 +385,7 @@ impl RESTServer {
) -> impl IntoResponse {
let s = state.inner.lock().unwrap();
- let key = format!("{}.{}", db, table);
+ let key = format!("{db}.{table}");
if s.no_permission_tables.contains(&key) {
let err = ErrorResponse::new(
Some("table".to_string()),
@@ -426,7 +426,7 @@ impl RESTServer {
) -> impl IntoResponse {
let mut s = state.inner.lock().unwrap();
- let key = format!("{}.{}", db, table);
+ let key = format!("{db}.{table}");
if s.no_permission_tables.contains(&key) {
let err = ErrorResponse::new(
Some("table".to_string()),
@@ -556,7 +556,7 @@ impl RESTServer {
)
});
- let key = format!("{}.{}", database, table);
+ let key = format!("{database}.{table}");
s.tables.entry(key).or_insert_with(|| {
GetTableResponse::new(
Some(table.to_string()),
@@ -574,12 +574,11 @@ impl RESTServer {
#[allow(dead_code)]
pub fn add_no_permission_table(&self, database: &str, table: &str) {
let mut s = self.inner.lock().unwrap();
- s.no_permission_tables
- .insert(format!("{}.{}", database, table));
+ s.no_permission_tables.insert(format!("{database}.{table}"));
}
/// Get the server URL.
pub fn url(&self) -> Option<String> {
- self.addr.map(|a| format!("http://{}", a))
+ self.addr.map(|a| format!("http://{a}"))
}
/// Get the warehouse path.
#[allow(dead_code)]
@@ -678,25 +677,25 @@ pub async fn start_mock_server(
.route("/v1/config", get(RESTServer::get_config))
// Database routes
.route(
- &format!("{}/databases", prefix),
+ &format!("{prefix}/databases"),
get(RESTServer::list_databases).post(RESTServer::create_database),
)
.route(
- &format!("{}/databases/:name", prefix),
+ &format!("{prefix}/databases/:name"),
get(RESTServer::get_database)
.post(RESTServer::alter_database)
.delete(RESTServer::drop_database),
)
.route(
- &format!("{}/databases/:db/tables", prefix),
+ &format!("{prefix}/databases/:db/tables"),
get(RESTServer::list_tables).post(RESTServer::create_table),
)
.route(
- &format!("{}/databases/:db/tables/:table", prefix),
+ &format!("{prefix}/databases/:db/tables/:table"),
get(RESTServer::get_table).delete(RESTServer::drop_table),
)
.route(
- &format!("{}/tables/rename", prefix),
+ &format!("{prefix}/tables/rename"),
axum::routing::post(RESTServer::rename_table),
)
// ECS metadata endpoints (for token loader testing)
@@ -717,7 +716,7 @@ pub async fn start_mock_server(
let server_handle = tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app.into_make_service()).await {
- eprintln!("mock server error: {}", e);
+ eprintln!("mock server error: {e}");
}
});
diff --git a/crates/paimon/tests/rest_api_test.rs
b/crates/paimon/tests/rest_api_test.rs
index 9bd31fc..753f5d1 100644
--- a/crates/paimon/tests/rest_api_test.rs
+++ b/crates/paimon/tests/rest_api_test.rs
@@ -88,7 +88,7 @@ async fn test_create_database() {
// Create new database
let result = ctx.api.create_database("new_db", None).await;
- assert!(result.is_ok(), "failed to create database: {:?}", result);
+ assert!(result.is_ok(), "failed to create database: {result:?}");
// Verify creation
let dbs = ctx.api.list_databases().await.unwrap();
@@ -132,7 +132,7 @@ async fn test_error_responses_status_mapping() {
);
assert_eq!(j.get("code").and_then(|v| v.as_u64()), Some(403));
}
- Err(e) => panic!("Expected 403 response, got error: {:?}", e),
+ Err(e) => panic!("Expected 403 response, got error: {e:?}"),
}
// POST create existing database -> 409
@@ -168,7 +168,7 @@ async fn test_alter_database() {
updates.insert("key2".to_string(), "value2".to_string());
let result = ctx.api.alter_database("default", vec![], updates).await;
- assert!(result.is_ok(), "failed to alter database: {:?}", result);
+ assert!(result.is_ok(), "failed to alter database: {result:?}");
// Verify the updates by getting the database
let db_resp = ctx.api.get_database("default").await.unwrap();
@@ -180,7 +180,7 @@ async fn test_alter_database() {
.api
.alter_database("default", vec!["key1".to_string()], HashMap::new())
.await;
- assert!(result.is_ok(), "failed to remove key: {:?}", result);
+ assert!(result.is_ok(), "failed to remove key: {result:?}");
let db_resp = ctx.api.get_database("default").await.unwrap();
assert!(!db_resp.options.contains_key("key1"));
@@ -211,7 +211,7 @@ async fn test_drop_database() {
// Drop database
let result = ctx.api.drop_database("to_drop").await;
- assert!(result.is_ok(), "failed to drop database: {:?}", result);
+ assert!(result.is_ok(), "failed to drop database: {result:?}");
// Verify database is gone
let dbs = ctx.api.list_databases().await.unwrap();
@@ -278,8 +278,7 @@ async fn test_list_tables_empty_database() {
let tables = ctx.api.list_tables("default").await.unwrap();
assert!(
tables.is_empty(),
- "expected empty tables list, got: {:?}",
- tables
+ "expected empty tables list, got: {tables:?}"
);
}
@@ -323,7 +322,7 @@ async fn test_create_table() {
.api
.create_table(&Identifier::new("default", "new_table"), schema)
.await;
- assert!(result.is_ok(), "failed to create table: {:?}", result);
+ assert!(result.is_ok(), "failed to create table: {result:?}");
// Verify table exists
let tables = ctx.api.list_tables("default").await.unwrap();
@@ -354,7 +353,7 @@ async fn test_drop_table() {
.api
.drop_table(&Identifier::new("default", "table_to_drop"))
.await;
- assert!(result.is_ok(), "failed to drop table: {:?}", result);
+ assert!(result.is_ok(), "failed to drop table: {result:?}");
// Verify table is gone
let tables = ctx.api.list_tables("default").await.unwrap();
@@ -398,7 +397,7 @@ async fn test_rename_table() {
&Identifier::new("default", "new_table"),
)
.await;
- assert!(result.is_ok(), "failed to rename table: {:?}", result);
+ assert!(result.is_ok(), "failed to rename table: {result:?}");
// Verify old table is gone
let tables = ctx.api.list_tables("default").await.unwrap();
diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md
index c1dc287..6e42ae3 100644
--- a/docs/src/getting-started.md
+++ b/docs/src/getting-started.md
@@ -157,7 +157,7 @@ let df = ctx.sql("SELECT * FROM my_table").await?;
df.show().await?;
```
-> **Note:** The DataFusion integration currently supports full table scans
only. Column projection and predicate pushdown are not yet implemented.
+> **Note:** The DataFusion integration supports full table scans and column
projection. Predicate pushdown is not yet implemented.
## Building from Source