jjayadeep06 commented on code in PR #50020: URL: https://github.com/apache/spark/pull/50020#discussion_r1973390612
########## core/src/main/scala/org/apache/spark/BarrierCoordinator.scala: ########## @@ -122,23 +124,40 @@ private[spark] class BarrierCoordinator( // Init a TimerTask for a barrier() call. private def initTimerTask(state: ContextBarrierState): Unit = { timerTask = new TimerTask { - override def run(): Unit = state.synchronized { - // Timeout current barrier() call, fail all the sync requests. - requesters.foreach(_.sendFailure(new SparkException("The coordinator didn't get all " + - s"barrier sync requests for barrier epoch $barrierEpoch from $barrierId within " + - s"$timeoutInSecs second(s)."))) - cleanupBarrierStage(barrierId) - } + override def run(): Unit = + try { + state.synchronized { + if (!Thread.currentThread().isInterrupted()) { + // Timeout current barrier() call, fail all the sync requests. + requesters.foreach( + _.sendFailure(new SparkException("The coordinator didn't get all " + + s"barrier sync requests for barrier epoch +" + + s" $barrierEpoch from $barrierId within " + + s"$timeoutInSecs second(s)."))) + cleanupBarrierStage(barrierId) + } + } + } catch { + case _: InterruptedException => + // Handle interruption gracefully + Thread.currentThread().interrupt() + case e: Exception => new SparkException("Error during " + + s"running of barrier tasks for " + + s"$barrierId", e) + } finally { + // Ensure cleanup happens even if interrupted or exception occurs + cleanupBarrierStage(barrierId) + state.clear() + } } } - // Cancel the current active TimerTask and release resources. + /* Cancel the tasks scheduled to run inside the ScheduledExecutor Threadpool + * The original implementation was clearing java.util.Timer and java.util.TimerTasks + * This became a no-op when java.util.Timer was replaced with ScheduledThreadPoolExecutor + */ private def cancelTimerTask(): Unit = { - if (timerTask != null) { - timerTask.cancel() Review Comment: It does not work. The explanation is available [here](https://github.com/apache/spark/pull/47956#issuecomment-2328035086). Basically, the issue was we moved from `Timertask` API to `ScheduledThreadPoolExecutor` and the cancel method is no longer directly available as `ScheduledThreadPoolExecutor` returns a `ScheduledFuture` and we need to use [cancel](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/Future.html) method of `Future` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org