Jefffrey commented on code in PR #16696:
URL: https://github.com/apache/datafusion/pull/16696#discussion_r2387651626


##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -338,12 +338,36 @@ fn optimize_projections(
                 .collect::<Result<Vec<_>>>()?
         }
         LogicalPlan::EmptyRelation(_)
-        | LogicalPlan::RecursiveQuery(_)
         | LogicalPlan::Values(_)
         | LogicalPlan::DescribeTable(_) => {
             // These operators have no inputs, so stop the optimization 
process.
             return Ok(Transformed::no(plan));
         }
+        LogicalPlan::RecursiveQuery(_) => {
+            // Only allow subqueries that reference the current CTE.
+            // 
https://github.com/apache/datafusion/pull/16696#discussion_r2246415968
+            if let LogicalPlan::RecursiveQuery(recursive) = &plan {
+                if plan_contains_non_cte_subquery(
+                    recursive.static_term.as_ref(),
+                    &recursive.name,
+                ) || plan_contains_non_cte_subquery(
+                    recursive.recursive_term.as_ref(),
+                    &recursive.name,
+                ) {
+                    return Ok(Transformed::no(plan));
+                }
+            }

Review Comment:
   ```suggestion
           LogicalPlan::RecursiveQuery(recursive) => {
               // Only allow subqueries that reference the current CTE; nested 
subqueries are not yet
               // supported for projection pushdown for simplicity.
               // TODO: be able to do projection pushdown on recursive CTEs 
with subqueries
               if plan_contains_non_cte_subquery(
                   recursive.static_term.as_ref(),
                   &recursive.name,
               ) || plan_contains_non_cte_subquery(
                   recursive.recursive_term.as_ref(),
                   &recursive.name,
               ) {
                   return Ok(Transformed::no(plan));
               }
   ```
   
   Simplification; also updating the comment if I understood the discussion 
properly



##########
datafusion/optimizer/tests/optimizer_integration.rs:
##########
@@ -46,6 +46,42 @@ fn init() {
     let _ = env_logger::try_init();
 }
 
+#[test]
+fn recursive_query_column_pruning() -> Result<()> {

Review Comment:
   What is this test testing for? Is it just a generic test regardless of the 
fix introduced by this PR? Since it seems to succeed without the fix applied



##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -819,6 +843,45 @@ pub fn is_projection_unnecessary(
     ))
 }
 
+fn plan_contains_non_cte_subquery(plan: &LogicalPlan, cte_name: &str) -> bool {

Review Comment:
   I think this name is confusing; it still rejects subqueries that aren't CTEs 
I believe?
   
   Is a name like `plan_contains_other_subqueries()` more accurate or am I 
misunderstanding the purpose?



##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -819,6 +843,45 @@ pub fn is_projection_unnecessary(
     ))
 }
 
+fn plan_contains_non_cte_subquery(plan: &LogicalPlan, cte_name: &str) -> bool {
+    if let LogicalPlan::SubqueryAlias(alias) = plan {
+        if alias.alias.table() != cte_name {
+            return true;
+        }
+    }
+
+    let mut found = false;
+    plan.apply_expressions(|expr| {
+        if expr_contains_subquery(expr) {
+            found = true;
+            Ok(TreeNodeRecursion::Stop)
+        } else {
+            Ok(TreeNodeRecursion::Continue)
+        }
+    })
+    .expect("expression traversal never fails");
+    if found {
+        return true;
+    }
+
+    plan.inputs()
+        .into_iter()
+        .any(|child| plan_contains_non_cte_subquery(child, cte_name))
+}
+
+fn expr_contains_subquery(expr: &Expr) -> bool {
+    let mut contains = false;
+    expr.apply(|e| match e {
+        Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => {
+            contains = true;
+            Ok(TreeNodeRecursion::Stop)
+        }
+        _ => Ok(TreeNodeRecursion::Continue),
+    })
+    .expect("expression traversal never fails");
+    contains
+}

Review Comment:
   ```suggestion
   fn expr_contains_subquery(expr: &Expr) -> bool {
       expr.exists(|e| match e {
           Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => 
Ok(true),
           _ => Ok(false),
       })
       // Safe unwrap since we are doing a simple boolean check
       .unwrap()
   }
   ```
   
   Simplification



##########
datafusion/optimizer/tests/optimizer_integration.rs:
##########
@@ -478,6 +514,90 @@ fn 
select_correlated_predicate_subquery_with_uppercase_ident() {
     );
 }
 
