gengliangwang commented on code in PR #50943: URL: https://github.com/apache/spark/pull/50943#discussion_r2098618954
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala: ########## @@ -22,38 +22,58 @@ import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expressio import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND -import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.{CatalogManager, Table} import org.apache.spark.sql.connector.catalog.constraints.Check import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation class ResolveTableConstraints(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { + private val resolveReferencesInFilter = new ResolveReferencesInFilter(catalogManager) + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(COMMAND), ruleId) { case v2Write: V2WriteCommand if v2Write.table.resolved && v2Write.query.resolved && !containsCheckInvariant(v2Write.query) && v2Write.outputResolved => v2Write.table match { - case r: DataSourceV2Relation - if r.table.constraints != null && r.table.constraints.nonEmpty => - // Check constraint is the only enforced constraint for DSV2 tables. - val checkInvariants = r.table.constraints.collect { - case c: Check => - val unresolvedExpr = buildCatalystExpression(c) - val columnExtractors = mutable.Map[String, Expression]() - buildColumnExtractors(unresolvedExpr, columnExtractors) - CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name, c.predicateSql) - } - // Combine the check invariants into a single expression using conjunctive AND. - checkInvariants.reduceOption(And).fold(v2Write)( - condition => v2Write.withNewQuery(Filter(condition, v2Write.query))) + case r: DataSourceV2Relation => + buildCheckCondition(r.table).map { condition => + val filter = Filter(condition, v2Write.query) + // Resolve attribute references in the filter condition only, not the entire query. Review Comment: See https://github.com/gengliangwang/spark/actions/runs/15130563620/job/42530638818 for details. ``` === Result of Batch DML rewrite === !'Sort [(cast(x#26 as bigint) + cast('c.y as bigint)) ASC NULLS FIRST], true Sort [(cast(x#26 as bigint) + cast(tempresolvedcolumn(c#24.y, c, y, false) as bigint)) ASC NULLS FIRST], true +- Aggregate [c#24.x], [c#24.x AS x#26] +- Aggregate [c#24.x], [c#24.x AS x#26] +- SubqueryAlias t +- SubqueryAlias t +- LocalRelation [c#24] +- LocalRelation [c#24] ,{}) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org