MaxGekk commented on code in PR #49658: URL: https://github.com/apache/spark/pull/49658#discussion_r1940855014
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala: ########## @@ -199,16 +279,97 @@ class ExpressionResolver( * from the [[Resolver]] during [[Project]] resolution. * * The output sequence can be larger than the input sequence due to [[UnresolvedStar]] expansion. + * + * @returns The list of resolved expressions along with flags indicating whether the resolved Review Comment: ```suggestion * @return The list of resolved expressions along with flags indicating whether the resolved ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolutionContext.scala: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis.resolver + +/** + * The [[ExpressionResolutionContext]] is a state that is propagated between the nodes of the + * expression tree during the bottom-up expression resolution process. This way we pass the results + * of [[ExpressionResolver.resolve]] call, which are not the resolved child itself, from children + * to parents. + * + * @hasAggregateExpressionsInASubtree A flag that highlights that a specific node corresponding to + * [[ExpressionResolutionContext]] has aggregate expressions in + * its subtree. + * @hasAttributeInASubtree A flag that highlights that a specific node corresponding to + * [[ExpressionResolutionContext]] has attributes in its subtree. + * @hasLateralColumnAlias A flag that highlights that a specific node corresponding to Review Comment: ```suggestion * @param hasLateralColumnAlias A flag that highlights that a specific node corresponding to ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolutionContext.scala: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis.resolver + +/** + * The [[ExpressionResolutionContext]] is a state that is propagated between the nodes of the + * expression tree during the bottom-up expression resolution process. This way we pass the results + * of [[ExpressionResolver.resolve]] call, which are not the resolved child itself, from children + * to parents. + * + * @hasAggregateExpressionsInASubtree A flag that highlights that a specific node corresponding to + * [[ExpressionResolutionContext]] has aggregate expressions in + * its subtree. + * @hasAttributeInASubtree A flag that highlights that a specific node corresponding to Review Comment: ```suggestion * @param hasAttributeInASubtree A flag that highlights that a specific node corresponding to ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolutionContext.scala: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis.resolver + +/** + * The [[ExpressionResolutionContext]] is a state that is propagated between the nodes of the + * expression tree during the bottom-up expression resolution process. This way we pass the results + * of [[ExpressionResolver.resolve]] call, which are not the resolved child itself, from children + * to parents. + * + * @hasAggregateExpressionsInASubtree A flag that highlights that a specific node corresponding to Review Comment: ```suggestion * @param hasAggregateExpressionsInASubtree A flag that highlights that a specific node corresponding to ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionIdAssigner.scala: ########## @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis.resolver + +import java.util.{ArrayDeque, HashMap, HashSet} + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.expressions.{ + Alias, + Attribute, + AttributeReference, + ExprId, + NamedExpression +} +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * [[ExpressionIdAssigner]] is used by the [[ExpressionResolver]] to assign unique expression IDs to + * [[NamedExpression]]s ([[AttributeReference]]s and [[Alias]]es). This is necessary to ensure + * that Optimizer performs its work correctly and does not produce correctness issues. + * + * The framework works the following way: + * - Each leaf operator must have unique output IDs (even if it's the same table, view, or CTE). + * - The [[AttributeReference]]s get propagated "upwards" through the operator tree with their IDs + * preserved. + * - Each [[Alias]] gets assigned a new unique ID and it sticks with it after it gets converted to + * an [[AttributeReference]] when it is outputted from the operator that produced it. + * - Any operator may have [[AttributeReference]]s with the same IDs in its output given it is the + * same attribute. + * Thus, **no multi-child operator may have children with conflicting [[AttributeReference]] IDs**. + * In other words, two subtrees must not output the [[AttributeReference]]s with the same IDs, since + * relations, views and CTEs all output unique attributes, and [[Alias]]es get assigned new IDs as + * well. [[ExpressionIdAssigner.assertOutputsHaveNoConflictingExpressionIds]] is used to assert this + * invariant. + * + * For SQL queries, this framework provides correctness just by reallocating relation outputs and + * by validating the invariants mentioned above. Reallocation is done in + * [[Resolver.handleLeafOperator]]. If all the relations (even if it's the same table) have unique + * output IDs, the expression ID assignment will be correct, because there are no duplicate IDs in + * a pure unresolved tree. The old ID -> new ID mapping is not needed in this case. + * For example, consider this query: + * + * {{{ + * SELECT * FROM t AS t1 CROSS JOIN t AS t2 ON t1.col1 = t2.col1 + * }}} + * + * The analyzed plan should be: + * {{{ + * Project [col1#0, col2#1, col1#2, col2#3] + * +- Join Cross, (col1#0 = col1#2) + * :- SubqueryAlias t1 + * : +- Relation t[col1#0,col2#1] parquet + * +- SubqueryAlias t2 + * +- Relation t[col1#2,col2#3] parquet + * }}} + * + * and not: + * {{{ + * Project [col1#0, col2#1, col1#0, col2#1] + * +- Join Cross, (col1#0 = col1#0) + * :- SubqueryAlias t1 + * : +- Relation t[col1#0,col2#1] parquet + * +- SubqueryAlias t2 + * +- Relation t[col1#0,col2#1] parquet + * }}} + * + * Because in the latter case the join condition is always true. + * + * For DataFrame programs we need the full power of [[ExpressionIdAssigner]], and old ID -> new ID + * mapping comes in handy, because DataFrame programs pass _partially_ resolved plans to the + * [[Resolver]], which may consist of duplicate subtrees, and thus will have already assigned + * expression IDs. These already resolved dupliciate subtrees with assigned IDs will conflict. + * Hence, we need to reallocate all the leaf node outputs _and_ remap old IDs to the new ones. + * Also, DataFrame programs may introduce the same [[Alias]]es in different parts of the query plan, + * so we just reallocate all the [[Alias]]es. + * + * For example, consider this DataFrame program: + * + * {{{ + * spark.range(0, 10).select($"id").write.format("parquet").saveAsTable("t") + * val alias = ($"id" + 1).as("id") + * spark.table("t").select(alias).select(alias) + * }}} + * + * The analyzed plan should be: + * {{{ + * Project [(id#6L + cast(1 as bigint)) AS id#13L] + * +- Project [(id#4L + cast(1 as bigint)) AS id#6L] + * +- SubqueryAlias spark_catalog.default.t + * +- Relation spark_catalog.default.t[id#4L] parquet + * }}} + * + * and not: + * {{{ + * Project [(id#6L + cast(1 as bigint)) AS id#6L] + * +- Project [(id#4L + cast(1 as bigint)) AS id#6L] + * +- SubqueryAlias spark_catalog.default.t + * +- Relation spark_catalog.default.t[id#4L] parquet + * }}} + * + * Because the latter case will confuse the Optimizer and the top [[Project]] will be eliminated + * leading to incorrect result. + * + * There's an important caveat here: the leftmost branch of a logical plan tree. In this branch we + * need to preserve the expression IDs wherever possible because DataFrames may reference each other + * using their attributes. This also makes sense for performance reasons. + * + * Consider this example: + * + * {{{ + * val df1 = spark.range(0, 10).select($"id") + * val df2 = spark.range(5, 15).select($"id") + * df1.union(df2).filter(df1("id") === 5) + * }}} + * + * In this example `df("id")` references lower `id` attribute by expression ID, so `union` must not + * reassign expression IDs in `df1` (left child). Referencing `df2` (right child) is not supported + * in Spark. + * + * The [[ExpressionIdAssigner]] covers both SQL and DataFrame scenarios with single approach and is + * integrated in the single-pass analysis framework. + * + * The [[ExpressionIdAssigner]] is used in the following way: + * - When the [[Resolver]] traverses the tree downwards prior to starting bottom-up analysis, + * we build the [[mappingStack]] by calling [[withNewMapping]] (i.e. [[mappingStack.push]]) + * for every child of a multi-child operator, so we have a separate stack entry (separate + * mapping) for each branch. This way sibling branches' mappings are isolated from each other and + * attribute IDs are reused only within the same branch. Initially we push `None`, because + * the mapping needs to be initialized later with the correct output of a resolved operator. + * - When the bottom-up analysis starts, we assign IDs to all the [[NamedExpression]]s which are + * present in operators starting from the [[LeafNode]]s using [[mapExpression]]. + * [[createMapping]] is called right after each [[LeafNode]] is resolved, and first remapped + * attributes come from that [[LeafNode]]. This is done in [[Resolver.handleLeafOperator]] for + * each logical plan tree branch except the leftmost. + * - Once the child branch is resolved, [[withNewMapping]] ends by calling [[mappingStack.pop]]. + * - After the multi-child operator is resolved, we call [[createMapping]] to + * initialize the mapping with attributes _chosen_ (e.g. [[Union.mergeChildOutputs]]) by that + * operator's resolution algorithm and remap _old_ expression IDs to those chosen attributes. + * - Continue remapping expressions until we reach the root of the operator tree. + */ +class ExpressionIdAssigner { + private val mappingStack = new ExpressionIdAssigner.Stack + mappingStack.push(ExpressionIdAssigner.StackEntry(isLeftmostBranch = true)) + + /** + * Returns `true` if the current logical plan branch is the leftmost branch. This is important + * in the context of preserving expression IDs in DataFrames. See class doc for more details. + */ + def isLeftmostBranch: Boolean = mappingStack.peek().isLeftmostBranch + + /** + * A RAII-wrapper for [[mappingStack.push]] and [[mappingStack.pop]]. [[Resolver]] uses this for + * every child of a multi-child operator to ensure that each operator branch uses an isolated + * expression ID mapping. + * + * @param isLeftmostChild whether the current child is the leftmost child of the operator that is + * being resolved. This is used to determine whether the new stack entry is gonna be in the + * leftmost logical plan branch. It's `false` by default, because it's safer to remap attributes + * than to leave duplicates (to prevent correctness issues). + */ + def withNewMapping[R](isLeftmostChild: Boolean = false)(body: => R): R = { + mappingStack.push( + ExpressionIdAssigner.StackEntry( + isLeftmostBranch = isLeftmostChild && isLeftmostBranch + ) + ) + try { + body + } finally { + mappingStack.pop() + } + } + + /** + * Create mapping with the given `newOutput` that rewrites the `oldOutput`. This + * is used by the [[Resolver]] after the multi-child operator is resolved to fill the current + * mapping with the attributes _chosen_ by that operator's resolution algorithm and remap _old_ + * expression IDs to those chosen attributes. It's also used by the [[ExpressionResolver]] right + * before remapping the attributes of a [[LeafNode]]. + * + * `oldOutput` is present for already resolved subtrees (e.g. DataFrames), but for SQL queries + * is will be `None`, because that logical plan is analyzed for the first time. + */ + def createMapping( + newOutput: Seq[Attribute] = Seq.empty, + oldOutput: Option[Seq[Attribute]] = None): Unit = { + if (mappingStack.peek().mapping.isDefined) { + throw SparkException.internalError( + s"Attempt to overwrite existing mapping. New output: $newOutput, old output: $oldOutput" + ) + } + + val newMapping = new ExpressionIdAssigner.Mapping + oldOutput match { + case Some(oldOutput) => + if (newOutput.length != oldOutput.length) { + throw SparkException.internalError( + s"Outputs have different lengths. New output: $newOutput, old output: $oldOutput" + ) + } + + newOutput.zip(oldOutput).foreach { + case (newAttribute, oldAttribute) => + newMapping.put(oldAttribute.exprId, newAttribute.exprId) + newMapping.put(newAttribute.exprId, newAttribute.exprId) + } + case None => + newOutput.foreach { newAttribute => + newMapping.put(newAttribute.exprId, newAttribute.exprId) + } + } + + mappingStack.push(mappingStack.pop().copy(mapping = Some(newMapping))) + } + + /** + * Assign a correct ID to the given [[originalExpression]] and return a new instance of that + * expression, or return a corresponding new instance of the same attribute, that was previously + * reallocated and is present in the current [[mappingStack]] entry. + * + * For [[Alias]]es: Try to preserve them if we are in the leftmost logical plan tree branch and + * unless they conflict. Conflicting [[Alias]] IDs are never acceptable. Otherwise, reallocate + * with a new ID and return that instance. + * + * For [[AttributeReference]]s: If the attribute is present in the current [[mappingStack]] entry, + * return that instance, otherwise reallocate with a new ID and return that instance. The mapping + * is done both from the original expression ID _and_ from the new expression ID - this way we are + * able to replace old references to that attribute in the current operator branch, and preserve + * already reallocated attributes to make this call idempotent. + * + * When remapping the provided expressions, we don't replace them with the previously seen + * attributes, but replace their IDs ([[NamedExpression.withExprId]]). This is done to preserve + * the properties of attributes at a certain point in the query plan. Examples where it's + * important: + * + * 1) Preserve the name case. In Spark the "requested" name takes precedence over the "original" + * name: + * + * {{{ + * -- The output schema is [col1, COL1] + * SELECT col1, COL1 FROM VALUES (1); + * }}} + * + * 2) Preserve the metadata: + * + * {{{ + * // Metadata "m1" remains, "m2" gets overwritten by the specified schema, "m3" is newly added. + * val metadata1 = new MetadataBuilder().putString("m1", "1").putString("m2", "2").build() + * val metadata2 = new MetadataBuilder().putString("m2", "3").putString("m3", "4").build() + * val schema = new StructType().add("a", IntegerType, nullable = true, metadata = metadata2) + * val df = + * spark.sql("SELECT col1 FROM VALUES (1)").select(col("col1").as("a", metadata1)).to(schema) + * }}} + */ + def mapExpression(originalExpression: NamedExpression): NamedExpression = { + if (!mappingStack.peek().mapping.isDefined) { Review Comment: nit: ```suggestion if (mappingStack.peek().mapping.isEmpty) { ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/CteScope.scala: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis.resolver + +import java.util.{ArrayDeque, ArrayList} + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.catalyst.plans.logical.CTERelationDef + +/** + * The [[CteScope]] is responsible for keeping track of visible and known CTE definitions at a given + * stage of a SQL query/DataFrame program resolution. These scopes are stacked and the stack is + * managed by the [[CteRegistry]]. The scope is created per single WITH clause. + * + * The CTE operators are: + * - [[UnresolvedWith]]. This is a `host` operator that contains a list of unresolved CTE + * definitions from the WITH clause and a single child operator, which is the actual unresolved + * SELECT query. + * - [[UnresolvedRelation]]. This is a generic unresolved relation operator that will sometimes + * be resolved to a CTE definition and later replaced with a [[CTERelationRef]]. The CTE takes + * precedence over a regular table or a view when resolving this identifier. + * - [[CTERelationDef]]. This is a reusable logical plan, which will later be referenced by the + * lower CTE definitions and [[UnresolvedWith]] child. + * - [[CTERelationRef]]. This is a leaf node similar to a relation operator that references a + * certain [[CTERelationDef]] by its ID. It has a name (unique locally for a WITH clause list) + * and an ID (unique for all the CTEs in a query). + * - [[WithCTE]]. This is a `host` operator that contains a list of resolved CTE definitions from + * the WITH clause and a single child operator, which is the actual resolved SELECT query. + * + * The task of the [[Resolver]] is to correctly place [[WithCTE]] with [[CTERelationDef]]s inside + * and make sure that [[CTERelationRef]]s correctly reference [[CTERelationDef]]s with their IDs. + * The decision whether to inline those CTE subtrees or not is made by the [[Optimizer]], unlike + * what Spark does for the [[View]]s (always inline during the analysis). + * + * There are some caveats in how Spark places those operators and resolves their names: + * - Ambiguous CTE definition names are disallowed only within a single WITH clause, and this is + * validated by the Parser in [[AstBuilder]] + * using [[QueryParsingErrors.duplicateCteDefinitionNamesError]]: + * + * {{{ + * -- This is disallowed. + * WITH cte AS (SELECT 1), + * cte AS (SELECT 2) + * SELECT * FROM cte; + * }}} + * + * - When [[UnresolvedRelation]] identifier is resolved to a [[CTERelationDef]] and there is a + * name conflict on several layers of CTE definitions, the lower definitions take precedence: + * + * {{{ + * -- The result is `3`, lower [[CTERelationDef]] takes precedence. + * WITH cte AS ( + * SELECT 1 + * ) + * SELECT * FROM ( + * WITH cte AS ( + * SELECT 2 + * ) + * SELECT * FROM ( + * WITH cte AS ( + * SELECT 3 + * ) + * SELECT * FROM cte + * ) + * ) + * }}} + * + * - Any subquery can contain [[UnresolvedWith]] on top of it, but [[WithCTE]] is not gonna be + * 1 to 1 to its unresolved counterpart. For example, if we are dealing with simple subqueries, + * [[CTERelationDef]]s will be merged together under a single [[WithCTE]]. The previous example + * would produce the following resolved plan: + * + * {{{ + * WithCTE + * :- CTERelationDef 18, false + * : +- ... + * :- CTERelationDef 19, false + * : +- ... + * :- CTERelationDef 20, false + * : +- ... + * +- Project [3#1203] + * : +- ... + * }}} + * + * - However, if we have any expression subquery (scalar/IN/EXISTS...), the top + * [[CTERelationDef]]s and subquery's [[CTERelationDef]] won't be merged together (as they are + * separated by an expression tree): + * + * {{{ + * WITH cte AS ( + * SELECT 1 AS col1 + * ) + * SELECT * FROM cte WHERE col1 IN ( + * WITH cte AS ( + * SELECT 2 + * ) + * SELECT * FROM cte + * ) + * }}} + * + * -> + * + * {{{ + * WithCTE + * :- CTERelationDef 21, false + * : +- ... + * +- Project [col1#1223] + * +- Filter col1#1223 IN (list#1222 []) + * : +- WithCTE + * : :- CTERelationDef 22, false + * : : +- ... + * : +- Project [2#1241] + * : +- ... + * +- ... + * }}} + * + * - Upper CTEs are visible through subqueries and can be referenced by lower operators, but not + * through the [[View]] boyndary: Review Comment: ```suggestion * through the [[View]] boundary: ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionIdAssigner.scala: ########## @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis.resolver + +import java.util.{ArrayDeque, HashMap, HashSet} + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.expressions.{ + Alias, + Attribute, + AttributeReference, + ExprId, + NamedExpression +} +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * [[ExpressionIdAssigner]] is used by the [[ExpressionResolver]] to assign unique expression IDs to + * [[NamedExpression]]s ([[AttributeReference]]s and [[Alias]]es). This is necessary to ensure + * that Optimizer performs its work correctly and does not produce correctness issues. + * + * The framework works the following way: + * - Each leaf operator must have unique output IDs (even if it's the same table, view, or CTE). + * - The [[AttributeReference]]s get propagated "upwards" through the operator tree with their IDs + * preserved. + * - Each [[Alias]] gets assigned a new unique ID and it sticks with it after it gets converted to + * an [[AttributeReference]] when it is outputted from the operator that produced it. + * - Any operator may have [[AttributeReference]]s with the same IDs in its output given it is the + * same attribute. + * Thus, **no multi-child operator may have children with conflicting [[AttributeReference]] IDs**. + * In other words, two subtrees must not output the [[AttributeReference]]s with the same IDs, since + * relations, views and CTEs all output unique attributes, and [[Alias]]es get assigned new IDs as + * well. [[ExpressionIdAssigner.assertOutputsHaveNoConflictingExpressionIds]] is used to assert this + * invariant. + * + * For SQL queries, this framework provides correctness just by reallocating relation outputs and + * by validating the invariants mentioned above. Reallocation is done in + * [[Resolver.handleLeafOperator]]. If all the relations (even if it's the same table) have unique + * output IDs, the expression ID assignment will be correct, because there are no duplicate IDs in + * a pure unresolved tree. The old ID -> new ID mapping is not needed in this case. + * For example, consider this query: + * + * {{{ + * SELECT * FROM t AS t1 CROSS JOIN t AS t2 ON t1.col1 = t2.col1 + * }}} + * + * The analyzed plan should be: + * {{{ + * Project [col1#0, col2#1, col1#2, col2#3] + * +- Join Cross, (col1#0 = col1#2) + * :- SubqueryAlias t1 + * : +- Relation t[col1#0,col2#1] parquet + * +- SubqueryAlias t2 + * +- Relation t[col1#2,col2#3] parquet + * }}} + * + * and not: + * {{{ + * Project [col1#0, col2#1, col1#0, col2#1] + * +- Join Cross, (col1#0 = col1#0) + * :- SubqueryAlias t1 + * : +- Relation t[col1#0,col2#1] parquet + * +- SubqueryAlias t2 + * +- Relation t[col1#0,col2#1] parquet + * }}} + * + * Because in the latter case the join condition is always true. + * + * For DataFrame programs we need the full power of [[ExpressionIdAssigner]], and old ID -> new ID + * mapping comes in handy, because DataFrame programs pass _partially_ resolved plans to the + * [[Resolver]], which may consist of duplicate subtrees, and thus will have already assigned + * expression IDs. These already resolved dupliciate subtrees with assigned IDs will conflict. Review Comment: ```suggestion * expression IDs. These already resolved duplicate subtrees with assigned IDs will conflict. ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/BridgedRelationsProvider.scala: ########## @@ -32,19 +33,28 @@ class BridgedRelationMetadataProvider( override val relationResolution: RelationResolution, analyzerBridgeState: AnalyzerBridgeState ) extends RelationMetadataProvider { - override val relationsWithResolvedMetadata = getRelationsFromBridgeState(analyzerBridgeState) + override val relationsWithResolvedMetadata = new RelationsWithResolvedMetadata + updateRelationsWithResolvedMetadata() - private def getRelationsFromBridgeState( - analyzerBridgeState: AnalyzerBridgeState): RelationsWithResolvedMetadata = { - val result = new RelationsWithResolvedMetadata + /** + * We update relations on each [[resolve]] call, because relation IDs might have changed. + * This can happen for the nested views, since catalog name may differ, and expanded table name + * will differ for the same [[UnresolvedRelation]]. + * + * See [[ViewResolver.resolve]] for more info on how SQL configs are propagated to nested views). + */ + override def resolve(unresolvedPlan: LogicalPlan): Unit = { + updateRelationsWithResolvedMetadata() + } + + private def updateRelationsWithResolvedMetadata(): Unit = { analyzerBridgeState.relationsWithResolvedMetadata.forEach( (unresolvedRelation, relationWithResolvedMetadata) => { - result.put( + relationsWithResolvedMetadata.put( relationIdFromUnresolvedRelation(unresolvedRelation), relationWithResolvedMetadata ) } ) - result } } Review Comment: Could you adjust the file name to the class name (should be the same). ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/CteScope.scala: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis.resolver + +import java.util.{ArrayDeque, ArrayList} + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.catalyst.plans.logical.CTERelationDef + +/** + * The [[CteScope]] is responsible for keeping track of visible and known CTE definitions at a given + * stage of a SQL query/DataFrame program resolution. These scopes are stacked and the stack is + * managed by the [[CteRegistry]]. The scope is created per single WITH clause. + * + * The CTE operators are: + * - [[UnresolvedWith]]. This is a `host` operator that contains a list of unresolved CTE + * definitions from the WITH clause and a single child operator, which is the actual unresolved + * SELECT query. + * - [[UnresolvedRelation]]. This is a generic unresolved relation operator that will sometimes + * be resolved to a CTE definition and later replaced with a [[CTERelationRef]]. The CTE takes + * precedence over a regular table or a view when resolving this identifier. + * - [[CTERelationDef]]. This is a reusable logical plan, which will later be referenced by the + * lower CTE definitions and [[UnresolvedWith]] child. + * - [[CTERelationRef]]. This is a leaf node similar to a relation operator that references a + * certain [[CTERelationDef]] by its ID. It has a name (unique locally for a WITH clause list) + * and an ID (unique for all the CTEs in a query). + * - [[WithCTE]]. This is a `host` operator that contains a list of resolved CTE definitions from + * the WITH clause and a single child operator, which is the actual resolved SELECT query. + * + * The task of the [[Resolver]] is to correctly place [[WithCTE]] with [[CTERelationDef]]s inside + * and make sure that [[CTERelationRef]]s correctly reference [[CTERelationDef]]s with their IDs. + * The decision whether to inline those CTE subtrees or not is made by the [[Optimizer]], unlike + * what Spark does for the [[View]]s (always inline during the analysis). + * + * There are some caveats in how Spark places those operators and resolves their names: + * - Ambiguous CTE definition names are disallowed only within a single WITH clause, and this is + * validated by the Parser in [[AstBuilder]] + * using [[QueryParsingErrors.duplicateCteDefinitionNamesError]]: + * + * {{{ + * -- This is disallowed. + * WITH cte AS (SELECT 1), + * cte AS (SELECT 2) + * SELECT * FROM cte; + * }}} + * + * - When [[UnresolvedRelation]] identifier is resolved to a [[CTERelationDef]] and there is a + * name conflict on several layers of CTE definitions, the lower definitions take precedence: + * + * {{{ + * -- The result is `3`, lower [[CTERelationDef]] takes precedence. + * WITH cte AS ( + * SELECT 1 + * ) + * SELECT * FROM ( + * WITH cte AS ( + * SELECT 2 + * ) + * SELECT * FROM ( + * WITH cte AS ( + * SELECT 3 + * ) + * SELECT * FROM cte + * ) + * ) + * }}} + * + * - Any subquery can contain [[UnresolvedWith]] on top of it, but [[WithCTE]] is not gonna be + * 1 to 1 to its unresolved counterpart. For example, if we are dealing with simple subqueries, + * [[CTERelationDef]]s will be merged together under a single [[WithCTE]]. The previous example + * would produce the following resolved plan: + * + * {{{ + * WithCTE + * :- CTERelationDef 18, false + * : +- ... + * :- CTERelationDef 19, false + * : +- ... + * :- CTERelationDef 20, false + * : +- ... + * +- Project [3#1203] + * : +- ... + * }}} + * + * - However, if we have any expression subquery (scalar/IN/EXISTS...), the top + * [[CTERelationDef]]s and subquery's [[CTERelationDef]] won't be merged together (as they are + * separated by an expression tree): + * + * {{{ + * WITH cte AS ( + * SELECT 1 AS col1 + * ) + * SELECT * FROM cte WHERE col1 IN ( + * WITH cte AS ( + * SELECT 2 + * ) + * SELECT * FROM cte + * ) + * }}} + * + * -> + * + * {{{ + * WithCTE + * :- CTERelationDef 21, false + * : +- ... + * +- Project [col1#1223] + * +- Filter col1#1223 IN (list#1222 []) + * : +- WithCTE + * : :- CTERelationDef 22, false + * : : +- ... + * : +- Project [2#1241] + * : +- ... + * +- ... + * }}} + * + * - Upper CTEs are visible through subqueries and can be referenced by lower operators, but not + * through the [[View]] boyndary: + * + * {{{ + * CREATE VIEW v1 AS SELECT 1; + * CREATE VIEW v2 AS SELECT * FROM v1; + * + * -- The result is 1. + * -- The `v2` body will be inlined in the main query tree during the analysis, but upper `v1` + * -- CTE definition _won't_ take precedence over the lower `v1` view. + * WITH v1 AS ( + * SELECT 2 + * ) + * SELECT * FROM v2; + * }}} + * + * @param isRoot This marks the place where [[WithCTE]] has to be placed with all the merged + * [[CTERelationDef]] that were collected under it. It will be true for root query, [[View]]s + * and expression subqueries. + * @param isOpaque This flag makes this [[CteScope]] opaque for [[CTERelationDef]] lookups. It will + * be true for root query and [[View]]s. + */ +class CteScope(val isRoot: Boolean, val isOpaque: Boolean) { + + /** + * Known [[CTERelationDef]]s that were already resolved in this scope or in child scopes. This is + * used to merge CTE definitions together in a single [[WithCTE]]. + */ + private val knownCtes = new ArrayList[CTERelationDef] + + /** + * Visible [[CTERelationDef]]s that were already resolved in this scope. Child scope definitions + * are _not_ visible. Upper definitions _are_ visible, but this is handled by + * [[CteRegistry.resolveCteName]] to avoid cascadingly growing [[IdentifierMap]]s. Review Comment: ```suggestion * [[CteRegistry.resolveCteName]] to avoid calculatingly growing [[IdentifierMap]]s. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org