dtenedor commented on code in PR #48649: URL: https://github.com/apache/spark/pull/48649#discussion_r1826211578
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala: ########## @@ -5874,53 +5886,72 @@ class AstBuilder extends DataTypeAstBuilder if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) { operationNotAllowed("Operator pipe SQL syntax using |>", ctx) } - // This helper function adds a table subquery boundary between the new operator to be added - // (such as a filter or sort) and the input plan if one does not already exist. This helps the - // analyzer behave as if we had added the corresponding SQL clause after a table subquery - // containing the input plan. - def withSubqueryAlias(): LogicalPlan = left match { - case s: SubqueryAlias => - s - case u: UnresolvedRelation => - u - case _ => - SubqueryAlias(SubqueryAlias.generateSubqueryName(), left) - } - Option(ctx.selectClause).map { c => - withSelectQuerySpecification( - ctx = ctx, - selectClause = c, - lateralView = new java.util.ArrayList[LateralViewContext](), - whereClause = null, - aggregationClause = null, - havingClause = null, - windowClause = null, - relation = left, - isPipeOperatorSelect = true) - }.getOrElse(Option(ctx.whereClause).map { c => - withWhereClause(c, withSubqueryAlias()) - }.getOrElse(Option(ctx.pivotClause()).map { c => - if (ctx.unpivotClause() != null) { - throw QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx) - } - withPivot(c, left) - }.getOrElse(Option(ctx.unpivotClause()).map { c => - if (ctx.pivotClause() != null) { - throw QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx) + + // Extract the base child and a list of WithWindowDefinition nodes from the plan + var baseChild = left + var withWindowDefinitions = List.empty[WithWindowDefinition] + while (baseChild.isInstanceOf[WithWindowDefinition]) { + val wwd = baseChild.asInstanceOf[WithWindowDefinition] + withWindowDefinitions = withWindowDefinitions :+ wwd + baseChild = wwd.child + } + + // Process the base child + val newChild = { + // This helper function adds a table subquery boundary between the new operator to be added + // (such as a filter or sort) and the input plan if one does not already exist. This helps the + // analyzer behave as if we had added the corresponding SQL clause after a table subquery + // containing the input plan. + def withSubqueryAlias(): LogicalPlan = baseChild match { + case s: SubqueryAlias => + s + case u: UnresolvedRelation => + u + case _ => + SubqueryAlias(SubqueryAlias.generateSubqueryName(), baseChild) } - withUnpivot(c, left) - }.getOrElse(Option(ctx.sample).map { c => - withSample(c, left) - }.getOrElse(Option(ctx.joinRelation()).map { c => - withJoinRelation(c, left) - }.getOrElse(Option(ctx.operator).map { c => - val all = Option(ctx.setQuantifier()).exists(_.ALL != null) - visitSetOperationImpl(left, plan(ctx.right), all, c.getType) - }.getOrElse(Option(ctx.queryOrganization).map { c => - withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true) - }.getOrElse( - visitOperatorPipeAggregate(ctx, left) - )))))))) + + Option(ctx.selectClause).map { c => + withSelectQuerySpecification( + ctx = ctx, + selectClause = c, + lateralView = new java.util.ArrayList[LateralViewContext](), + whereClause = null, + aggregationClause = null, + havingClause = null, + windowClause = null, + relation = baseChild, + isPipeOperatorSelect = true) + }.getOrElse(Option(ctx.whereClause).map { c => + withWhereClause(c, withSubqueryAlias()) + }.getOrElse(Option(ctx.pivotClause()).map { c => + if (ctx.unpivotClause() != null) { + throw QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx) + } + withPivot(c, baseChild) + }.getOrElse(Option(ctx.unpivotClause()).map { c => + if (ctx.pivotClause() != null) { + throw QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx) + } + withUnpivot(c, baseChild) + }.getOrElse(Option(ctx.sample).map { c => + withSample(c, baseChild) + }.getOrElse(Option(ctx.joinRelation()).map { c => + withJoinRelation(c, baseChild) + }.getOrElse(Option(ctx.operator).map { c => + val all = Option(ctx.setQuantifier()).exists(_.ALL != null) + visitSetOperationImpl(baseChild, plan(ctx.right), all, c.getType) + }.getOrElse(Option(ctx.queryOrganization).map { c => + withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true) + }.getOrElse( + visitOperatorPipeAggregate(ctx, left) + )))))))) + } + + // Reconstruct the WithWindowDefinition nodes on top of the new child Review Comment: @Angryrou Yes, this is correct, this is my exact suggestion. Speaking with the pipe SQL syntax authors offline, they point out that in general, the pipe operators should be independent and each one performs the same step on a relation no matter how they are composed together. Making `|> WINDOW` separate violates this. Adding an optional `WithWindowDefinition?` to the end of `|> SELECT` and `|> WHERE` fixes it, because then the window definition only applies directly to the same pipe operator to which it is attached. Thanks again for your patience in dealing with the syntax change :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org