allisonwang-db commented on code in PR #49414:
URL: https://github.com/apache/spark/pull/49414#discussion_r1908337016


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -1561,6 +1561,121 @@ class SessionCatalog(
     }
   }
 
+  /**
+   * Constructs a scalar SQL function logical plan. The logical plan will be 
used to
+   * construct actual expression from the function inputs and body.
+   *
+   * The body of a scalar SQL function can either be an expression or a query 
returns
+   * one single column.
+   *
+   * Example scalar SQL function with an expression:
+   *
+   *   CREATE FUNCTION area(width DOUBLE, height DOUBLE) RETURNS DOUBLE
+   *   RETURN width * height;
+   *
+   * Query:
+   *
+   *   SELECT area(a, b) FROM t;
+   *
+   * Analyzed SQL function plan:
+   *
+   *   Project [CAST(width * height AS DOUBLE) AS area]
+   *   +- Project [CAST(outer(a) AS DOUBLE) AS width, CAST(outer(b AS DOUBLE) 
AS height]
+   *      +- OneRowRelation
+   *
+   * Analyzed plan:
+   *
+   *   Project [area(width, height) AS area]
+   *   +- Project [a, b, CAST(a AS DOUBLE) AS width, CAST(b AS DOUBLE) AS 
height]
+   *      +- Relation [a, b]
+   *
+   * Example scalar SQL function with a subquery:
+   *
+   *   CREATE FUNCTION foo(x INT) RETURNS INT
+   *   RETURN SELECT SUM(a) FROM t WHERE x = a;
+   *
+   *   SELECT foo(a) FROM t;
+   *
+   * Analyzed SQL function plan:
+   *
+   *   Project [scalar-subquery AS foo]
+   *   :  +- Aggregate [] [sum(a)]
+   *   :     +- Filter [outer(x) = a]
+   *   :        +- Relation [a, b]
+   *   +- Project [CAST(outer(a) AS INT) AS x]
+   *      +- OneRowRelation
+   *
+   * Analyzed plan:
+   *
+   *   Project [foo(x) AS foo]
+   *   +- Project [a, b, CAST(a AS INT) AS x]
+   *      +- Relation [a, b]
+   */
+  def makeSQLFunctionPlan(
+      name: String,
+      function: SQLFunction,
+      input: Seq[Expression]): LogicalPlan = {
+    def metaForFuncInputAlias = {
+      new MetadataBuilder()
+        .putString("__funcInputAlias", "true")
+        .build()
+    }
+    assert(!function.isTableFunc)
+    val funcName = function.name.funcName
+
+    // Use captured SQL configs when parsing a SQL function.
+    val conf = new SQLConf()
+    function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) }
+    SQLConf.withExistingConf(conf) {
+      val inputParam = function.inputParam
+      val returnType = function.getScalarFuncReturnType
+      val (expression, query) = function.getExpressionAndQuery(parser, 
isTableFunc = false)
+      assert(expression.isDefined || query.isDefined)
+
+      // Check function arguments
+      val paramSize = inputParam.map(_.size).getOrElse(0)
+      if (input.size > paramSize) {
+        throw QueryCompilationErrors.wrongNumArgsError(
+          name, paramSize.toString, input.size)
+      }
+
+      val inputs = inputParam.map { param =>
+        // Attributes referencing the input parameters inside the function can 
use the
+        // function name as a qualifier. E.G.:
+        // `create function foo(a int) returns int return foo.a`
+        val qualifier = Seq(funcName)
+        val paddedInput = input ++
+          param.takeRight(paramSize - input.size).map { p =>
+            val defaultExpr = p.getDefault()
+            if (defaultExpr.isDefined) {
+              Cast(parseDefault(defaultExpr.get, parser), p.dataType)
+            } else {
+              throw QueryCompilationErrors.wrongNumArgsError(
+                name, paramSize.toString, input.size)
+            }
+          }
+
+        paddedInput.zip(param.fields).map {
+          case (expr, param) =>
+            // Add outer references to all attributes and outer references in 
the function input.
+            // Outer references also need to be wrapped because the function 
input may already
+            // contain outer references.
+            val outer = expr.transform {
+              case a: Attribute => OuterReference(a)

Review Comment:
   The first step to resolve a SQL UDF is to verify the function body 
(expression or subquery) can be resolved correctly using the captured SQL 
config. We wrap the function inputs with outer references so that we can run 
simple analyzer on top:
   ```
   Project [CAST(width * height AS DOUBLE) AS area]
     +- Project [CAST(outer(a) AS DOUBLE) AS width, CAST(outer(b AS DOUBLE) AS 
height]
       +- OneRowRelation
   ```
   Once analyzed, the next step is to inline the SQL UDF body into the original 
query plan tree (rewriteSQLFunctions)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to