aokolnychyi commented on code in PR #50839:
URL: https://github.com/apache/spark/pull/50839#discussion_r2087219426


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala:
##########
@@ -44,3 +45,34 @@ case class AlterTableExec(
     Seq.empty
   }
 }
+
+/**
+ * Physical plan node for adding a check constraint with validation.
+ */
+case class AddCheckConstraintExec(
+    catalog: TableCatalog,
+    ident: Identifier,
+    changes: Seq[TableChange],
+    condition: String,
+    child: SparkPlan) extends V2CommandExec with UnaryExecNode {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+    try {
+     if (child.executeTake(1).nonEmpty) {

Review Comment:
   Optional: I wonder whether we should move this block out of the try/catch. 
Technically, only `alterTable` below can throw `IllegalArgumentException`.
   
   ```
   if (child.executeTake(1).nonEmpty) {
     throw QueryExecutionErrors.newCheckViolation(condition, ident.name())
   }
   ```
   
   Up to you.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala:
##########
@@ -290,23 +291,45 @@ case class AlterTableCollation(
 }
 
 /**
- * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command.
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Primary 
Key, Foreign Key,
+ * and Unique constraints.
  */
 case class AddConstraint(
     table: LogicalPlan,
     tableConstraint: TableConstraint) extends AlterTableCommand {
+
   override def changes: Seq[TableChange] = {
     val constraint = tableConstraint.toV2Constraint
-    val validatedTableVersion = table match {
-      case t: ResolvedTable if constraint.enforced() =>
-        t.table.currentVersion()
+    // The table version is null because the constraint is not enforced.
+    Seq(TableChange.addConstraint(constraint, null))
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Check 
constraints.
+ * It doesn't extend [[AlterTableCommand]] because its child is a filtered 
table scan rather than
+ * a table reference.
+ */
+case class AddCheckConstraint(
+    child: LogicalPlan,
+    checkConstraint: CheckConstraint) extends UnaryCommand {
+
+  def changes: Seq[TableChange] = {
+    val constraint = checkConstraint.toV2Constraint
+    val validatedTableVersion = 
child.find(_.isInstanceOf[DataSourceV2ScanRelation]) match {
+      case Some(d: DataSourceV2ScanRelation) if constraint.enforced() =>
+        d.relation.table.currentVersion()

Review Comment:
   Now that I think about it, I wonder whether we should expose the version on 
the scan, rather than on the table. It is not something to worry about in this 
PR, but prior to 4.1. Let's discuss separately.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala:
##########
@@ -290,23 +291,45 @@ case class AlterTableCollation(
 }
 
 /**
- * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command.
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Primary 
Key, Foreign Key,
+ * and Unique constraints.
  */
 case class AddConstraint(
     table: LogicalPlan,
     tableConstraint: TableConstraint) extends AlterTableCommand {
+
   override def changes: Seq[TableChange] = {
     val constraint = tableConstraint.toV2Constraint
-    val validatedTableVersion = table match {
-      case t: ResolvedTable if constraint.enforced() =>
-        t.table.currentVersion()
+    // The table version is null because the constraint is not enforced.
+    Seq(TableChange.addConstraint(constraint, null))
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Check 
constraints.
+ * It doesn't extend [[AlterTableCommand]] because its child is a filtered 
table scan rather than
+ * a table reference.
+ */
+case class AddCheckConstraint(
+    child: LogicalPlan,
+    checkConstraint: CheckConstraint) extends UnaryCommand {
+
+  def changes: Seq[TableChange] = {

Review Comment:
   Will this ever be a sequence or just one table change?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -5490,9 +5490,16 @@ class AstBuilder extends DataTypeAstBuilder
     withOrigin(ctx) {
       val tableConstraint = 
visitTableConstraintDefinition(ctx.tableConstraintDefinition())
       withIdentClause(ctx.identifierReference, identifiers => {
-        val table = UnresolvedTable(identifiers, "ALTER TABLE ... ADD 
CONSTRAINT")
         val namedConstraint = tableConstraint.withTableName(identifiers.last)
-        AddConstraint(table, namedConstraint)
+        namedConstraint match {
+          case c: CheckConstraint =>
+            val relation = createUnresolvedRelation(ctx.identifierReference())
+            val child = Filter(Not(c.child), relation)

Review Comment:
   What should happen if we pass a condition that gets optimized into true? 
Does it even make sense to pass such constraints to the connectors? What other 
systems do?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -5490,9 +5490,16 @@ class AstBuilder extends DataTypeAstBuilder
     withOrigin(ctx) {
       val tableConstraint = 
visitTableConstraintDefinition(ctx.tableConstraintDefinition())
       withIdentClause(ctx.identifierReference, identifiers => {
-        val table = UnresolvedTable(identifiers, "ALTER TABLE ... ADD 
CONSTRAINT")
         val namedConstraint = tableConstraint.withTableName(identifiers.last)
-        AddConstraint(table, namedConstraint)
+        namedConstraint match {
+          case c: CheckConstraint =>
+            val relation = createUnresolvedRelation(ctx.identifierReference())

Review Comment:
   Minor: Omit `()` like above for consistency?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala:
##########
@@ -290,23 +291,45 @@ case class AlterTableCollation(
 }
 
 /**
- * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command.
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Primary 
Key, Foreign Key,
+ * and Unique constraints.
  */
 case class AddConstraint(
     table: LogicalPlan,
     tableConstraint: TableConstraint) extends AlterTableCommand {
+
   override def changes: Seq[TableChange] = {
     val constraint = tableConstraint.toV2Constraint
-    val validatedTableVersion = table match {
-      case t: ResolvedTable if constraint.enforced() =>
-        t.table.currentVersion()
+    // The table version is null because the constraint is not enforced.
+    Seq(TableChange.addConstraint(constraint, null))
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Check 
constraints.
+ * It doesn't extend [[AlterTableCommand]] because its child is a filtered 
table scan rather than
+ * a table reference.
+ */
+case class AddCheckConstraint(
+    child: LogicalPlan,
+    checkConstraint: CheckConstraint) extends UnaryCommand {
+
+  def changes: Seq[TableChange] = {

Review Comment:
   Having `changes` seems very fragile as output depends on whether the 
planning already happened.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala:
##########
@@ -290,23 +291,45 @@ case class AlterTableCollation(
 }
 
 /**
- * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command.
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Primary 
Key, Foreign Key,
+ * and Unique constraints.
  */
 case class AddConstraint(
     table: LogicalPlan,
     tableConstraint: TableConstraint) extends AlterTableCommand {
+
   override def changes: Seq[TableChange] = {
     val constraint = tableConstraint.toV2Constraint
-    val validatedTableVersion = table match {
-      case t: ResolvedTable if constraint.enforced() =>
-        t.table.currentVersion()
+    // The table version is null because the constraint is not enforced.
+    Seq(TableChange.addConstraint(constraint, null))
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Check 
constraints.
+ * It doesn't extend [[AlterTableCommand]] because its child is a filtered 
table scan rather than
+ * a table reference.
+ */
+case class AddCheckConstraint(
+    child: LogicalPlan,
+    checkConstraint: CheckConstraint) extends UnaryCommand {
+
+  def changes: Seq[TableChange] = {

Review Comment:
   Do we actually need `changes` here? We will have access to 
`DataSourceV2ScanRelation` in the strategy. So I wonder whether we can have 
something like below.
   
   ```
   case a @ AddCheckConstraint(PhysicalOperation(_, _, d: 
DataSourceV2ScanRelation), check) =>
     assert(d.relation.catalog.isDefined, "Catalog should be defined after 
analysis")
     assert(d.relation.identifier.isDefined, "Identifier should be defined 
after analysis")
     val catalog = d.relation.catalog.get.asTableCatalog
     val ident = d.relation.identifier.get
     val condition = a.checkConstraint.condition
     val change = TableChange.addConstraint(
       check.toV2Constraint,
       d.relation.table.currentVersion)
     ResolveTableConstraints.validateCatalogForTableChange(Seq(change), 
catalog, ident)
     AddCheckConstraintExec(catalog, ident, change, condition, 
planLater(a.child)) :: Nil
   ```
   
   Then `AddCheckConstraint` becomes trivial.
   
   ```
   case class AddCheckConstraint(
       child: LogicalPlan,
       checkConstraint: CheckConstraint) extends UnaryCommand {
   
     override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
       copy(child = newChild)
   }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala:
##########
@@ -290,23 +291,45 @@ case class AlterTableCollation(
 }
 
 /**
- * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command.
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Primary 
Key, Foreign Key,
+ * and Unique constraints.
  */
 case class AddConstraint(
     table: LogicalPlan,
     tableConstraint: TableConstraint) extends AlterTableCommand {
+
   override def changes: Seq[TableChange] = {
     val constraint = tableConstraint.toV2Constraint
-    val validatedTableVersion = table match {
-      case t: ResolvedTable if constraint.enforced() =>
-        t.table.currentVersion()
+    // The table version is null because the constraint is not enforced.
+    Seq(TableChange.addConstraint(constraint, null))
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Check 
constraints.
+ * It doesn't extend [[AlterTableCommand]] because its child is a filtered 
table scan rather than
+ * a table reference.
+ */
+case class AddCheckConstraint(
+    child: LogicalPlan,
+    checkConstraint: CheckConstraint) extends UnaryCommand {
+
+  def changes: Seq[TableChange] = {
+    val constraint = checkConstraint.toV2Constraint
+    val validatedTableVersion = 
child.find(_.isInstanceOf[DataSourceV2ScanRelation]) match {

Review Comment:
   What should happen if we pass a condition that gets optimized into true? 
Does it even make sense to pass such constraints to the connectors? What other 
systems do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to