This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ed8b7f  feat(datafusion): parallelize paimon split execution (#169)
5ed8b7f is described below

commit 5ed8b7fddf9feeb0466795a8bb8cb936d896e8fe
Author: Zach <[email protected]>
AuthorDate: Wed Apr 1 21:46:02 2026 +0800

    feat(datafusion): parallelize paimon split execution (#169)
---
 crates/integrations/datafusion/src/lib.rs          |  2 +-
 .../datafusion/src/physical_plan/scan.rs           | 89 +++++++++++++++++++---
 crates/integrations/datafusion/src/table/mod.rs    | 57 +++++++++++++-
 .../integrations/datafusion/tests/read_tables.rs   | 52 +++++++++++++
 4 files changed, 187 insertions(+), 13 deletions(-)

diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index edfe1ed..6dd5272 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -33,7 +33,7 @@
 //! let df = ctx.sql("SELECT * FROM my_table").await?;
 //! ```
 //!
-//! This version does not support write, column projection, or predicate 
pushdown.
+//! This version does not support write or predicate pushdown.
 
 mod error;
 mod physical_plan;
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index fcb1497..dd27612 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -27,15 +27,24 @@ use 
datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, 
PlanProperties};
 use futures::{StreamExt, TryStreamExt};
 use paimon::table::Table;
+use paimon::DataSplit;
 
 use crate::error::to_datafusion_error;
 
 /// Execution plan that scans a Paimon table with optional column projection.
+///
+/// Planning is performed eagerly in 
[`super::super::table::PaimonTableProvider::scan`],
+/// and the resulting splits are distributed across DataFusion execution 
partitions
+/// so that DataFusion can schedule them in parallel.
 #[derive(Debug)]
 pub struct PaimonTableScan {
     table: Table,
     /// Projected column names (if None, reads all columns).
     projected_columns: Option<Vec<String>>,
+    /// Pre-planned partition assignments: `planned_partitions[i]` contains the
+    /// Paimon splits that DataFusion partition `i` will read.
+    /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in 
`execute()`.
+    planned_partitions: Vec<Arc<[DataSplit]>>,
     plan_properties: PlanProperties,
 }
 
@@ -44,20 +53,18 @@ impl PaimonTableScan {
         schema: ArrowSchemaRef,
         table: Table,
         projected_columns: Option<Vec<String>>,
+        planned_partitions: Vec<Arc<[DataSplit]>>,
     ) -> Self {
         let plan_properties = PlanProperties::new(
             EquivalenceProperties::new(schema.clone()),
-            // TODO: Currently all Paimon splits are read in a single 
DataFusion partition,
-            // which means we lose DataFusion parallelism. A follow-up should 
expose one
-            // execution partition per Paimon split so that DataFusion can 
schedule them
-            // across threads.
-            Partitioning::UnknownPartitioning(1),
+            Partitioning::UnknownPartitioning(planned_partitions.len()),
             EmissionType::Incremental,
             Boundedness::Bounded,
         );
         Self {
             table,
             projected_columns,
+            planned_partitions,
             plan_properties,
         }
     }
@@ -93,9 +100,16 @@ impl ExecutionPlan for PaimonTableScan {
 
     fn execute(
         &self,
-        _partition: usize,
+        partition: usize,
         _context: Arc<TaskContext>,
     ) -> DFResult<SendableRecordBatchStream> {
+        let splits = 
Arc::clone(self.planned_partitions.get(partition).ok_or_else(|| {
+            datafusion::error::DataFusionError::Internal(format!(
+                "PaimonTableScan: partition index {partition} out of range 
(total {})",
+                self.planned_partitions.len()
+            ))
+        })?);
+
         let table = self.table.clone();
         let schema = self.schema();
         let projected_columns = self.projected_columns.clone();
@@ -103,16 +117,13 @@ impl ExecutionPlan for PaimonTableScan {
         let fut = async move {
             let mut read_builder = table.new_read_builder();
 
-            // Apply projection if specified
             if let Some(ref columns) = projected_columns {
                 let col_refs: Vec<&str> = columns.iter().map(|s| 
s.as_str()).collect();
                 read_builder.with_projection(&col_refs);
             }
 
-            let scan = read_builder.new_scan();
-            let plan = scan.plan().await.map_err(to_datafusion_error)?;
             let read = read_builder.new_read().map_err(to_datafusion_error)?;
-            let stream = 
read.to_arrow(plan.splits()).map_err(to_datafusion_error)?;
+            let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?;
             let stream = stream.map(|r| r.map_err(to_datafusion_error));
 
             Ok::<_, 
datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new(
@@ -135,6 +146,62 @@ impl DisplayAs for PaimonTableScan {
         _t: datafusion::physical_plan::DisplayFormatType,
         f: &mut std::fmt::Formatter,
     ) -> std::fmt::Result {
-        write!(f, "PaimonTableScan")
+        write!(
+            f,
+            "PaimonTableScan: partitions={}",
+            self.planned_partitions.len()
+        )
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, 
Schema};
+    use datafusion::physical_plan::ExecutionPlan;
+
+    fn test_schema() -> ArrowSchemaRef {
+        Arc::new(Schema::new(vec![Field::new(
+            "id",
+            ArrowDataType::Int32,
+            false,
+        )]))
+    }
+
+    #[test]
+    fn test_partition_count_empty_plan() {
+        let schema = test_schema();
+        let scan = PaimonTableScan::new(schema, dummy_table(), None, 
vec![Arc::from(Vec::new())]);
+        assert_eq!(scan.properties().output_partitioning().partition_count(), 
1);
+    }
+
+    #[test]
+    fn test_partition_count_multiple_partitions() {
+        let schema = test_schema();
+        let planned_partitions = vec![
+            Arc::from(Vec::new()),
+            Arc::from(Vec::new()),
+            Arc::from(Vec::new()),
+        ];
+        let scan = PaimonTableScan::new(schema, dummy_table(), None, 
planned_partitions);
+        assert_eq!(scan.properties().output_partitioning().partition_count(), 
3);
+    }
+
+    /// Constructs a minimal Table for testing (no real files needed since we
+    /// only test PlanProperties, not actual reads).
+    fn dummy_table() -> Table {
+        use paimon::catalog::Identifier;
+        use paimon::io::FileIOBuilder;
+        use paimon::spec::{Schema, TableSchema};
+
+        let file_io = FileIOBuilder::new("file").build().unwrap();
+        let schema = Schema::builder().build().unwrap();
+        let table_schema = TableSchema::new(0, &schema);
+        Table::new(
+            file_io,
+            Identifier::new("test_db", "test_table"),
+            "/tmp/test-table".to_string(),
+            table_schema,
+        )
     }
 }
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index 1ba06f4..2e0a49e 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -29,6 +29,7 @@ use datafusion::logical_expr::Expr;
 use datafusion::physical_plan::ExecutionPlan;
 use paimon::table::Table;
 
+use crate::error::to_datafusion_error;
 use crate::physical_plan::PaimonTableScan;
 use crate::schema::paimon_schema_to_arrow;
 
@@ -57,6 +58,15 @@ impl PaimonTableProvider {
     }
 }
 
+/// Distribute `items` into `num_buckets` groups using round-robin assignment.
+fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
+    let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| 
Vec::new()).collect();
+    for (i, item) in items.into_iter().enumerate() {
+        buckets[i % num_buckets].push(item);
+    }
+    buckets
+}
+
 #[async_trait]
 impl TableProvider for PaimonTableProvider {
     fn as_any(&self) -> &dyn Any {
@@ -73,7 +83,7 @@ impl TableProvider for PaimonTableProvider {
 
     async fn scan(
         &self,
-        _state: &dyn Session,
+        state: &dyn Session,
         projection: Option<&Vec<usize>>,
         _filters: &[Expr],
         _limit: Option<usize>,
@@ -90,10 +100,55 @@ impl TableProvider for PaimonTableProvider {
             (self.schema.clone(), None)
         };
 
+        // Plan splits eagerly so we know partition count upfront.
+        let read_builder = self.table.new_read_builder();
+        let scan = read_builder.new_scan();
+        let plan = scan.plan().await.map_err(to_datafusion_error)?;
+
+        // Distribute splits across DataFusion partitions, capped by the
+        // session's target_partitions to avoid over-sharding with many small 
splits.
+        // Each partition's splits are wrapped in Arc to avoid deep-cloning in 
execute().
+        let splits = plan.splits().to_vec();
+        let planned_partitions: Vec<Arc<[_]>> = if splits.is_empty() {
+            // Empty plans get a single empty partition to avoid 0-partition 
edge cases.
+            vec![Arc::from(Vec::new())]
+        } else {
+            let target = state.config_options().execution.target_partitions;
+            let num_partitions = splits.len().min(target.max(1));
+            bucket_round_robin(splits, num_partitions)
+                .into_iter()
+                .map(Arc::from)
+                .collect()
+        };
+
         Ok(Arc::new(PaimonTableScan::new(
             projected_schema,
             self.table.clone(),
             projected_columns,
+            planned_partitions,
         )))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_bucket_round_robin_distributes_evenly() {
+        let result = bucket_round_robin(vec![0, 1, 2, 3, 4], 3);
+        assert_eq!(result, vec![vec![0, 3], vec![1, 4], vec![2]]);
+    }
+
+    #[test]
+    fn test_bucket_round_robin_fewer_items_than_buckets() {
+        let result = bucket_round_robin(vec![10, 20], 2);
+        assert_eq!(result, vec![vec![10], vec![20]]);
+    }
+
+    #[test]
+    fn test_bucket_round_robin_single_bucket() {
+        let result = bucket_round_robin(vec![1, 2, 3], 1);
+        assert_eq!(result, vec![vec![1, 2, 3]]);
+    }
+}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index 97bad76..dd1cad7 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -156,3 +156,55 @@ async fn test_projection_via_datafusion() {
         "Projected id values should match"
     );
 }
+
+/// Verifies that `PaimonTableProvider::scan()` produces more than one
+/// execution partition for a multi-partition table, and that the reported
+/// partition count is still capped by `target_partitions`.
+#[tokio::test]
+async fn test_scan_partition_count_respects_session_config() {
+    use datafusion::datasource::TableProvider;
+    use datafusion::prelude::SessionConfig;
+
+    let warehouse = get_test_warehouse();
+    let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create 
catalog");
+    let identifier = Identifier::new("default", "partitioned_log_table");
+    let table = catalog
+        .get_table(&identifier)
+        .await
+        .expect("Failed to get table");
+
+    let provider = PaimonTableProvider::try_new(table).expect("Failed to 
create table provider");
+
+    // With generous target_partitions, the plan should expose more than one 
partition.
+    let config = SessionConfig::new().with_target_partitions(8);
+    let ctx = SessionContext::new_with_config(config);
+    let state = ctx.state();
+    let plan = provider
+        .scan(&state, None, &[], None)
+        .await
+        .expect("scan() should succeed");
+
+    let partition_count = 
plan.properties().output_partitioning().partition_count();
+    assert!(
+        partition_count > 1,
+        "partitioned_log_table should produce >1 partitions, got 
{partition_count}"
+    );
+
+    // With target_partitions=1, all splits must be coalesced into a single 
partition
+    let config_single = SessionConfig::new().with_target_partitions(1);
+    let ctx_single = SessionContext::new_with_config(config_single);
+    let state_single = ctx_single.state();
+    let plan_single = provider
+        .scan(&state_single, None, &[], None)
+        .await
+        .expect("scan() should succeed with target_partitions=1");
+
+    assert_eq!(
+        plan_single
+            .properties()
+            .output_partitioning()
+            .partition_count(),
+        1,
+        "target_partitions=1 should coalesce all splits into exactly 1 
partition"
+    );
+}

Reply via email to