cloud-fan commented on code in PR #49386: URL: https://github.com/apache/spark/pull/49386#discussion_r1909797114
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala: ########## @@ -712,6 +717,109 @@ case class UnresolvedStarExceptOrReplace( } } +/** + * Represents some of the input attributes to a given relational operator, for example in + * `df.withColumn`. + * + * @param target an optional name that should be the target of the expansion. If omitted all + * targets' columns are produced. This can only be a table name. This + * is a list of identifiers that is the path of the expansion. + * + * @param colNames a list of column names that should be replaced or produced. + * + * @param exprs a list of expressions that should be used to replace the expressions removed by. + * + * @param explicitMetadata an optional list of explicit metadata to associate with the columns. + */ +case class UnresolvedStarWithColumns( + target: Option[Seq[String]], + colNames: Seq[String], + exprs: Seq[Expression], + explicitMetadata: Option[Seq[Metadata]] = None) + extends UnresolvedStarBase { + + override def children: Seq[Expression] = exprs + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression]): UnresolvedStarWithColumns = + copy(exprs = newChildren) + + override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + assert(colNames.size == exprs.size, + s"The size of column names: ${colNames.size} isn't equal to " + + s"the size of expressions: ${exprs.size}") + explicitMetadata.foreach { m => + assert(colNames.size == m.size, + s"The size of column names: ${colNames.size} isn't equal to " + + s"the size of metadata elements: ${m.size}") + } + + SchemaUtils.checkColumnNameDuplication(colNames, resolver) + + val expandedCols = super.expand(input, resolver) + + val columnSeq = explicitMetadata match { + case Some(ms) => colNames.zip(exprs).zip(ms.map(Some(_))) + case _ => colNames.zip(exprs).map((_, None)) + } + + val replacedAndExistingColumns = expandedCols.map { field => + columnSeq.find { case ((colName, _), _) => + resolver(field.name, colName) + } match { + case Some(((colName, expr), m)) => Alias(expr, colName)(explicitMetadata = m) + case _ => field + } + } + + val newColumns = columnSeq.filter { case ((colName, _), _) => + !expandedCols.exists(f => resolver(f.name, colName)) + }.map { + case ((colName, expr), m) => Alias(expr, colName)(explicitMetadata = m) + } + + replacedAndExistingColumns ++ newColumns + } +} + +/** + * Represents some of the input attributes to a given relational operator, for example in + * `df.withColumnRenamed`. + * + * @param target an optional name that should be the target of the expansion. If omitted all + * targets' columns are produced. This can only be a table name. This + * is a list of identifiers that is the path of the expansion. + * + * @param existingNames a list of column names that should be replaced. Review Comment: shall we mention that if the column does not exist, we don't replace and just leave it? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala: ########## @@ -712,6 +717,109 @@ case class UnresolvedStarExceptOrReplace( } } +/** + * Represents some of the input attributes to a given relational operator, for example in + * `df.withColumn`. + * + * @param target an optional name that should be the target of the expansion. If omitted all + * targets' columns are produced. This can only be a table name. This + * is a list of identifiers that is the path of the expansion. + * + * @param colNames a list of column names that should be replaced or produced. + * + * @param exprs a list of expressions that should be used to replace the expressions removed by. + * + * @param explicitMetadata an optional list of explicit metadata to associate with the columns. + */ +case class UnresolvedStarWithColumns( + target: Option[Seq[String]], + colNames: Seq[String], + exprs: Seq[Expression], + explicitMetadata: Option[Seq[Metadata]] = None) + extends UnresolvedStarBase { + + override def children: Seq[Expression] = exprs + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression]): UnresolvedStarWithColumns = + copy(exprs = newChildren) + + override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + assert(colNames.size == exprs.size, + s"The size of column names: ${colNames.size} isn't equal to " + + s"the size of expressions: ${exprs.size}") + explicitMetadata.foreach { m => + assert(colNames.size == m.size, + s"The size of column names: ${colNames.size} isn't equal to " + + s"the size of metadata elements: ${m.size}") + } + + SchemaUtils.checkColumnNameDuplication(colNames, resolver) + + val expandedCols = super.expand(input, resolver) + + val columnSeq = explicitMetadata match { + case Some(ms) => colNames.zip(exprs).zip(ms.map(Some(_))) + case _ => colNames.zip(exprs).map((_, None)) + } + + val replacedAndExistingColumns = expandedCols.map { field => + columnSeq.find { case ((colName, _), _) => + resolver(field.name, colName) + } match { + case Some(((colName, expr), m)) => Alias(expr, colName)(explicitMetadata = m) + case _ => field + } + } + + val newColumns = columnSeq.filter { case ((colName, _), _) => + !expandedCols.exists(f => resolver(f.name, colName)) + }.map { + case ((colName, expr), m) => Alias(expr, colName)(explicitMetadata = m) + } + + replacedAndExistingColumns ++ newColumns + } +} + +/** + * Represents some of the input attributes to a given relational operator, for example in + * `df.withColumnRenamed`. + * + * @param target an optional name that should be the target of the expansion. If omitted all + * targets' columns are produced. This can only be a table name. This + * is a list of identifiers that is the path of the expansion. + * + * @param existingNames a list of column names that should be replaced. + * + * @param newNames a list of new column names that should be used to replace the existing columns. + */ +case class UnresolvedStarWithColumnsRenames( + target: Option[Seq[String]], Review Comment: ditto, not used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org