berkaysynnada commented on code in PR #14120: URL: https://github.com/apache/datafusion/pull/14120#discussion_r1916044324
########## datafusion/core/src/physical_optimizer/projection_pushdown.rs: ########## @@ -144,7 +144,10 @@ pub fn remove_unnecessary_projections( } else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>() { try_swapping_with_cross_join(projection, cross_join)? } else if let Some(nl_join) = input.downcast_ref::<NestedLoopJoinExec>() { - try_swapping_with_nested_loop_join(projection, nl_join)? + try_pushdown_through_nested_loop_join(projection, nl_join)?.map_or_else( Review Comment: I know the same pattern exists in HashJoin part too, but does it take much effort if we somehow unify this "first try pushdown, if not possible, then embed" approach? Like "embed and pushdown", whichever possible. ########## datafusion/core/src/physical_optimizer/projection_pushdown.rs: ########## @@ -2402,6 +2413,73 @@ mod tests { Ok(()) } + #[test] + fn test_nested_loop_join_after_projection() -> Result<()> { + let left_csv = create_simple_csv_exec(); + let right_csv = create_simple_csv_exec(); + + let col_left_a = col("a", &left_csv.schema())?; + let col_right_b = col("b", &right_csv.schema())?; + let col_left_c = col("c", &left_csv.schema())?; + // left_a < right_b + let filter_expr = + binary(col_left_a, Operator::Lt, col_right_b, &Schema::empty())?; + let filter_column_indices = vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 1, + side: JoinSide::Right, + }, + ColumnIndex { + index: 2, + side: JoinSide::Right, + }, + ]; + let filter_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ]); + + let join: Arc<dyn ExecutionPlan> = Arc::new(NestedLoopJoinExec::try_new( + left_csv, + right_csv, + Some(JoinFilter::new( + filter_expr, + filter_column_indices, + filter_schema, + )), + &JoinType::Inner, + None, + )?); + + let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new( + vec![(col_left_c, "c".to_string())], + Arc::clone(&join), + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[c@2 as c]", + " NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + let expected = [ + "NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1, projection=[c@2]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", Review Comment: A solid example of what I am looking for is, these plans will project a&c (first CsvExec), and b only (second CsvExec). Embedding the projection into CsvExec is already done, we just need to pushdown the projection below NestedLoopJoinExec ########## datafusion/core/src/physical_optimizer/projection_pushdown.rs: ########## @@ -626,6 +635,63 @@ fn collect_column_indices(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> Vec<usiz indices } +fn try_pushdown_through_nested_loop_join( Review Comment: What I mean is, can we do the possible pushdown and embed operation at the same time in this function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org