allisonwang-db commented on code in PR #49471: URL: https://github.com/apache/spark/pull/49471#discussion_r1984269314
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -2636,6 +2637,93 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } } + /* + * This rule resolves SQL table functions. + */ + object ResolveSQLTableFunctions extends Rule[LogicalPlan] with AliasHelper { + + /** + * Check if a subquery plan is subject to the COUNT bug that can cause wrong results. + * A lateral correlation can only be removed if the lateral subquery is not subject to + * the COUNT bug. Currently only lateral correlation can handle it correctly. + */ + private def hasCountBug(sub: LogicalPlan): Boolean = sub.find { + // The COUNT bug occurs when there is an Aggregate that satisfies all the following + // conditions: + // 1) is on the correlation path + // 2) has non-empty group by expressions + // 3) has one or more output columns that evaluate to non-null values with empty input. + // E.g: COUNT(empty row) = 0. + // For simplicity, we use a stricter criteria (1 and 2 only) to determine if a query + // is subject to the COUNT bug. + case a: Aggregate if a.groupingExpressions.nonEmpty => hasOuterReferences(a.child) + case _ => false + }.nonEmpty + + /** + * Rewrite a resolved SQL table function plan by removing unnecessary lateral joins: + * Before: + * LateralJoin lateral-subquery [a], Inner + * : +- Project [c1, c2] + * : +- Filter [outer(a) == c1] + * : +- Relation [c1, c2] + * +- Project [1 AS a] + * +- OneRowRelation + * After: + * Project [c1, c2] + * +- Filter [1 == c1] <---- Replaced outer(a) + * +- Relation [c1, c2] + */ + private def rewrite(plan: LogicalPlan): LogicalPlan = { + (plan transformUp { + case j @ LateralJoin(Project(aliases, _: OneRowRelation), sub: LateralSubquery, Inner, None) + if j.resolved && aliases.forall(_.deterministic) => + val attrMap = AttributeMap(aliases.collect { case a: Alias => a.toAttribute -> a.child }) + val newPlan = sub.plan.transformAllExpressionsWithPruning( + _.containsPattern(OUTER_REFERENCE)) { + // Avoid replacing outer references that do not belong to the current outer plan. + // This can happen if the child of an alias also contains outer references (nested + // table function references). E.g: + // LateralJoin + // : +- Filter [outer(a) == x] + // : +- Relation [x, y] + // +- Project [outer(c) AS a] + // +- OneRowRelation + case OuterReference(a: Attribute) if attrMap.contains(a) => attrMap(a) match { + case ne: NamedExpression => ne + case o => Alias(o, a.name)(exprId = a.exprId, qualifier = a.qualifier) + } + } + // Keep the original lateral join if the new plan is subject to the count bug. + if (hasCountBug(newPlan)) j else newPlan + }).transformWithPruning(_.containsPattern(ALIAS)) { + // As a result of the above rewriting, we may end-up introducing nested Aliases (i.e., + // Aliases defined inside Aliases). This is problematic for the plan canonicalization as + // it doesn't expect nested Aliases. Therefore, here we remove non-top level Aliases. + case node => node.mapExpressions(trimNonTopLevelAliases) + } + } + + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( + _.containsPattern(SQL_TABLE_FUNCTION)) { + case SQLTableFunction(name, function, inputs, output) => + val plan = v1SessionCatalog.makeSQLTableFunctionPlan(name, function, inputs, output) + // Resolve the SQL table function plan using its function context. + val conf = new SQLConf() + function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) } + val resolved = SQLConf.withExistingConf(conf) { Review Comment: Good point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org