adriangb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2015101716
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -295,3 +350,104 @@ fn create_initial_plan(
// default to scanning all row groups
Ok(ParquetAccessPlan::new_all(row_group_count))
}
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by
incrementing
+/// the `predicate_creation_errors` counter.
+pub(crate) fn build_pruning_predicate(
+ predicate: Arc<dyn PhysicalExpr>,
+ file_schema: &SchemaRef,
+ predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+ match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
+ Ok(pruning_predicate) => {
+ if !pruning_predicate.always_true() {
+ return Some(Arc::new(pruning_predicate));
+ }
+ }
+ Err(e) => {
+ debug!("Could not create pruning predicate for: {e}");
+ predicate_creation_errors.add(1);
+ }
+ }
+ None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page
pruning
+/// predicate, return None.
+pub(crate) fn build_page_pruning_predicate(
+ predicate: &Arc<dyn PhysicalExpr>,
+ file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+ Arc::new(PagePruningAccessPlanFilter::new(
+ predicate,
+ Arc::clone(file_schema),
+ ))
+}
+
+/// A vistor for a PhysicalExpr that collects all column references to
determine what columns the expression needs to be evaluated.
+struct FilterSchemaBuilder<'schema> {
+ filter_schema_fields: BTreeSet<Arc<Field>>,
+ file_schema: &'schema Schema,
+ table_schema: &'schema Schema,
+}
+
+impl<'schema> FilterSchemaBuilder<'schema> {
+ fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) ->
Self {
+ Self {
+ filter_schema_fields: BTreeSet::new(),
+ file_schema,
+ table_schema,
+ }
+ }
+
+ fn sort_fields(
+ fields: &mut Vec<Arc<Field>>,
+ table_schema: &Schema,
+ file_schema: &Schema,
+ ) {
+ fields.sort_by_key(|f| f.name().to_string());
+ fields.dedup_by_key(|f| f.name().to_string());
+ fields.sort_by_key(|f| {
+ let table_schema_index =
+ table_schema.index_of(f.name()).unwrap_or(usize::MAX);
+ let file_schema_index =
file_schema.index_of(f.name()).unwrap_or(usize::MAX);
+ (table_schema_index, file_schema_index)
+ });
+ }
+
+ fn build(self) -> SchemaRef {
+ let mut fields =
self.filter_schema_fields.into_iter().collect::<Vec<_>>();
+ FilterSchemaBuilder::sort_fields(
+ &mut fields,
+ self.table_schema,
+ self.file_schema,
+ );
+ Arc::new(Schema::new(fields))
+ }
+}
+
+impl TreeNodeRewriter for FilterSchemaBuilder<'_> {
+ type Node = Arc<dyn PhysicalExpr>;
+
+ fn f_down(&mut self, node: Arc<dyn PhysicalExpr>) ->
Result<Transformed<Self::Node>> {
+ if let Some(column) = node.as_any().downcast_ref::<Column>() {
+ if let Ok(field) =
self.table_schema.field_with_name(column.name()) {
+ self.filter_schema_fields.insert(Arc::new(field.clone()));
+ } else if let Ok(field) =
self.file_schema.field_with_name(column.name()) {
+ self.filter_schema_fields.insert(Arc::new(field.clone()));
+ } else {
+ // If it's not in either schema it must be a partition column
Review Comment:
Hmm not sure I understand. This doesn't push down filters into partitioning
columns. I think the issue is that as far as TopK (or other operators above the
scan level) are concerned there is no such thing as a partitioning column, just
a column. So they can't filter out what filters to push down. The filtering has
to happen within the opener/scan... and this seems like as good of a place as
any?
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -102,25 +110,52 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
+ let dynamic_filters = self
+ .dynamic_filters
+ .iter()
+ .map(|f| f.current_filters())
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<_>>();
+ // Collect dynamic_filters into a single predicate by reducing with AND
+ let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
+ Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))
+ });
+ let enable_page_index = should_enable_page_index(
+ self.enable_page_index,
+ &self.page_pruning_predicate,
+ dynamic_predicate.is_some(),
+ );
+ let predicate = self.predicate.clone();
+ let predicate = match (predicate, dynamic_predicate) {
Review Comment:
agreed 😄
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]