This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e0dde0890 fix(stats): widen sum_value integer arithmetic to 
SUM-compatible types (#20865)
6e0dde0890 is described below

commit 6e0dde0890ef967183f5fa828195a44cbf99b870
Author: Kumar Ujjawal <[email protected]>
AuthorDate: Wed Mar 25 20:49:26 2026 +0530

    fix(stats): widen sum_value integer arithmetic to SUM-compatible types 
(#20865)
    
    ## Which issue does this PR close?
    
    
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #20826.
    
    ## Rationale for this change
    
    As discussed in the review thread on #20768 and tracked by #20826,
    `sum_value` should not keep narrow integer column types during stats
    aggregation, because merge/multiply paths can overflow before values are
    widened.
    
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    This PR updates statistics `sum_value` arithmetic to match SUM-style
    widening for small integer types, and applies that behavior consistently
    across merge and multiplication paths.
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    Yes
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 datafusion/common/src/stats.rs                   | 133 +++++++++++++++++++++--
 datafusion/datasource/src/statistics.rs          |  81 +++++++++++++-
 datafusion/physical-expr/src/projection.rs       |  42 ++++++-
 datafusion/physical-plan/src/joins/cross_join.rs |  85 +++++++++++----
 datafusion/physical-plan/src/union.rs            |   2 +-
 5 files changed, 304 insertions(+), 39 deletions(-)

diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index 7d3d511ca7..4cf5cc3661 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -203,6 +203,24 @@ impl Precision<usize> {
 }
 
 impl Precision<ScalarValue> {
+    fn sum_data_type(data_type: &DataType) -> DataType {
+        match data_type {
+            DataType::Int8 | DataType::Int16 | DataType::Int32 => 
DataType::Int64,
+            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => 
DataType::UInt64,
+            _ => data_type.clone(),
+        }
+    }
+
+    fn cast_scalar_to_sum_type(value: &ScalarValue) -> Result<ScalarValue> {
+        let source_type = value.data_type();
+        let target_type = Self::sum_data_type(&source_type);
+        if source_type == target_type {
+            Ok(value.clone())
+        } else {
+            value.cast_to(&target_type)
+        }
+    }
+
     /// Calculates the sum of two (possibly inexact) [`ScalarValue`] values,
     /// conservatively propagating exactness information. If one of the input
     /// values is [`Precision::Absent`], the result is `Absent` too.
@@ -228,6 +246,31 @@ impl Precision<ScalarValue> {
         }
     }
 
+    /// Casts integer values to the wider SQL `SUM` return type.
+    ///
+    /// This narrows overflow risk when `sum_value` statistics are merged:
+    /// `Int8/Int16/Int32 -> Int64` and `UInt8/UInt16/UInt32 -> UInt64`.
+    pub fn cast_to_sum_type(&self) -> Precision<ScalarValue> {
+        match (self.is_exact(), self.get_value()) {
+            (Some(true), Some(value)) => Self::cast_scalar_to_sum_type(value)
+                .map(Precision::Exact)
+                .unwrap_or(Precision::Absent),
+            (Some(false), Some(value)) => Self::cast_scalar_to_sum_type(value)
+                .map(Precision::Inexact)
+                .unwrap_or(Precision::Absent),
+            (_, _) => Precision::Absent,
+        }
+    }
+
+    /// SUM-style addition with integer widening to match SQL `SUM` return
+    /// types for smaller integral inputs.
+    pub fn add_for_sum(&self, other: &Precision<ScalarValue>) -> 
Precision<ScalarValue> {
+        let mut lhs = self.cast_to_sum_type();
+        let rhs = other.cast_to_sum_type();
+        precision_add(&mut lhs, &rhs);
+        lhs
+    }
+
     /// Calculates the difference of two (possibly inexact) [`ScalarValue`] 
values,
     /// conservatively propagating exactness information. If one of the input
     /// values is [`Precision::Absent`], the result is `Absent` too.
@@ -620,7 +663,7 @@ impl Statistics {
     /// assert_eq!(merged.column_statistics[0].max_value,
     ///     Precision::Exact(ScalarValue::from(200)));
     /// assert_eq!(merged.column_statistics[0].sum_value,
-    ///     Precision::Exact(ScalarValue::from(1500)));
+    ///     Precision::Exact(ScalarValue::Int64(Some(1500))));
     /// ```
     pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> 
Result<Statistics>
     where
@@ -664,7 +707,7 @@ impl Statistics {
                 null_count: cs.null_count,
                 max_value: cs.max_value.clone(),
                 min_value: cs.min_value.clone(),
-                sum_value: cs.sum_value.clone(),
+                sum_value: cs.sum_value.cast_to_sum_type(),
                 distinct_count: cs.distinct_count,
                 byte_size: cs.byte_size,
             })
@@ -693,7 +736,8 @@ impl Statistics {
                 };
                 col_stats.min_value = 
col_stats.min_value.min(&item_cs.min_value);
                 col_stats.max_value = 
col_stats.max_value.max(&item_cs.max_value);
-                precision_add(&mut col_stats.sum_value, &item_cs.sum_value);
+                let item_sum_value = item_cs.sum_value.cast_to_sum_type();
+                precision_add(&mut col_stats.sum_value, &item_sum_value);
                 col_stats.byte_size = 
col_stats.byte_size.add(&item_cs.byte_size);
             }
         }
