This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5ed8b7f feat(datafusion): parallelize paimon split execution (#169)
5ed8b7f is described below
commit 5ed8b7fddf9feeb0466795a8bb8cb936d896e8fe
Author: Zach <[email protected]>
AuthorDate: Wed Apr 1 21:46:02 2026 +0800
feat(datafusion): parallelize paimon split execution (#169)
---
crates/integrations/datafusion/src/lib.rs | 2 +-
.../datafusion/src/physical_plan/scan.rs | 89 +++++++++++++++++++---
crates/integrations/datafusion/src/table/mod.rs | 57 +++++++++++++-
.../integrations/datafusion/tests/read_tables.rs | 52 +++++++++++++
4 files changed, 187 insertions(+), 13 deletions(-)
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index edfe1ed..6dd5272 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -33,7 +33,7 @@
//! let df = ctx.sql("SELECT * FROM my_table").await?;
//! ```
//!
-//! This version does not support write, column projection, or predicate
pushdown.
+//! This version does not support write or predicate pushdown.
mod error;
mod physical_plan;
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index fcb1497..dd27612 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -27,15 +27,24 @@ use
datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning,
PlanProperties};
use futures::{StreamExt, TryStreamExt};
use paimon::table::Table;
+use paimon::DataSplit;
use crate::error::to_datafusion_error;
/// Execution plan that scans a Paimon table with optional column projection.
+///
+/// Planning is performed eagerly in
[`super::super::table::PaimonTableProvider::scan`],
+/// and the resulting splits are distributed across DataFusion execution
partitions
+/// so that DataFusion can schedule them in parallel.
#[derive(Debug)]
pub struct PaimonTableScan {
table: Table,
/// Projected column names (if None, reads all columns).
projected_columns: Option<Vec<String>>,
+ /// Pre-planned partition assignments: `planned_partitions[i]` contains the
+ /// Paimon splits that DataFusion partition `i` will read.
+ /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in
`execute()`.
+ planned_partitions: Vec<Arc<[DataSplit]>>,
plan_properties: PlanProperties,
}
@@ -44,20 +53,18 @@ impl PaimonTableScan {
schema: ArrowSchemaRef,
table: Table,
projected_columns: Option<Vec<String>>,
+ planned_partitions: Vec<Arc<[DataSplit]>>,
) -> Self {
let plan_properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
- // TODO: Currently all Paimon splits are read in a single
DataFusion partition,
- // which means we lose DataFusion parallelism. A follow-up should
expose one
- // execution partition per Paimon split so that DataFusion can
schedule them
- // across threads.
- Partitioning::UnknownPartitioning(1),
+ Partitioning::UnknownPartitioning(planned_partitions.len()),
EmissionType::Incremental,
Boundedness::Bounded,
);
Self {
table,
projected_columns,
+ planned_partitions,
plan_properties,
}
}
@@ -93,9 +100,16 @@ impl ExecutionPlan for PaimonTableScan {
fn execute(
&self,
- _partition: usize,
+ partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
+ let splits =
Arc::clone(self.planned_partitions.get(partition).ok_or_else(|| {
+ datafusion::error::DataFusionError::Internal(format!(
+ "PaimonTableScan: partition index {partition} out of range
(total {})",
+ self.planned_partitions.len()
+ ))
+ })?);
+
let table = self.table.clone();
let schema = self.schema();
let projected_columns = self.projected_columns.clone();
@@ -103,16 +117,13 @@ impl ExecutionPlan for PaimonTableScan {
let fut = async move {
let mut read_builder = table.new_read_builder();
- // Apply projection if specified
if let Some(ref columns) = projected_columns {
let col_refs: Vec<&str> = columns.iter().map(|s|
s.as_str()).collect();
read_builder.with_projection(&col_refs);
}
- let scan = read_builder.new_scan();
- let plan = scan.plan().await.map_err(to_datafusion_error)?;
let read = read_builder.new_read().map_err(to_datafusion_error)?;
- let stream =
read.to_arrow(plan.splits()).map_err(to_datafusion_error)?;
+ let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?;
let stream = stream.map(|r| r.map_err(to_datafusion_error));
Ok::<_,
datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new(
@@ -135,6 +146,62 @@ impl DisplayAs for PaimonTableScan {
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
- write!(f, "PaimonTableScan")
+ write!(
+ f,
+ "PaimonTableScan: partitions={}",
+ self.planned_partitions.len()
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field,
Schema};
+ use datafusion::physical_plan::ExecutionPlan;
+
+ fn test_schema() -> ArrowSchemaRef {
+ Arc::new(Schema::new(vec![Field::new(
+ "id",
+ ArrowDataType::Int32,
+ false,
+ )]))
+ }
+
+ #[test]
+ fn test_partition_count_empty_plan() {
+ let schema = test_schema();
+ let scan = PaimonTableScan::new(schema, dummy_table(), None,
vec![Arc::from(Vec::new())]);
+ assert_eq!(scan.properties().output_partitioning().partition_count(),
1);
+ }
+
+ #[test]
+ fn test_partition_count_multiple_partitions() {
+ let schema = test_schema();
+ let planned_partitions = vec![
+ Arc::from(Vec::new()),
+ Arc::from(Vec::new()),
+ Arc::from(Vec::new()),
+ ];
+ let scan = PaimonTableScan::new(schema, dummy_table(), None,
planned_partitions);
+ assert_eq!(scan.properties().output_partitioning().partition_count(),
3);
+ }
+
+ /// Constructs a minimal Table for testing (no real files needed since we
+ /// only test PlanProperties, not actual reads).
+ fn dummy_table() -> Table {
+ use paimon::catalog::Identifier;
+ use paimon::io::FileIOBuilder;
+ use paimon::spec::{Schema, TableSchema};
+
+ let file_io = FileIOBuilder::new("file").build().unwrap();
+ let schema = Schema::builder().build().unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+ Table::new(
+ file_io,
+ Identifier::new("test_db", "test_table"),
+ "/tmp/test-table".to_string(),
+ table_schema,
+ )
}
}
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index 1ba06f4..2e0a49e 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -29,6 +29,7 @@ use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use paimon::table::Table;
+use crate::error::to_datafusion_error;
use crate::physical_plan::PaimonTableScan;
use crate::schema::paimon_schema_to_arrow;
@@ -57,6 +58,15 @@ impl PaimonTableProvider {
}
}
+/// Distribute `items` into `num_buckets` groups using round-robin assignment.
+fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
+ let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_|
Vec::new()).collect();
+ for (i, item) in items.into_iter().enumerate() {
+ buckets[i % num_buckets].push(item);
+ }
+ buckets
+}
+
#[async_trait]
impl TableProvider for PaimonTableProvider {
fn as_any(&self) -> &dyn Any {
@@ -73,7 +83,7 @@ impl TableProvider for PaimonTableProvider {
async fn scan(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
@@ -90,10 +100,55 @@ impl TableProvider for PaimonTableProvider {
(self.schema.clone(), None)
};
+ // Plan splits eagerly so we know partition count upfront.
+ let read_builder = self.table.new_read_builder();
+ let scan = read_builder.new_scan();
+ let plan = scan.plan().await.map_err(to_datafusion_error)?;
+
+ // Distribute splits across DataFusion partitions, capped by the
+ // session's target_partitions to avoid over-sharding with many small
splits.
+ // Each partition's splits are wrapped in Arc to avoid deep-cloning in
execute().
+ let splits = plan.splits().to_vec();
+ let planned_partitions: Vec<Arc<[_]>> = if splits.is_empty() {
+ // Empty plans get a single empty partition to avoid 0-partition
edge cases.
+ vec![Arc::from(Vec::new())]
+ } else {
+ let target = state.config_options().execution.target_partitions;
+ let num_partitions = splits.len().min(target.max(1));
+ bucket_round_robin(splits, num_partitions)
+ .into_iter()
+ .map(Arc::from)
+ .collect()
+ };
+
Ok(Arc::new(PaimonTableScan::new(
projected_schema,
self.table.clone(),
projected_columns,
+ planned_partitions,
)))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_bucket_round_robin_distributes_evenly() {
+ let result = bucket_round_robin(vec![0, 1, 2, 3, 4], 3);
+ assert_eq!(result, vec![vec![0, 3], vec![1, 4], vec![2]]);
+ }
+
+ #[test]
+ fn test_bucket_round_robin_fewer_items_than_buckets() {
+ let result = bucket_round_robin(vec![10, 20], 2);
+ assert_eq!(result, vec![vec![10], vec![20]]);
+ }
+
+ #[test]
+ fn test_bucket_round_robin_single_bucket() {
+ let result = bucket_round_robin(vec![1, 2, 3], 1);
+ assert_eq!(result, vec![vec![1, 2, 3]]);
+ }
+}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index 97bad76..dd1cad7 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -156,3 +156,55 @@ async fn test_projection_via_datafusion() {
"Projected id values should match"
);
}
+
+/// Verifies that `PaimonTableProvider::scan()` produces more than one
+/// execution partition for a multi-partition table, and that the reported
+/// partition count is still capped by `target_partitions`.
+#[tokio::test]
+async fn test_scan_partition_count_respects_session_config() {
+ use datafusion::datasource::TableProvider;
+ use datafusion::prelude::SessionConfig;
+
+ let warehouse = get_test_warehouse();
+ let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create
catalog");
+ let identifier = Identifier::new("default", "partitioned_log_table");
+ let table = catalog
+ .get_table(&identifier)
+ .await
+ .expect("Failed to get table");
+
+ let provider = PaimonTableProvider::try_new(table).expect("Failed to
create table provider");
+
+ // With generous target_partitions, the plan should expose more than one
partition.
+ let config = SessionConfig::new().with_target_partitions(8);
+ let ctx = SessionContext::new_with_config(config);
+ let state = ctx.state();
+ let plan = provider
+ .scan(&state, None, &[], None)
+ .await
+ .expect("scan() should succeed");
+
+ let partition_count =
plan.properties().output_partitioning().partition_count();
+ assert!(
+ partition_count > 1,
+ "partitioned_log_table should produce >1 partitions, got
{partition_count}"
+ );
+
+ // With target_partitions=1, all splits must be coalesced into a single
partition
+ let config_single = SessionConfig::new().with_target_partitions(1);
+ let ctx_single = SessionContext::new_with_config(config_single);
+ let state_single = ctx_single.state();
+ let plan_single = provider
+ .scan(&state_single, None, &[], None)
+ .await
+ .expect("scan() should succeed with target_partitions=1");
+
+ assert_eq!(
+ plan_single
+ .properties()
+ .output_partitioning()
+ .partition_count(),
+ 1,
+ "target_partitions=1 should coalesce all splits into exactly 1
partition"
+ );
+}