cloud-fan commented on code in PR #49471:
URL: https://github.com/apache/spark/pull/49471#discussion_r1982858588


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##########
@@ -1675,6 +1676,91 @@ class SessionCatalog(
     }
   }
 
+  /**
+   * Constructs a SQL table function plan.
+   *
+   * Example SQL table function:
+   *
+   *   CREATE FUNCTION foo(x INT) RETURNS TABLE(a INT) RETURN SELECT x + 1 AS 
x1
+   *
+   * Query:
+   *
+   *   SELECT * FROM foo(1);
+   *
+   * Plan:
+   *
+   *   Project [CAST(x1 AS INT) AS a]
+   *   +- LateralJoin lateral-subquery [x]
+   *      :  +- Project [(outer(x) + 1) AS x1]
+   *      :     +- OneRowRelation
+   *      +- Project [CAST(1 AS INT) AS x]
+   *         +- OneRowRelation
+   */
+  def makeSQLTableFunctionPlan(
+      name: String,
+      function: SQLFunction,
+      input: Seq[Expression],
+      outputAttrs: Seq[Attribute]): LogicalPlan = {
+    assert(function.isTableFunc)
+    val funcName = function.name.funcName
+
+    // Use captured SQL configs when parsing a SQL function.
+    val conf = new SQLConf()
+    function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) }
+    SQLConf.withExistingConf(conf) {
+      val inputParam = function.inputParam
+      val returnParam = function.getTableFuncReturnCols
+      val (_, query) = function.getExpressionAndQuery(parser, isTableFunc = 
true)
+      assert(query.isDefined)
+
+      // Check function arguments
+      val paramSize = inputParam.map(_.size).getOrElse(0)
+      if (input.size > paramSize) {
+        throw QueryCompilationErrors.wrongNumArgsError(
+          name, paramSize.toString, input.size)
+      }
+
+      val body = if (inputParam.isDefined) {
+        val param = inputParam.get
+        // Attributes referencing the input parameters inside the function can 
use the
+        // function name as a qualifier.
+        val qualifier = Seq(funcName)
+        val paddedInput = input ++
+          param.takeRight(paramSize - input.size).map { p =>
+            val defaultExpr = p.getDefault()
+            if (defaultExpr.isDefined) {
+              Cast(parseDefault(defaultExpr.get, parser), p.dataType)
+            } else {
+              throw QueryCompilationErrors.wrongNumArgsError(
+                name, paramSize.toString, input.size)
+            }
+          }
+
+        val inputCast = paddedInput.zip(param.fields).map {
+          case (expr, param) =>
+            // Add outer references to all attributes in the function input.
+            val outer = expr.transform {
+              case a: Attribute => OuterReference(a)
+            }
+            Alias(Cast(outer, param.dataType), param.name)(qualifier = 
qualifier)

Review Comment:
   Is this Cast duplicated? It's already added in 
https://github.com/apache/spark/pull/49471/files#diff-9dd0899e5406230aeff96654432da54f35255f6dc60eecb87264a5c508a8c826R1732



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to