xudong963 commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2113890230


##########
datafusion/common/src/pruning.rs:
##########
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
         values: &HashSet<ScalarValue>,
     ) -> Option<BooleanArray>;
 }
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+    /// Values for each column for each container.
+    /// The outer vectors represent the columns while the inner
+    /// vectors represent the containers.
+    /// The order must match the order of the partition columns in
+    /// [`PartitionPruningStatistics::partition_schema`].
+    partition_values: Vec<ArrayRef>,
+    /// The number of containers.
+    /// Stored since the partition values are column-major and if
+    /// there are no columns we wouldn't know the number of containers.
+    num_containers: usize,
+    /// The schema of the partition columns.
+    /// This must **not** be the schema of the entire file or table:
+    /// it must only be the schema of the partition columns,
+    /// in the same order as the values in 
[`PartitionPruningStatistics::partition_values`].
+    partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+    /// Create a new instance of [`PartitionPruningStatistics`].
+    ///
+    /// Args:
+    /// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+    ///   The outer vector represents the containers while the inner
+    ///   vector represents the partition values for each column.
+    ///   Note that this is the **opposite** of the order of the
+    ///   partition columns in `PartitionPruningStatistics::partition_schema`.
+    /// * `partition_schema`: The schema of the partition columns.
+    ///   This must **not** be the schema of the entire file or table:
+    ///   instead it must only be the schema of the partition columns,
+    ///   in the same order as the values in `partition_values`.
+    pub fn try_new(
+        partition_values: Vec<Vec<ScalarValue>>,
+        partition_fields: Vec<FieldRef>,
+    ) -> Result<Self, DataFusionError> {
+        let num_containers = partition_values.len();
+        let partition_schema = Arc::new(Schema::new(partition_fields));
+        let mut partition_values_by_column =
+            vec![
+                Vec::with_capacity(partition_values.len());
+                partition_schema.fields().len()
+            ];
+        for partition_value in partition_values {
+            for (i, value) in partition_value.into_iter().enumerate() {
+                partition_values_by_column[i].push(value);
+            }
+        }
+        Ok(Self {
+            partition_values: partition_values_by_column
+                .into_iter()
+                .map(|v| {
+                    if v.is_empty() {
+                        Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+                    } else {
+                        ScalarValue::iter_to_array(v)
+                    }
+                })
+                .collect::<Result<Vec<_>, _>>()?,
+            num_containers,
+            partition_schema,
+        })
+    }
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        self.partition_values.get(index).and_then(|v| {
+            if v.is_empty() || v.null_count() == v.len() {
+                // If the array is empty or all nulls, return None
+                None
+            } else {
+                // Otherwise, return the array as is
+                Some(Arc::clone(v))
+            }
+        })
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        self.min_values(column)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.num_containers
+    }
+
+    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
+    }
+
+    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
+    }
+
+    fn contained(
+        &self,
+        column: &Column,
+        values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        let array = self.partition_values.get(index)?;
+        let boolean_array = values.iter().try_fold(None, |acc, v| {
+            let arrow_value = v.to_scalar().ok()?;
+            let eq_result = arrow::compute::kernels::cmp::eq(array, 
&arrow_value).ok()?;
+            match acc {
+                None => Some(Some(eq_result)),
+                Some(acc_array) => {
+                    arrow::compute::kernels::boolean::and(&acc_array, 
&eq_result)
+                        .map(Some)
+                        .ok()
+                }
+            }
+        })??;
+        // If the boolean array is empty or all null values, return None
+        if boolean_array.is_empty() || boolean_array.null_count() == 
boolean_array.len() {
+            None
+        } else {
+            Some(boolean_array)
+        }
+    }
+}
+
+/// Prune a set of containers represented by their statistics.
+/// Each [`Statistics`] represents a container (e.g. a file or a partition of 
files).

Review Comment:
   What does `a partition of files` mean? A `FileGroup` or just a collection of 
