kosiew commented on code in PR #22988:
URL: https://github.com/apache/datafusion/pull/22988#discussion_r3489489677
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,207 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ // Use alias as schema qualifier so `t.col` resolves when user writes
+ // `MERGE INTO target AS t`. Fall back to the table reference itself.
+ let target_qualifier = target_alias
+ .as_ref()
+ .map(|a| TableReference::bare(a.name.value.clone()))
+ .unwrap_or_else(|| target_table_ref.clone());
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
+ target_qualifier,
+ &target_table_source.schema(),
+ )?);
+
+ // 2. Plan the source (USING clause) as a LogicalPlan
+ let mut planner_context = PlannerContext::new();
+ let source_table_with_joins = TableWithJoins {
+ relation: source,
+ joins: vec![],
+ };
+ let source_plan =
+ self.plan_from_tables(vec![source_table_with_joins], &mut
planner_context)?;
+
+ // 3. Build a combined schema for resolving expressions in ON and WHEN
clauses
+ let combined_schema =
+ Arc::new(target_schema.as_ref().join(source_plan.schema())?);
+
+ // 4. Convert the ON condition from sqlparser Expr to datafusion Expr
+ let on_expr = self.sql_to_expr(*on, &combined_schema, &mut
planner_context)?;
+
+ // 5. Convert each WHEN clause
+ let df_clauses = clauses
+ .into_iter()
+ .map(|clause| {
+ self.merge_clause_to_plan(
+ clause,
+ &combined_schema,
+ &target_schema,
+ &target_alias,
+ &mut planner_context,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // 6. Build the DmlStatement
+ Ok(LogicalPlan::Dml(DmlStatement::new(
+ target_table_ref,
+ target_table_source,
+ WriteOp::MergeInto(Box::new(MergeIntoOp {
+ on: on_expr,
+ clauses: df_clauses,
+ })),
+ Arc::new(source_plan),
+ )))
+ }
+
+ fn ident_from_object_name_last(name: &ObjectName) -> Result<String> {
+ let part = name
+ .0
+ .iter()
+ .last()
+ .ok_or_else(|| plan_datafusion_err!("Empty column name"))?;
+ part.as_ident()
+ .ok_or_else(|| plan_datafusion_err!("Expected simple identifier"))
+ .map(|ident| ident.value.clone())
+ }
+
+ fn merge_clause_to_plan(
+ &self,
+ clause: ast::MergeClause,
+ combined_schema: &DFSchema,
+ target_schema: &DFSchema,
+ _target_alias: &Option<ast::TableAlias>,
+ planner_context: &mut PlannerContext,
+ ) -> Result<MergeIntoClause> {
+ let kind = match clause.clause_kind {
+ ast::MergeClauseKind::Matched => MergeIntoClauseKind::Matched,
+ ast::MergeClauseKind::NotMatched =>
MergeIntoClauseKind::NotMatched,
+ ast::MergeClauseKind::NotMatchedByTarget => {
+ MergeIntoClauseKind::NotMatchedByTarget
+ }
+ ast::MergeClauseKind::NotMatchedBySource => {
+ MergeIntoClauseKind::NotMatchedBySource
+ }
+ };
+
+ let predicate = clause
+ .predicate
+ .map(|p| self.sql_to_expr(p, combined_schema, planner_context))
+ .transpose()?;
+
+ let action = match clause.action {
+ ast::MergeAction::Update(update_expr) => {
+ let assignments = update_expr
+ .assignments
+ .into_iter()
+ .map(|assign| {
+ let col_name = match &assign.target {
+ AssignmentTarget::ColumnName(cols) => {
+ Self::ident_from_object_name_last(cols)?
+ }
+ _ => plan_err!("Tuples are not supported")?,
+ };
+ // Validate column exists in target
+ target_schema.field_with_unqualified_name(&col_name)?;
+ let value = self.sql_to_expr(
+ assign.value,
+ combined_schema,
+ planner_context,
+ )?;
+ Ok((col_name, value))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ MergeIntoAction::Update(assignments)
+ }
+ ast::MergeAction::Insert(insert_expr) => {
+ let columns: Vec<String> = insert_expr
+ .columns
+ .iter()
+ .map(|c| Self::ident_from_object_name_last(c))
Review Comment:
I think the MERGE INSERT column handling still needs one more normalization
step. The extracted column names currently keep the raw `Ident.value` from
`ident_from_object_name_last`, so validation happens before SQL identifier
normalization.
That means something like `MERGE ... INSERT (ID) VALUES (...)` can be
rejected against a target column named `id`, and duplicate detection may not
match regular INSERT behavior.
Could we normalize the extracted column identifiers before duplicate checks
and before the `field_with_unqualified_name` lookup, matching regular INSERT
planning?
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,207 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ // Use alias as schema qualifier so `t.col` resolves when user writes
+ // `MERGE INTO target AS t`. Fall back to the table reference itself.
+ let target_qualifier = target_alias
+ .as_ref()
+ .map(|a| TableReference::bare(a.name.value.clone()))
Review Comment:
Nice fix to use the target alias as the schema qualifier here. One remaining
issue is that this uses `a.name.value.clone()` directly, so unquoted aliases
are not normalized the same way as the rest of planning.
For example, `MERGE INTO target AS T ... ON T.id = s.id` can store the
target qualifier as `T`, while expression planning normalizes `T.id` to `t.id`.
That can still make alias-qualified target references fail.
Could we build the alias qualifier with
`self.ident_normalizer.normalize(a.name.clone())` before passing it to
`TableReference::bare(...)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]