+#[test]
+fn recursive_cte_projection_pushdown() -> Result<()> {
+    // Test that projection pushdown works with recursive CTEs by ensuring
+    // only the required columns are projected from the base table, even when
+    // the CTE definition includes unused columns
+    let sql = "WITH RECURSIVE nodes AS (\
+        SELECT col_int32 AS id, col_utf8 AS name, col_uint32 AS extra FROM 
test \
+        UNION ALL \
+        SELECT id + 1, name, extra FROM nodes WHERE id < 3\
+    ) SELECT id FROM nodes";
+    let plan = test_sql(sql)?;
+
+    // The optimizer successfully performs projection pushdown by only 
selecting the needed
+    // columns from the base table and recursive table, eliminating unused 
columns
+    assert_snapshot!(
+        format!("{plan}"),
+        @r#"SubqueryAlias: nodes
+  RecursiveQuery: is_distinct=false
+    Projection: test.col_int32 AS id
+      TableScan: test projection=[col_int32]
+    Projection: CAST(CAST(nodes.id AS Int64) + Int64(1) AS Int32)
+      Filter: nodes.id < Int32(3)
+        TableScan: nodes projection=[id]
+"#
+    );
+    Ok(())
+}
+
+#[test]
+fn recursive_cte_with_unused_columns() -> Result<()> {

Review Comment:
   Could we add a test with a recursive CTE with a nested subquery, which 
disables the projection pushdown logic? If we don't already have one



##########
datafusion/optimizer/tests/optimizer_integration.rs:
##########
@@ -478,6 +514,90 @@ fn 
select_correlated_predicate_subquery_with_uppercase_ident() {
     );
 }
 
+#[test]
+fn recursive_cte_projection_pushdown() -> Result<()> {
+    // Test that projection pushdown works with recursive CTEs by ensuring
+    // only the required columns are projected from the base table, even when
+    // the CTE definition includes unused columns
+    let sql = "WITH RECURSIVE nodes AS (\
+        SELECT col_int32 AS id, col_utf8 AS name, col_uint32 AS extra FROM 
test \
+        UNION ALL \
+        SELECT id + 1, name, extra FROM nodes WHERE id < 3\
+    ) SELECT id FROM nodes";
+    let plan = test_sql(sql)?;
+
+    // The optimizer successfully performs projection pushdown by only 
selecting the needed
+    // columns from the base table and recursive table, eliminating unused 
columns
+    assert_snapshot!(
+        format!("{plan}"),
+        @r#"SubqueryAlias: nodes
+  RecursiveQuery: is_distinct=false
+    Projection: test.col_int32 AS id
+      TableScan: test projection=[col_int32]
+    Projection: CAST(CAST(nodes.id AS Int64) + Int64(1) AS Int32)
+      Filter: nodes.id < Int32(3)
+        TableScan: nodes projection=[id]
+"#
+    );
+    Ok(())
+}
+
+#[test]
+fn recursive_cte_with_unused_columns() -> Result<()> {
+    // Test projection pushdown with a recursive CTE where the base case
+    // includes columns that are never used in the recursive part or final 
result
+    let sql = "WITH RECURSIVE series AS (\
+        SELECT 1 AS n, col_utf8, col_uint32, col_date32 FROM test WHERE 
col_int32 = 1 \
+        UNION ALL \
+        SELECT n + 1, col_utf8, col_uint32, col_date32 FROM series WHERE n < 3\
+    ) SELECT n FROM series";
+    let plan = test_sql(sql)?;
+
+    // The optimizer successfully performs projection pushdown by eliminating 
unused columns
+    // even when they're defined in the CTE but not actually needed
+    assert_snapshot!(
+        format!("{plan}"),
+        @r#"SubqueryAlias: series
+  RecursiveQuery: is_distinct=false
+    Projection: Int64(1) AS n
+      Filter: test.col_int32 = Int32(1)
+        TableScan: test projection=[col_int32]
+    Projection: series.n + Int64(1)
+      Filter: series.n < Int64(3)
+        TableScan: series projection=[n]
+"#
+    );
+    Ok(())
+}
+
+#[test]
+fn recursive_cte_true_projection_pushdown() -> Result<()> {
+    // Test case that truly demonstrates projection pushdown working:
+    // The base case only selects needed columns

Review Comment:
   Could you help me understand why this test case is special compared to the 
other two above it, that it is marked as being "true" projection pushdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to