juliuszsompolski commented on code in PR #49370: URL: https://github.com/apache/spark/pull/49370#discussion_r1913629950
########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala: ########## @@ -126,6 +126,18 @@ private[connect] class ExecuteHolder( runner.start() } + /** + * Check if the execution was ended without finalizing the outcome and no further progress will + * be made. If the execution was delegated, this method always returns false. + */ + def isOrphan(): Boolean = { + // Check runner.completed() before others as the acquire memory fence in the method ensures the + // current thread to read the last known state of responseObserver correctly. + runner.completed() && + !runner.shouldDelegateCompleteResponse(request) && + !responseObserver.completed() + } Review Comment: I need to look at it fresh tomorrow because with the new version of the code I am again confused :-). This basically checks if the ExecuteThreadRunner exited without sending onCompleted / onError. onCompleted is send at the end of executeInternal. execute is wrapped in various try catches. What about checking there that executeInternal exited without closing the stream, and closing the stream with an onError from there? Then the RPC handler side should get this error via the usual route. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org