davidm-db commented on code in PR #50026:
URL: https://github.com/apache/spark/pull/50026#discussion_r1964228225


##########
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala:
##########
@@ -823,35 +827,41 @@ class ForStatementExec(
 
       override def hasNext: Boolean = !interrupted && (state match {
           case ForState.VariableAssignment => cachedQueryResult().hasNext
-          case ForState.Body => true
-          case ForState.VariableCleanup => 
dropVariablesExec.getTreeIterator.hasNext
+          case ForState.Body => bodyWithVariables.getTreeIterator.hasNext
         })
 
       @scala.annotation.tailrec
       override def next(): CompoundStatementExec = state match {
 
         case ForState.VariableAssignment =>
-          variablesMap = createVariablesMapFromRow(cachedQueryResult().next())
-
-          if (!areVariablesDeclared) {
-            // create and execute declare var statements
-            variablesMap.keys.toSeq
-              .map(colName => createDeclareVarExec(colName, 
variablesMap(colName)))
-              .foreach(declareVarExec => 
declareVarExec.buildDataFrame(session).collect())
-            areVariablesDeclared = true
-          }
-
-          // create and execute set var statements
-          variablesMap.keys.toSeq
-            .map(colName => createSetVarExec(colName, variablesMap(colName)))
-            .foreach(setVarExec => 
setVarExec.buildDataFrame(session).collect())
+          val row = cachedQueryResult().next()
+
+          val variableInitStatements = row.schema.names.toSeq
+            .map { colName => (colName, 
createExpressionFromValue(row.getAs(colName))) }
+            .flatMap { case (colName, expr) => Seq(
+              createDeclareVarExec(colName, expr),
+              createSetVarExec(colName, expr)
+            ) }
+
+          bodyWithVariables = new CompoundBodyExec(
+            // NoOpStatementExec appended to end of body to prevent
+            // dropping variables before last statement is executed.

Review Comment:
   Not sure I understand why is this? Check the comment in 
`SparkSession.executeSqlScript`:
   ```
       // We must execute returned df before calling sse.getNextResult again 
because sse.hasNext
       // advances the script execution and executes all statements until the 
next result. We must
       // collect results immediately to maintain execution order.
       // This ensures we respect the contract of SqlScriptingExecution API.
   ```
   We have ensured that we don't do anything before the resulting statement is 
executed. Even if we didn't do it, I'm not sure how would this help - i.e. 
would it just get skipped over?
   Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to