cloud-fan commented on code in PR #52173:
URL: https://github.com/apache/spark/pull/52173#discussion_r2312765430


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala:
##########
@@ -17,190 +17,137 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import scala.util.{Either, Left, Right}
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
VariableReference}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Exists, 
Expression, InSubquery, ListQuery, ScalarSubquery, VariableReference}
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.{CompoundBody, LogicalPlan, 
SetVariable}
+import org.apache.spark.sql.catalyst.plans.logical.{ExecutableDuringAnalysis, 
LocalRelation, LogicalPlan, SetVariable, UnaryNode}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{EXECUTE_IMMEDIATE, 
TreePattern}
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.StringType
 
 /**
  * Logical plan representing execute immediate query.
  *
- * @param args parameters of query
- * @param query query string or variable
+ * @param queryParam the query expression (first child)
+ * @param args parameters from USING clause (subsequent children)
  * @param targetVariables variables to store the result of the query
  */
 case class ExecuteImmediateQuery(
+    queryParam: Expression,
     args: Seq[Expression],
-    query: Either[String, UnresolvedAttribute],
-    targetVariables: Seq[UnresolvedAttribute])
+    targetVariables: Seq[Expression])
   extends UnresolvedLeafNode {
+
   final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE)
 }
 
 /**
- * This rule substitutes execute immediate query node with fully analyzed
- * plan that is passed as string literal or session parameter.
+ * Logical plan representing a resolved execute immediate command that will 
recursively
+ * invoke SQL execution.
+ *
+ * @param queryParam the resolved query expression
+ * @param args parameters from USING clause
  */
-class SubstituteExecuteImmediate(
-    val catalogManager: CatalogManager,
-    resolveChild: LogicalPlan => LogicalPlan,
-    checkAnalysis: LogicalPlan => Unit)
-  extends Rule[LogicalPlan] {
-  private val variableResolution = new 
VariableResolution(catalogManager.tempVariableManager)
-
-  def resolveVariable(e: Expression): Expression = {
-
-    /**
-     * We know that the expression is either UnresolvedAttribute, Alias or 
Parameter, as passed from
-     * the parser. If it is an UnresolvedAttribute, we look it up in the 
catalog and return it. If
-     * it is an Alias, we resolve the child and return an Alias with the same 
name. If it is
-     * a Parameter, we leave it as is because the parameter belongs to another 
parameterized
-     * query and should be resolved later.
-     */
-    e match {
-      case u: UnresolvedAttribute =>
-        getVariableReference(u, u.nameParts)
-      case a: Alias =>
-        Alias(resolveVariable(a.child), a.name)()
-      case p: Parameter => p
-      case other =>
-        throw QueryCompilationErrors.unsupportedParameterExpression(other)
-    }
-  }
+case class ExecuteImmediateCommand(
+    queryParam: Expression,
+    args: Seq[Expression])
+  extends UnaryNode with ExecutableDuringAnalysis {
 
-  def resolveArguments(expressions: Seq[Expression]): Seq[Expression] = {
-    expressions.map { exp =>
-      if (exp.resolved) {
-        exp
-      } else {
-        resolveVariable(exp)
-      }
-    }
-  }
+  final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE)
 
-  def extractQueryString(either: Either[String, UnresolvedAttribute]): String 
= {
-    either match {
-      case Left(v) => v
-      case Right(u) =>
-        val varReference = getVariableReference(u, u.nameParts)
+  override def child: LogicalPlan = LocalRelation(Nil, Nil)
 
-        if (!varReference.dataType.sameType(StringType)) {
-          throw 
QueryCompilationErrors.invalidExecuteImmediateVariableType(varReference.dataType)
-        }
+  override def output: Seq[Attribute] = child.output
 
-        // Call eval with null value passed instead of a row.
-        // This is ok as this is variable and invoking eval should
-        // be independent of row value.
-        val varReferenceValue = varReference.eval(null)
+  override lazy val resolved: Boolean = {
+    // ExecuteImmediateCommand should not be considered resolved until it has 
been
+    // executed and replaced by ExecuteImmediateCommands rule.
+    // This ensures that SetVariable waits for execution to complete.
+    false
+  }
 
-        if (varReferenceValue == null) {
-          throw QueryCompilationErrors.nullSQLStringExecuteImmediate(u.name)
-        }
+  override def stageForExplain(): LogicalPlan = {
+    // For EXPLAIN, just show the command without executing it
+    copy()
+  }
 
-        varReferenceValue.toString
-    }
+  override protected def withNewChildInternal(
+      newChild: LogicalPlan): ExecuteImmediateCommand = {
+    copy()
   }
+}
+
+/**
+ * This rule resolves execute immediate query node into a command node
+ * that will handle recursive SQL execution.
+ */
+class ResolveExecuteImmediate(
+    val catalogManager: CatalogManager,
+    resolveChild: LogicalPlan => LogicalPlan,
+    checkAnalysis: LogicalPlan => Unit) extends Rule[LogicalPlan] {
+  private val variableResolution = new 
VariableResolution(catalogManager.tempVariableManager)
 
   override def apply(plan: LogicalPlan): LogicalPlan =
     plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), 
ruleId) {
-      case e @ ExecuteImmediateQuery(expressions, _, _) if 
expressions.exists(!_.resolved) =>
-        e.copy(args = resolveArguments(expressions))
-
-      case ExecuteImmediateQuery(expressions, query, targetVariables)
-        if expressions.forall(_.resolved) =>
-
-        val queryString = extractQueryString(query)
-        val plan = parseStatement(queryString, targetVariables)
-
-        val posNodes = plan.collect { case p: LogicalPlan =>
-          p.expressions.flatMap(_.collect { case n: PosParameter => n })
-        }.flatten
-        val namedNodes = plan.collect { case p: LogicalPlan =>
-          p.expressions.flatMap(_.collect { case n: NamedParameter => n })
-        }.flatten
-
-        val queryPlan = if (expressions.isEmpty || (posNodes.isEmpty && 
namedNodes.isEmpty)) {
-          plan
-        } else if (posNodes.nonEmpty && namedNodes.nonEmpty) {
-          throw QueryCompilationErrors.invalidQueryMixedQueryParameters()
-        } else {
-          if (posNodes.nonEmpty) {
-            PosParameterizedQuery(plan, expressions)
-          } else {
-            val aliases = expressions.collect {
-              case e: Alias => e
-              case u: VariableReference => Alias(u, u.identifier.name())()
-            }
-
-            if (aliases.size != expressions.size) {
-              val nonAliases = expressions.filter(attr =>
-                !attr.isInstanceOf[Alias] && 
!attr.isInstanceOf[VariableReference])
+      case e @ ExecuteImmediateQuery(queryParam, args, targetVariables) =>
+        // Check if all expressions are resolved (they should be resolved by 
ResolveReferences now)
+        val queryParamResolved = queryParam.resolved
+        val allArgsResolved = args.forall(_.resolved)
+        val targetVariablesResolved = targetVariables.forall {
+          case _: UnresolvedAttribute => false // Unresolved attributes are 
not resolved
+          case alias: Alias => alias.child.resolved // For aliases, check if 
child is resolved
+          case _: VariableReference => true // VariableReference is already 
resolved
+          case expr => expr.resolved // For other expressions, use standard 
resolved check
+        }

Review Comment:
   How is it different from `targetVariables.forall(_.resolved)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to