ethan-tyler commented on code in PR #20061:
URL: https://github.com/apache/datafusion/pull/20061#discussion_r2751941932
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1126,62 +1124,41 @@ impl OptimizerRule for PushDownFilter {
}
LogicalPlan::Join(join) => push_down_join(join,
Some(&filter.predicate)),
LogicalPlan::TableScan(scan) => {
+ // Move filters to the TableScan.
+ // Exclude scalar subqueries from being pushed down since
those are essentially a
+ // fork in the logical plan tree that should not be processed
by TableScan nodes.
let filter_predicates = split_conjunction(&filter.predicate);
- let (volatile_filters, non_volatile_filters): (Vec<&Expr>,
Vec<&Expr>) =
- filter_predicates
- .into_iter()
- .partition(|pred| pred.is_volatile());
-
- // Check which non-volatile filters are supported by source
- let supported_filters = scan
- .source
-
.supports_filters_pushdown(non_volatile_filters.as_slice())?;
- assert_eq_or_internal_err!(
- non_volatile_filters.len(),
- supported_filters.len(),
- "Vec returned length: {} from supports_filters_pushdown is
not the same size as the filters passed, which length is: {}",
- supported_filters.len(),
- non_volatile_filters.len()
- );
-
- // Compose scan filters from non-volatile filters of `Exact`
or `Inexact` pushdown type
- let zip =
non_volatile_filters.into_iter().zip(supported_filters);
+ // Partition predicates: scalar subqueries must stay in a
Filter node
+ let (scalar_subquery_filters, pushable_filters): (Vec<_>,
Vec<_>) =
+ filter_predicates.iter().partition(|pred| {
+ pred.exists(|e| Ok(matches!(e,
Expr::ScalarSubquery(_))))
+ .unwrap()
+ });
- let new_scan_filters = zip
- .clone()
- .filter(|(_, res)| res !=
&TableProviderFilterPushDown::Unsupported)
- .map(|(pred, _)| pred);
-
- // Add new scan filters
+ // Combine existing scan filters with pushable filter
predicates
let new_scan_filters: Vec<Expr> = scan
.filters
.iter()
- .chain(new_scan_filters)
+ .chain(pushable_filters)
.unique()
Review Comment:
This refactor changes what reaches `.unique()`. Volatile predicates can now
land in `TableScan.filters`, and dedup collapses repeated evaluation. `random()
< 0.5 AND random() < 0.5` is not equivalent to `random() < 0.5`. I would either
drop the dedup, or restrict it to `!expr.is_volatile()` and keep volatiles in
order.
##########
datafusion/core/src/physical_planner.rs:
##########
Review Comment:
In `UPDATE t1 SET ... FROM t2 WHERE t2.x = 1 AND t1.id = t2.id`, `t2.x`
becomes `x` and can accidentally match `t1.x`; join predicates degrade into `id
= id`. I would only pass predicates that are provably evaluable against the
target table schema alone before stripping qualifiers and keep the rest outside
the provider call.
##########
datafusion/core/src/physical_planner.rs:
##########
Review Comment:
Same qualifier-stripping problem is happening here.
`extract_update_assignments` strips qualifiers from the RHS, `SET b = t2.b` can
become `b = b` and get treated as an identity assignment.
##########
datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs:
##########
@@ -97,47 +101,151 @@ pub fn from_table_scan(
producer: &mut impl SubstraitProducer,
scan: &TableScan,
) -> datafusion::common::Result<Box<Rel>> {
- let projection = scan.projection.as_ref().map(|p| {
- p.iter()
- .map(|i| StructItem {
- field: *i as i32,
- child: None,
- })
- .collect()
- });
+ let table_schema = scan.source.schema();
+ let table_schema_qualified = Arc::new(DFSchema::try_from_qualified_schema(
+ scan.table_name.clone(),
+ &table_schema,
+ )?);
- let projection = projection.map(|struct_items| MaskExpression {
- select: Some(StructSelect { struct_items }),
- maintain_singular_struct: false,
- });
-
- let table_schema = scan.source.schema().to_dfschema_ref()?;
- let base_schema = to_substrait_named_struct(producer, &table_schema)?;
+ // Check if projection contains only simple column references
+ let simple_projection_indices = scan
+ .projection
+ .as_ref()
+ .and_then(|exprs| projection_indices_from_exprs(exprs,
&scan.source.schema()));
+ // Build the filter expression if any
let filter_option = if scan.filters.is_empty() {
None
} else {
- let table_schema_qualified = Arc::new(
- DFSchema::try_from_qualified_schema(
- scan.table_name.clone(),
- &(scan.source.schema()),
- )
- .unwrap(),
- );
-
let combined_expr = conjunction(scan.filters.clone()).unwrap();
let filter_expr =
producer.handle_expr(&combined_expr, &table_schema_qualified)?;
Some(Box::new(filter_expr))
};
+ let base_schema = to_substrait_named_struct(
+ producer,
+ &Arc::clone(&table_schema).to_dfschema_ref()?,
+ )?;
+
+ // If projection is simple column references, use a mask on the ReadRel
+ if let Some(indices) = simple_projection_indices {
+ let projection = Some(MaskExpression {
+ select: Some(StructSelect {
+ struct_items: indices
+ .iter()
+ .map(|&i| StructItem {
+ field: i as i32,
+ child: None,
+ })
+ .collect(),
+ }),
+ maintain_singular_struct: false,
+ });
+
+ return Ok(Box::new(Rel {
+ rel_type: Some(RelType::Read(Box::new(ReadRel {
+ common: None,
+ base_schema: Some(base_schema),
+ filter: filter_option,
+ best_effort_filter: None,
+ projection,
+ advanced_extension: None,
+ read_type: Some(ReadType::NamedTable(NamedTable {
+ names: scan.table_name.to_vec(),
+ advanced_extension: None,
+ })),
+ }))),
+ }));
+ }
+
+ // Complex projection expressions - need to wrap ReadRel in ProjectRel
+ if let Some(proj_exprs) = &scan.projection {
+ // Find all columns needed by the projection expressions
+ let mut required_columns = HashSet::new();
+ for expr in proj_exprs {
+ expr_to_columns(expr, &mut required_columns)?;
+ }
+
+ // Convert column names to indices
+ let mut column_indices: Vec<usize> = required_columns
+ .iter()
+ .filter_map(|col| table_schema.index_of(col.name()).ok())
+ .collect();
+ column_indices.sort();
+ column_indices.dedup();
Review Comment:
Field references are built against the full table DFSchema but the ReadRel
uses a column subset mask. If expression projections stay, the indices won't
line up. Goes away if projections are restricted to column refs.
##########
datafusion/sql/src/unparser/utils.rs:
##########
@@ -385,17 +385,24 @@ pub(crate) fn
try_transform_to_simple_table_scan_with_filters(
}
}
- let mut builder = LogicalPlanBuilder::scan(
+ // Use TableScanBuilder to preserve full projection expressions
+ let mut scan_builder = TableScanBuilder::new(
Review Comment:
This builder chain doesn't carry over `table_scan.fetch`, it needs a
`.with_fetch(table_scan.fetch)` or plans with fetch pushed into the scan will
lose LIMIT in the generated SQL.
##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -1023,11 +1023,10 @@ impl AsLogicalPlan for LogicalPlanNode {
let projection = match projection {
Review Comment:
Projections are serialized as column name strings and deserialized by schema
index lookup. Non-column expressions won't survive the roundtrip, and even
column names with dots or quoting differences are fragile. I think the cleanest
path is enforcing column-ref-only projections in the builder and failing fast
on anything else. Extending proto to `Vec<LogicalExprNode>` is the alternative
but a bigger lift.
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1671,6 +1610,214 @@ impl DefaultPhysicalPlanner {
Ok(exec_node)
}
+ /// Plan a TableScan node, handling filter pushdown classification and
+ /// wrapping with FilterExec/ProjectionExec as needed.
+ async fn plan_table_scan(
+ &self,
+ scan: &TableScan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ use datafusion_expr::TableProviderFilterPushDown;
+
+ let provider = source_as_provider(&scan.source)?;
+ let source_schema = scan.source.schema();
+
+ // Remove all qualifiers from filters as the provider doesn't know
+ // (nor should care) how the relation was referred to in the query
+ let filters: Vec<Expr> =
unnormalize_cols(scan.filters.iter().cloned());
+
+ // Separate volatile filters (they should never be pushed down)
+ let (volatile_filters, non_volatile_filters): (Vec<Expr>, Vec<Expr>) =
filters
+ .into_iter()
+ .partition(|pred: &Expr| pred.is_volatile());
+
+ // Classify filters using supports_filters_pushdown
+ let filter_refs: Vec<&Expr> = non_volatile_filters.iter().collect();
+ let supported = provider.supports_filters_pushdown(&filter_refs)?;
+
+ assert_eq_or_internal_err!(
+ non_volatile_filters.len(),
+ supported.len(),
+ "supports_filters_pushdown returned {} results for {} filters",
+ supported.len(),
+ non_volatile_filters.len()
+ );
+
+ // Separate filters into:
+ // - pushable_filters: Exact or Inexact filters to pass to the provider
+ // - post_scan_filters: Inexact, Unsupported, and volatile filters for
FilterExec
+ let mut pushable_filters = Vec::new();
+ let mut post_scan_filters = Vec::new();
+
+ for (filter, support) in
non_volatile_filters.into_iter().zip(supported.iter()) {
+ match support {
+ TableProviderFilterPushDown::Exact => {
+ pushable_filters.push(filter);
+ }
+ TableProviderFilterPushDown::Inexact => {
+ pushable_filters.push(filter.clone());
+ post_scan_filters.push(filter);
+ }
+ TableProviderFilterPushDown::Unsupported => {
+ post_scan_filters.push(filter);
+ }
+ }
+ }
+
+ // Add volatile filters to post_scan_filters
+ post_scan_filters.extend(volatile_filters);
+
+ // Compute required column indices for the scan
+ // We need columns from both projection expressions and post-scan
filters
+ let scan_projection = self.compute_scan_projection(
+ &scan.projection,
+ &post_scan_filters,
+ &source_schema,
+ )?;
+
+ // Check if we have inexact filters - if so, we can't push limit
+ let has_inexact =
supported.contains(&TableProviderFilterPushDown::Inexact);
+ let scan_limit = if has_inexact || !post_scan_filters.is_empty() {
+ None // Can't push limit when post-filtering is needed
+ } else {
+ scan.fetch
+ };
+
+ // Create the scan
+ let scan_args = ScanArgs::default()
+ .with_projection(scan_projection.as_deref())
+ .with_filters(if pushable_filters.is_empty() {
+ None
+ } else {
+ Some(&pushable_filters)
+ })
+ .with_limit(scan_limit);
+
+ let scan_result = provider.scan_with_args(session_state,
scan_args).await?;
+ let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+ // Create a DFSchema from the scan output for filter and projection
creation
+ // The scan output schema is the physical plan's schema
+ let scan_output_schema = plan.schema();
+ let scan_df_schema =
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+
+ // Wrap with FilterExec if needed
+ if !post_scan_filters.is_empty()
+ && let Some(filter_expr) = conjunction(post_scan_filters)
+ {
+ let num_scan_columns = scan_output_schema.fields().len();
+ plan = self.create_filter_exec(
+ &filter_expr,
+ plan,
+ &scan_df_schema,
+ session_state,
+ num_scan_columns,
+ )?;
+ }
+
+ // Wrap with ProjectionExec if projection is present and differs from
scan output
+ // (either non-identity, or fewer columns due to filter-only columns)
+ if let Some(ref proj_exprs) = scan.projection {
+ let needs_projection = !self
+ .is_identity_column_projection(proj_exprs, &source_schema)
+ || scan_output_schema.fields().len() != proj_exprs.len();
+
+ if needs_projection {
+ // Unnormalize projection expressions to match the scan output
schema
+ let unnormalized_proj_exprs =
+ unnormalize_cols(proj_exprs.iter().cloned());
+ plan = self.create_projection_exec(
+ &unnormalized_proj_exprs,
+ plan,
+ &scan_df_schema,
+ session_state,
+ )?;
+ }
+ }
+
+ // Apply limit if it wasn't pushed to scan
+ if let Some(fetch) = scan.fetch
+ && scan_limit.is_none()
+ {
+ plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch)));
+ }
+
+ Ok(plan)
+ }
+
+ /// Compute the column indices needed for the scan based on projection
+ /// expressions and post-scan filters.
+ fn compute_scan_projection(
+ &self,
+ projection: &Option<Vec<Expr>>,
+ post_filters: &[Expr],
+ source_schema: &Schema,
+ ) -> Result<Option<Vec<usize>>> {
+ // Collect all columns needed
+ let mut required_columns = HashSet::new();
+
+ // Add columns from projection expressions
+ if let Some(exprs) = projection {
+ for expr in exprs {
+ expr.apply(|e| {
+ if let Expr::Column(col) = e {
+ required_columns.insert(col.name().to_string());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+ }
+ }
+
+ // Add columns from post-scan filters
+ for filter in post_filters {
+ filter.apply(|e| {
+ if let Expr::Column(col) = e {
+ required_columns.insert(col.name().to_string());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+ }
+
+ // If no projection specified and no filters, return None (all columns)
+ if projection.is_none() && post_filters.is_empty() {
+ return Ok(None);
+ }
+
+ // If projection is None but we have filters, we need all columns
+ if projection.is_none() {
+ return Ok(None);
+ }
+
+ // Convert column names to indices
+ let indices: Vec<usize> = required_columns
+ .iter()
+ .filter_map(|name| source_schema.index_of(name).ok())
+ .sorted()
+ .collect();
+
+ if indices.is_empty() {
+ Ok(None)
+ } else {
+ Ok(Some(indices))
+ }
+ }
Review Comment:
When the required column set is empty (e.g. `SELECT 1 FROM wide_table`),
this returns `None` which makes it scan all columns. IMO this is the opposite
of what we want and would return `Some(vec![])`, or a single cheap column if
empty projections aren't provider safe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]