This is an automated email from the ASF dual-hosted git repository.
hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b33b2d7 feat: Implement statistics for data fusion scan (#217)
b33b2d7 is described below
commit b33b2d75983ffde4cbc4511de470388c563bfa88
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 6 23:19:11 2026 +0800
feat: Implement statistics for data fusion scan (#217)
* feat: Implement statistics for data fusion scan
---
.../datafusion/src/physical_plan/scan.rs | 48 +++++++++++++++++++++-
1 file changed, 47 insertions(+), 1 deletion(-)
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index d7dfc7a..1a181eb 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -19,6 +19,8 @@ use std::any::Any;
use std::sync::Arc;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::common::stats::Precision;
+use datafusion::common::Statistics;
use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
@@ -165,6 +167,34 @@ impl ExecutionPlan for PaimonTableScan {
futures::stream::once(fut).try_flatten(),
)))
}
+
+ fn statistics(&self) -> DFResult<Statistics> {
+ self.partition_statistics(None)
+ }
+
+ fn partition_statistics(&self, partition: Option<usize>) ->
DFResult<Statistics> {
+ let partitions: &[Arc<[DataSplit]>] = match partition {
+ Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),
+ None => &self.planned_partitions,
+ };
+
+ let mut total_rows: usize = 0;
+ let mut total_bytes: usize = 0;
+ for splits in partitions {
+ for split in splits.iter() {
+ total_rows +=
split.merged_row_count().unwrap_or(split.row_count()) as usize;
+ for file in split.data_files() {
+ total_bytes += file.file_size as usize;
+ }
+ }
+ }
+
+ Ok(Statistics {
+ num_rows: Precision::Inexact(total_rows),
+ total_byte_size: Precision::Inexact(total_bytes),
+ column_statistics: Statistics::unknown_column(&self.schema()),
+ })
+ }
}
impl DisplayAs for PaimonTableScan {
@@ -173,11 +203,27 @@ impl DisplayAs for PaimonTableScan {
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
+ write!(f, "PaimonTableScan: table={}", self.table.identifier())?;
+
+ let total_splits: usize = self.planned_partitions.iter().map(|p|
p.len()).sum();
+ let total_files: usize = self
+ .planned_partitions
+ .iter()
+ .flat_map(|p| p.iter())
+ .map(|s| s.data_files().len())
+ .sum();
write!(
f,
- "PaimonTableScan: partitions={}",
+ ", partitions={}, splits={total_splits}, files={total_files}",
self.planned_partitions.len()
)?;
+
+ if let Some(ref columns) = self.projected_columns {
+ write!(f, ", projection=[{}]", columns.join(", "))?;
+ }
+ if let Some(ref predicate) = self.pushed_predicate {
+ write!(f, ", predicate={predicate}")?;
+ }
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}