ethan-tyler commented on code in PR #20061:
URL: https://github.com/apache/datafusion/pull/20061#discussion_r2751941932


##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1126,62 +1124,41 @@ impl OptimizerRule for PushDownFilter {
             }
             LogicalPlan::Join(join) => push_down_join(join, 
Some(&filter.predicate)),
             LogicalPlan::TableScan(scan) => {
+                // Move filters to the TableScan.
+                // Exclude scalar subqueries from being pushed down since 
those are essentially a
+                // fork in the logical plan tree that should not be processed 
by TableScan nodes.
                 let filter_predicates = split_conjunction(&filter.predicate);
 
-                let (volatile_filters, non_volatile_filters): (Vec<&Expr>, 
Vec<&Expr>) =
-                    filter_predicates
-                        .into_iter()
-                        .partition(|pred| pred.is_volatile());
-
-                // Check which non-volatile filters are supported by source
-                let supported_filters = scan
-                    .source
-                    
.supports_filters_pushdown(non_volatile_filters.as_slice())?;
-                assert_eq_or_internal_err!(
-                    non_volatile_filters.len(),
-                    supported_filters.len(),
-                    "Vec returned length: {} from supports_filters_pushdown is 
not the same size as the filters passed, which length is: {}",
-                    supported_filters.len(),
-                    non_volatile_filters.len()
-                );
-
-                // Compose scan filters from non-volatile filters of `Exact` 
or `Inexact` pushdown type
-                let zip = 
non_volatile_filters.into_iter().zip(supported_filters);
+                // Partition predicates: scalar subqueries must stay in a 
Filter node
+                let (scalar_subquery_filters, pushable_filters): (Vec<_>, 
Vec<_>) =
+                    filter_predicates.iter().partition(|pred| {
+                        pred.exists(|e| Ok(matches!(e, 
Expr::ScalarSubquery(_))))
+                            .unwrap()
+                    });
 
-                let new_scan_filters = zip
-                    .clone()
-                    .filter(|(_, res)| res != 
&TableProviderFilterPushDown::Unsupported)
-                    .map(|(pred, _)| pred);
-
-                // Add new scan filters
+                // Combine existing scan filters with pushable filter 
predicates
                 let new_scan_filters: Vec<Expr> = scan
                     .filters
                     .iter()
-                    .chain(new_scan_filters)
+                    .chain(pushable_filters)
                     .unique()

Review Comment:
   This refactor changes what reaches `.unique()`. Volatile predicates can now 
land in `TableScan.filters`, and dedup collapses repeated evaluation. `random() 
< 0.5 AND random() < 0.5` is not equivalent to `random() < 0.5`. I would either 
drop the dedup, or restrict it to `!expr.is_volatile()` and keep volatiles in 
order.



##########
datafusion/core/src/physical_planner.rs:
##########


Review Comment:
    In `UPDATE t1 SET ... FROM t2 WHERE t2.x = 1 AND t1.id = t2.id`, `t2.x` 
becomes `x` and can accidentally match `t1.x`; join predicates degrade into `id 
= id`. I would only pass predicates that are provably evaluable against the 
target table schema alone before stripping qualifiers and keep the rest outside 
the provider call.



##########
datafusion/core/src/physical_planner.rs:
##########


Review Comment:
   Same qualifier-stripping problem is happening here. 
`extract_update_assignments` strips qualifiers from the RHS, `SET b = t2.b` can 
become `b = b` and get treated as an identity assignment.



##########
datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs:
##########
@@ -97,47 +101,151 @@ pub fn from_table_scan(
     producer: &mut impl SubstraitProducer,
     scan: &TableScan,
 ) -> datafusion::common::Result<Box<Rel>> {
-    let projection = scan.projection.as_ref().map(|p| {
-        p.iter()
-            .map(|i| StructItem {
-                field: *i as i32,
-                child: None,
-            })
-            .collect()
-    });
+    let table_schema = scan.source.schema();
+    let table_schema_qualified = Arc::new(DFSchema::try_from_qualified_schema(
+        scan.table_name.clone(),
+        &table_schema,
+    )?);
 
-    let projection = projection.map(|struct_items| MaskExpression {
-        select: Some(StructSelect { struct_items }),
-        maintain_singular_struct: false,
-    });
-
-    let table_schema = scan.source.schema().to_dfschema_ref()?;
-    let base_schema = to_substrait_named_struct(producer, &table_schema)?;
+    // Check if projection contains only simple column references
+    let simple_projection_indices = scan
+        .projection
+        .as_ref()
+        .and_then(|exprs| projection_indices_from_exprs(exprs, 
&scan.source.schema()));
 
+    // Build the filter expression if any
     let filter_option = if scan.filters.is_empty() {
         None
     } else {
-        let table_schema_qualified = Arc::new(
-            DFSchema::try_from_qualified_schema(
-                scan.table_name.clone(),
-                &(scan.source.schema()),
-            )
-            .unwrap(),
-        );
-
         let combined_expr = conjunction(scan.filters.clone()).unwrap();
         let filter_expr =
             producer.handle_expr(&combined_expr, &table_schema_qualified)?;
         Some(Box::new(filter_expr))
     };
 
+    let base_schema = to_substrait_named_struct(
+        producer,
+        &Arc::clone(&table_schema).to_dfschema_ref()?,
+    )?;
+
+    // If projection is simple column references, use a mask on the ReadRel
+    if let Some(indices) = simple_projection_indices {
+        let projection = Some(MaskExpression {
+            select: Some(StructSelect {
+                struct_items: indices
+                    .iter()
+                    .map(|&i| StructItem {
+                        field: i as i32,
+                        child: None,
+                    })
+                    .collect(),
+            }),
+            maintain_singular_struct: false,
+        });
+
+        return Ok(Box::new(Rel {
+            rel_type: Some(RelType::Read(Box::new(ReadRel {
+                common: None,
+                base_schema: Some(base_schema),
+                filter: filter_option,
+                best_effort_filter: None,
+                projection,
+                advanced_extension: None,
+                read_type: Some(ReadType::NamedTable(NamedTable {
+                    names: scan.table_name.to_vec(),
+                    advanced_extension: None,
+                })),
+            }))),
+        }));
+    }
+
+    // Complex projection expressions - need to wrap ReadRel in ProjectRel
+    if let Some(proj_exprs) = &scan.projection {
+        // Find all columns needed by the projection expressions
+        let mut required_columns = HashSet::new();
+        for expr in proj_exprs {
+            expr_to_columns(expr, &mut required_columns)?;
+        }
+
+        // Convert column names to indices
+        let mut column_indices: Vec<usize> = required_columns
+            .iter()
+            .filter_map(|col| table_schema.index_of(col.name()).ok())
+            .collect();
+        column_indices.sort();
+        column_indices.dedup();

Review Comment:
   Field references are built against the full table DFSchema but the ReadRel 
uses a column subset mask. If expression projections stay, the indices won't 
line up. Goes away if projections are restricted to column refs.



##########
datafusion/sql/src/unparser/utils.rs:
##########
@@ -385,17 +385,24 @@ pub(crate) fn 
try_transform_to_simple_table_scan_with_filters(
                     }
                 }
 
-                let mut builder = LogicalPlanBuilder::scan(
+                // Use TableScanBuilder to preserve full projection expressions
+                let mut scan_builder = TableScanBuilder::new(

Review Comment:
   This builder chain doesn't carry over `table_scan.fetch`, it needs a 
`.with_fetch(table_scan.fetch)` or plans with fetch pushed into the scan will 
lose LIMIT in the generated SQL.



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -1023,11 +1023,10 @@ impl AsLogicalPlan for LogicalPlanNode {
 
                 let projection = match projection {

Review Comment:
   Projections are serialized as column name strings and deserialized by schema 
index lookup. Non-column expressions won't survive the roundtrip, and even 
column names with dots or quoting differences are fragile. I think the cleanest 
path is enforcing column-ref-only projections in the builder and failing fast 
on anything else. Extending proto to `Vec<LogicalExprNode>` is the alternative 
but a bigger lift.



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1671,6 +1610,214 @@ impl DefaultPhysicalPlanner {
         Ok(exec_node)
     }
 
+    /// Plan a TableScan node, handling filter pushdown classification and
+    /// wrapping with FilterExec/ProjectionExec as needed.
+    async fn plan_table_scan(
+        &self,
+        scan: &TableScan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        use datafusion_expr::TableProviderFilterPushDown;
+
+        let provider = source_as_provider(&scan.source)?;
+        let source_schema = scan.source.schema();
+
+        // Remove all qualifiers from filters as the provider doesn't know
+        // (nor should care) how the relation was referred to in the query
+        let filters: Vec<Expr> = 
unnormalize_cols(scan.filters.iter().cloned());
+
+        // Separate volatile filters (they should never be pushed down)
+        let (volatile_filters, non_volatile_filters): (Vec<Expr>, Vec<Expr>) = 
filters
+            .into_iter()
+            .partition(|pred: &Expr| pred.is_volatile());
+
+        // Classify filters using supports_filters_pushdown
+        let filter_refs: Vec<&Expr> = non_volatile_filters.iter().collect();
+        let supported = provider.supports_filters_pushdown(&filter_refs)?;
+
+        assert_eq_or_internal_err!(
+            non_volatile_filters.len(),
+            supported.len(),
+            "supports_filters_pushdown returned {} results for {} filters",
+            supported.len(),
+            non_volatile_filters.len()
+        );
+
+        // Separate filters into:
+        // - pushable_filters: Exact or Inexact filters to pass to the provider
+        // - post_scan_filters: Inexact, Unsupported, and volatile filters for 
FilterExec
+        let mut pushable_filters = Vec::new();
+        let mut post_scan_filters = Vec::new();
+
+        for (filter, support) in 
non_volatile_filters.into_iter().zip(supported.iter()) {
+            match support {
+                TableProviderFilterPushDown::Exact => {
+                    pushable_filters.push(filter);
+                }
+                TableProviderFilterPushDown::Inexact => {
+                    pushable_filters.push(filter.clone());
+                    post_scan_filters.push(filter);
+                }
+                TableProviderFilterPushDown::Unsupported => {
+                    post_scan_filters.push(filter);
+                }
+            }
+        }
+
+        // Add volatile filters to post_scan_filters
+        post_scan_filters.extend(volatile_filters);
+
+        // Compute required column indices for the scan
+        // We need columns from both projection expressions and post-scan 
filters
+        let scan_projection = self.compute_scan_projection(
+            &scan.projection,
+            &post_scan_filters,
+            &source_schema,
+        )?;
+
+        // Check if we have inexact filters - if so, we can't push limit
+        let has_inexact = 
supported.contains(&TableProviderFilterPushDown::Inexact);
+        let scan_limit = if has_inexact || !post_scan_filters.is_empty() {
+            None // Can't push limit when post-filtering is needed
+        } else {
+            scan.fetch
+        };
+
+        // Create the scan
+        let scan_args = ScanArgs::default()
+            .with_projection(scan_projection.as_deref())
+            .with_filters(if pushable_filters.is_empty() {
+                None
+            } else {
+                Some(&pushable_filters)
+            })
+            .with_limit(scan_limit);
+
+        let scan_result = provider.scan_with_args(session_state, 
scan_args).await?;
+        let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+        // Create a DFSchema from the scan output for filter and projection 
creation
+        // The scan output schema is the physical plan's schema
+        let scan_output_schema = plan.schema();
+        let scan_df_schema = 
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+
+        // Wrap with FilterExec if needed
+        if !post_scan_filters.is_empty()
+            && let Some(filter_expr) = conjunction(post_scan_filters)
+        {
+            let num_scan_columns = scan_output_schema.fields().len();
+            plan = self.create_filter_exec(
+                &filter_expr,
+                plan,
+                &scan_df_schema,
+                session_state,
+                num_scan_columns,
+            )?;
+        }
+
+        // Wrap with ProjectionExec if projection is present and differs from 
scan output
+        // (either non-identity, or fewer columns due to filter-only columns)
+        if let Some(ref proj_exprs) = scan.projection {
+            let needs_projection = !self
+                .is_identity_column_projection(proj_exprs, &source_schema)
+                || scan_output_schema.fields().len() != proj_exprs.len();
+
+            if needs_projection {
+                // Unnormalize projection expressions to match the scan output 
schema
+                let unnormalized_proj_exprs =
+                    unnormalize_cols(proj_exprs.iter().cloned());
+                plan = self.create_projection_exec(
+                    &unnormalized_proj_exprs,
+                    plan,
+                    &scan_df_schema,
+                    session_state,
+                )?;
+            }
+        }
+
+        // Apply limit if it wasn't pushed to scan
+        if let Some(fetch) = scan.fetch
+            && scan_limit.is_none()
+        {
+            plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch)));
+        }
+
+        Ok(plan)
+    }
+
+    /// Compute the column indices needed for the scan based on projection
+    /// expressions and post-scan filters.
+    fn compute_scan_projection(
+        &self,
+        projection: &Option<Vec<Expr>>,
+        post_filters: &[Expr],
+        source_schema: &Schema,
+    ) -> Result<Option<Vec<usize>>> {
+        // Collect all columns needed
+        let mut required_columns = HashSet::new();
+
+        // Add columns from projection expressions
+        if let Some(exprs) = projection {
+            for expr in exprs {
+                expr.apply(|e| {
+                    if let Expr::Column(col) = e {
+                        required_columns.insert(col.name().to_string());
+                    }
+                    Ok(TreeNodeRecursion::Continue)
+                })?;
+            }
+        }
+
+        // Add columns from post-scan filters
+        for filter in post_filters {
+            filter.apply(|e| {
+                if let Expr::Column(col) = e {
+                    required_columns.insert(col.name().to_string());
+                }
+                Ok(TreeNodeRecursion::Continue)
+            })?;
+        }
+
+        // If no projection specified and no filters, return None (all columns)
+        if projection.is_none() && post_filters.is_empty() {
+            return Ok(None);
+        }
+
+        // If projection is None but we have filters, we need all columns
+        if projection.is_none() {
+            return Ok(None);
+        }
+
+        // Convert column names to indices
+        let indices: Vec<usize> = required_columns
+            .iter()
+            .filter_map(|name| source_schema.index_of(name).ok())
+            .sorted()
+            .collect();
+
+        if indices.is_empty() {
+            Ok(None)
+        } else {
+            Ok(Some(indices))
+        }
+    }

Review Comment:
   When the required column set is empty (e.g. `SELECT 1 FROM wide_table`), 
this returns `None` which makes it scan all columns. IMO this is the opposite 
of what we want and would return `Some(vec![])`, or a single cheap column if 
empty projections aren't provider safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to