@@ -877,7 +921,15 @@ pub struct ColumnStatistics {
     pub max_value: Precision<ScalarValue>,
     /// Minimum value of column
     pub min_value: Precision<ScalarValue>,
-    /// Sum value of a column
+    /// Sum value of a column.
+    ///
+    /// For integral columns, values should be kept in SUM-compatible widened
+    /// types (`Int8/Int16/Int32 -> Int64`, `UInt8/UInt16/UInt32 -> UInt64`) to
+    /// reduce overflow risk during statistics propagation.
+    ///
+    /// Callers should prefer [`ColumnStatistics::with_sum_value`] for setting
+    /// this field and [`Precision<ScalarValue>::add_for_sum`] /
+    /// [`Precision<ScalarValue>::cast_to_sum_type`] for sum arithmetic.
     pub sum_value: Precision<ScalarValue>,
     /// Number of distinct values
     pub distinct_count: Precision<usize>,
@@ -942,7 +994,19 @@ impl ColumnStatistics {
 
     /// Set the sum value
     pub fn with_sum_value(mut self, sum_value: Precision<ScalarValue>) -> Self 
{
-        self.sum_value = sum_value;
+        self.sum_value = match sum_value {
+            Precision::Exact(value) => {
+                Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
+                    .map(Precision::Exact)
+                    .unwrap_or(Precision::Absent)
+            }
+            Precision::Inexact(value) => {
+                Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
+                    .map(Precision::Inexact)
+                    .unwrap_or(Precision::Absent)
+            }
+            Precision::Absent => Precision::Absent,
+        };
         self
     }
 
@@ -1095,6 +1159,45 @@ mod tests {
         assert_eq!(precision.add(&Precision::Absent), Precision::Absent);
     }
 
+    #[test]
+    fn test_add_for_sum_scalar_integer_widening() {
+        let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
+
+        assert_eq!(
+            
precision.add_for_sum(&Precision::Exact(ScalarValue::Int32(Some(23)))),
+            Precision::Exact(ScalarValue::Int64(Some(65))),
+        );
+        assert_eq!(
+            
precision.add_for_sum(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
+            Precision::Inexact(ScalarValue::Int64(Some(65))),
+        );
+    }
+
+    #[test]
+    fn test_add_for_sum_prevents_int32_overflow() {
+        let lhs = Precision::Exact(ScalarValue::Int32(Some(i32::MAX)));
+        let rhs = Precision::Exact(ScalarValue::Int32(Some(1)));
+
+        assert_eq!(
+            lhs.add_for_sum(&rhs),
+            Precision::Exact(ScalarValue::Int64(Some(i64::from(i32::MAX) + 
1))),
+        );
+    }
+
+    #[test]
+    fn test_add_for_sum_scalar_unsigned_integer_widening() {
+        let precision = Precision::Exact(ScalarValue::UInt32(Some(42)));
+
+        assert_eq!(
+            
precision.add_for_sum(&Precision::Exact(ScalarValue::UInt32(Some(23)))),
+            Precision::Exact(ScalarValue::UInt64(Some(65))),
+        );
+        assert_eq!(
+            
precision.add_for_sum(&Precision::Inexact(ScalarValue::UInt32(Some(23)))),
+            Precision::Inexact(ScalarValue::UInt64(Some(65))),
+        );
+    }
+
     #[test]
     fn test_sub() {
         let precision1 = Precision::Exact(42);
@@ -1340,7 +1443,7 @@ mod tests {
         );
         assert_eq!(
             col1_stats.sum_value,
-            Precision::Exact(ScalarValue::Int32(Some(1100)))
+            Precision::Exact(ScalarValue::Int64(Some(1100)))
         ); // 500 + 600
 
         let col2_stats = &summary_stats.column_statistics[1];
@@ -1355,7 +1458,7 @@ mod tests {
         );
         assert_eq!(
             col2_stats.sum_value,
-            Precision::Exact(ScalarValue::Int32(Some(2200)))
+            Precision::Exact(ScalarValue::Int64(Some(2200)))
         ); // 1000 + 1200
     }
 
@@ -1997,6 +2100,16 @@ mod tests {
         assert_eq!(col_stats.byte_size, Precision::Exact(8192));
     }
 
+    #[test]
+    fn test_with_sum_value_builder_widens_small_integers() {
+        let col_stats = ColumnStatistics::new_unknown()
+            .with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(123))));
+        assert_eq!(
+            col_stats.sum_value,
+            Precision::Exact(ScalarValue::UInt64(Some(123)))
+        );
+    }
+
     #[test]
     fn test_with_fetch_scales_byte_size() {
         // Test that byte_size is scaled by the row ratio in with_fetch
@@ -2144,7 +2257,7 @@ mod tests {
         );
         assert_eq!(
             col1_stats.sum_value,
-            Precision::Exact(ScalarValue::Int32(Some(1100)))
+            Precision::Exact(ScalarValue::Int64(Some(1100)))
         );
 
         let col2_stats = &summary_stats.column_statistics[1];
@@ -2159,7 +2272,7 @@ mod tests {
         );
         assert_eq!(
             col2_stats.sum_value,
-            Precision::Exact(ScalarValue::Int32(Some(2200)))
+            Precision::Exact(ScalarValue::Int64(Some(2200)))
         );
     }
 
