alamb commented on code in PR #16139:
URL: https://github.com/apache/datafusion/pull/16139#discussion_r2115649463


##########
datafusion/common/src/pruning.rs:
##########
@@ -122,3 +127,984 @@ pub trait PruningStatistics {
         values: &HashSet<ScalarValue>,
     ) -> Option<BooleanArray>;
 }
+
+/// Prune files based on their partition values.
+/// This is used both at planning time and execution time to prune
+/// files based on their partition values.
+/// This feeds into [`CompositePruningStatistics`] to allow pruning
+/// with filters that depend both on partition columns and data columns
+/// (e.g. `WHERE partition_col = data_col`).
+#[derive(Clone)]
+pub struct PartitionPruningStatistics {
+    /// Values for each column for each container.
+    /// The outer vectors represent the columns while the inner
+    /// vectors represent the containers.
+    /// The order must match the order of the partition columns in
+    /// [`PartitionPruningStatistics::partition_schema`].
+    partition_values: Vec<ArrayRef>,
+    /// The number of containers.
+    /// Stored since the partition values are column-major and if
+    /// there are no columns we wouldn't know the number of containers.
+    num_containers: usize,
+    /// The schema of the partition columns.
+    /// This must **not** be the schema of the entire file or table:
+    /// it must only be the schema of the partition columns,
+    /// in the same order as the values in 
[`PartitionPruningStatistics::partition_values`].
+    partition_schema: SchemaRef,
+}
+
+impl PartitionPruningStatistics {
+    /// Create a new instance of [`PartitionPruningStatistics`].
+    ///
+    /// Args:
+    /// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
+    ///   The outer vector represents the containers while the inner
+    ///   vector represents the partition values for each column.
+    ///   Note that this is the **opposite** of the order of the
+    ///   partition columns in `PartitionPruningStatistics::partition_schema`.
+    /// * `partition_schema`: The schema of the partition columns.
+    ///   This must **not** be the schema of the entire file or table:
+    ///   instead it must only be the schema of the partition columns,
+    ///   in the same order as the values in `partition_values`.
+    pub fn try_new(
+        partition_values: Vec<Vec<ScalarValue>>,
+        partition_fields: Vec<FieldRef>,
+    ) -> Result<Self, DataFusionError> {
+        let num_containers = partition_values.len();
+        let partition_schema = Arc::new(Schema::new(partition_fields));
+        let mut partition_values_by_column =
+            vec![
+                Vec::with_capacity(partition_values.len());
+                partition_schema.fields().len()
+            ];
+        for partition_value in partition_values {
+            for (i, value) in partition_value.into_iter().enumerate() {
+                partition_values_by_column[i].push(value);
+            }
+        }
+        Ok(Self {
+            partition_values: partition_values_by_column
+                .into_iter()
+                .map(|v| {
+                    if v.is_empty() {
+                        Ok(Arc::new(NullArray::new(0)) as ArrayRef)
+                    } else {
+                        ScalarValue::iter_to_array(v)
+                    }
+                })
+                .collect::<Result<Vec<_>, _>>()?,
+            num_containers,
+            partition_schema,
+        })
+    }
+}
+
+impl PruningStatistics for PartitionPruningStatistics {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        self.partition_values.get(index).and_then(|v| {
+            if v.is_empty() || v.null_count() == v.len() {
+                // If the array is empty or all nulls, return None
+                None
+            } else {
+                // Otherwise, return the array as is
+                Some(Arc::clone(v))
+            }
+        })
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        self.min_values(column)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.num_containers
+    }
+
+    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
+    }
+
+    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        None
+    }
+
+    fn contained(
+        &self,
+        column: &Column,
+        values: &HashSet<ScalarValue>,
+    ) -> Option<BooleanArray> {
+        let index = self.partition_schema.index_of(column.name()).ok()?;
+        let array = self.partition_values.get(index)?;
+        let boolean_array = values.iter().try_fold(None, |acc, v| {
+            let arrow_value = v.to_scalar().ok()?;
+            let eq_result = arrow::compute::kernels::cmp::eq(array, 
&arrow_value).ok()?;
+            match acc {
+                None => Some(Some(eq_result)),
+                Some(acc_array) => {
+                    arrow::compute::kernels::boolean::and(&acc_array, 
&eq_result)
+                        .map(Some)
+                        .ok()
+                }
+            }
+        })??;
+        // If the boolean array is empty or all null values, return None
+        if boolean_array.is_empty() || boolean_array.null_count() == 
boolean_array.len() {
+            None
+        } else {
+            Some(boolean_array)
+        }
+    }
+}
+
+/// Prune a set of containers represented by their statistics.
+/// Each [`Statistics`] represents a container (e.g. a file or a partition of 
files).

Review Comment:
   Follow on PR: https://github.com/apache/datafusion/pull/16213



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to