xudong963 commented on code in PR #20003:
URL: https://github.com/apache/datafusion/pull/20003#discussion_r2730956061
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1154,34 +1160,109 @@ impl OptimizerRule for PushDownFilter {
.map(|(pred, _)| pred);
// Add new scan filters
- let new_scan_filters: Vec<Expr> = scan
- .filters
- .iter()
- .chain(new_scan_filters)
- .unique()
- .cloned()
- .collect();
+ let new_scan_filters: Vec<Expr> =
+ new_scan_filters.unique().cloned().collect();
+
+ let source_schema = scan.source.schema();
+ let mut additional_projection = HashSet::new();
// Compose predicates to be of `Unsupported` or `Inexact`
pushdown type, and also include volatile filters
let new_predicate: Vec<Expr> = zip
- .filter(|(_, res)| res !=
&TableProviderFilterPushDown::Exact)
+ .filter(|(expr, res)| {
+ if *res == TableProviderFilterPushDown::Exact {
+ return false;
+ }
+ // For each not exactly supported filter we must
ensure that all columns are projected,
+ // so we collect all columns which are not currently
projected.
+ expr.apply(|expr| {
+ if let Expr::Column(column) = expr
+ && let Ok(idx) =
source_schema.index_of(column.name())
+ && scan
+ .projection
+ .as_ref()
+ .is_some_and(|p| !p.contains(&idx))
+ {
+ additional_projection.insert(idx);
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ .unwrap();
+ true
+ })
.map(|(pred, _)| pred)
.chain(volatile_filters)
.cloned()
.collect();
- let new_scan = LogicalPlan::TableScan(TableScan {
- filters: new_scan_filters,
- ..scan
- });
-
- Transformed::yes(new_scan).transform_data(|new_scan| {
- if let Some(predicate) = conjunction(new_predicate) {
- make_filter(predicate,
Arc::new(new_scan)).map(Transformed::yes)
+ // Wraps with a filter if some filters are not supported
exactly.
+ let filtered = move |plan| {
+ if let Some(new_predicate) = conjunction(new_predicate) {
+ Filter::try_new(new_predicate, Arc::new(plan))
+ .map(LogicalPlan::Filter)
} else {
- Ok(Transformed::no(new_scan))
+ Ok(plan)
}
- })
+ };
+
+ if additional_projection.is_empty() {
+ // No additional projection is required.
+ let new_scan = LogicalPlan::TableScan(TableScan {
+ filters: new_scan_filters,
+ ..scan
+ });
+ return filtered(new_scan).map(Transformed::yes);
+ }
+
+ let scan_table_name = &scan.table_name;
+ let new_scan = filtered(
+ LogicalPlanBuilder::scan_with_filters_fetch(
+ scan_table_name.clone(),
+ Arc::clone(&scan.source),
+ scan.projection.clone().map(|mut projection| {
+ // Extend a projection.
+ projection.extend(additional_projection);
Review Comment:
There should be a tradeoff between fetching an extra column from storage vs
the selectivity of the related filter
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1154,34 +1160,109 @@ impl OptimizerRule for PushDownFilter {
.map(|(pred, _)| pred);
// Add new scan filters
- let new_scan_filters: Vec<Expr> = scan
- .filters
- .iter()
- .chain(new_scan_filters)
- .unique()
- .cloned()
- .collect();
+ let new_scan_filters: Vec<Expr> =
+ new_scan_filters.unique().cloned().collect();
+
+ let source_schema = scan.source.schema();
+ let mut additional_projection = HashSet::new();
// Compose predicates to be of `Unsupported` or `Inexact`
pushdown type, and also include volatile filters
let new_predicate: Vec<Expr> = zip
- .filter(|(_, res)| res !=
&TableProviderFilterPushDown::Exact)
+ .filter(|(expr, res)| {
+ if *res == TableProviderFilterPushDown::Exact {
+ return false;
+ }
+ // For each not exactly supported filter we must
ensure that all columns are projected,
+ // so we collect all columns which are not currently
projected.
+ expr.apply(|expr| {
+ if let Expr::Column(column) = expr
+ && let Ok(idx) =
source_schema.index_of(column.name())
+ && scan
+ .projection
+ .as_ref()
+ .is_some_and(|p| !p.contains(&idx))
+ {
+ additional_projection.insert(idx);
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ .unwrap();
+ true
+ })
.map(|(pred, _)| pred)
.chain(volatile_filters)
.cloned()
.collect();
- let new_scan = LogicalPlan::TableScan(TableScan {
- filters: new_scan_filters,
- ..scan
- });
-
- Transformed::yes(new_scan).transform_data(|new_scan| {
- if let Some(predicate) = conjunction(new_predicate) {
- make_filter(predicate,
Arc::new(new_scan)).map(Transformed::yes)
+ // Wraps with a filter if some filters are not supported
exactly.
+ let filtered = move |plan| {
+ if let Some(new_predicate) = conjunction(new_predicate) {
+ Filter::try_new(new_predicate, Arc::new(plan))
+ .map(LogicalPlan::Filter)
} else {
- Ok(Transformed::no(new_scan))
+ Ok(plan)
}
- })
+ };
+
+ if additional_projection.is_empty() {
Review Comment:
The branch of scan is expanding, any thoughts to split the branch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]