@@ -2508,7 +2621,7 @@ mod tests {
         );
         assert_eq!(
             col_stats.sum_value,
-            Precision::Inexact(ScalarValue::Int32(Some(1500)))
+            Precision::Inexact(ScalarValue::Int64(Some(1500)))
         );
     }
 }
diff --git a/datafusion/datasource/src/statistics.rs 
b/datafusion/datasource/src/statistics.rs
index b1a56e096c..e5a1e4613b 100644
--- a/datafusion/datasource/src/statistics.rs
+++ b/datafusion/datasource/src/statistics.rs
@@ -293,7 +293,7 @@ fn sort_columns_from_physical_sort_exprs(
     since = "47.0.0",
     note = "Please use `get_files_with_limit` and  
`compute_all_files_statistics` instead"
 )]
-#[expect(unused)]
+#[cfg_attr(not(test), expect(unused))]
 pub async fn get_statistics_with_limit(
     all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
     file_schema: SchemaRef,
@@ -329,7 +329,7 @@ pub async fn get_statistics_with_limit(
             col_stats_set[index].null_count = file_column.null_count;
             col_stats_set[index].max_value = file_column.max_value;
             col_stats_set[index].min_value = file_column.min_value;
-            col_stats_set[index].sum_value = file_column.sum_value;
+            col_stats_set[index].sum_value = 
file_column.sum_value.cast_to_sum_type();
         }
 
         // If the number of rows exceeds the limit, we can stop processing
@@ -374,7 +374,7 @@ pub async fn get_statistics_with_limit(
                     col_stats.null_count = col_stats.null_count.add(file_nc);
                     col_stats.max_value = col_stats.max_value.max(file_max);
                     col_stats.min_value = col_stats.min_value.min(file_min);
-                    col_stats.sum_value = col_stats.sum_value.add(file_sum);
+                    col_stats.sum_value = 
col_stats.sum_value.add_for_sum(file_sum);
                     col_stats.byte_size = col_stats.byte_size.add(file_sbs);
                 }
 
@@ -497,3 +497,78 @@ pub fn add_row_stats(
 ) -> Precision<usize> {
     file_num_rows.add(&num_rows)
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::PartitionedFile;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use futures::stream;
+
+    fn file_stats(sum: u32) -> Statistics {
+        Statistics {
+            num_rows: Precision::Exact(1),
+            total_byte_size: Precision::Exact(4),
+            column_statistics: vec![ColumnStatistics {
+                null_count: Precision::Exact(0),
+                max_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
+                min_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
+                sum_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
+                distinct_count: Precision::Exact(1),
+                byte_size: Precision::Exact(4),
+            }],
+        }
+    }
+
+    #[tokio::test]
+    #[expect(deprecated)]
+    async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
+    -> Result<()> {
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, 
true)]));
+
+        let files = stream::iter(vec![Ok((
+            PartitionedFile::new("f1.parquet", 1),
+            Arc::new(file_stats(100)),
+        ))]);
+
+        let (_group, stats) =
+            get_statistics_with_limit(files, schema, None, false).await?;
+
+        assert_eq!(
+            stats.column_statistics[0].sum_value,
+            Precision::Exact(ScalarValue::UInt64(Some(100)))
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[expect(deprecated)]
+    async fn test_get_statistics_with_limit_merges_sum_with_unsigned_widening()
+    -> Result<()> {
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, 
true)]));
+
+        let files = stream::iter(vec![
+            Ok((
+                PartitionedFile::new("f1.parquet", 1),
+                Arc::new(file_stats(100)),
+            )),
+            Ok((
+                PartitionedFile::new("f2.parquet", 1),
+                Arc::new(file_stats(200)),
+            )),
+        ]);
+
+        let (_group, stats) =
+            get_statistics_with_limit(files, schema, None, true).await?;
+
+        assert_eq!(
+            stats.column_statistics[0].sum_value,
+            Precision::Exact(ScalarValue::UInt64(Some(300)))
+        );
+
+        Ok(())
+    }
+}
diff --git a/datafusion/physical-expr/src/projection.rs 
b/datafusion/physical-expr/src/projection.rs
index b9f98c03da..e133e5a849 100644
--- a/datafusion/physical-expr/src/projection.rs
+++ b/datafusion/physical-expr/src/projection.rs
@@ -693,12 +693,15 @@ impl ProjectionExprs {
                         Precision::Absent
                     };
 
