jonathanc-n commented on code in PR #16450:
URL: https://github.com/apache/datafusion/pull/16450#discussion_r2155615159


##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1009,95 +1012,99 @@ impl DefaultPhysicalPlanner {
                 let left_df_schema = left.schema();
                 let right_df_schema = right.schema();
                 let execution_props = session_state.execution_props();
-                let join_on = keys
-                    .iter()
-                    .map(|(l, r)| {
-                        let l = create_physical_expr(l, left_df_schema, 
execution_props)?;
-                        let r =
-                            create_physical_expr(r, right_df_schema, 
execution_props)?;
-                        Ok((l, r))
-                    })
-                    .collect::<Result<join_utils::JoinOn>>()?;
-
-                let join_filter = match filter {
-                    Some(expr) => {
-                        // Extract columns from filter expression and saved in 
a HashSet
-                        let cols = expr.column_refs();
 
-                        // Collect left & right field indices, the field 
indices are sorted in ascending order
-                        let left_field_indices = cols
-                            .iter()
-                            .filter_map(|c| 
left_df_schema.index_of_column(c).ok())
-                            .sorted()
-                            .collect::<Vec<_>>();
-                        let right_field_indices = cols
-                            .iter()
-                            .filter_map(|c| 
right_df_schema.index_of_column(c).ok())
-                            .sorted()
-                            .collect::<Vec<_>>();
+                // We declare a threshold here of 5 rows as NestedLoopJoins 
tend to better when one
+                // of the tables are small.
+                let threshold = session_state
+                    .config_options()
+                    .optimizer
+                    .nested_loop_equijoin_threshold;
+                let left_rows = *physical_left
+                    // We set the partition to None here to draw the num_rows 
from the plan
+                    .partition_statistics(None)?
+                    .num_rows
+                    .get_value()
+                    .unwrap()
+                    <= threshold;
+                let right_rows = *physical_right
+                    .partition_statistics(None)?
+                    .num_rows
+                    .get_value()
+                    .unwrap()
+                    <= threshold;
+                let use_nested_loop_join_equijoin = left_rows || right_rows;
+
+                // If we can use a nested loop join then `join_on` will be 
empty because
+                // the expressions are moved into the join filter.
+                let join_on: JoinOn = if use_nested_loop_join_equijoin {
+                    Vec::new()
+                } else {
+                    keys.iter()
+                        .map(|(l, r)| {
+                            let l =
+                                create_physical_expr(l, left_df_schema, 
execution_props)?;
+                            let r = create_physical_expr(
+                                r,
+                                right_df_schema,
+                                execution_props,
+                            )?;
+                            Ok((l, r))
+                        })
+                        .collect::<Result<_>>()?
+                };
 
-                        // Collect DFFields and Fields required for 
intermediate schemas
-                        let (filter_df_fields, filter_fields): (Vec<_>, 
Vec<_>) =
-                            left_field_indices
-                                .clone()
-                                .into_iter()
-                                .map(|i| {
-                                    (
-                                        left_df_schema.qualified_field(i),
-                                        
physical_left.schema().field(i).clone(),
-                                    )
+                // If we can use nested loop join then we will combine the 
expressions in `join_on`
+                // and pass it into the join filter; create your join filters 
normally otherwise.
+                let join_filter: Option<JoinFilter> = if 
use_nested_loop_join_equijoin {
+                    let combined_join_on_expression: Expr = filter

Review Comment:
   It seems that when we combine the `JoinOn` expressions here it will cause an 
error when both sides in the expression have the same unqualified name leading 
to duplicate unqualified fields. Is there a function that is able to qualify it 
with the schema, I can't seem to find one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to