allisonwang-db commented on code in PR #49414: URL: https://github.com/apache/spark/pull/49414#discussion_r1908337016
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala: ########## @@ -1561,6 +1561,121 @@ class SessionCatalog( } } + /** + * Constructs a scalar SQL function logical plan. The logical plan will be used to + * construct actual expression from the function inputs and body. + * + * The body of a scalar SQL function can either be an expression or a query returns + * one single column. + * + * Example scalar SQL function with an expression: + * + * CREATE FUNCTION area(width DOUBLE, height DOUBLE) RETURNS DOUBLE + * RETURN width * height; + * + * Query: + * + * SELECT area(a, b) FROM t; + * + * Analyzed SQL function plan: + * + * Project [CAST(width * height AS DOUBLE) AS area] + * +- Project [CAST(outer(a) AS DOUBLE) AS width, CAST(outer(b AS DOUBLE) AS height] + * +- OneRowRelation + * + * Analyzed plan: + * + * Project [area(width, height) AS area] + * +- Project [a, b, CAST(a AS DOUBLE) AS width, CAST(b AS DOUBLE) AS height] + * +- Relation [a, b] + * + * Example scalar SQL function with a subquery: + * + * CREATE FUNCTION foo(x INT) RETURNS INT + * RETURN SELECT SUM(a) FROM t WHERE x = a; + * + * SELECT foo(a) FROM t; + * + * Analyzed SQL function plan: + * + * Project [scalar-subquery AS foo] + * : +- Aggregate [] [sum(a)] + * : +- Filter [outer(x) = a] + * : +- Relation [a, b] + * +- Project [CAST(outer(a) AS INT) AS x] + * +- OneRowRelation + * + * Analyzed plan: + * + * Project [foo(x) AS foo] + * +- Project [a, b, CAST(a AS INT) AS x] + * +- Relation [a, b] + */ + def makeSQLFunctionPlan( + name: String, + function: SQLFunction, + input: Seq[Expression]): LogicalPlan = { + def metaForFuncInputAlias = { + new MetadataBuilder() + .putString("__funcInputAlias", "true") + .build() + } + assert(!function.isTableFunc) + val funcName = function.name.funcName + + // Use captured SQL configs when parsing a SQL function. + val conf = new SQLConf() + function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) } + SQLConf.withExistingConf(conf) { + val inputParam = function.inputParam + val returnType = function.getScalarFuncReturnType + val (expression, query) = function.getExpressionAndQuery(parser, isTableFunc = false) + assert(expression.isDefined || query.isDefined) + + // Check function arguments + val paramSize = inputParam.map(_.size).getOrElse(0) + if (input.size > paramSize) { + throw QueryCompilationErrors.wrongNumArgsError( + name, paramSize.toString, input.size) + } + + val inputs = inputParam.map { param => + // Attributes referencing the input parameters inside the function can use the + // function name as a qualifier. E.G.: + // `create function foo(a int) returns int return foo.a` + val qualifier = Seq(funcName) + val paddedInput = input ++ + param.takeRight(paramSize - input.size).map { p => + val defaultExpr = p.getDefault() + if (defaultExpr.isDefined) { + Cast(parseDefault(defaultExpr.get, parser), p.dataType) + } else { + throw QueryCompilationErrors.wrongNumArgsError( + name, paramSize.toString, input.size) + } + } + + paddedInput.zip(param.fields).map { + case (expr, param) => + // Add outer references to all attributes and outer references in the function input. + // Outer references also need to be wrapped because the function input may already + // contain outer references. + val outer = expr.transform { + case a: Attribute => OuterReference(a) Review Comment: The first step to resolve a SQL UDF is to verify the function body (expression or subquery) can be resolved correctly using the captured SQL config. We wrap the function inputs with outer references so that we can run simple analyzer on top: ``` Project [CAST(width * height AS DOUBLE) AS area] +- Project [CAST(outer(a) AS DOUBLE) AS width, CAST(outer(b AS DOUBLE) AS height] +- OneRowRelation ``` Once analyzed, the next step is to inline the SQL UDF body into the original query plan tree (rewriteSQLFunctions) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org