some files?



##########
datafusion/common/src/pruning.rs:
##########
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
         values: &HashSet<ScalarValue>,
     ) -> Option<BooleanArray>;
 }
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+    /// Values for each column for each container.
+    /// The outer vectors represent the columns while the inner
+    /// vectors represent the containers.
+    /// The order must match the order of the partition columns in
+    /// [`PartitionPruningStatistics::partition_schema`].
+    partition_values: Vec<ArrayRef>,
+    /// The number of containers.
+    /// Stored since the partition values are column-major and if
+    /// there are no columns we wouldn't know the number of containers.
+    num_containers: usize,
+    /// The schema of the partition columns.
+    /// This must **not** be the schema of the entire file or table:
+    /// it must only be the schema of the partition columns,
+    /// in the same order as the values in 
[`PartitionPruningStatistics::partition_values`].
+    partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+    /// Create a new instance of [`PartitionPruningStatistics`].
+    ///
+    /// Args:
+    /// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+    ///   The outer vector represents the containers while the inner
+    ///   vector represents the partition values for each column.
+    ///   Note that this is the **opposite** of the order of the
+    ///   partition columns in `PartitionPruningStatistics::partition_schema`.
+    /// * `partition_schema`: The schema of the partition columns.
+    ///   This must **not** be the schema of the entire file or table:
+    ///   instead it must only be the schema of the partition columns,
+    ///   in the same order as the values in `partition_values`.
+    pub fn try_new(
+        partition_values: Vec<Vec<ScalarValue>>,
+        partition_fields: Vec<FieldRef>,
+    ) -> Result<Self, DataFusionError> {
+        let num_containers = partition_values.len();
+        let partition_schema = Arc::new(Schema::new(partition_fields));
+        let mut partition_values_by_column =
+            vec![
+                Vec::with_capacity(partition_values.len());
+                partition_schema.fields().len()
+            ];
+        for partition_value in partition_values {
+            for (i, value) in partition_value.into_iter().enumerate() {
+                partition_values_by_column[i].push(value);
+            }
+        }
+        Ok(Self {
+            partition_values: partition_values_by_column
+                .into_iter()
+                .map(|v| {
+                    if v.is_empty() {
+                        Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+                    } else {
+                        ScalarValue::iter_to_array(v)
+                    }
+                })
+                .collect::<Result<Vec<_>, _>>()?,
+            num_containers,
+            partition_schema,
+        })
+    }
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        self.partition_values.get(index).and_then(|v| {
+            if v.is_empty() || v.null_count() == v.len() {
+                // If the array is empty or all nulls, return None
+                None
+            } else {
+                // Otherwise, return the array as is
+                Some(Arc::clone(v))
+            }
+        })
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        self.min_values(column)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.num_containers
+    }
+
+    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
+    }
+
+    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
+    }
+
+    fn contained(
+        &self,
+        column: &Column,
+        values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        let array = self.partition_values.get(index)?;
+        let boolean_array = values.iter().try_fold(None, |acc, v| {
+            let arrow_value = v.to_scalar().ok()?;
+            let eq_result = arrow::compute::kernels::cmp::eq(array, 
&arrow_value).ok()?;
+            match acc {
+                None => Some(Some(eq_result)),
+                Some(acc_array) => {
+                    arrow::compute::kernels::boolean::and(&acc_array, 
&eq_result)
+                        .map(Some)
+                        .ok()
+                }
+            }
+        })??;
+        // If the boolean array is empty or all null values, return None
+        if boolean_array.is_empty() || boolean_array.null_count() == 
boolean_array.len() {
+            None
+        } else {
+            Some(boolean_array)
+        }
+    }
+}
+
+/// Prune a set of containers represented by their statistics.
+/// Each [`Statistics`] represents a container (e.g. a file or a partition of 
files).
+#[derive(Clone)]
+pub struct PrunableStatistics {
+    /// Statistics for each container.
+    /// These are taken as a reference since they may be rather large / 
expensive to clone
+    /// and we often won't return all of them as ArrayRefs (we only return the 
columns the predicate requests).
+    statistics: Vec<Arc<Statistics>>,
+    /// The schema of the file these statistics are for.
+    schema: SchemaRef,
+}
+
+impl PrunableStatistics {
+    /// Create a new instance of [`PrunableStatistics`].
+    /// Each [`Statistics`] represents a container (e.g. a file or a partition 
of files).
+    /// The `schema` is the schema of the data in the containers and should 
apply to all files.
+    pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
+        Self { statistics, schema }
+    }
+
+    fn get_exact_column_statistics(
+        &self,
+        column: &Column,
+        get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
+    ) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        let mut has_value = false;
+        match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
+            s.column_statistics
+                .get(index)
+                .and_then(|stat| {
+                    if let Precision::Exact(min) = get_stat(stat) {
+                        has_value = true;
+                        Some(min.clone())
+                    } else {
+                        None
+                    }
+                })
+                .unwrap_or(ScalarValue::Null)
+        })) {
+            // If there is any non-null value and no errors, return the array
+            Ok(array) => has_value.then_some(array),
+            Err(_) => {
+                log::warn!(
+                    "Failed to convert min values to array for column {}",
+                    column.name()
+                );
+                None
+            }
+        }
+    }
+}
+
+impl PruningStatistics for PrunableStatistics {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        self.get_exact_column_statistics(column, |stat| &stat.min_value)
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        self.get_exact_column_statistics(column, |stat| &stat.max_value)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.statistics.len()
+    }
+
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.schema.index_of(column.name()).ok()?;
+        if self.statistics.iter().any(|s| {
+            s.column_statistics
+                .get(index)
+                .is_some_and(|stat| 
stat.null_count.is_exact().unwrap_or(false))
+        }) {
+            Some(Arc::new(
+                self.statistics
+                    .iter()
+                    .map(|s| {
+                        s.column_statistics.get(index).and_then(|stat| {
+                            if let Precision::Exact(null_count) = 
&stat.null_count {
+                                u64::try_from(*null_count).ok()
+                            } else {
+                                None
+                            }
+                        })
+                    })
+                    .collect::<UInt64Array>(),
+            ))
+        } else {
+            None
+        }
+    }
+
+    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
+        // If the column does not exist in the schema, return None
+        if self.schema.index_of(column.name()).is_err() {
+            return None;
+        }
+        if self
+            .statistics
+            .iter()
+            .any(|s| s.num_rows.is_exact().unwrap_or(false))
+        {
+            Some(Arc::new(
+                self.statistics
+                    .iter()
+                    .map(|s| {
+                        if let Precision::Exact(row_count) = &s.num_rows {
+                            u64::try_from(*row_count).ok()
+                        } else {
+                            None
+                        }
+                    })
+                    .collect::<UInt64Array>(),
+            ))
+        } else {
+            None
+        }
+    }
+
+    fn contained(
+        &self,
+        _column: &Column,
+        _values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        None
+    }
+}
+
+/// Combine multiple [`PruningStatistics`] into a single
+/// [`CompositePruningStatistics`].
+/// This can be used to combine statistics from different sources,
+/// for example partition values and file statistics.
+/// This allows pruning with filters that depend on multiple sources of 
statistics,
+/// such as `WHERE partition_col = data_col`.
+/// This is done by iterating over the statistics and returning the first
+/// one that has information for the requested column.
+/// If multiple statistics have information for the same column,
+/// the first one is returned without any regard for completeness or accuracy.
+/// That is: if the first statistics has information for a column, even if it 
is incomplete,
+/// that is returned even if a later statistics has more complete information.

Review Comment:
   I'm curious about why not prune based on all the statistics that have 
information, one by one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to