-                    let sum_value = 
Precision::<ScalarValue>::from(stats.num_rows)
-                        .cast_to(&value.data_type())
-                        .ok()
-                        .map(|row_count| {
-                            
Precision::Exact(value.clone()).multiply(&row_count)
+                    let widened_sum = 
Precision::Exact(value.clone()).cast_to_sum_type();
+                    let sum_value = widened_sum
+                        .get_value()
+                        .and_then(|sum| {
+                            Precision::<ScalarValue>::from(stats.num_rows)
+                                .cast_to(&sum.data_type())
+                                .ok()
                         })
+                        .map(|row_count| widened_sum.multiply(&row_count))
                         .unwrap_or(Precision::Absent);
 
                     ColumnStatistics {
@@ -2864,6 +2867,35 @@ pub(crate) mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_project_statistics_with_i32_literal_sum_widens_to_i64() -> 
Result<()> {
+        let input_stats = get_stats();
+        let input_schema = get_schema();
+
+        let projection = ProjectionExprs::new(vec![
+            ProjectionExpr {
+                expr: Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
+                alias: "constant".to_string(),
+            },
+            ProjectionExpr {
+                expr: Arc::new(Column::new("col0", 0)),
+                alias: "num".to_string(),
+            },
+        ]);
+
+        let output_stats = projection.project_statistics(
+            input_stats,
+            &projection.project_schema(&input_schema)?,
+        )?;
+
+        assert_eq!(
+            output_stats.column_statistics[0].sum_value,
+            Precision::Exact(ScalarValue::Int64(Some(50)))
+        );
+
+        Ok(())
+    }
+
     // Test statistics calculation for NULL literal (constant NULL column)
     #[test]
     fn test_project_statistics_with_null_literal() -> Result<()> {
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index a895f69dc5..b64de91d95 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -458,32 +458,34 @@ fn stats_cartesian_product(
     // Min, max and distinct_count on the other hand are invariants.
     let cross_join_stats = left_col_stats
         .into_iter()
-        .map(|s| ColumnStatistics {
-            null_count: s.null_count.multiply(&right_row_count),
-            distinct_count: s.distinct_count,
-            min_value: s.min_value,
-            max_value: s.max_value,
-            sum_value: s
-                .sum_value
-                .get_value()
-                // Cast the row count into the same type as any existing sum 
value
-                .and_then(|v| {
-                    Precision::<ScalarValue>::from(right_row_count)
-                        .cast_to(&v.data_type())
-                        .ok()
-                })
-                .map(|row_count| s.sum_value.multiply(&row_count))
-                .unwrap_or(Precision::Absent),
-            byte_size: Precision::Absent,
+        .map(|s| {
+            let widened_sum = s.sum_value.cast_to_sum_type();
+            ColumnStatistics {
+                null_count: s.null_count.multiply(&right_row_count),
+                distinct_count: s.distinct_count,
+                min_value: s.min_value,
+                max_value: s.max_value,
+                sum_value: widened_sum
+                    .get_value()
+                    // Cast the row count into the same type as any existing 
sum value
+                    .and_then(|v| {
+                        Precision::<ScalarValue>::from(right_row_count)
+                            .cast_to(&v.data_type())
+                            .ok()
+                    })
+                    .map(|row_count| widened_sum.multiply(&row_count))
+                    .unwrap_or(Precision::Absent),
+                byte_size: Precision::Absent,
+            }
         })
         .chain(right_col_stats.into_iter().map(|s| {
+            let widened_sum = s.sum_value.cast_to_sum_type();
             ColumnStatistics {
                 null_count: s.null_count.multiply(&left_row_count),
                 distinct_count: s.distinct_count,
                 min_value: s.min_value,
                 max_value: s.max_value,
-                sum_value: s
-                    .sum_value
+                sum_value: widened_sum
                     .get_value()
                     // Cast the row count into the same type as any existing 
sum value
                     .and_then(|v| {
@@ -491,7 +493,7 @@ fn stats_cartesian_product(
                             .cast_to(&v.data_type())
                             .ok()
                     })
-                    .map(|row_count| s.sum_value.multiply(&row_count))
+                    .map(|row_count| widened_sum.multiply(&row_count))
                     .unwrap_or(Precision::Absent),
                 byte_size: Precision::Absent,
             }
@@ -875,6 +877,49 @@ mod tests {
         assert_eq!(result, expected);
     }
 
+    #[tokio::test]
+    async fn test_stats_cartesian_product_unsigned_sum_widens_to_u64() {
+        let left_row_count = 2;
+        let right_row_count = 3;
+
+        let left = Statistics {
+            num_rows: Precision::Exact(left_row_count),
+            total_byte_size: Precision::Exact(10),
+            column_statistics: vec![ColumnStatistics {
+                distinct_count: Precision::Exact(2),
+                max_value: Precision::Exact(ScalarValue::UInt32(Some(10))),
+                min_value: Precision::Exact(ScalarValue::UInt32(Some(1))),
+                sum_value: Precision::Exact(ScalarValue::UInt32(Some(7))),
+                null_count: Precision::Exact(0),
+                byte_size: Precision::Absent,
+            }],
+        };
+
+        let right = Statistics {
+            num_rows: Precision::Exact(right_row_count),
+            total_byte_size: Precision::Exact(10),
+            column_statistics: vec![ColumnStatistics {
+                distinct_count: Precision::Exact(3),
+                max_value: Precision::Exact(ScalarValue::UInt32(Some(12))),
+                min_value: Precision::Exact(ScalarValue::UInt32(Some(0))),
+                sum_value: Precision::Exact(ScalarValue::UInt32(Some(11))),
+                null_count: Precision::Exact(0),
+                byte_size: Precision::Absent,
+            }],
+        };
+
+        let result = stats_cartesian_product(left, right);
+
+        assert_eq!(
+            result.column_statistics[0].sum_value,
+            Precision::Exact(ScalarValue::UInt64(Some(21)))
+        );
+        assert_eq!(
+            result.column_statistics[1].sum_value,
+            Precision::Exact(ScalarValue::UInt64(Some(22)))
+        );
+    }
+
     #[tokio::test]
     async fn test_join() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index 218eb50015..eb16375a2d 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -857,7 +857,7 @@ fn col_stats_union(
     left.distinct_count = union_distinct_count(&left, right);
     left.min_value = left.min_value.min(&right.min_value);
     left.max_value = left.max_value.max(&right.max_value);
-    left.sum_value = left.sum_value.add(&right.sum_value);
+    left.sum_value = left.sum_value.add_for_sum(&right.sum_value);
     left.null_count = left.null_count.add(&right.null_count);
 
     left


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to