berkaysynnada commented on code in PR #14120:
URL: https://github.com/apache/datafusion/pull/14120#discussion_r1916044324


##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -144,7 +144,10 @@ pub fn remove_unnecessary_projections(
         } else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>() 
{
             try_swapping_with_cross_join(projection, cross_join)?
         } else if let Some(nl_join) = 
input.downcast_ref::<NestedLoopJoinExec>() {
-            try_swapping_with_nested_loop_join(projection, nl_join)?
+            try_pushdown_through_nested_loop_join(projection, 
nl_join)?.map_or_else(

Review Comment:
   I know the same pattern exists in HashJoin part too, but does it take much 
effort if we somehow unify this "first try pushdown, if not possible, then 
embed" approach? Like "embed and pushdown", whichever possible.



##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -2402,6 +2413,73 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_nested_loop_join_after_projection() -> Result<()> {
+        let left_csv = create_simple_csv_exec();
+        let right_csv = create_simple_csv_exec();
+
+        let col_left_a = col("a", &left_csv.schema())?;
+        let col_right_b = col("b", &right_csv.schema())?;
+        let col_left_c = col("c", &left_csv.schema())?;
+        // left_a < right_b
+        let filter_expr =
+            binary(col_left_a, Operator::Lt, col_right_b, &Schema::empty())?;
+        let filter_column_indices = vec![
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 1,
+                side: JoinSide::Right,
+            },
+            ColumnIndex {
+                index: 2,
+                side: JoinSide::Right,
+            },
+        ];
+        let filter_schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Int32, true),
+        ]);
+
+        let join: Arc<dyn ExecutionPlan> = 
Arc::new(NestedLoopJoinExec::try_new(
+            left_csv,
+            right_csv,
+            Some(JoinFilter::new(
+                filter_expr,
+                filter_column_indices,
+                filter_schema,
+            )),
+            &JoinType::Inner,
+            None,
+        )?);
+
+        let projection: Arc<dyn ExecutionPlan> = 
Arc::new(ProjectionExec::try_new(
+            vec![(col_left_c, "c".to_string())],
+            Arc::clone(&join),
+        )?);
+        let initial = get_plan_string(&projection);
+        let expected_initial = [
+            "ProjectionExec: expr=[c@2 as c]",
+            "  NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1",
+            "    CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], has_header=false",
+            "    CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], has_header=false",
+            ];
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            ProjectionPushdown::new().optimize(projection, 
&ConfigOptions::new())?;
+        let expected = [
+            "NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1, 
projection=[c@2]",
+            "  CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], has_header=false",
+            "  CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], has_header=false",

Review Comment:
   A solid example of what I am looking for is, these plans will project a&c 
(first CsvExec), and b only (second CsvExec). Embedding the projection into 
CsvExec is already done, we just need to pushdown the projection below 
NestedLoopJoinExec



##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -626,6 +635,63 @@ fn collect_column_indices(exprs: &[(Arc<dyn PhysicalExpr>, 
String)]) -> Vec<usiz
     indices
 }
 
+fn try_pushdown_through_nested_loop_join(

Review Comment:
   What I mean is, can we do the possible pushdown and embed operation at the 
same time in this function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to