juliuszsompolski commented on code in PR #49370:
URL: https://github.com/apache/spark/pull/49370#discussion_r1913629950


##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala:
##########
@@ -126,6 +126,18 @@ private[connect] class ExecuteHolder(
     runner.start()
   }
 
+  /**
+   * Check if the execution was ended without finalizing the outcome and no 
further progress will
+   * be made. If the execution was delegated, this method always returns false.
+   */
+  def isOrphan(): Boolean = {
+    // Check runner.completed() before others as the acquire memory fence in 
the method ensures the
+    // current thread to read the last known state of responseObserver 
correctly.
+    runner.completed() &&
+    !runner.shouldDelegateCompleteResponse(request) &&
+    !responseObserver.completed()
+  }

Review Comment:
   I need to look at it fresh tomorrow because with the new version of the code 
I am again confused :-).
   This basically checks if the ExecuteThreadRunner exited without sending 
onCompleted / onError.
   onCompleted is send at the end of executeInternal.
   execute is wrapped in various try catches.
   
   How about checking there that executeInternal exited without closing the 
stream, and closing the stream with an onError from there? Then the RPC handler 
side should get this error via the usual route.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to