jayadeep-jayaraman commented on code in PR #50020:
URL: https://github.com/apache/spark/pull/50020#discussion_r1973338339


##########
core/src/main/scala/org/apache/spark/BarrierCoordinator.scala:
##########
@@ -122,23 +124,40 @@ private[spark] class BarrierCoordinator(
     // Init a TimerTask for a barrier() call.
     private def initTimerTask(state: ContextBarrierState): Unit = {
       timerTask = new TimerTask {
-        override def run(): Unit = state.synchronized {
-          // Timeout current barrier() call, fail all the sync requests.
-          requesters.foreach(_.sendFailure(new SparkException("The coordinator 
didn't get all " +
-            s"barrier sync requests for barrier epoch $barrierEpoch from 
$barrierId within " +
-            s"$timeoutInSecs second(s).")))
-          cleanupBarrierStage(barrierId)
-        }
+        override def run(): Unit =
+          try {
+            state.synchronized {
+              if (!Thread.currentThread().isInterrupted()) {
+                // Timeout current barrier() call, fail all the sync requests.
+                requesters.foreach(
+                  _.sendFailure(new SparkException("The coordinator didn't get 
all " +
+                    s"barrier sync requests for barrier epoch +" +
+                    s" $barrierEpoch from $barrierId within " +
+                    s"$timeoutInSecs second(s).")))
+                cleanupBarrierStage(barrierId)
+              }
+            }
+          } catch {
+            case _: InterruptedException =>
+              // Handle interruption gracefully
+              Thread.currentThread().interrupt()
+            case e: Exception => new SparkException("Error during " +
+              s"running of barrier tasks for " +
+              s"$barrierId", e)
+          } finally {
+            // Ensure cleanup happens even if interrupted or exception occurs
+            cleanupBarrierStage(barrierId)
+            state.clear()
+          }
       }
     }
 
-    // Cancel the current active TimerTask and release resources.
+    /* Cancel the tasks scheduled to run inside the ScheduledExecutor 
Threadpool
+    * The original implementation was clearing java.util.Timer and 
java.util.TimerTasks
+    * This became a no-op when java.util.Timer was replaced with 
ScheduledThreadPoolExecutor
+    */
     private def cancelTimerTask(): Unit = {
-      if (timerTask != null) {
-        timerTask.cancel()

Review Comment:
   It does not work. The explanation is available 
[here](https://github.com/apache/spark/pull/47956#issuecomment-2328035086). 
Basically, the issue was we moved from `Timertask` API to 
`ScheduledThreadPoolExecutor`  and the cancel method is no longer directly 
available as `ScheduledThreadPoolExecutor` returns a `ScheduledFuture` and we 
need to use 
[cancel](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/Future.html)
 method of `Future` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to