allisonwang-db commented on code in PR #49471:
URL: https://github.com/apache/spark/pull/49471#discussion_r1984269314


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2636,6 +2637,93 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
     }
   }
 
+  /*
+   * This rule resolves SQL table functions.
+   */
+  object ResolveSQLTableFunctions extends Rule[LogicalPlan] with AliasHelper {
+
+    /**
+     * Check if a subquery plan is subject to the COUNT bug that can cause 
wrong results.
+     * A lateral correlation can only be removed if the lateral subquery is 
not subject to
+     * the COUNT bug. Currently only lateral correlation can handle it 
correctly.
+     */
+    private def hasCountBug(sub: LogicalPlan): Boolean = sub.find {
+      // The COUNT bug occurs when there is an Aggregate that satisfies all 
the following
+      // conditions:
+      // 1) is on the correlation path
+      // 2) has non-empty group by expressions
+      // 3) has one or more output columns that evaluate to non-null values 
with empty input.
+      //    E.g: COUNT(empty row) = 0.
+      // For simplicity, we use a stricter criteria (1 and 2 only) to 
determine if a query
+      // is subject to the COUNT bug.
+      case a: Aggregate if a.groupingExpressions.nonEmpty => 
hasOuterReferences(a.child)
+      case _ => false
+    }.nonEmpty
+
+    /**
+     * Rewrite a resolved SQL table function plan by removing unnecessary 
lateral joins:
+     * Before:
+     *   LateralJoin lateral-subquery [a], Inner
+     *   :  +- Project [c1, c2]
+     *   :     +- Filter [outer(a) == c1]
+     *   :        +- Relation [c1, c2]
+     *   +- Project [1 AS a]
+     *      +- OneRowRelation
+     * After:
+     *   Project [c1, c2]
+     *   +- Filter [1 == c1]  <---- Replaced outer(a)
+     *      +- Relation [c1, c2]
+     */
+    private def rewrite(plan: LogicalPlan): LogicalPlan = {
+      (plan transformUp {
+        case j @ LateralJoin(Project(aliases, _: OneRowRelation), sub: 
LateralSubquery, Inner, None)
+            if j.resolved && aliases.forall(_.deterministic) =>
+          val attrMap = AttributeMap(aliases.collect { case a: Alias => 
a.toAttribute -> a.child })
+          val newPlan = sub.plan.transformAllExpressionsWithPruning(
+            _.containsPattern(OUTER_REFERENCE)) {
+            // Avoid replacing outer references that do not belong to the 
current outer plan.
+            // This can happen if the child of an alias also contains outer 
references (nested
+            // table function references). E.g:
+            // LateralJoin
+            // :  +- Filter [outer(a) == x]
+            // :     +- Relation [x, y]
+            // +- Project [outer(c) AS a]
+            //    +- OneRowRelation
+            case OuterReference(a: Attribute) if attrMap.contains(a) => 
attrMap(a) match {
+              case ne: NamedExpression => ne
+              case o => Alias(o, a.name)(exprId = a.exprId, qualifier = 
a.qualifier)
+            }
+          }
+          // Keep the original lateral join if the new plan is subject to the 
count bug.
+          if (hasCountBug(newPlan)) j else newPlan
+      }).transformWithPruning(_.containsPattern(ALIAS)) {
+        // As a result of the above rewriting, we may end-up introducing 
nested Aliases (i.e.,
+        // Aliases defined inside Aliases). This is problematic for the plan 
canonicalization as
+        // it doesn't expect nested Aliases. Therefore, here we remove non-top 
level Aliases.
+        case node => node.mapExpressions(trimNonTopLevelAliases)
+      }
+    }
+
+    def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+      _.containsPattern(SQL_TABLE_FUNCTION)) {
+      case SQLTableFunction(name, function, inputs, output) =>
+        val plan = v1SessionCatalog.makeSQLTableFunctionPlan(name, function, 
inputs, output)
+        // Resolve the SQL table function plan using its function context.
+        val conf = new SQLConf()
+        function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, 
v) }
+        val resolved = SQLConf.withExistingConf(conf) {

Review Comment:
   Good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to