cloud-fan commented on code in PR #49715: URL: https://github.com/apache/spark/pull/49715#discussion_r1950213620
########## sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala: ########## @@ -303,3 +308,43 @@ case class TableCacheQueryStageExec( override def getRuntimeStatistics: Statistics = inMemoryTableScan.runtimeStatistics } + +case class ResultQueryStageExec( + override val id: Int, + override val plan: SparkPlan, + resultHandler: SparkPlan => Any) extends QueryStageExec { + + override def resetMetrics(): Unit = { + plan.resetMetrics() + } + + override protected def doMaterialize(): Future[Any] = { + val javaFuture = SQLExecution.withThreadLocalCaptured( + session, + ResultQueryStageExec.executionContext) { + resultHandler(plan) + } + val scalaPromise: Promise[Any] = Promise() + javaFuture.whenComplete { (result: Any, exception: Throwable) => + if (exception != null) { + scalaPromise.failure(exception match { + case completionException: java.util.concurrent.CompletionException => + completionException.getCause + case ex => ex + }) + } else { + scalaPromise.success(result) + } + } + scalaPromise.future + } + + // Result stage could be any SparkPlan, so we don't have a specific runtime statistics for it. + override def getRuntimeStatistics: Statistics = Statistics(sizeInBytes = 0, rowCount = None) Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org