srielau commented on code in PR #52173: URL: https://github.com/apache/spark/pull/52173#discussion_r2344578422
########## sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.SqlScriptingContextManager +import org.apache.spark.sql.catalyst.expressions.{Alias, EmptyRow, Expression, Literal, + VariableReference} +import org.apache.spark.sql.catalyst.plans.logical.{Command, CompoundBody, LogicalPlan, SetVariable} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.EXECUTE_IMMEDIATE +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.StringType + +/** + * Analysis rule that resolves and executes EXECUTE IMMEDIATE statements during analysis, + * replacing them with the results, similar to how CALL statements work. + * This rule combines resolution and execution in a single pass. + */ +case class ResolveExecuteImmediate(sparkSession: SparkSession, catalogManager: CatalogManager) + extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) { + case node @ UnresolvedExecuteImmediate(sqlStmtStr, args, targetVariables) => + if (sqlStmtStr.resolved && targetVariables.forall(_.resolved) && args.forall(_.resolved)) { + // All resolved - execute immediately and handle INTO clause if present + if (targetVariables.nonEmpty) { + // EXECUTE IMMEDIATE ... INTO should generate SetVariable plan with eagerly executed + // source + val finalTargetVars = resolveTargetVariables(targetVariables) + val executedSource = executeImmediateQuery(sqlStmtStr, args, hasIntoClause = true) + SetVariable(finalTargetVars, executedSource) + } else { + // Regular EXECUTE IMMEDIATE without INTO - execute and return result directly + executeImmediateQuery(sqlStmtStr, args, hasIntoClause = false) + } + } else { + // Not all resolved yet - wait for next iteration + node + } + } + } + + private def resolveTargetVariables(targetVariables: Seq[Expression]): Seq[VariableReference] = { + targetVariables.map { + case alias: Alias => + // Extract the VariableReference from the alias + alias.child match { + case varRef: VariableReference => + // Use resolved VariableReference directly with canFold = false + varRef.copy(canFold = false) + case _ => + throw QueryCompilationErrors.unsupportedParameterExpression(alias.child) + } + case varRef: VariableReference => + // Use resolved VariableReference directly with canFold = false + varRef.copy(canFold = false) + case other => + throw QueryCompilationErrors.unsupportedParameterExpression(other) + } + } + + private def executeImmediateQuery( + sqlStmtStr: Expression, + args: Seq[Expression], + hasIntoClause: Boolean): LogicalPlan = { + // Extract the query string from the queryParam expression + val sqlString = extractQueryString(sqlStmtStr) + + // Parse and validate the query + val parsedPlan = sparkSession.sessionState.sqlParser.parsePlan(sqlString) + validateQuery(sqlString, parsedPlan) + + // Execute the query recursively with isolated local variable context + val result = if (args.isEmpty) { + // No parameters - execute directly + withIsolatedLocalVariableContext { + sparkSession.sql(sqlString) + } + } else { + // For parameterized queries, build parameter arrays + val (paramValues, paramNames) = buildUnifiedParameters(args) + + withIsolatedLocalVariableContext { + sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession] Review Comment: If I do that i need to expose it to spark Connect Is that a problem? @davidm-db -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org