attilapiros commented on code in PR #49094:
URL: https://github.com/apache/spark/pull/49094#discussion_r1882923441


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -496,22 +496,27 @@ private[spark] class AppStatusListener(
       val now = System.nanoTime()
 
       // Check if there are any pending stages that match this job; mark those 
as skipped.
-      val it = liveStages.entrySet.iterator()
-      while (it.hasNext()) {
-        val e = it.next()
-        if (job.stageIds.contains(e.getKey()._1)) {
-          val stage = e.getValue()
+      liveStages.forEach { (key, stage) =>

Review Comment:
   This is fine! 
   
   I have double checked as `liveStages` is a `ConcurrentHashMap` during a 
`forEach` a `remove` can happen either from another thread or this and the 
traversing is prepared for any resizing.



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    // These dummy stages will belong only to one job
+    val job1Stage = new StageInfo(2, 0, "job1Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+    val job2Stage = new StageInfo(3, 0, "job2Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    val job1Id = 1
+    val job1StartEvent = SparkListenerJobStart(job1Id, 0L, 
Seq(sharedPendingStage, job1Stage))
+    val job2Id = 2
+    val job2StartEvent = SparkListenerJobStart(job2Id, 0L, 
Seq(sharedPendingStage, job2Stage))
+
+    listener.onApplicationStart(SparkListenerApplicationStart("app", 
Some("app"), 0L, "none", None))
+    listener.onJobStart(job1StartEvent)
+    listener.onJobStart(job2StartEvent)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.PENDING)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job1Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.stageIds == Seq(sharedStageId, 2))

Review Comment:
   ```suggestion
         assert(job.info.stageIds == Seq(sharedStageId, job1Stage.stageId))
   ```



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    // These dummy stages will belong only to one job
+    val job1Stage = new StageInfo(2, 0, "job1Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+    val job2Stage = new StageInfo(3, 0, "job2Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    val job1Id = 1
+    val job1StartEvent = SparkListenerJobStart(job1Id, 0L, 
Seq(sharedPendingStage, job1Stage))
+    val job2Id = 2
+    val job2StartEvent = SparkListenerJobStart(job2Id, 0L, 
Seq(sharedPendingStage, job2Stage))
+
+    listener.onApplicationStart(SparkListenerApplicationStart("app", 
Some("app"), 0L, "none", None))
+    listener.onJobStart(job1StartEvent)
+    listener.onJobStart(job2StartEvent)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.PENDING)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job1Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>

Review Comment:
   ```suggestion
       check[JobDataWrapper](job1Id) { job =>
   ```



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,

Review Comment:
   NIT: I would introduce a `val` named as `sharedStageAttemptId` as that 0 
(especially down in the Array is bit cryptic for me).



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    // These dummy stages will belong only to one job
+    val job1Stage = new StageInfo(2, 0, "job1Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+    val job2Stage = new StageInfo(3, 0, "job2Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    val job1Id = 1
+    val job1StartEvent = SparkListenerJobStart(job1Id, 0L, 
Seq(sharedPendingStage, job1Stage))
+    val job2Id = 2
+    val job2StartEvent = SparkListenerJobStart(job2Id, 0L, 
Seq(sharedPendingStage, job2Stage))
+
+    listener.onApplicationStart(SparkListenerApplicationStart("app", 
Some("app"), 0L, "none", None))
+    listener.onJobStart(job1StartEvent)
+    listener.onJobStart(job2StartEvent)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.PENDING)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job1Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.stageIds == Seq(sharedStageId, 2))
+    }
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.SKIPPED)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job2Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job2Id, 1L, JobSucceeded))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.SKIPPED)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    check[JobDataWrapper](2) { job =>
+      assert(job.info.stageIds == Seq(sharedStageId, 3))

Review Comment:
   Same here.



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    // These dummy stages will belong only to one job
+    val job1Stage = new StageInfo(2, 0, "job1Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+    val job2Stage = new StageInfo(3, 0, "job2Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    val job1Id = 1
+    val job1StartEvent = SparkListenerJobStart(job1Id, 0L, 
Seq(sharedPendingStage, job1Stage))
+    val job2Id = 2
+    val job2StartEvent = SparkListenerJobStart(job2Id, 0L, 
Seq(sharedPendingStage, job2Stage))
+
+    listener.onApplicationStart(SparkListenerApplicationStart("app", 
Some("app"), 0L, "none", None))
+    listener.onJobStart(job1StartEvent)
+    listener.onJobStart(job2StartEvent)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.PENDING)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job1Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.stageIds == Seq(sharedStageId, 2))
+    }
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.SKIPPED)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job2Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job2Id, 1L, JobSucceeded))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.SKIPPED)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    check[JobDataWrapper](2) { job =>

Review Comment:
   Same here. Please replace the literal with the corresponding val!



##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -496,22 +496,27 @@ private[spark] class AppStatusListener(
       val now = System.nanoTime()
 
       // Check if there are any pending stages that match this job; mark those 
as skipped.
-      val it = liveStages.entrySet.iterator()
-      while (it.hasNext()) {
-        val e = it.next()
-        if (job.stageIds.contains(e.getKey()._1)) {
-          val stage = e.getValue()
+      liveStages.forEach { (key, stage) =>
+        val stageId = key._1
+        if (job.stageIds.contains(stageId)) {
           if (v1.StageStatus.PENDING.equals(stage.status)) {
             stage.status = v1.StageStatus.SKIPPED
             job.skippedStages += stage.info.stageId
             job.skippedTasks += stage.info.numTasks
 
+            // Before removing stage we need to update skipped data
+            // for all jobs referencing this stage
+            liveJobs.values.filter(_.stageIds.contains(stageId)).foreach { 
otherJob =>

Review Comment:
   The naming `otherJob` is right as the current job was already removed from 
the `liveJobs` in the beginning of this function!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to