aokolnychyi commented on code in PR #50761: URL: https://github.com/apache/spark/pull/50761#discussion_r2075734567
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expression, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +class ResolveTableConstraint(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { Review Comment: Optional: Shall we call the rule `ResolveTableConstraints` as it may resolve multiple? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expression, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +class ResolveTableConstraint(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( + _.containsPattern(COMMAND), ruleId) { + case v2Write: V2WriteCommand + if v2Write.table.resolved && v2Write.query.resolved && + !containsCheckInvariant(v2Write.query) && v2Write.outputResolved => + v2Write.table match { + case r: DataSourceV2Relation + if r.table.constraints != null && r.table.constraints.nonEmpty => + // Check constraint is the only enforced constraint for DSV2 tables. + val checks = r.table.constraints.collect { + case c: Check => c + } + val checkInvariants = checks.map { c => + val unresolvedExpr = buildCatalystExpression(c) + val columnExtractors = mutable.Map[String, Expression]() + buildColumnExtractors(unresolvedExpr, columnExtractors) + CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name(), c.predicateSql()) + } + // Combine the check invariants into a single expression using conjunctive AND. + val condition = checkInvariants.reduce(And) Review Comment: Can this produce an exception? What if we have constraints other than CHECK? It seems we will enter this block and throw an exception here. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expression, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +class ResolveTableConstraint(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( + _.containsPattern(COMMAND), ruleId) { + case v2Write: V2WriteCommand + if v2Write.table.resolved && v2Write.query.resolved && + !containsCheckInvariant(v2Write.query) && v2Write.outputResolved => + v2Write.table match { + case r: DataSourceV2Relation + if r.table.constraints != null && r.table.constraints.nonEmpty => + // Check constraint is the only enforced constraint for DSV2 tables. + val checks = r.table.constraints.collect { Review Comment: What about doing this in one pass? ``` val checkInvariants = r.table.constraints.collect { case c: Check => val expr = buildCatalystExpression(c) ... CheckInvariant(...) } ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{CheckInvariant, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand, Validate} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +class ResolveTableConstraint(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( + _.containsPattern(COMMAND), ruleId) { + case v2Write: V2WriteCommand + if v2Write.table.resolved && v2Write.query.resolved && + !v2Write.query.isInstanceOf[Validate] && v2Write.outputResolved => + v2Write.table match { + case r: DataSourceV2Relation + if r.table.constraints() != null && r.table.constraints().nonEmpty => + val checks = r.table.constraints().collect { + case c: Check => c + } + val checkInvariants = checks.map { c => + val parsed = + catalogManager.v1SessionCatalog.parser.parseExpression(c.predicateSql()) + val columnExtractors = mutable.Map[String, Expression]() Review Comment: Well, we could build an immutable map using recursion but it will be a bit costlier. It will unlikely have an impact given that CHECK expressions are usually small but up to you here. I am OK as is. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expression, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +class ResolveTableConstraint(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( + _.containsPattern(COMMAND), ruleId) { + case v2Write: V2WriteCommand + if v2Write.table.resolved && v2Write.query.resolved && + !containsCheckInvariant(v2Write.query) && v2Write.outputResolved => + v2Write.table match { + case r: DataSourceV2Relation + if r.table.constraints != null && r.table.constraints.nonEmpty => + // Check constraint is the only enforced constraint for DSV2 tables. + val checks = r.table.constraints.collect { + case c: Check => c + } + val checkInvariants = checks.map { c => + val unresolvedExpr = buildCatalystExpression(c) + val columnExtractors = mutable.Map[String, Expression]() + buildColumnExtractors(unresolvedExpr, columnExtractors) + CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name(), c.predicateSql()) + } + // Combine the check invariants into a single expression using conjunctive AND. + val condition = checkInvariants.reduce(And) + v2Write.withNewQuery(Filter(condition, v2Write.query)) Review Comment: It is nice to not have a new node but I am worried about nulls. If any of constraint expressions evaluates to null, we will replace the entire filter expression with `false` in `ReplaceNullWithFalseInPredicate`. Can we add tests for this? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{CheckInvariant, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand, Validate} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +class ResolveTableConstraint(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( + _.containsPattern(COMMAND), ruleId) { + case v2Write: V2WriteCommand Review Comment: Sounds good. We will have to agree on behavior for rows that are being copied over there. Technically, those are not new records but we will throw a violation exception. If we only support enforced CHECK constraints, it shouldn't be a problem as ALTER will already validate existing data. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expression, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +class ResolveTableConstraint(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( + _.containsPattern(COMMAND), ruleId) { + case v2Write: V2WriteCommand + if v2Write.table.resolved && v2Write.query.resolved && + !containsCheckInvariant(v2Write.query) && v2Write.outputResolved => + v2Write.table match { + case r: DataSourceV2Relation + if r.table.constraints != null && r.table.constraints.nonEmpty => + // Check constraint is the only enforced constraint for DSV2 tables. + val checks = r.table.constraints.collect { + case c: Check => c + } + val checkInvariants = checks.map { c => + val unresolvedExpr = buildCatalystExpression(c) + val columnExtractors = mutable.Map[String, Expression]() + buildColumnExtractors(unresolvedExpr, columnExtractors) + CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name(), c.predicateSql()) + } + // Combine the check invariants into a single expression using conjunctive AND. + val condition = checkInvariants.reduce(And) + v2Write.withNewQuery(Filter(condition, v2Write.query)) + case _ => + v2Write + } + } + + private def containsCheckInvariant(plan: LogicalPlan): Boolean = { + plan match { + case Filter(condition, _) => + condition.exists(_.isInstanceOf[CheckInvariant]) + + case _ => false + } + } + + private def buildCatalystExpression(c: Check): Expression = { Review Comment: This method may produce a suboptimal error message if a connector provides only the expression and that expression can't be converted to Catalyst. It is similar to the problem I faced in PR #50792. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expression, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +class ResolveTableConstraint(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( + _.containsPattern(COMMAND), ruleId) { + case v2Write: V2WriteCommand + if v2Write.table.resolved && v2Write.query.resolved && + !containsCheckInvariant(v2Write.query) && v2Write.outputResolved => + v2Write.table match { + case r: DataSourceV2Relation + if r.table.constraints != null && r.table.constraints.nonEmpty => + // Check constraint is the only enforced constraint for DSV2 tables. + val checks = r.table.constraints.collect { + case c: Check => c + } + val checkInvariants = checks.map { c => + val unresolvedExpr = buildCatalystExpression(c) + val columnExtractors = mutable.Map[String, Expression]() + buildColumnExtractors(unresolvedExpr, columnExtractors) + CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name(), c.predicateSql()) Review Comment: Optional: Just `c.name` and `c.predicateSql` for consistency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org