mikoszilard commented on code in PR #49094:
URL: https://github.com/apache/spark/pull/49094#discussion_r1886976527


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    // These dummy stages will belong only to one job
+    val job1Stage = new StageInfo(2, 0, "job1Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+    val job2Stage = new StageInfo(3, 0, "job2Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    val job1Id = 1
+    val job1StartEvent = SparkListenerJobStart(job1Id, 0L, 
Seq(sharedPendingStage, job1Stage))
+    val job2Id = 2
+    val job2StartEvent = SparkListenerJobStart(job2Id, 0L, 
Seq(sharedPendingStage, job2Stage))
+
+    listener.onApplicationStart(SparkListenerApplicationStart("app", 
Some("app"), 0L, "none", None))
+    listener.onJobStart(job1StartEvent)
+    listener.onJobStart(job2StartEvent)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.PENDING)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job1Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>

Review Comment:
   Thanks for pointing out now I'm using the value.



##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -496,22 +496,27 @@ private[spark] class AppStatusListener(
       val now = System.nanoTime()
 
       // Check if there are any pending stages that match this job; mark those 
as skipped.
-      val it = liveStages.entrySet.iterator()
-      while (it.hasNext()) {
-        val e = it.next()
-        if (job.stageIds.contains(e.getKey()._1)) {
-          val stage = e.getValue()
+      liveStages.forEach { (key, stage) =>

Review Comment:
   Thanks Attila for double checking this



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    // These dummy stages will belong only to one job
+    val job1Stage = new StageInfo(2, 0, "job1Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+    val job2Stage = new StageInfo(3, 0, "job2Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    val job1Id = 1
+    val job1StartEvent = SparkListenerJobStart(job1Id, 0L, 
Seq(sharedPendingStage, job1Stage))
+    val job2Id = 2
+    val job2StartEvent = SparkListenerJobStart(job2Id, 0L, 
Seq(sharedPendingStage, job2Stage))
+
+    listener.onApplicationStart(SparkListenerApplicationStart("app", 
Some("app"), 0L, "none", None))
+    listener.onJobStart(job1StartEvent)
+    listener.onJobStart(job2StartEvent)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.PENDING)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job1Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.stageIds == Seq(sharedStageId, 2))
+    }
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.SKIPPED)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job2Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job2Id, 1L, JobSucceeded))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.SKIPPED)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    check[JobDataWrapper](2) { job =>
+      assert(job.info.stageIds == Seq(sharedStageId, 3))

Review Comment:
   Thanks for pointing out now I'm using the value.



##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -496,22 +496,27 @@ private[spark] class AppStatusListener(
       val now = System.nanoTime()
 
       // Check if there are any pending stages that match this job; mark those 
as skipped.
-      val it = liveStages.entrySet.iterator()
-      while (it.hasNext()) {
-        val e = it.next()
-        if (job.stageIds.contains(e.getKey()._1)) {
-          val stage = e.getValue()
+      liveStages.forEach { (key, stage) =>
+        val stageId = key._1
+        if (job.stageIds.contains(stageId)) {
           if (v1.StageStatus.PENDING.equals(stage.status)) {
             stage.status = v1.StageStatus.SKIPPED
             job.skippedStages += stage.info.stageId
             job.skippedTasks += stage.info.numTasks
 
+            // Before removing stage we need to update skipped data
+            // for all jobs referencing this stage
+            liveJobs.values.filter(_.stageIds.contains(stageId)).foreach { 
otherJob =>

Review Comment:
   Thanks Attila I extended the comment just to make sure it is clear what is 
happening here.



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,

Review Comment:
   Thanks for pointing out now I'm using the new value.



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    // These dummy stages will belong only to one job
+    val job1Stage = new StageInfo(2, 0, "job1Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+    val job2Stage = new StageInfo(3, 0, "job2Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    val job1Id = 1
+    val job1StartEvent = SparkListenerJobStart(job1Id, 0L, 
Seq(sharedPendingStage, job1Stage))
+    val job2Id = 2
+    val job2StartEvent = SparkListenerJobStart(job2Id, 0L, 
Seq(sharedPendingStage, job2Stage))
+
+    listener.onApplicationStart(SparkListenerApplicationStart("app", 
Some("app"), 0L, "none", None))
+    listener.onJobStart(job1StartEvent)
+    listener.onJobStart(job2StartEvent)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.PENDING)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job1Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.stageIds == Seq(sharedStageId, 2))
+    }
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.SKIPPED)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job2Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job2Id, 1L, JobSucceeded))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.SKIPPED)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    check[JobDataWrapper](2) { job =>

Review Comment:
   Thanks for pointing out now I'm using the value.



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     assert(job.numActiveStages == 0)
   }
 
+  test("SPARK-50356: skip shared stage for all jobs") {
+    val listener = new AppStatusListener(store, conf, live = true)
+
+    // This will be a shared stage in PENDING state which gets SKIPPED status
+    // when first job is completed
+    val sharedStageId = 1
+    val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    // These dummy stages will belong only to one job
+    val job1Stage = new StageInfo(2, 0, "job1Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+    val job2Stage = new StageInfo(3, 0, "job2Stage", 2,
+      Seq.empty, Seq.empty, "", resourceProfileId = 0)
+
+    val job1Id = 1
+    val job1StartEvent = SparkListenerJobStart(job1Id, 0L, 
Seq(sharedPendingStage, job1Stage))
+    val job2Id = 2
+    val job2StartEvent = SparkListenerJobStart(job2Id, 0L, 
Seq(sharedPendingStage, job2Stage))
+
+    listener.onApplicationStart(SparkListenerApplicationStart("app", 
Some("app"), 0L, "none", None))
+    listener.onJobStart(job1StartEvent)
+    listener.onJobStart(job2StartEvent)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage))
+
+    check[StageDataWrapper](Array(sharedStageId, 0)) { stage =>
+      assert(stage.info.status === v1.StageStatus.PENDING)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    listener.onStageCompleted(SparkListenerStageCompleted(job1Stage))
+    listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.stageIds == Seq(sharedStageId, 2))

Review Comment:
   Thanks for pointing out now I'm using the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to