adriangb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2017257698
##########
datafusion/common/src/config.rs:
##########
@@ -590,6 +590,9 @@ config_namespace! {
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true
+ /// When set to true attempts to push down dynamic filters from TopK
operations into file scans
+ pub enable_dynamic_filter_pushdown: bool, default = true
Review Comment:
Added a basic description here.
##########
datafusion/datasource-parquet/src/mod.rs:
##########
@@ -541,11 +541,13 @@ impl ExecutionPlan for ParquetExec {
fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
+ has_dynamic_filters: bool,
) -> bool {
enable_page_index
- && page_pruning_predicate.is_some()
- && page_pruning_predicate
- .as_ref()
- .map(|p| p.filter_number() > 0)
- .unwrap_or(false)
+ && (page_pruning_predicate.is_some()
+ && page_pruning_predicate
+ .as_ref()
+ .map(|p| p.filter_number() > 0)
+ .unwrap_or(false))
+ || has_dynamic_filters
}
Review Comment:
7f0c894a5
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -102,25 +108,52 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
+ let dynamic_filters = self
+ .dynamic_filters
+ .iter()
+ .map(|f| f.current_filters())
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<_>>();
+ // Collect dynamic_filters into a single predicate by reducing with AND
+ let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
+ Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))
Review Comment:
cce16e33a
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -102,25 +110,52 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
+ let dynamic_filters = self
+ .dynamic_filters
+ .iter()
+ .map(|f| f.current_filters())
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<_>>();
+ // Collect dynamic_filters into a single predicate by reducing with AND
+ let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
+ Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))
+ });
+ let enable_page_index = should_enable_page_index(
+ self.enable_page_index,
+ &self.page_pruning_predicate,
+ dynamic_predicate.is_some(),
+ );
+ let predicate = self.predicate.clone();
+ let predicate = match (predicate, dynamic_predicate) {
Review Comment:
I ended up adding this. We can split this off later once I get to doing
cleanup on this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]