MaxGekk commented on code in PR #49658: URL: https://github.com/apache/spark/pull/49658#discussion_r1939251554
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala: ########## @@ -272,122 +394,74 @@ class NameScopeStack extends SQLConfHelper { } /** - * Completely overwrite the top scope state with a named plan output. + * Completely overwrite the top scope state with operator `output`. * - * See [[NameScope.update]] for more details. - */ - def overwriteTop(name: String, attributes: Seq[Attribute]): Unit = { - val newScope = new NameScope - newScope.update(name, attributes) - - stack.pop() - stack.push(newScope) - } - - /** - * Completely overwrite the top scope state with an unnamed plan output. + * This method is called by the [[Resolver]] when we've calculated the output of an operator that + * is being resolved. The new output is calculated based on the outputs of operator's children. + * + * Example for [[SubqueryAlias]], here we rewrite the top [[NameScope]]'s attributes to prepend + * subquery qualifier to their names: * - * See [[NameScope.+=]] for more details. + * {{{ + * val qualifier = sa.identifier.qualifier :+ sa.alias + * scope.overwriteTop(scope.output.map(attribute => attribute.withQualifier(qualifier))) + * }}} + * + * Trivially, we would call this method for every operator in the query plan, + * however some operators just propagate the output of their children without any changes, so + * we can omit this call for them (e.g. [[Filter]]). + * + * This method should be preferred over [[withNewScope]]. */ - def overwriteTop(attributes: Seq[Attribute]): Unit = { - val newScope = new NameScope - newScope += attributes + def overwriteTop(output: Seq[Attribute]): Unit = { + val newScope = new NameScope(output) stack.pop() stack.push(newScope) } /** - * Execute `body` in a context of a fresh scope. It's used during the [[Project]] or the - * [[Aggregate]] resolution to avoid calling [[push]] and [[pop]] explicitly. + * Execute `body` in a context of a fresh scope. + * + * This method is called by the [[Resolver]] before recursing into the operator's child + * resolution _only_ in cases where a fresh scope is required. + * + * For esample, [[Project]] or [[Aggregate]] introduce their own scopes semantically, so that a + * lower resolution can lookup correlated names: + * + * {{{ + * CREATE TABLE IF NOT EXISTS t1 (col1 INT, col2 STRING); + * CREATE TABLE IF NOT EXISTS t2 (col1 INT, col2 STRING); + * + * -- Here we need a scope for the upper [[Project]], and a separate scope for the correlated + * -- subquery, because its [[Filter]] need to lookup `t1.col1` from the upper scope. + * -- Those scopes have to be indenepdent to avoid polluting each other's attributes. + * SELECT col1, (SELECT col2 FROM t2 WHERE t2.col1 == t1.col1 LIMIT 1) FROM t1; + * }}} + * + * Also, we need separate scopes for the operators with multiple children, so that the next + * child's resolution woudn't try to work with the data from it's sibling's scope, to avoid Review Comment: ```suggestion * child's resolution wouldn't try to work with the data from it's sibling's scope, to avoid ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameTarget.scala: ########## @@ -23,59 +23,76 @@ import org.apache.spark.sql.catalyst.util.StringUtils.orderSuggestedIdentifiersB import org.apache.spark.sql.errors.QueryCompilationErrors /** - * Class that represents results of name resolution or star expansion. It encapsulates: - * - `candidates` - A list of candidates that are possible matches for a given name. - * - `aliasName` - If the candidates size is 1 and it's type is `ExtractValue` (which means that - * it's a recursive type), then the `aliasName` should be the name with which the candidate is - * aliased. Otherwise, `aliasName` should be `None`. - * - `allAttributes` - A list of all attributes which is used to generate suggestions for - * unresolved column error. + * [[NameTarget]] is a result of a multipart name resolution of the + * [[NameScope.resolveMultipartName]]. * - * Example: + * Attribute resolution: * - * - Attribute resolution: - * {{{ SELECT col1 FROM VALUES (1); }}} will have a [[NameTarget]] with a single candidate `col1`. - * `aliasName` would be `None` in this case because the column is not of recursive type. + * {{{ + * -- [[NameTarget]] with a single candidate `col1`. `aliasName` is be `None` in this case because + * -- the name is not a field/value/item of some recursive type. + * SELECT col1 FROM VALUES (1); + * }}} * - * - Recursive attribute resolution: - * {{{ SELECT col1.col1 FROM VALUES(STRUCT(1,2), 3) }}} will have a [[NameTarget]] with a - * single candidate `col1` and an `aliasName` of `Some("col1")`. + * Attribute resolution ambiguity: + * + * {{{ + * -- [[NameTarget]] with candidates `col1`, `col1`. [[pickCandidate]] will throw + * -- `AMBIGUOUS_REFERENCE`. + * SELECT col1 FROM VALUES (1) t1, VALUES (2) t2; + * }}} + * + * Struct field resolution: + * + * {{{ + * -- [[NameTarget]] with a single candidate `GetStructField(col1, "field1")`. `aliasName` is + * -- `Some("col1")`, since here we extract a field of a struct. + * SELECT col1.field1 FROM VALUES (named_struct('field1', 1), 3); + * }}} + * + * @param candidates A list of candidates that are possible matches for a given name. + * @param aliasName If the candidates size is 1 and it's type is [[ExtractValue]] (which means that + * it's a field/value/item from a recursive type), then the `aliasName` should be the name with + * which the candidate needs to be aliased. Otherwise, `aliasName` is `None`. + * @param lateralAttributeReference If the candidate is laterally referencing another column this + * field is populated with that column's attribute. + * @param output [[output]] of a [[NameSope]] that produced this [[NameTarget]]. Used to provide Review Comment: Probably, `NameScope`: ```suggestion * @param output [[output]] of a [[NameScope]] that produced this [[NameTarget]]. Used to provide ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala: ########## @@ -139,63 +168,112 @@ class Resolver( override def resolve(unresolvedPlan: LogicalPlan): LogicalPlan = { planLogger.logPlanResolutionEvent(unresolvedPlan, "Unresolved plan") - throwIfNodeWasResolvedEarlier(unresolvedPlan) - val resolvedPlan = unresolvedPlan match { + case unresolvedWith: UnresolvedWith => + resolveWith(unresolvedWith) case unresolvedProject: Project => - resolveProject(unresolvedProject) + projectResolver.resolve(unresolvedProject) case unresolvedFilter: Filter => resolveFilter(unresolvedFilter) + case unresolvedSubqueryColumnAliases: UnresolvedSubqueryColumnAliases => + resolveSubqueryColumnAliases(unresolvedSubqueryColumnAliases) case unresolvedSubqueryAlias: SubqueryAlias => resolveSubqueryAlias(unresolvedSubqueryAlias) + case unresolvedView: View => + viewResolver.resolve(unresolvedView) case unresolvedGlobalLimit: GlobalLimit => resolveGlobalLimit(unresolvedGlobalLimit) case unresolvedLocalLimit: LocalLimit => resolveLocalLimit(unresolvedLocalLimit) + case unresolvedDistinct: Distinct => + resolveDistinct(unresolvedDistinct) case unresolvedRelation: UnresolvedRelation => resolveRelation(unresolvedRelation) + case unresolvedCteRelationDef: CTERelationDef => + resolveCteRelationDef(unresolvedCteRelationDef) case unresolvedInlineTable: UnresolvedInlineTable => resolveInlineTable(unresolvedInlineTable) + case unresolvedUnion: Union => + unionResolver.resolve(unresolvedUnion) // See the reason why we have to match both [[LocalRelation]] and [[ResolvedInlineTable]] // in the [[resolveInlineTable]] scaladoc case resolvedInlineTable: ResolvedInlineTable => - updateNameScopeWithPlanOutput(resolvedInlineTable) + handleLeafOperator(resolvedInlineTable) case localRelation: LocalRelation => - updateNameScopeWithPlanOutput(localRelation) + handleLeafOperator(localRelation) case unresolvedOneRowRelation: OneRowRelation => - updateNameScopeWithPlanOutput(unresolvedOneRowRelation) + handleLeafOperator(unresolvedOneRowRelation) case _ => tryDelegateResolutionToExtension(unresolvedPlan).getOrElse { handleUnmatchedOperator(unresolvedPlan) } } - markNodeAsResolved(resolvedPlan) + if (resolvedPlan.children.nonEmpty) { + val missingInput = resolvedPlan.missingInput + if (missingInput.nonEmpty) { + withPosition(unresolvedPlan) { + throwMissingAttributesError(resolvedPlan, missingInput) + } + } + } + + if (!resolvedPlan.resolved) { + throwSinglePassFailedToResolveOperator(resolvedPlan) + } planLogger.logPlanResolution(unresolvedPlan, resolvedPlan) - resolvedPlan + preservePlanIdTag(unresolvedPlan, resolvedPlan) } /** - * [[Project]] introduces a new scope to resolve its subtree and project list expressions. After - * those are resolved in the child scope we overwrite current scope with resolved [[Project]]'s - * output to expose new names to the parent operators. + * [[UnresolvedWith]] contains a list of unresolved CTE definitions, which are represented by + * (name, subquery) pairs, and an actual child query. First we resolve the CTE definitions + * strictly in their declaration order, so they become available for other lower definitions + * (lower both in this WITH clause list and in the plan tree) and for the [[UnresolvedWith]] child + * query. After that, we resolve the child query. Optionally, if this is a root [[CteScope]], + * we return a [[WithCTE]] operator with all the resolved [[CTERelationDef]]s merged together + * from this scope and child scopes. Otherwise, we return the resolved child query so that + * the resolved [[CTERelationDefs]] propagate up and will be merged together later. + * + * See [[CteScope]] scaladoc for all the details on how CTEs are resolved. */ - private def resolveProject(unresolvedProject: Project): LogicalPlan = { - val resolvedProject = scopes.withNewScope { - val resolvedChild = resolve(unresolvedProject.child) - val resolvedProjectList = - expressionResolver.resolveProjectList(unresolvedProject.projectList) - Project(resolvedProjectList, resolvedChild) + private def resolveWith(unresolvedWith: UnresolvedWith): LogicalPlan = { + val childOutputs = new ArrayBuffer[Seq[Attribute]] + + unresolvedWith.cteRelations.map { cteRelation => Review Comment: Since the lambda doesn't return any result, it is better to use `foreach`: ```suggestion unresolvedWith.cteRelations.foreach { cteRelation => ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala: ########## @@ -252,36 +371,74 @@ class Resolver( val resolvedChild = resolve(unresolvedLocalLimit.child) val resolvedLimitExpr = withPosition(unresolvedLocalLimit) { - limitExpressionResolver.resolve(unresolvedLocalLimit.limitExpr) + expressionResolver.resolveLimitExpression( + unresolvedLocalLimit.limitExpr, + unresolvedLocalLimit + ) } LocalLimit(resolvedLimitExpr, resolvedChild) } + /** + * [[Distinct]] operator doesn't require any speciial resolution. Review Comment: ```suggestion * [[Distinct]] operator doesn't require any special resolution. ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala: ########## @@ -288,30 +473,148 @@ class ExpressionResolver( /** * [[Literal]] resolution doesn't require any specific resolution logic at this point. + */ + private def resolveLiteral(literal: Literal): Expression = literal + + /** + * The [[GetViewColumnByNameAndOrdinal]] is a special internal expression that is placed by the + * [[SessionCatalog]] in the top [[Project]] operator of the freshly reconstructed unresolved + * view plan. Since the view schema is fixed and persisted in the catalog, we have to extract + * the right attributes from the view plan regardless of the underlying table schema changes. + * [[GetViewColumnByNameAndOrdinal]] contains attribute name and it's ordinal to perform the + * necessary matching. If the matching was not successful, or the number of matched candidates + * differs from the recorded one, we throw an error. + * + * Example of the correct name matching: + * + * {{{ + * CREATE TABLE underlying (col1 INT, col2 STRING); + * CREATE VIEW all_columns AS SELECT * FROM underlying; + * + * -- View plan for the SELECT below will contain a Project node on top with the following + * -- expressions: + * -- getviewcolumnbynameandordinal(`spark_catalog`.`default`.`all_columns`, col1, 0, 1) + * -- getviewcolumnbynameandordinal(`spark_catalog`.`default`.`all_columns`, col2, 0, 1) Review Comment: Could you restore camel-cases. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala: ########## @@ -272,122 +394,74 @@ class NameScopeStack extends SQLConfHelper { } /** - * Completely overwrite the top scope state with a named plan output. + * Completely overwrite the top scope state with operator `output`. * - * See [[NameScope.update]] for more details. - */ - def overwriteTop(name: String, attributes: Seq[Attribute]): Unit = { - val newScope = new NameScope - newScope.update(name, attributes) - - stack.pop() - stack.push(newScope) - } - - /** - * Completely overwrite the top scope state with an unnamed plan output. + * This method is called by the [[Resolver]] when we've calculated the output of an operator that + * is being resolved. The new output is calculated based on the outputs of operator's children. + * + * Example for [[SubqueryAlias]], here we rewrite the top [[NameScope]]'s attributes to prepend + * subquery qualifier to their names: * - * See [[NameScope.+=]] for more details. + * {{{ + * val qualifier = sa.identifier.qualifier :+ sa.alias + * scope.overwriteTop(scope.output.map(attribute => attribute.withQualifier(qualifier))) + * }}} + * + * Trivially, we would call this method for every operator in the query plan, + * however some operators just propagate the output of their children without any changes, so + * we can omit this call for them (e.g. [[Filter]]). + * + * This method should be preferred over [[withNewScope]]. */ - def overwriteTop(attributes: Seq[Attribute]): Unit = { - val newScope = new NameScope - newScope += attributes + def overwriteTop(output: Seq[Attribute]): Unit = { + val newScope = new NameScope(output) stack.pop() stack.push(newScope) } /** - * Execute `body` in a context of a fresh scope. It's used during the [[Project]] or the - * [[Aggregate]] resolution to avoid calling [[push]] and [[pop]] explicitly. + * Execute `body` in a context of a fresh scope. + * + * This method is called by the [[Resolver]] before recursing into the operator's child + * resolution _only_ in cases where a fresh scope is required. + * + * For esample, [[Project]] or [[Aggregate]] introduce their own scopes semantically, so that a + * lower resolution can lookup correlated names: + * + * {{{ + * CREATE TABLE IF NOT EXISTS t1 (col1 INT, col2 STRING); + * CREATE TABLE IF NOT EXISTS t2 (col1 INT, col2 STRING); + * + * -- Here we need a scope for the upper [[Project]], and a separate scope for the correlated + * -- subquery, because its [[Filter]] need to lookup `t1.col1` from the upper scope. + * -- Those scopes have to be indenepdent to avoid polluting each other's attributes. Review Comment: ```suggestion * -- Those scopes have to be independent to avoid polluting each other's attributes. ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala: ########## @@ -199,16 +279,97 @@ class ExpressionResolver( * from the [[Resolver]] during [[Project]] resolution. * * The output sequence can be larger than the input sequence due to [[UnresolvedStar]] expansion. + * + * @returns The list of resolved expressions along with flags indicating whether the resolved + * project list contains aggregate expressions or attributes (encapsulated in + * [[ResolvedProjectList]]) which are used during the further resolution of the tree. + * + * The following query: + * + * {{{ SELECT COUNT(col1), 2 FROM VALUES(1); }}} + * + * would have a project list with two expressions: `COUNT(col1)` and `2`. After the resolution it + * would return the following result: + * ResolvedProjectList( + * expressions = [count(col1) as count(col1), 2 AS 2], + * hasAggregateExpressions = true, // because it contains `count(col1)` in the project list + * hasAttributes = false // because it doesn't contain any [[AttributeReference]]s in the + * // project list (only under the aggregate expression, please check + * // [[AggregateExpressionResolver]] for more details). */ - def resolveProjectList(unresolvedProjectList: Seq[NamedExpression]): Seq[NamedExpression] = { - unresolvedProjectList.flatMap { + def resolveProjectList( + unresolvedProjectList: Seq[NamedExpression], + operator: LogicalPlan): ResolvedProjectList = { + val projectListResolutionContext = new ExpressionResolutionContext + val resolvedProjectList = unresolvedProjectList.flatMap { case unresolvedStar: UnresolvedStar => resolveStar(unresolvedStar) case other => - Seq(resolveNamedExpression(other, isTopOfProjectList = true).asInstanceOf[NamedExpression]) + val (resolvedElement, resolvedElementContext) = + resolveExpressionTreeInOperatorImpl(other, operator) + projectListResolutionContext.merge(resolvedElementContext) + Seq(resolvedElement.asInstanceOf[NamedExpression]) } + ResolvedProjectList( + expressions = resolvedProjectList, + hasAggregateExpressions = projectListResolutionContext.hasAggregateExpressionsInASubtree, + hasAttributes = projectListResolutionContext.hasAttributeInASubtree, + hasLateralColumnAlias = projectListResolutionContext.hasLateralColumnAlias + ) } + /** + * Resolves [[Expression]] only by resolving its children. This resolution method is used for + * nodes that don't require any special resolution other than resolving its children. + */ + def resolveExpressionGenerically(expression: Expression): Expression = Review Comment: Can be a private? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala: ########## @@ -17,29 +17,50 @@ package org.apache.spark.sql.catalyst.analysis.resolver +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.EvaluateUnresolvedInlineTable import org.apache.spark.sql.catalyst.analysis.{ withPosition, + AnalysisErrorAt, FunctionResolution, - NamedRelation, + MultiInstanceRelation, RelationResolution, ResolvedInlineTable, UnresolvedInlineTable, - UnresolvedRelation + UnresolvedRelation, + UnresolvedSubqueryColumnAliases +} +import org.apache.spark.sql.catalyst.expressions.{ + Alias, + Attribute, + AttributeSet, + Expression, + NamedExpression } import org.apache.spark.sql.catalyst.plans.logical.{ + AnalysisHelper, + CTERelationDef, + CTERelationRef, + Distinct, Filter, GlobalLimit, + LeafNode, LocalLimit, LocalRelation, LogicalPlan, OneRowRelation, Project, - SubqueryAlias + SubqueryAlias, + Union, + UnresolvedWith, + View, + WithCTE } -import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} +import org.apache.spark.sql.connector.catalog.{CatalogManager} Review Comment: ```suggestion import org.apache.spark.sql.connector.catalog.CatalogManager ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala: ########## @@ -123,18 +182,18 @@ class ExpressionResolver( override def resolve(unresolvedExpression: Expression): Expression = { planLogger.logExpressionTreeResolutionEvent(unresolvedExpression, "Unresolved expression tree") - if (unresolvedExpression - .getTagValue(ExpressionResolver.SINGLE_PASS_SUBTREE_BOUNDARY) - .nonEmpty) { + if (tryPopSinglePassSubtreeBoundary(unresolvedExpression)) { unresolvedExpression } else { - throwIfNodeWasResolvedEarlier(unresolvedExpression) + pushResolutionContext() - val resolvedExpression = unresolvedExpression match { + var resolvedExpression = unresolvedExpression match { Review Comment: Is it really needed? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/MetadataResolver.scala: ########## @@ -52,7 +58,7 @@ class MetadataResolver( * [[RelationResolution]] wasn't successful, we resort to using [[extensions]]. * Otherwise, we fail with an exception. */ - def resolve(unresolvedPlan: LogicalPlan): Unit = { + override def resolve(unresolvedPlan: LogicalPlan): Unit = { traverseLogicalPlanTree(unresolvedPlan) { unresolvedOperator => unresolvedOperator match { Review Comment: ```suggestion traverseLogicalPlanTree(unresolvedPlan) { case unresolvedRelation: UnresolvedRelation => ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala: ########## @@ -272,122 +394,74 @@ class NameScopeStack extends SQLConfHelper { } /** - * Completely overwrite the top scope state with a named plan output. + * Completely overwrite the top scope state with operator `output`. * - * See [[NameScope.update]] for more details. - */ - def overwriteTop(name: String, attributes: Seq[Attribute]): Unit = { - val newScope = new NameScope - newScope.update(name, attributes) - - stack.pop() - stack.push(newScope) - } - - /** - * Completely overwrite the top scope state with an unnamed plan output. + * This method is called by the [[Resolver]] when we've calculated the output of an operator that + * is being resolved. The new output is calculated based on the outputs of operator's children. + * + * Example for [[SubqueryAlias]], here we rewrite the top [[NameScope]]'s attributes to prepend + * subquery qualifier to their names: * - * See [[NameScope.+=]] for more details. + * {{{ + * val qualifier = sa.identifier.qualifier :+ sa.alias + * scope.overwriteTop(scope.output.map(attribute => attribute.withQualifier(qualifier))) + * }}} + * + * Trivially, we would call this method for every operator in the query plan, + * however some operators just propagate the output of their children without any changes, so + * we can omit this call for them (e.g. [[Filter]]). + * + * This method should be preferred over [[withNewScope]]. */ - def overwriteTop(attributes: Seq[Attribute]): Unit = { - val newScope = new NameScope - newScope += attributes + def overwriteTop(output: Seq[Attribute]): Unit = { + val newScope = new NameScope(output) stack.pop() stack.push(newScope) } /** - * Execute `body` in a context of a fresh scope. It's used during the [[Project]] or the - * [[Aggregate]] resolution to avoid calling [[push]] and [[pop]] explicitly. + * Execute `body` in a context of a fresh scope. + * + * This method is called by the [[Resolver]] before recursing into the operator's child + * resolution _only_ in cases where a fresh scope is required. + * + * For esample, [[Project]] or [[Aggregate]] introduce their own scopes semantically, so that a Review Comment: ```suggestion * For example, [[Project]] or [[Aggregate]] introduce their own scopes semantically, so that a ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org