This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b33b2d7  feat: Implement statistics for data fusion scan (#217)
b33b2d7 is described below

commit b33b2d75983ffde4cbc4511de470388c563bfa88
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 6 23:19:11 2026 +0800

    feat: Implement statistics for data fusion scan (#217)
    
    * feat: Implement statistics for data fusion scan
---
 .../datafusion/src/physical_plan/scan.rs           | 48 +++++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)

diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index d7dfc7a..1a181eb 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -19,6 +19,8 @@ use std::any::Any;
 use std::sync::Arc;
 
 use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::common::stats::Precision;
+use datafusion::common::Statistics;
 use datafusion::error::Result as DFResult;
 use datafusion::execution::{SendableRecordBatchStream, TaskContext};
 use datafusion::physical_expr::EquivalenceProperties;
@@ -165,6 +167,34 @@ impl ExecutionPlan for PaimonTableScan {
             futures::stream::once(fut).try_flatten(),
         )))
     }
+
+    fn statistics(&self) -> DFResult<Statistics> {
+        self.partition_statistics(None)
+    }
+
+    fn partition_statistics(&self, partition: Option<usize>) -> 
DFResult<Statistics> {
+        let partitions: &[Arc<[DataSplit]>] = match partition {
+            Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),
+            None => &self.planned_partitions,
+        };
+
+        let mut total_rows: usize = 0;
+        let mut total_bytes: usize = 0;
+        for splits in partitions {
+            for split in splits.iter() {
+                total_rows += 
split.merged_row_count().unwrap_or(split.row_count()) as usize;
+                for file in split.data_files() {
+                    total_bytes += file.file_size as usize;
+                }
+            }
+        }
+
+        Ok(Statistics {
+            num_rows: Precision::Inexact(total_rows),
+            total_byte_size: Precision::Inexact(total_bytes),
+            column_statistics: Statistics::unknown_column(&self.schema()),
+        })
+    }
 }
 
 impl DisplayAs for PaimonTableScan {
@@ -173,11 +203,27 @@ impl DisplayAs for PaimonTableScan {
         _t: datafusion::physical_plan::DisplayFormatType,
         f: &mut std::fmt::Formatter,
     ) -> std::fmt::Result {
+        write!(f, "PaimonTableScan: table={}", self.table.identifier())?;
+
+        let total_splits: usize = self.planned_partitions.iter().map(|p| 
p.len()).sum();
+        let total_files: usize = self
+            .planned_partitions
+            .iter()
+            .flat_map(|p| p.iter())
+            .map(|s| s.data_files().len())
+            .sum();
         write!(
             f,
-            "PaimonTableScan: partitions={}",
+            ", partitions={}, splits={total_splits}, files={total_files}",
             self.planned_partitions.len()
         )?;
+
+        if let Some(ref columns) = self.projected_columns {
+            write!(f, ", projection=[{}]", columns.join(", "))?;
+        }
+        if let Some(ref predicate) = self.pushed_predicate {
+            write!(f, ", predicate={predicate}")?;
+        }
         if let Some(limit) = self.limit {
             write!(f, ", limit={limit}")?;
         }

Reply via email to