cloud-fan commented on code in PR #52173: URL: https://github.com/apache/spark/pull/52173#discussion_r2312766450
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala: ########## @@ -17,190 +17,137 @@ package org.apache.spark.sql.catalyst.analysis -import scala.util.{Either, Left, Right} - -import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, VariableReference} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Exists, Expression, InSubquery, ListQuery, ScalarSubquery, VariableReference} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{CompoundBody, LogicalPlan, SetVariable} +import org.apache.spark.sql.catalyst.plans.logical.{ExecutableDuringAnalysis, LocalRelation, LogicalPlan, SetVariable, UnaryNode} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{EXECUTE_IMMEDIATE, TreePattern} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.types.StringType /** * Logical plan representing execute immediate query. * - * @param args parameters of query - * @param query query string or variable + * @param queryParam the query expression (first child) + * @param args parameters from USING clause (subsequent children) * @param targetVariables variables to store the result of the query */ case class ExecuteImmediateQuery( + queryParam: Expression, args: Seq[Expression], - query: Either[String, UnresolvedAttribute], - targetVariables: Seq[UnresolvedAttribute]) + targetVariables: Seq[Expression]) extends UnresolvedLeafNode { + final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE) } /** - * This rule substitutes execute immediate query node with fully analyzed - * plan that is passed as string literal or session parameter. + * Logical plan representing a resolved execute immediate command that will recursively + * invoke SQL execution. + * + * @param queryParam the resolved query expression + * @param args parameters from USING clause */ -class SubstituteExecuteImmediate( - val catalogManager: CatalogManager, - resolveChild: LogicalPlan => LogicalPlan, - checkAnalysis: LogicalPlan => Unit) - extends Rule[LogicalPlan] { - private val variableResolution = new VariableResolution(catalogManager.tempVariableManager) - - def resolveVariable(e: Expression): Expression = { - - /** - * We know that the expression is either UnresolvedAttribute, Alias or Parameter, as passed from - * the parser. If it is an UnresolvedAttribute, we look it up in the catalog and return it. If - * it is an Alias, we resolve the child and return an Alias with the same name. If it is - * a Parameter, we leave it as is because the parameter belongs to another parameterized - * query and should be resolved later. - */ - e match { - case u: UnresolvedAttribute => - getVariableReference(u, u.nameParts) - case a: Alias => - Alias(resolveVariable(a.child), a.name)() - case p: Parameter => p - case other => - throw QueryCompilationErrors.unsupportedParameterExpression(other) - } - } +case class ExecuteImmediateCommand( + queryParam: Expression, + args: Seq[Expression]) + extends UnaryNode with ExecutableDuringAnalysis { - def resolveArguments(expressions: Seq[Expression]): Seq[Expression] = { - expressions.map { exp => - if (exp.resolved) { - exp - } else { - resolveVariable(exp) - } - } - } + final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE) - def extractQueryString(either: Either[String, UnresolvedAttribute]): String = { - either match { - case Left(v) => v - case Right(u) => - val varReference = getVariableReference(u, u.nameParts) + override def child: LogicalPlan = LocalRelation(Nil, Nil) - if (!varReference.dataType.sameType(StringType)) { - throw QueryCompilationErrors.invalidExecuteImmediateVariableType(varReference.dataType) - } + override def output: Seq[Attribute] = child.output - // Call eval with null value passed instead of a row. - // This is ok as this is variable and invoking eval should - // be independent of row value. - val varReferenceValue = varReference.eval(null) + override lazy val resolved: Boolean = { + // ExecuteImmediateCommand should not be considered resolved until it has been + // executed and replaced by ExecuteImmediateCommands rule. + // This ensures that SetVariable waits for execution to complete. + false + } - if (varReferenceValue == null) { - throw QueryCompilationErrors.nullSQLStringExecuteImmediate(u.name) - } + override def stageForExplain(): LogicalPlan = { + // For EXPLAIN, just show the command without executing it + copy() + } - varReferenceValue.toString - } + override protected def withNewChildInternal( + newChild: LogicalPlan): ExecuteImmediateCommand = { + copy() } +} + +/** + * This rule resolves execute immediate query node into a command node + * that will handle recursive SQL execution. + */ +class ResolveExecuteImmediate( + val catalogManager: CatalogManager, + resolveChild: LogicalPlan => LogicalPlan, + checkAnalysis: LogicalPlan => Unit) extends Rule[LogicalPlan] { + private val variableResolution = new VariableResolution(catalogManager.tempVariableManager) override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) { - case e @ ExecuteImmediateQuery(expressions, _, _) if expressions.exists(!_.resolved) => - e.copy(args = resolveArguments(expressions)) - - case ExecuteImmediateQuery(expressions, query, targetVariables) - if expressions.forall(_.resolved) => - - val queryString = extractQueryString(query) - val plan = parseStatement(queryString, targetVariables) - - val posNodes = plan.collect { case p: LogicalPlan => - p.expressions.flatMap(_.collect { case n: PosParameter => n }) - }.flatten - val namedNodes = plan.collect { case p: LogicalPlan => - p.expressions.flatMap(_.collect { case n: NamedParameter => n }) - }.flatten - - val queryPlan = if (expressions.isEmpty || (posNodes.isEmpty && namedNodes.isEmpty)) { - plan - } else if (posNodes.nonEmpty && namedNodes.nonEmpty) { - throw QueryCompilationErrors.invalidQueryMixedQueryParameters() - } else { - if (posNodes.nonEmpty) { - PosParameterizedQuery(plan, expressions) - } else { - val aliases = expressions.collect { - case e: Alias => e - case u: VariableReference => Alias(u, u.identifier.name())() - } - - if (aliases.size != expressions.size) { - val nonAliases = expressions.filter(attr => - !attr.isInstanceOf[Alias] && !attr.isInstanceOf[VariableReference]) + case e @ ExecuteImmediateQuery(queryParam, args, targetVariables) => + // Check if all expressions are resolved (they should be resolved by ResolveReferences now) + val queryParamResolved = queryParam.resolved + val allArgsResolved = args.forall(_.resolved) + val targetVariablesResolved = targetVariables.forall { + case _: UnresolvedAttribute => false // Unresolved attributes are not resolved + case alias: Alias => alias.child.resolved // For aliases, check if child is resolved + case _: VariableReference => true // VariableReference is already resolved + case expr => expr.resolved // For other expressions, use standard resolved check + } - throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(nonAliases) + // Validate that USING clause expressions don't contain unsupported constructs + validateUsingClauseExpressions(args) + + // Validate that query parameter is foldable (constant expression) + validateQueryParameter(queryParam) + + if (queryParamResolved && allArgsResolved && targetVariablesResolved) { + // All resolved - transform based on whether we have target variables + if (targetVariables.nonEmpty) { + // EXECUTE IMMEDIATE ... INTO should generate SetVariable plan + // SetVariable expects UnresolvedAttribute objects that ResolveSetVariable will resolve + val finalTargetVars = targetVariables.map { + case attr: UnresolvedAttribute => Review Comment: We will never hit it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org