srielau commented on code in PR #52173:
URL: https://github.com/apache/spark/pull/52173#discussion_r2344578422


##########
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.SqlScriptingContextManager
+import org.apache.spark.sql.catalyst.expressions.{Alias, EmptyRow, Expression, 
Literal,
+  VariableReference}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, CompoundBody, 
LogicalPlan, SetVariable}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.EXECUTE_IMMEDIATE
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.StringType
+
+/**
+ * Analysis rule that resolves and executes EXECUTE IMMEDIATE statements 
during analysis,
+ * replacing them with the results, similar to how CALL statements work.
+ * This rule combines resolution and execution in a single pass.
+ */
+case class ResolveExecuteImmediate(sparkSession: SparkSession, catalogManager: 
CatalogManager)
+  extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), 
ruleId) {
+      case node @ UnresolvedExecuteImmediate(sqlStmtStr, args, 
targetVariables) =>
+        if (sqlStmtStr.resolved && targetVariables.forall(_.resolved) && 
args.forall(_.resolved)) {
+          // All resolved - execute immediately and handle INTO clause if 
present
+          if (targetVariables.nonEmpty) {
+            // EXECUTE IMMEDIATE ... INTO should generate SetVariable plan 
with eagerly executed
+            // source
+            val finalTargetVars = resolveTargetVariables(targetVariables)
+            val executedSource = executeImmediateQuery(sqlStmtStr, args, 
hasIntoClause = true)
+            SetVariable(finalTargetVars, executedSource)
+          } else {
+            // Regular EXECUTE IMMEDIATE without INTO - execute and return 
result directly
+            executeImmediateQuery(sqlStmtStr, args, hasIntoClause = false)
+          }
+        } else {
+          // Not all resolved yet - wait for next iteration
+          node
+        }
+    }
+  }
+
+  private def resolveTargetVariables(targetVariables: Seq[Expression]): 
Seq[VariableReference] = {
+    targetVariables.map {
+      case alias: Alias =>
+        // Extract the VariableReference from the alias
+        alias.child match {
+          case varRef: VariableReference =>
+            // Use resolved VariableReference directly with canFold = false
+            varRef.copy(canFold = false)
+          case _ =>
+            throw 
QueryCompilationErrors.unsupportedParameterExpression(alias.child)
+        }
+      case varRef: VariableReference =>
+        // Use resolved VariableReference directly with canFold = false
+        varRef.copy(canFold = false)
+      case other =>
+        throw QueryCompilationErrors.unsupportedParameterExpression(other)
+    }
+  }
+
+  private def executeImmediateQuery(
+      sqlStmtStr: Expression,
+      args: Seq[Expression],
+      hasIntoClause: Boolean): LogicalPlan = {
+    // Extract the query string from the queryParam expression
+    val sqlString = extractQueryString(sqlStmtStr)
+
+    // Parse and validate the query
+    val parsedPlan = sparkSession.sessionState.sqlParser.parsePlan(sqlString)
+    validateQuery(sqlString, parsedPlan)
+
+    // Execute the query recursively with isolated local variable context
+    val result = if (args.isEmpty) {
+      // No parameters - execute directly
+      withIsolatedLocalVariableContext {
+        sparkSession.sql(sqlString)
+      }
+    } else {
+      // For parameterized queries, build parameter arrays
+      val (paramValues, paramNames) = buildUnifiedParameters(args)
+
+      withIsolatedLocalVariableContext {
+        sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession]

Review Comment:
   If I do that i need to expose it to spark Connect Is that a problem? 
@davidm-db 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to