jonathanc-n commented on code in PR #16450: URL: https://github.com/apache/datafusion/pull/16450#discussion_r2156969096
########## datafusion/core/src/physical_planner.rs: ########## @@ -1009,95 +1012,99 @@ impl DefaultPhysicalPlanner { let left_df_schema = left.schema(); let right_df_schema = right.schema(); let execution_props = session_state.execution_props(); - let join_on = keys - .iter() - .map(|(l, r)| { - let l = create_physical_expr(l, left_df_schema, execution_props)?; - let r = - create_physical_expr(r, right_df_schema, execution_props)?; - Ok((l, r)) - }) - .collect::<Result<join_utils::JoinOn>>()?; - - let join_filter = match filter { - Some(expr) => { - // Extract columns from filter expression and saved in a HashSet - let cols = expr.column_refs(); - // Collect left & right field indices, the field indices are sorted in ascending order - let left_field_indices = cols - .iter() - .filter_map(|c| left_df_schema.index_of_column(c).ok()) - .sorted() - .collect::<Vec<_>>(); - let right_field_indices = cols - .iter() - .filter_map(|c| right_df_schema.index_of_column(c).ok()) - .sorted() - .collect::<Vec<_>>(); + // We declare a threshold here of 5 rows as NestedLoopJoins tend to better when one + // of the tables are small. + let threshold = session_state + .config_options() + .optimizer + .nested_loop_equijoin_threshold; + let left_rows = *physical_left + // We set the partition to None here to draw the num_rows from the plan + .partition_statistics(None)? + .num_rows + .get_value() + .unwrap() + <= threshold; + let right_rows = *physical_right + .partition_statistics(None)? + .num_rows + .get_value() + .unwrap() + <= threshold; + let use_nested_loop_join_equijoin = left_rows || right_rows; + + // If we can use a nested loop join then `join_on` will be empty because + // the expressions are moved into the join filter. + let join_on: JoinOn = if use_nested_loop_join_equijoin { + Vec::new() + } else { + keys.iter() + .map(|(l, r)| { + let l = + create_physical_expr(l, left_df_schema, execution_props)?; + let r = create_physical_expr( + r, + right_df_schema, + execution_props, + )?; + Ok((l, r)) + }) + .collect::<Result<_>>()? + }; - // Collect DFFields and Fields required for intermediate schemas - let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = - left_field_indices - .clone() - .into_iter() - .map(|i| { - ( - left_df_schema.qualified_field(i), - physical_left.schema().field(i).clone(), - ) + // If we can use nested loop join then we will combine the expressions in `join_on` + // and pass it into the join filter; create your join filters normally otherwise. + let join_filter: Option<JoinFilter> = if use_nested_loop_join_equijoin { + let combined_join_on_expression: Expr = filter Review Comment: To clarify, this is originally not a problem when the expression is in `JoinOn` as each unqualified column is referring to their own table before being combined into one expression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org