mikoszilard commented on code in PR #49094: URL: https://github.com/apache/spark/pull/49094#discussion_r1886976527
########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + // These dummy stages will belong only to one job + val job1Stage = new StageInfo(2, 0, "job1Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + val job2Stage = new StageInfo(3, 0, "job2Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + val job1Id = 1 + val job1StartEvent = SparkListenerJobStart(job1Id, 0L, Seq(sharedPendingStage, job1Stage)) + val job2Id = 2 + val job2StartEvent = SparkListenerJobStart(job2Id, 0L, Seq(sharedPendingStage, job2Stage)) + + listener.onApplicationStart(SparkListenerApplicationStart("app", Some("app"), 0L, "none", None)) + listener.onJobStart(job1StartEvent) + listener.onJobStart(job2StartEvent) + listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage)) + listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job1Stage)) + listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded)) + + check[JobDataWrapper](1) { job => Review Comment: Thanks for pointing out now I'm using the value. ########## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ########## @@ -496,22 +496,27 @@ private[spark] class AppStatusListener( val now = System.nanoTime() // Check if there are any pending stages that match this job; mark those as skipped. - val it = liveStages.entrySet.iterator() - while (it.hasNext()) { - val e = it.next() - if (job.stageIds.contains(e.getKey()._1)) { - val stage = e.getValue() + liveStages.forEach { (key, stage) => Review Comment: Thanks Attila for double checking this ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + // These dummy stages will belong only to one job + val job1Stage = new StageInfo(2, 0, "job1Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + val job2Stage = new StageInfo(3, 0, "job2Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + val job1Id = 1 + val job1StartEvent = SparkListenerJobStart(job1Id, 0L, Seq(sharedPendingStage, job1Stage)) + val job2Id = 2 + val job2StartEvent = SparkListenerJobStart(job2Id, 0L, Seq(sharedPendingStage, job2Stage)) + + listener.onApplicationStart(SparkListenerApplicationStart("app", Some("app"), 0L, "none", None)) + listener.onJobStart(job1StartEvent) + listener.onJobStart(job2StartEvent) + listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage)) + listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job1Stage)) + listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.stageIds == Seq(sharedStageId, 2)) + } + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.SKIPPED) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job2Stage)) + listener.onJobEnd(SparkListenerJobEnd(job2Id, 1L, JobSucceeded)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.SKIPPED) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + check[JobDataWrapper](2) { job => + assert(job.info.stageIds == Seq(sharedStageId, 3)) Review Comment: Thanks for pointing out now I'm using the value. ########## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ########## @@ -496,22 +496,27 @@ private[spark] class AppStatusListener( val now = System.nanoTime() // Check if there are any pending stages that match this job; mark those as skipped. - val it = liveStages.entrySet.iterator() - while (it.hasNext()) { - val e = it.next() - if (job.stageIds.contains(e.getKey()._1)) { - val stage = e.getValue() + liveStages.forEach { (key, stage) => + val stageId = key._1 + if (job.stageIds.contains(stageId)) { if (v1.StageStatus.PENDING.equals(stage.status)) { stage.status = v1.StageStatus.SKIPPED job.skippedStages += stage.info.stageId job.skippedTasks += stage.info.numTasks + // Before removing stage we need to update skipped data + // for all jobs referencing this stage + liveJobs.values.filter(_.stageIds.contains(stageId)).foreach { otherJob => Review Comment: Thanks Attila I extended the comment just to make sure it is clear what is happening here. ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, Review Comment: Thanks for pointing out now I'm using the new value. ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + // These dummy stages will belong only to one job + val job1Stage = new StageInfo(2, 0, "job1Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + val job2Stage = new StageInfo(3, 0, "job2Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + val job1Id = 1 + val job1StartEvent = SparkListenerJobStart(job1Id, 0L, Seq(sharedPendingStage, job1Stage)) + val job2Id = 2 + val job2StartEvent = SparkListenerJobStart(job2Id, 0L, Seq(sharedPendingStage, job2Stage)) + + listener.onApplicationStart(SparkListenerApplicationStart("app", Some("app"), 0L, "none", None)) + listener.onJobStart(job1StartEvent) + listener.onJobStart(job2StartEvent) + listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage)) + listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job1Stage)) + listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.stageIds == Seq(sharedStageId, 2)) + } + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.SKIPPED) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job2Stage)) + listener.onJobEnd(SparkListenerJobEnd(job2Id, 1L, JobSucceeded)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.SKIPPED) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + check[JobDataWrapper](2) { job => Review Comment: Thanks for pointing out now I'm using the value. ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + // These dummy stages will belong only to one job + val job1Stage = new StageInfo(2, 0, "job1Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + val job2Stage = new StageInfo(3, 0, "job2Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + val job1Id = 1 + val job1StartEvent = SparkListenerJobStart(job1Id, 0L, Seq(sharedPendingStage, job1Stage)) + val job2Id = 2 + val job2StartEvent = SparkListenerJobStart(job2Id, 0L, Seq(sharedPendingStage, job2Stage)) + + listener.onApplicationStart(SparkListenerApplicationStart("app", Some("app"), 0L, "none", None)) + listener.onJobStart(job1StartEvent) + listener.onJobStart(job2StartEvent) + listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage)) + listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job1Stage)) + listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.stageIds == Seq(sharedStageId, 2)) Review Comment: Thanks for pointing out now I'm using the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org