kosiew commented on code in PR #22988:
URL: https://github.com/apache/datafusion/pull/22988#discussion_r3489489677


##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,207 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         Ok(plan)
     }
 
+    fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+        let ast::Merge {
+            table,
+            source,
+            on,
+            clauses,
+            into: _,
+            merge_token: _,
+            optimizer_hints,
+            output,
+        } = merge;
+
+        if !optimizer_hints.is_empty() {
+            plan_err!("Optimizer hints not supported")?;
+        }
+
+        if output.is_some() {
+            return not_impl_err!("MERGE OUTPUT clause is not supported");
+        }
+
+        // 1. Resolve target table
+        let (target_table_name, target_alias) = match &table {
+            TableFactor::Table { name, alias, .. } => (name.clone(), 
alias.clone()),
+            _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+        };
+        let target_table_ref = 
self.object_name_to_table_reference(target_table_name)?;
+        let target_table_source = self
+            .context_provider
+            .get_table_source(target_table_ref.clone())?;
+        // Use alias as schema qualifier so `t.col` resolves when user writes
+        // `MERGE INTO target AS t`. Fall back to the table reference itself.
+        let target_qualifier = target_alias
+            .as_ref()
+            .map(|a| TableReference::bare(a.name.value.clone()))
+            .unwrap_or_else(|| target_table_ref.clone());
+        let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
+            target_qualifier,
+            &target_table_source.schema(),
+        )?);
+
+        // 2. Plan the source (USING clause) as a LogicalPlan
+        let mut planner_context = PlannerContext::new();
+        let source_table_with_joins = TableWithJoins {
+            relation: source,
+            joins: vec![],
+        };
+        let source_plan =
+            self.plan_from_tables(vec![source_table_with_joins], &mut 
planner_context)?;
+
+        // 3. Build a combined schema for resolving expressions in ON and WHEN 
clauses
+        let combined_schema =
+            Arc::new(target_schema.as_ref().join(source_plan.schema())?);
+
+        // 4. Convert the ON condition from sqlparser Expr to datafusion Expr
+        let on_expr = self.sql_to_expr(*on, &combined_schema, &mut 
planner_context)?;
+
+        // 5. Convert each WHEN clause
+        let df_clauses = clauses
+            .into_iter()
+            .map(|clause| {
+                self.merge_clause_to_plan(
+                    clause,
+                    &combined_schema,
+                    &target_schema,
+                    &target_alias,
+                    &mut planner_context,
+                )
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // 6. Build the DmlStatement
+        Ok(LogicalPlan::Dml(DmlStatement::new(
+            target_table_ref,
+            target_table_source,
+            WriteOp::MergeInto(Box::new(MergeIntoOp {
+                on: on_expr,
+                clauses: df_clauses,
+            })),
+            Arc::new(source_plan),
+        )))
+    }
+
+    fn ident_from_object_name_last(name: &ObjectName) -> Result<String> {
+        let part = name
+            .0
+            .iter()
+            .last()
+            .ok_or_else(|| plan_datafusion_err!("Empty column name"))?;
+        part.as_ident()
+            .ok_or_else(|| plan_datafusion_err!("Expected simple identifier"))
+            .map(|ident| ident.value.clone())
+    }
+
+    fn merge_clause_to_plan(
+        &self,
+        clause: ast::MergeClause,
+        combined_schema: &DFSchema,
+        target_schema: &DFSchema,
+        _target_alias: &Option<ast::TableAlias>,
+        planner_context: &mut PlannerContext,
+    ) -> Result<MergeIntoClause> {
+        let kind = match clause.clause_kind {
+            ast::MergeClauseKind::Matched => MergeIntoClauseKind::Matched,
+            ast::MergeClauseKind::NotMatched => 
MergeIntoClauseKind::NotMatched,
+            ast::MergeClauseKind::NotMatchedByTarget => {
+                MergeIntoClauseKind::NotMatchedByTarget
+            }
+            ast::MergeClauseKind::NotMatchedBySource => {
+                MergeIntoClauseKind::NotMatchedBySource
+            }
+        };
+
+        let predicate = clause
+            .predicate
+            .map(|p| self.sql_to_expr(p, combined_schema, planner_context))
+            .transpose()?;
+
+        let action = match clause.action {
+            ast::MergeAction::Update(update_expr) => {
+                let assignments = update_expr
+                    .assignments
+                    .into_iter()
+                    .map(|assign| {
+                        let col_name = match &assign.target {
+                            AssignmentTarget::ColumnName(cols) => {
+                                Self::ident_from_object_name_last(cols)?
+                            }
+                            _ => plan_err!("Tuples are not supported")?,
+                        };
+                        // Validate column exists in target
+                        target_schema.field_with_unqualified_name(&col_name)?;
+                        let value = self.sql_to_expr(
+                            assign.value,
+                            combined_schema,
+                            planner_context,
+                        )?;
+                        Ok((col_name, value))
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                MergeIntoAction::Update(assignments)
+            }
+            ast::MergeAction::Insert(insert_expr) => {
+                let columns: Vec<String> = insert_expr
+                    .columns
+                    .iter()
+                    .map(|c| Self::ident_from_object_name_last(c))

Review Comment:
   I think the MERGE INSERT column handling still needs one more normalization 
step. The extracted column names currently keep the raw `Ident.value` from 
`ident_from_object_name_last`, so validation happens before SQL identifier 
normalization.
   
   That means something like `MERGE ... INSERT (ID) VALUES (...)` can be 
rejected against a target column named `id`, and duplicate detection may not 
match regular INSERT behavior.
   
   Could we normalize the extracted column identifiers before duplicate checks 
and before the `field_with_unqualified_name` lookup, matching regular INSERT 
planning?



##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,207 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         Ok(plan)
     }
 
+    fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+        let ast::Merge {
+            table,
+            source,
+            on,
+            clauses,
+            into: _,
+            merge_token: _,
+            optimizer_hints,
+            output,
+        } = merge;
+
+        if !optimizer_hints.is_empty() {
+            plan_err!("Optimizer hints not supported")?;
+        }
+
+        if output.is_some() {
+            return not_impl_err!("MERGE OUTPUT clause is not supported");
+        }
+
+        // 1. Resolve target table
+        let (target_table_name, target_alias) = match &table {
+            TableFactor::Table { name, alias, .. } => (name.clone(), 
alias.clone()),
+            _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+        };
+        let target_table_ref = 
self.object_name_to_table_reference(target_table_name)?;
+        let target_table_source = self
+            .context_provider
+            .get_table_source(target_table_ref.clone())?;
+        // Use alias as schema qualifier so `t.col` resolves when user writes
+        // `MERGE INTO target AS t`. Fall back to the table reference itself.
+        let target_qualifier = target_alias
+            .as_ref()
+            .map(|a| TableReference::bare(a.name.value.clone()))

Review Comment:
   Nice fix to use the target alias as the schema qualifier here. One remaining 
issue is that this uses `a.name.value.clone()` directly, so unquoted aliases 
are not normalized the same way as the rest of planning.
   
   For example, `MERGE INTO target AS T ... ON T.id = s.id` can store the 
target qualifier as `T`, while expression planning normalizes `T.id` to `t.id`. 
That can still make alias-qualified target references fail.
   
   Could we build the alias qualifier with 
`self.ident_normalizer.normalize(a.name.clone())` before passing it to 
`TableReference::bare(...)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to