aokolnychyi commented on code in PR #50761:
URL: https://github.com/apache/spark/pull/50761#discussion_r2075734567


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, 
Expression, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+class ResolveTableConstraint(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan] {

Review Comment:
   Optional: Shall we call the rule `ResolveTableConstraints` as it may resolve 
multiple?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, 
Expression, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+class ResolveTableConstraint(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    _.containsPattern(COMMAND), ruleId) {
+    case v2Write: V2WriteCommand
+      if v2Write.table.resolved && v2Write.query.resolved &&
+        !containsCheckInvariant(v2Write.query) && v2Write.outputResolved =>
+      v2Write.table match {
+        case r: DataSourceV2Relation
+          if r.table.constraints != null && r.table.constraints.nonEmpty =>
+          // Check constraint is the only enforced constraint for DSV2 tables.
+          val checks = r.table.constraints.collect {
+            case c: Check => c
+          }
+          val checkInvariants = checks.map { c =>
+            val unresolvedExpr = buildCatalystExpression(c)
+            val columnExtractors = mutable.Map[String, Expression]()
+            buildColumnExtractors(unresolvedExpr, columnExtractors)
+            CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name(), 
c.predicateSql())
+          }
+          // Combine the check invariants into a single expression using 
conjunctive AND.
+          val condition = checkInvariants.reduce(And)

Review Comment:
   Can this produce an exception? What if we have constraints other than CHECK? 
It seems we will enter this block and throw an exception here.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, 
Expression, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+class ResolveTableConstraint(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    _.containsPattern(COMMAND), ruleId) {
+    case v2Write: V2WriteCommand
+      if v2Write.table.resolved && v2Write.query.resolved &&
+        !containsCheckInvariant(v2Write.query) && v2Write.outputResolved =>
+      v2Write.table match {
+        case r: DataSourceV2Relation
+          if r.table.constraints != null && r.table.constraints.nonEmpty =>
+          // Check constraint is the only enforced constraint for DSV2 tables.
+          val checks = r.table.constraints.collect {

Review Comment:
   What about doing this in one pass?
   
   ```
   val checkInvariants = r.table.constraints.collect {
     case c: Check =>
       val expr = buildCatalystExpression(c)
       ...
       CheckInvariant(...)
   }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{CheckInvariant, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand, Validate}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+class ResolveTableConstraint(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    _.containsPattern(COMMAND), ruleId) {
+    case v2Write: V2WriteCommand
+      if v2Write.table.resolved && v2Write.query.resolved &&
+        !v2Write.query.isInstanceOf[Validate] && v2Write.outputResolved =>
+      v2Write.table match {
+        case r: DataSourceV2Relation
+          if r.table.constraints() != null && r.table.constraints().nonEmpty =>
+          val checks = r.table.constraints().collect {
+            case c: Check => c
+          }
+          val checkInvariants = checks.map { c =>
+            val parsed =
+              
catalogManager.v1SessionCatalog.parser.parseExpression(c.predicateSql())
+            val columnExtractors = mutable.Map[String, Expression]()

Review Comment:
   Well, we could build an immutable map using recursion but it will be a bit 
costlier. It will unlikely have an impact given that CHECK expressions are 
usually small but up to you here. I am OK as is. 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, 
Expression, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+class ResolveTableConstraint(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    _.containsPattern(COMMAND), ruleId) {
+    case v2Write: V2WriteCommand
+      if v2Write.table.resolved && v2Write.query.resolved &&
+        !containsCheckInvariant(v2Write.query) && v2Write.outputResolved =>
+      v2Write.table match {
+        case r: DataSourceV2Relation
+          if r.table.constraints != null && r.table.constraints.nonEmpty =>
+          // Check constraint is the only enforced constraint for DSV2 tables.
+          val checks = r.table.constraints.collect {
+            case c: Check => c
+          }
+          val checkInvariants = checks.map { c =>
+            val unresolvedExpr = buildCatalystExpression(c)
+            val columnExtractors = mutable.Map[String, Expression]()
+            buildColumnExtractors(unresolvedExpr, columnExtractors)
+            CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name(), 
c.predicateSql())
+          }
+          // Combine the check invariants into a single expression using 
conjunctive AND.
+          val condition = checkInvariants.reduce(And)
+          v2Write.withNewQuery(Filter(condition, v2Write.query))

Review Comment:
   It is nice to not have a new node but I am worried about nulls. If any of 
constraint expressions evaluates to null, we will replace the entire filter 
expression with `false` in `ReplaceNullWithFalseInPredicate`. Can we add tests 
for this?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{CheckInvariant, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand, Validate}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+class ResolveTableConstraint(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    _.containsPattern(COMMAND), ruleId) {
+    case v2Write: V2WriteCommand

Review Comment:
   Sounds good. We will have to agree on behavior for rows that are being 
copied over there. Technically, those are not new records but we will throw a 
violation exception. If we only support enforced CHECK constraints, it 
shouldn't be a problem as ALTER will already validate existing data.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, 
Expression, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+class ResolveTableConstraint(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    _.containsPattern(COMMAND), ruleId) {
+    case v2Write: V2WriteCommand
+      if v2Write.table.resolved && v2Write.query.resolved &&
+        !containsCheckInvariant(v2Write.query) && v2Write.outputResolved =>
+      v2Write.table match {
+        case r: DataSourceV2Relation
+          if r.table.constraints != null && r.table.constraints.nonEmpty =>
+          // Check constraint is the only enforced constraint for DSV2 tables.
+          val checks = r.table.constraints.collect {
+            case c: Check => c
+          }
+          val checkInvariants = checks.map { c =>
+            val unresolvedExpr = buildCatalystExpression(c)
+            val columnExtractors = mutable.Map[String, Expression]()
+            buildColumnExtractors(unresolvedExpr, columnExtractors)
+            CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name(), 
c.predicateSql())
+          }
+          // Combine the check invariants into a single expression using 
conjunctive AND.
+          val condition = checkInvariants.reduce(And)
+          v2Write.withNewQuery(Filter(condition, v2Write.query))
+        case _ =>
+          v2Write
+      }
+  }
+
+  private def containsCheckInvariant(plan: LogicalPlan): Boolean = {
+    plan match {
+      case Filter(condition, _) =>
+        condition.exists(_.isInstanceOf[CheckInvariant])
+
+      case _ => false
+    }
+  }
+
+  private def buildCatalystExpression(c: Check): Expression = {

Review Comment:
   This method may produce a suboptimal error message if a connector provides 
only the expression and that expression can't be converted to Catalyst. It is 
similar to the problem I faced in PR #50792.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraint.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, 
Expression, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+class ResolveTableConstraint(val catalogManager: CatalogManager) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    _.containsPattern(COMMAND), ruleId) {
+    case v2Write: V2WriteCommand
+      if v2Write.table.resolved && v2Write.query.resolved &&
+        !containsCheckInvariant(v2Write.query) && v2Write.outputResolved =>
+      v2Write.table match {
+        case r: DataSourceV2Relation
+          if r.table.constraints != null && r.table.constraints.nonEmpty =>
+          // Check constraint is the only enforced constraint for DSV2 tables.
+          val checks = r.table.constraints.collect {
+            case c: Check => c
+          }
+          val checkInvariants = checks.map { c =>
+            val unresolvedExpr = buildCatalystExpression(c)
+            val columnExtractors = mutable.Map[String, Expression]()
+            buildColumnExtractors(unresolvedExpr, columnExtractors)
+            CheckInvariant(unresolvedExpr, columnExtractors.toSeq, c.name(), 
c.predicateSql())

Review Comment:
   Optional: Just `c.name` and `c.predicateSql` for consistency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to