cloud-fan commented on code in PR #49386:
URL: https://github.com/apache/spark/pull/49386#discussion_r1909797114


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -712,6 +717,109 @@ case class UnresolvedStarExceptOrReplace(
   }
 }
 
+/**
+ * Represents some of the input attributes to a given relational operator, for 
example in
+ * `df.withColumn`.
+ *
+ * @param target an optional name that should be the target of the expansion. 
If omitted all
+ *              targets' columns are produced. This can only be a table name. 
This
+ *              is a list of identifiers that is the path of the expansion.
+ *
+ * @param colNames a list of column names that should be replaced or produced.
+ *
+ * @param exprs a list of expressions that should be used to replace the 
expressions removed by.
+ *
+ * @param explicitMetadata an optional list of explicit metadata to associate 
with the columns.
+ */
+case class UnresolvedStarWithColumns(
+     target: Option[Seq[String]],
+     colNames: Seq[String],
+     exprs: Seq[Expression],
+     explicitMetadata: Option[Seq[Metadata]] = None)
+  extends UnresolvedStarBase {
+
+  override def children: Seq[Expression] = exprs
+
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): UnresolvedStarWithColumns =
+    copy(exprs = newChildren)
+
+  override def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = {
+    assert(colNames.size == exprs.size,
+      s"The size of column names: ${colNames.size} isn't equal to " +
+        s"the size of expressions: ${exprs.size}")
+    explicitMetadata.foreach { m =>
+      assert(colNames.size == m.size,
+        s"The size of column names: ${colNames.size} isn't equal to " +
+          s"the size of metadata elements: ${m.size}")
+    }
+
+    SchemaUtils.checkColumnNameDuplication(colNames, resolver)
+
+    val expandedCols = super.expand(input, resolver)
+
+    val columnSeq = explicitMetadata match {
+      case Some(ms) => colNames.zip(exprs).zip(ms.map(Some(_)))
+      case _ => colNames.zip(exprs).map((_, None))
+    }
+
+    val replacedAndExistingColumns = expandedCols.map { field =>
+      columnSeq.find { case ((colName, _), _) =>
+        resolver(field.name, colName)
+      } match {
+        case Some(((colName, expr), m)) => Alias(expr, 
colName)(explicitMetadata = m)
+        case _ => field
+      }
+    }
+
+    val newColumns = columnSeq.filter { case ((colName, _), _) =>
+      !expandedCols.exists(f => resolver(f.name, colName))
+    }.map {
+      case ((colName, expr), m) => Alias(expr, colName)(explicitMetadata = m)
+    }
+
+    replacedAndExistingColumns ++ newColumns
+  }
+}
+
+/**
+ * Represents some of the input attributes to a given relational operator, for 
example in
+ * `df.withColumnRenamed`.
+ *
+ * @param target an optional name that should be the target of the expansion. 
If omitted all
+ *              targets' columns are produced. This can only be a table name. 
This
+ *              is a list of identifiers that is the path of the expansion.
+ *
+ * @param existingNames a list of column names that should be replaced.

Review Comment:
   shall we mention that if the column does not exist, we don't replace and 
just leave it?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -712,6 +717,109 @@ case class UnresolvedStarExceptOrReplace(
   }
 }
 
+/**
+ * Represents some of the input attributes to a given relational operator, for 
example in
+ * `df.withColumn`.
+ *
+ * @param target an optional name that should be the target of the expansion. 
If omitted all
+ *              targets' columns are produced. This can only be a table name. 
This
+ *              is a list of identifiers that is the path of the expansion.
+ *
+ * @param colNames a list of column names that should be replaced or produced.
+ *
+ * @param exprs a list of expressions that should be used to replace the 
expressions removed by.
+ *
+ * @param explicitMetadata an optional list of explicit metadata to associate 
with the columns.
+ */
+case class UnresolvedStarWithColumns(
+     target: Option[Seq[String]],
+     colNames: Seq[String],
+     exprs: Seq[Expression],
+     explicitMetadata: Option[Seq[Metadata]] = None)
+  extends UnresolvedStarBase {
+
+  override def children: Seq[Expression] = exprs
+
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): UnresolvedStarWithColumns =
+    copy(exprs = newChildren)
+
+  override def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = {
+    assert(colNames.size == exprs.size,
+      s"The size of column names: ${colNames.size} isn't equal to " +
+        s"the size of expressions: ${exprs.size}")
+    explicitMetadata.foreach { m =>
+      assert(colNames.size == m.size,
+        s"The size of column names: ${colNames.size} isn't equal to " +
+          s"the size of metadata elements: ${m.size}")
+    }
+
+    SchemaUtils.checkColumnNameDuplication(colNames, resolver)
+
+    val expandedCols = super.expand(input, resolver)
+
+    val columnSeq = explicitMetadata match {
+      case Some(ms) => colNames.zip(exprs).zip(ms.map(Some(_)))
+      case _ => colNames.zip(exprs).map((_, None))
+    }
+
+    val replacedAndExistingColumns = expandedCols.map { field =>
+      columnSeq.find { case ((colName, _), _) =>
+        resolver(field.name, colName)
+      } match {
+        case Some(((colName, expr), m)) => Alias(expr, 
colName)(explicitMetadata = m)
+        case _ => field
+      }
+    }
+
+    val newColumns = columnSeq.filter { case ((colName, _), _) =>
+      !expandedCols.exists(f => resolver(f.name, colName))
+    }.map {
+      case ((colName, expr), m) => Alias(expr, colName)(explicitMetadata = m)
+    }
+
+    replacedAndExistingColumns ++ newColumns
+  }
+}
+
+/**
+ * Represents some of the input attributes to a given relational operator, for 
example in
+ * `df.withColumnRenamed`.
+ *
+ * @param target an optional name that should be the target of the expansion. 
If omitted all
+ *              targets' columns are produced. This can only be a table name. 
This
+ *              is a list of identifiers that is the path of the expansion.
+ *
+ * @param existingNames a list of column names that should be replaced.
+ *
+ * @param newNames a list of new column names that should be used to replace 
the existing columns.
+ */
+case class UnresolvedStarWithColumnsRenames(
+    target: Option[Seq[String]],

Review Comment:
   ditto, not used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to