gabotechs commented on code in PR #16454: URL: https://github.com/apache/datafusion/pull/16454#discussion_r2182236680
########## datafusion/expr/src/logical_plan/builder.rs: ########## @@ -1667,6 +1667,34 @@ pub fn build_join_schema( dfschema.with_functional_dependencies(func_dependencies) } +/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise +/// conflict with the columns from the other. +/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying +/// aliases, neither for columns nor for tables. DataFusion requires columns to be uniquely identifiable, in some +/// places (see e.g. DFSchema::check_names). +pub fn requalify_sides_if_needed( Review Comment: How about extending the doc comment with something like: ``` The function returns: - The requalified or original left logical plan - The requalified or original right logical plan - If a requalification was needed or not ``` ########## datafusion/core/src/physical_planner.rs: ########## @@ -1502,6 +1521,64 @@ fn get_null_physical_expr_pair( Ok((Arc::new(null_value), physical_name)) } +/// Qualifies the fields in a join schema with "left" and "right" qualifiers +/// without mutating the original schema. This function should only be used when +/// the join inputs have already been requalified earlier in `try_new_with_project_input`. +/// +/// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution) +/// when converting expressions to fields. +fn qualify_join_schema_sides( + join_schema: &DFSchema, + left: &LogicalPlan, + right: &LogicalPlan, +) -> Result<DFSchema> { + let left_fields = left.schema().fields(); + let right_fields = right.schema().fields(); + let join_fields = join_schema.fields(); + + // Validate lengths + if join_fields.len() != left_fields.len() + right_fields.len() { + return internal_err!( + "Join schema field count must match left and right field count." + ); + } + + // Validate field names match + for (i, (field, expected)) in join_fields + .iter() + .zip(left_fields.iter().chain(right_fields.iter())) + .enumerate() + { + if field.name() != expected.name() { + return internal_err!( + "Field name mismatch at index {}: expected '{}', found '{}'", + i, + expected.name(), + field.name() + ); + } + } Review Comment: Do you think this validation is necessary here? IIUC these are the original schema and sides of the join in DF's logical plan, I'd expect them to be correct. If you think it's better to still do this validation here, then let's keep it 👍 ########## datafusion/common/src/dfschema.rs: ########## @@ -206,6 +206,24 @@ impl DFSchema { Ok(dfschema) } + /// Return the same schema, where all fields have a given qualifier. + pub fn with_field_specific_qualified_schema( + &self, + qualifiers: Vec<Option<TableReference>>, + ) -> Result<Self> { + if qualifiers.len() != self.fields().len() { + return _plan_err!( + "{}", + "Number of qualifiers must match number of fields".to_string() + ); Review Comment: A bit weird how this is getting formatted isn't it? how about: ```rust return _plan_err!( "Number of qualifiers must match number of fields. Expected {}, got {}", self.fields().len(), qualifiers.len() ); ``` ########## datafusion/core/src/physical_planner.rs: ########## @@ -1502,6 +1521,64 @@ fn get_null_physical_expr_pair( Ok((Arc::new(null_value), physical_name)) } +/// Qualifies the fields in a join schema with "left" and "right" qualifiers +/// without mutating the original schema. This function should only be used when +/// the join inputs have already been requalified earlier in `try_new_with_project_input`. +/// +/// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution) +/// when converting expressions to fields. +fn qualify_join_schema_sides( + join_schema: &DFSchema, + left: &LogicalPlan, + right: &LogicalPlan, +) -> Result<DFSchema> { + let left_fields = left.schema().fields(); + let right_fields = right.schema().fields(); + let join_fields = join_schema.fields(); + + // Validate lengths + if join_fields.len() != left_fields.len() + right_fields.len() { + return internal_err!( + "Join schema field count must match left and right field count." + ); + } + + // Validate field names match + for (i, (field, expected)) in join_fields + .iter() + .zip(left_fields.iter().chain(right_fields.iter())) + .enumerate() + { + if field.name() != expected.name() { + return internal_err!( + "Field name mismatch at index {}: expected '{}', found '{}'", + i, + expected.name(), + field.name() + ); + } + } + + // qualify sides + let qualifiers = join_fields + .iter() + .enumerate() + .map(|(i, _)| { + if i < left_fields.len() { + Some(TableReference::Bare { + table: Arc::from("left"), + }) + } else { + Some(TableReference::Bare { + table: Arc::from("right"), + }) Review Comment: 🤔 having the hardcoded "left" and "right" strings here introduces a "stringly-typed dependency" with `requalify_sides_if_needed`, where the "left" and "right" qualifiers are also hardcoded. Not a very big deal though ########## datafusion/expr/src/logical_plan/plan.rs: ########## @@ -3795,37 +3796,58 @@ impl Join { }) } - /// Create Join with input which wrapped with projection, this method is used to help create physical join. + /// Create Join with input which wrapped with projection, this method is used in physcial planning only to help Review Comment: Now that this returns a tuple, how about adding a comment about what each entry in the tuple is supposed to represent? ########## datafusion/expr/src/logical_plan/plan.rs: ########## @@ -3795,37 +3796,58 @@ impl Join { }) } - /// Create Join with input which wrapped with projection, this method is used to help create physical join. + /// Create Join with input which wrapped with projection, this method is used in physcial planning only to help + /// create the physical join. pub fn try_new_with_project_input( original: &LogicalPlan, left: Arc<LogicalPlan>, right: Arc<LogicalPlan>, column_on: (Vec<Column>, Vec<Column>), - ) -> Result<Self> { + ) -> Result<(Self, bool)> { let original_join = match original { LogicalPlan::Join(join) => join, _ => return plan_err!("Could not create join with project input"), }; + let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left)); + let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right)); + + let mut requalified = false; + + // By definition, the resulting schema of an inner join will have first the left side fields and then the right, + // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before. + // TODO: handle left and right joins as well. + if original_join.join_type == JoinType::Inner { Review Comment: 🤔 I imagine that for the same reason that we might need to re-qualify the sides of an INNER join, we might also need to do the same for any other join that returns columns from both sides right? For example, I can just go to the test Substrait file you committed, replace `JOIN_TYPE_INNER` by `JOIN_TYPE_LEFT`, and the following error appears while running the test: ``` Error: SchemaError(DuplicateUnqualifiedField { name: "upper(host)" }, Some("")) ``` ########## datafusion/expr/src/logical_plan/plan.rs: ########## @@ -3795,37 +3796,58 @@ impl Join { }) } - /// Create Join with input which wrapped with projection, this method is used to help create physical join. + /// Create Join with input which wrapped with projection, this method is used in physcial planning only to help Review Comment: ```suggestion /// Create Join with input which wrapped with projection, this method is used in physical planning only to help ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org