MaxGekk commented on code in PR #49658:
URL: https://github.com/apache/spark/pull/49658#discussion_r1940855014


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala:
##########
@@ -199,16 +279,97 @@ class ExpressionResolver(
    * from the [[Resolver]] during [[Project]] resolution.
    *
    * The output sequence can be larger than the input sequence due to 
[[UnresolvedStar]] expansion.
+   *
+   * @returns The list of resolved expressions along with flags indicating 
whether the resolved

Review Comment:
   ```suggestion
      * @return The list of resolved expressions along with flags indicating 
whether the resolved
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolutionContext.scala:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+/**
+ * The [[ExpressionResolutionContext]] is a state that is propagated between 
the nodes of the
+ * expression tree during the bottom-up expression resolution process. This 
way we pass the results
+ * of [[ExpressionResolver.resolve]] call, which are not the resolved child 
itself, from children
+ * to parents.
+ *
+ * @hasAggregateExpressionsInASubtree A flag that highlights that a specific 
node corresponding to
+ *                                    [[ExpressionResolutionContext]] has 
aggregate expressions in
+ *                                    its subtree.
+ * @hasAttributeInASubtree A flag that highlights that a specific node 
corresponding to
+ *                         [[ExpressionResolutionContext]] has attributes in 
its subtree.
+ * @hasLateralColumnAlias A flag that highlights that a specific node 
corresponding to

Review Comment:
   ```suggestion
    * @param hasLateralColumnAlias A flag that highlights that a specific node 
corresponding to
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolutionContext.scala:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+/**
+ * The [[ExpressionResolutionContext]] is a state that is propagated between 
the nodes of the
+ * expression tree during the bottom-up expression resolution process. This 
way we pass the results
+ * of [[ExpressionResolver.resolve]] call, which are not the resolved child 
itself, from children
+ * to parents.
+ *
+ * @hasAggregateExpressionsInASubtree A flag that highlights that a specific 
node corresponding to
+ *                                    [[ExpressionResolutionContext]] has 
aggregate expressions in
+ *                                    its subtree.
+ * @hasAttributeInASubtree A flag that highlights that a specific node 
corresponding to

Review Comment:
   ```suggestion
    * @param hasAttributeInASubtree A flag that highlights that a specific node 
corresponding to
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolutionContext.scala:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+/**
+ * The [[ExpressionResolutionContext]] is a state that is propagated between 
the nodes of the
+ * expression tree during the bottom-up expression resolution process. This 
way we pass the results
+ * of [[ExpressionResolver.resolve]] call, which are not the resolved child 
itself, from children
+ * to parents.
+ *
+ * @hasAggregateExpressionsInASubtree A flag that highlights that a specific 
node corresponding to

Review Comment:
   ```suggestion
    * @param hasAggregateExpressionsInASubtree A flag that highlights that a 
specific node corresponding to
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionIdAssigner.scala:
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+import java.util.{ArrayDeque, HashMap, HashSet}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.expressions.{
+  Alias,
+  Attribute,
+  AttributeReference,
+  ExprId,
+  NamedExpression
+}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * [[ExpressionIdAssigner]] is used by the [[ExpressionResolver]] to assign 
unique expression IDs to
+ * [[NamedExpression]]s ([[AttributeReference]]s and [[Alias]]es). This is 
necessary to ensure
+ * that Optimizer performs its work correctly and does not produce correctness 
issues.
+ *
+ * The framework works the following way:
+ *  - Each leaf operator must have unique output IDs (even if it's the same 
table, view, or CTE).
+ *  - The [[AttributeReference]]s get propagated "upwards" through the 
operator tree with their IDs
+ *    preserved.
+ *  - Each [[Alias]] gets assigned a new unique ID and it sticks with it after 
it gets converted to
+ *    an [[AttributeReference]] when it is outputted from the operator that 
produced it.
+ *  - Any operator may have [[AttributeReference]]s with the same IDs in its 
output given it is the
+ *    same attribute.
+ * Thus, **no multi-child operator may have children with conflicting 
[[AttributeReference]] IDs**.
+ * In other words, two subtrees must not output the [[AttributeReference]]s 
with the same IDs, since
+ * relations, views and CTEs all output unique attributes, and [[Alias]]es get 
assigned new IDs as
+ * well. [[ExpressionIdAssigner.assertOutputsHaveNoConflictingExpressionIds]] 
is used to assert this
+ * invariant.
+ *
+ * For SQL queries, this framework provides correctness just by reallocating 
relation outputs and
+ * by validating the invariants mentioned above. Reallocation is done in
+ * [[Resolver.handleLeafOperator]]. If all the relations (even if it's the 
same table) have unique
+ * output IDs, the expression ID assignment will be correct, because there are 
no duplicate IDs in
+ * a pure unresolved tree. The old ID -> new ID mapping is not needed in this 
case.
+ * For example, consider this query:
+ *
+ * {{{
+ * SELECT * FROM t AS t1 CROSS JOIN t AS t2 ON t1.col1 = t2.col1
+ * }}}
+ *
+ * The analyzed plan should be:
+ * {{{
+ * Project [col1#0, col2#1, col1#2, col2#3]
+ * +- Join Cross, (col1#0 = col1#2)
+ *    :- SubqueryAlias t1
+ *    :   +- Relation t[col1#0,col2#1] parquet
+ *    +- SubqueryAlias t2
+ *        +- Relation t[col1#2,col2#3] parquet
+ * }}}
+ *
+ * and not:
+ * {{{
+ * Project [col1#0, col2#1, col1#0, col2#1]
+ * +- Join Cross, (col1#0 = col1#0)
+ *    :- SubqueryAlias t1
+ *    :   +- Relation t[col1#0,col2#1] parquet
+ *    +- SubqueryAlias t2
+ *        +- Relation t[col1#0,col2#1] parquet
+ * }}}
+ *
+ * Because in the latter case the join condition is always true.
+ *
+ * For DataFrame programs we need the full power of [[ExpressionIdAssigner]], 
and old ID -> new ID
+ * mapping comes in handy, because DataFrame programs pass _partially_ 
resolved plans to the
+ * [[Resolver]], which may consist of duplicate subtrees, and thus will have 
already assigned
+ * expression IDs. These already resolved dupliciate subtrees with assigned 
IDs will conflict.
+ * Hence, we need to reallocate all the leaf node outputs _and_ remap old IDs 
to the new ones.
+ * Also, DataFrame programs may introduce the same [[Alias]]es in different 
parts of the query plan,
+ * so we just reallocate all the [[Alias]]es.
+ *
+ * For example, consider this DataFrame program:
+ *
+ * {{{
+ * spark.range(0, 10).select($"id").write.format("parquet").saveAsTable("t")
+ * val alias = ($"id" + 1).as("id")
+ * spark.table("t").select(alias).select(alias)
+ * }}}
+ *
+ * The analyzed plan should be:
+ * {{{
+ * Project [(id#6L + cast(1 as bigint)) AS id#13L]
+ * +- Project [(id#4L + cast(1 as bigint)) AS id#6L]
+ *    +- SubqueryAlias spark_catalog.default.t
+ *       +- Relation spark_catalog.default.t[id#4L] parquet
+ * }}}
+ *
+ * and not:
+ * {{{
+ * Project [(id#6L + cast(1 as bigint)) AS id#6L]
+ * +- Project [(id#4L + cast(1 as bigint)) AS id#6L]
+ *    +- SubqueryAlias spark_catalog.default.t
+ *       +- Relation spark_catalog.default.t[id#4L] parquet
+ * }}}
+ *
+ * Because the latter case will confuse the Optimizer and the top [[Project]] 
will be eliminated
+ * leading to incorrect result.
+ *
+ * There's an important caveat here: the leftmost branch of a logical plan 
tree. In this branch we
+ * need to preserve the expression IDs wherever possible because DataFrames 
may reference each other
+ * using their attributes. This also makes sense for performance reasons.
+ *
+ * Consider this example:
+ *
+ * {{{
+ * val df1 = spark.range(0, 10).select($"id")
+ * val df2 = spark.range(5, 15).select($"id")
+ * df1.union(df2).filter(df1("id") === 5)
+ * }}}
+ *
+ * In this example `df("id")` references lower `id` attribute by expression 
ID, so `union` must not
+ * reassign expression IDs in `df1` (left child). Referencing `df2` (right 
child) is not supported
+ * in Spark.
+ *
+ * The [[ExpressionIdAssigner]] covers both SQL and DataFrame scenarios with 
single approach and is
+ * integrated in the single-pass analysis framework.
+ *
+ * The [[ExpressionIdAssigner]] is used in the following way:
+ *  - When the [[Resolver]] traverses the tree downwards prior to starting 
bottom-up analysis,
+ *    we build the [[mappingStack]] by calling [[withNewMapping]] (i.e. 
[[mappingStack.push]])
+ *    for every child of a multi-child operator, so we have a separate stack 
entry (separate
+ *    mapping) for each branch. This way sibling branches' mappings are 
isolated from each other and
+ *    attribute IDs are reused only within the same branch. Initially we push 
`None`, because
+ *    the mapping needs to be initialized later with the correct output of a 
resolved operator.
+ *  - When the bottom-up analysis starts, we assign IDs to all the 
[[NamedExpression]]s which are
+ *    present in operators starting from the [[LeafNode]]s using 
[[mapExpression]].
+ *    [[createMapping]] is called right after each [[LeafNode]] is resolved, 
and first remapped
+ *    attributes come from that [[LeafNode]]. This is done in 
[[Resolver.handleLeafOperator]] for
+ *    each logical plan tree branch except the leftmost.
+ *  - Once the child branch is resolved, [[withNewMapping]] ends by calling 
[[mappingStack.pop]].
+ *  - After the multi-child operator is resolved, we call [[createMapping]] to
+ *    initialize the mapping with attributes _chosen_ (e.g. 
[[Union.mergeChildOutputs]]) by that
+ *    operator's resolution algorithm and remap _old_ expression IDs to those 
chosen attributes.
+ *  - Continue remapping expressions until we reach the root of the operator 
tree.
+ */
+class ExpressionIdAssigner {
+  private val mappingStack = new ExpressionIdAssigner.Stack
+  mappingStack.push(ExpressionIdAssigner.StackEntry(isLeftmostBranch = true))
+
+  /**
+   * Returns `true` if the current logical plan branch is the leftmost branch. 
This is important
+   * in the context of preserving expression IDs in DataFrames. See class doc 
for more details.
+   */
+  def isLeftmostBranch: Boolean = mappingStack.peek().isLeftmostBranch
+
+  /**
+   * A RAII-wrapper for [[mappingStack.push]] and [[mappingStack.pop]]. 
[[Resolver]] uses this for
+   * every child of a multi-child operator to ensure that each operator branch 
uses an isolated
+   * expression ID mapping.
+   *
+   * @param isLeftmostChild whether the current child is the leftmost child of 
the operator that is
+   *   being resolved. This is used to determine whether the new stack entry 
is gonna be in the
+   *   leftmost logical plan branch. It's `false` by default, because it's 
safer to remap attributes
+   *   than to leave duplicates (to prevent correctness issues).
+   */
+  def withNewMapping[R](isLeftmostChild: Boolean = false)(body: => R): R = {
+    mappingStack.push(
+      ExpressionIdAssigner.StackEntry(
+        isLeftmostBranch = isLeftmostChild && isLeftmostBranch
+      )
+    )
+    try {
+      body
+    } finally {
+      mappingStack.pop()
+    }
+  }
+
+  /**
+   * Create mapping with the given `newOutput` that rewrites the `oldOutput`. 
This
+   * is used by the [[Resolver]] after the multi-child operator is resolved to 
fill the current
+   * mapping with the attributes _chosen_ by that operator's resolution 
algorithm and remap _old_
+   * expression IDs to those chosen attributes. It's also used by the 
[[ExpressionResolver]] right
+   * before remapping the attributes of a [[LeafNode]].
+   *
+   * `oldOutput` is present for already resolved subtrees (e.g. DataFrames), 
but for SQL queries
+   * is will be `None`, because that logical plan is analyzed for the first 
time.
+   */
+  def createMapping(
+      newOutput: Seq[Attribute] = Seq.empty,
+      oldOutput: Option[Seq[Attribute]] = None): Unit = {
+    if (mappingStack.peek().mapping.isDefined) {
+      throw SparkException.internalError(
+        s"Attempt to overwrite existing mapping. New output: $newOutput, old 
output: $oldOutput"
+      )
+    }
+
+    val newMapping = new ExpressionIdAssigner.Mapping
+    oldOutput match {
+      case Some(oldOutput) =>
+        if (newOutput.length != oldOutput.length) {
+          throw SparkException.internalError(
+            s"Outputs have different lengths. New output: $newOutput, old 
output: $oldOutput"
+          )
+        }
+
+        newOutput.zip(oldOutput).foreach {
+          case (newAttribute, oldAttribute) =>
+            newMapping.put(oldAttribute.exprId, newAttribute.exprId)
+            newMapping.put(newAttribute.exprId, newAttribute.exprId)
+        }
+      case None =>
+        newOutput.foreach { newAttribute =>
+          newMapping.put(newAttribute.exprId, newAttribute.exprId)
+        }
+    }
+
+    mappingStack.push(mappingStack.pop().copy(mapping = Some(newMapping)))
+  }
+
+  /**
+   * Assign a correct ID to the given [[originalExpression]] and return a new 
instance of that
+   * expression, or return a corresponding new instance of the same attribute, 
that was previously
+   * reallocated and is present in the current [[mappingStack]] entry.
+   *
+   * For [[Alias]]es: Try to preserve them if we are in the leftmost logical 
plan tree branch and
+   * unless they conflict. Conflicting [[Alias]] IDs are never acceptable. 
Otherwise, reallocate
+   * with a new ID and return that instance.
+   *
+   * For [[AttributeReference]]s: If the attribute is present in the current 
[[mappingStack]] entry,
+   * return that instance, otherwise reallocate with a new ID and return that 
instance. The mapping
+   * is done both from the original expression ID _and_ from the new 
expression ID - this way we are
+   * able to replace old references to that attribute in the current operator 
branch, and preserve
+   * already reallocated attributes to make this call idempotent.
+   *
+   * When remapping the provided expressions, we don't replace them with the 
previously seen
+   * attributes, but replace their IDs ([[NamedExpression.withExprId]]). This 
is done to preserve
+   * the properties of attributes at a certain point in the query plan. 
Examples where it's
+   * important:
+   *
+   * 1) Preserve the name case. In Spark the "requested" name takes precedence 
over the "original"
+   * name:
+   *
+   * {{{
+   * -- The output schema is [col1, COL1]
+   * SELECT col1, COL1 FROM VALUES (1);
+   * }}}
+   *
+   * 2) Preserve the metadata:
+   *
+   * {{{
+   * // Metadata "m1" remains, "m2" gets overwritten by the specified schema, 
"m3" is newly added.
+   * val metadata1 = new MetadataBuilder().putString("m1", 
"1").putString("m2", "2").build()
+   * val metadata2 = new MetadataBuilder().putString("m2", 
"3").putString("m3", "4").build()
+   * val schema = new StructType().add("a", IntegerType, nullable = true, 
metadata = metadata2)
+   * val df =
+   *   spark.sql("SELECT col1 FROM VALUES (1)").select(col("col1").as("a", 
metadata1)).to(schema)
+   * }}}
+   */
+  def mapExpression(originalExpression: NamedExpression): NamedExpression = {
+    if (!mappingStack.peek().mapping.isDefined) {

Review Comment:
   nit:
   ```suggestion
       if (mappingStack.peek().mapping.isEmpty) {
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/CteScope.scala:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+import java.util.{ArrayDeque, ArrayList}
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.catalyst.plans.logical.CTERelationDef
+
+/**
+ * The [[CteScope]] is responsible for keeping track of visible and known CTE 
definitions at a given
+ * stage of a SQL query/DataFrame program resolution. These scopes are stacked 
and the stack is
+ * managed by the [[CteRegistry]]. The scope is created per single WITH clause.
+ *
+ * The CTE operators are:
+ *  - [[UnresolvedWith]]. This is a `host` operator that contains a list of 
unresolved CTE
+ *    definitions from the WITH clause and a single child operator, which is 
the actual unresolved
+ *    SELECT query.
+ *  - [[UnresolvedRelation]]. This is a generic unresolved relation operator 
that will sometimes
+ *    be resolved to a CTE definition and later replaced with a 
[[CTERelationRef]]. The CTE takes
+ *    precedence over a regular table or a view when resolving this identifier.
+ *  - [[CTERelationDef]]. This is a reusable logical plan, which will later be 
referenced by the
+ *    lower CTE definitions and [[UnresolvedWith]] child.
+ *  - [[CTERelationRef]]. This is a leaf node similar to a relation operator 
that references a
+ *    certain [[CTERelationDef]] by its ID. It has a name (unique locally for 
a WITH clause list)
+ *    and an ID (unique for all the CTEs in a query).
+ *  - [[WithCTE]]. This is a `host` operator that contains a list of resolved 
CTE definitions from
+ *    the WITH clause and a single child operator, which is the actual 
resolved SELECT query.
+ *
+ * The task of the [[Resolver]] is to correctly place [[WithCTE]] with 
[[CTERelationDef]]s inside
+ * and make sure that [[CTERelationRef]]s correctly reference 
[[CTERelationDef]]s with their IDs.
+ * The decision whether to inline those CTE subtrees or not is made by the 
[[Optimizer]], unlike
+ * what Spark does for the [[View]]s (always inline during the analysis).
+ *
+ * There are some caveats in how Spark places those operators and resolves 
their names:
+ *  - Ambiguous CTE definition names are disallowed only within a single WITH 
clause, and this is
+ *    validated by the Parser in [[AstBuilder]]
+ *    using [[QueryParsingErrors.duplicateCteDefinitionNamesError]]:
+ *
+ *    {{{
+ *    -- This is disallowed.
+ *    WITH cte AS (SELECT 1),
+ *    cte AS (SELECT 2)
+ *    SELECT * FROM cte;
+ *    }}}
+ *
+ *  - When [[UnresolvedRelation]] identifier is resolved to a 
[[CTERelationDef]] and there is a
+ *    name conflict on several layers of CTE definitions, the lower 
definitions take precedence:
+ *
+ *    {{{
+ *    -- The result is `3`, lower [[CTERelationDef]] takes precedence.
+ *    WITH cte AS (
+ *      SELECT 1
+ *    )
+ *    SELECT * FROM (
+ *      WITH cte AS (
+ *        SELECT 2
+ *      )
+ *      SELECT * FROM (
+ *        WITH cte AS (
+ *          SELECT 3
+ *        )
+ *        SELECT * FROM cte
+ *      )
+ *    )
+ *    }}}
+ *
+ *  - Any subquery can contain [[UnresolvedWith]] on top of it, but 
[[WithCTE]] is not gonna be
+ *    1 to 1 to its unresolved counterpart. For example, if we are dealing 
with simple subqueries,
+ *    [[CTERelationDef]]s will be merged together under a single [[WithCTE]]. 
The previous example
+ *    would produce the following resolved plan:
+ *
+ *    {{{
+ *    WithCTE
+ *    :- CTERelationDef 18, false
+ *    :  +- ...
+ *    :- CTERelationDef 19, false
+ *    :  +- ...
+ *    :- CTERelationDef 20, false
+ *    :  +- ...
+ *    +- Project [3#1203]
+ *    :  +- ...
+ *    }}}
+ *
+ *  - However, if we have any expression subquery (scalar/IN/EXISTS...), the 
top
+ *    [[CTERelationDef]]s and subquery's [[CTERelationDef]] won't be merged 
together (as they are
+ *    separated by an expression tree):
+ *
+ *    {{{
+ *    WITH cte AS (
+ *      SELECT 1 AS col1
+ *    )
+ *    SELECT * FROM cte WHERE col1 IN (
+ *      WITH cte AS (
+ *        SELECT 2
+ *      )
+ *      SELECT * FROM cte
+ *    )
+ *    }}}
+ *
+ *    ->
+ *
+ *    {{{
+ *    WithCTE
+ *    :- CTERelationDef 21, false
+ *    :  +- ...
+ *    +- Project [col1#1223]
+ *       +- Filter col1#1223 IN (list#1222 [])
+ *          :  +- WithCTE
+ *          :     :- CTERelationDef 22, false
+ *          :     :  +- ...
+ *          :     +- Project [2#1241]
+ *          :        +- ...
+ *          +- ...
+ *    }}}
+ *
+ *  - Upper CTEs are visible through subqueries and can be referenced by lower 
operators, but not
+ *    through the [[View]] boyndary:

Review Comment:
   ```suggestion
    *    through the [[View]] boundary:
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionIdAssigner.scala:
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+import java.util.{ArrayDeque, HashMap, HashSet}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.expressions.{
+  Alias,
+  Attribute,
+  AttributeReference,
+  ExprId,
+  NamedExpression
+}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * [[ExpressionIdAssigner]] is used by the [[ExpressionResolver]] to assign 
unique expression IDs to
+ * [[NamedExpression]]s ([[AttributeReference]]s and [[Alias]]es). This is 
necessary to ensure
+ * that Optimizer performs its work correctly and does not produce correctness 
issues.
+ *
+ * The framework works the following way:
+ *  - Each leaf operator must have unique output IDs (even if it's the same 
table, view, or CTE).
+ *  - The [[AttributeReference]]s get propagated "upwards" through the 
operator tree with their IDs
+ *    preserved.
+ *  - Each [[Alias]] gets assigned a new unique ID and it sticks with it after 
it gets converted to
+ *    an [[AttributeReference]] when it is outputted from the operator that 
produced it.
+ *  - Any operator may have [[AttributeReference]]s with the same IDs in its 
output given it is the
+ *    same attribute.
+ * Thus, **no multi-child operator may have children with conflicting 
[[AttributeReference]] IDs**.
+ * In other words, two subtrees must not output the [[AttributeReference]]s 
with the same IDs, since
+ * relations, views and CTEs all output unique attributes, and [[Alias]]es get 
assigned new IDs as
+ * well. [[ExpressionIdAssigner.assertOutputsHaveNoConflictingExpressionIds]] 
is used to assert this
+ * invariant.
+ *
+ * For SQL queries, this framework provides correctness just by reallocating 
relation outputs and
+ * by validating the invariants mentioned above. Reallocation is done in
+ * [[Resolver.handleLeafOperator]]. If all the relations (even if it's the 
same table) have unique
+ * output IDs, the expression ID assignment will be correct, because there are 
no duplicate IDs in
+ * a pure unresolved tree. The old ID -> new ID mapping is not needed in this 
case.
+ * For example, consider this query:
+ *
+ * {{{
+ * SELECT * FROM t AS t1 CROSS JOIN t AS t2 ON t1.col1 = t2.col1
+ * }}}
+ *
+ * The analyzed plan should be:
+ * {{{
+ * Project [col1#0, col2#1, col1#2, col2#3]
+ * +- Join Cross, (col1#0 = col1#2)
+ *    :- SubqueryAlias t1
+ *    :   +- Relation t[col1#0,col2#1] parquet
+ *    +- SubqueryAlias t2
+ *        +- Relation t[col1#2,col2#3] parquet
+ * }}}
+ *
+ * and not:
+ * {{{
+ * Project [col1#0, col2#1, col1#0, col2#1]
+ * +- Join Cross, (col1#0 = col1#0)
+ *    :- SubqueryAlias t1
+ *    :   +- Relation t[col1#0,col2#1] parquet
+ *    +- SubqueryAlias t2
+ *        +- Relation t[col1#0,col2#1] parquet
+ * }}}
+ *
+ * Because in the latter case the join condition is always true.
+ *
+ * For DataFrame programs we need the full power of [[ExpressionIdAssigner]], 
and old ID -> new ID
+ * mapping comes in handy, because DataFrame programs pass _partially_ 
resolved plans to the
+ * [[Resolver]], which may consist of duplicate subtrees, and thus will have 
already assigned
+ * expression IDs. These already resolved dupliciate subtrees with assigned 
IDs will conflict.

Review Comment:
   ```suggestion
    * expression IDs. These already resolved duplicate subtrees with assigned 
IDs will conflict.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/BridgedRelationsProvider.scala:
##########
@@ -32,19 +33,28 @@ class BridgedRelationMetadataProvider(
     override val relationResolution: RelationResolution,
     analyzerBridgeState: AnalyzerBridgeState
 ) extends RelationMetadataProvider {
-  override val relationsWithResolvedMetadata = 
getRelationsFromBridgeState(analyzerBridgeState)
+  override val relationsWithResolvedMetadata = new 
RelationsWithResolvedMetadata
+  updateRelationsWithResolvedMetadata()
 
-  private def getRelationsFromBridgeState(
-      analyzerBridgeState: AnalyzerBridgeState): RelationsWithResolvedMetadata 
= {
-    val result = new RelationsWithResolvedMetadata
+  /**
+   * We update relations on each [[resolve]] call, because relation IDs might 
have changed.
+   * This can happen for the nested views, since catalog name may differ, and 
expanded table name
+   * will differ for the same [[UnresolvedRelation]].
+   *
+   * See [[ViewResolver.resolve]] for more info on how SQL configs are 
propagated to nested views).
+   */
+  override def resolve(unresolvedPlan: LogicalPlan): Unit = {
+    updateRelationsWithResolvedMetadata()
+  }
+
+  private def updateRelationsWithResolvedMetadata(): Unit = {
     analyzerBridgeState.relationsWithResolvedMetadata.forEach(
       (unresolvedRelation, relationWithResolvedMetadata) => {
-        result.put(
+        relationsWithResolvedMetadata.put(
           relationIdFromUnresolvedRelation(unresolvedRelation),
           relationWithResolvedMetadata
         )
       }
     )
-    result
   }
 }

Review Comment:
   Could you adjust the file name to the class name (should be the same).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/CteScope.scala:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+import java.util.{ArrayDeque, ArrayList}
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.catalyst.plans.logical.CTERelationDef
+
+/**
+ * The [[CteScope]] is responsible for keeping track of visible and known CTE 
definitions at a given
+ * stage of a SQL query/DataFrame program resolution. These scopes are stacked 
and the stack is
+ * managed by the [[CteRegistry]]. The scope is created per single WITH clause.
+ *
+ * The CTE operators are:
+ *  - [[UnresolvedWith]]. This is a `host` operator that contains a list of 
unresolved CTE
+ *    definitions from the WITH clause and a single child operator, which is 
the actual unresolved
+ *    SELECT query.
+ *  - [[UnresolvedRelation]]. This is a generic unresolved relation operator 
that will sometimes
+ *    be resolved to a CTE definition and later replaced with a 
[[CTERelationRef]]. The CTE takes
+ *    precedence over a regular table or a view when resolving this identifier.
+ *  - [[CTERelationDef]]. This is a reusable logical plan, which will later be 
referenced by the
+ *    lower CTE definitions and [[UnresolvedWith]] child.
+ *  - [[CTERelationRef]]. This is a leaf node similar to a relation operator 
that references a
+ *    certain [[CTERelationDef]] by its ID. It has a name (unique locally for 
a WITH clause list)
+ *    and an ID (unique for all the CTEs in a query).
+ *  - [[WithCTE]]. This is a `host` operator that contains a list of resolved 
CTE definitions from
+ *    the WITH clause and a single child operator, which is the actual 
resolved SELECT query.
+ *
+ * The task of the [[Resolver]] is to correctly place [[WithCTE]] with 
[[CTERelationDef]]s inside
+ * and make sure that [[CTERelationRef]]s correctly reference 
[[CTERelationDef]]s with their IDs.
+ * The decision whether to inline those CTE subtrees or not is made by the 
[[Optimizer]], unlike
+ * what Spark does for the [[View]]s (always inline during the analysis).
+ *
+ * There are some caveats in how Spark places those operators and resolves 
their names:
+ *  - Ambiguous CTE definition names are disallowed only within a single WITH 
clause, and this is
+ *    validated by the Parser in [[AstBuilder]]
+ *    using [[QueryParsingErrors.duplicateCteDefinitionNamesError]]:
+ *
+ *    {{{
+ *    -- This is disallowed.
+ *    WITH cte AS (SELECT 1),
+ *    cte AS (SELECT 2)
+ *    SELECT * FROM cte;
+ *    }}}
+ *
+ *  - When [[UnresolvedRelation]] identifier is resolved to a 
[[CTERelationDef]] and there is a
+ *    name conflict on several layers of CTE definitions, the lower 
definitions take precedence:
+ *
+ *    {{{
+ *    -- The result is `3`, lower [[CTERelationDef]] takes precedence.
+ *    WITH cte AS (
+ *      SELECT 1
+ *    )
+ *    SELECT * FROM (
+ *      WITH cte AS (
+ *        SELECT 2
+ *      )
+ *      SELECT * FROM (
+ *        WITH cte AS (
+ *          SELECT 3
+ *        )
+ *        SELECT * FROM cte
+ *      )
+ *    )
+ *    }}}
+ *
+ *  - Any subquery can contain [[UnresolvedWith]] on top of it, but 
[[WithCTE]] is not gonna be
+ *    1 to 1 to its unresolved counterpart. For example, if we are dealing 
with simple subqueries,
+ *    [[CTERelationDef]]s will be merged together under a single [[WithCTE]]. 
The previous example
+ *    would produce the following resolved plan:
+ *
+ *    {{{
+ *    WithCTE
+ *    :- CTERelationDef 18, false
+ *    :  +- ...
+ *    :- CTERelationDef 19, false
+ *    :  +- ...
+ *    :- CTERelationDef 20, false
+ *    :  +- ...
+ *    +- Project [3#1203]
+ *    :  +- ...
+ *    }}}
+ *
+ *  - However, if we have any expression subquery (scalar/IN/EXISTS...), the 
top
+ *    [[CTERelationDef]]s and subquery's [[CTERelationDef]] won't be merged 
together (as they are
+ *    separated by an expression tree):
+ *
+ *    {{{
+ *    WITH cte AS (
+ *      SELECT 1 AS col1
+ *    )
+ *    SELECT * FROM cte WHERE col1 IN (
+ *      WITH cte AS (
+ *        SELECT 2
+ *      )
+ *      SELECT * FROM cte
+ *    )
+ *    }}}
+ *
+ *    ->
+ *
+ *    {{{
+ *    WithCTE
+ *    :- CTERelationDef 21, false
+ *    :  +- ...
+ *    +- Project [col1#1223]
+ *       +- Filter col1#1223 IN (list#1222 [])
+ *          :  +- WithCTE
+ *          :     :- CTERelationDef 22, false
+ *          :     :  +- ...
+ *          :     +- Project [2#1241]
+ *          :        +- ...
+ *          +- ...
+ *    }}}
+ *
+ *  - Upper CTEs are visible through subqueries and can be referenced by lower 
operators, but not
+ *    through the [[View]] boyndary:
+ *
+ *    {{{
+ *    CREATE VIEW v1 AS SELECT 1;
+ *    CREATE VIEW v2 AS SELECT * FROM v1;
+ *
+ *    -- The result is 1.
+ *    -- The `v2` body will be inlined in the main query tree during the 
analysis, but upper `v1`
+ *    -- CTE definition _won't_ take precedence over the lower `v1` view.
+ *    WITH v1 AS (
+ *      SELECT 2
+ *    )
+ *    SELECT * FROM v2;
+ *    }}}
+ *
+ * @param isRoot This marks the place where [[WithCTE]] has to be placed with 
all the merged
+ *   [[CTERelationDef]] that were collected under it. It will be true for root 
query, [[View]]s
+ *   and expression subqueries.
+ * @param isOpaque This flag makes this [[CteScope]] opaque for 
[[CTERelationDef]] lookups. It will
+ *   be true for root query and [[View]]s.
+ */
+class CteScope(val isRoot: Boolean, val isOpaque: Boolean) {
+
+  /**
+   * Known [[CTERelationDef]]s that were already resolved in this scope or in 
child scopes. This is
+   * used to merge CTE definitions together in a single [[WithCTE]].
+   */
+  private val knownCtes = new ArrayList[CTERelationDef]
+
+  /**
+   * Visible [[CTERelationDef]]s that were already resolved in this scope. 
Child scope definitions
+   * are _not_ visible. Upper definitions _are_ visible, but this is handled by
+   * [[CteRegistry.resolveCteName]] to avoid cascadingly growing 
[[IdentifierMap]]s.

Review Comment:
   ```suggestion
      * [[CteRegistry.resolveCteName]] to avoid calculatingly growing 
[[IdentifierMap]]s.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org


Reply via email to