attilapiros commented on code in PR #49094: URL: https://github.com/apache/spark/pull/49094#discussion_r1882923441
########## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ########## @@ -496,22 +496,27 @@ private[spark] class AppStatusListener( val now = System.nanoTime() // Check if there are any pending stages that match this job; mark those as skipped. - val it = liveStages.entrySet.iterator() - while (it.hasNext()) { - val e = it.next() - if (job.stageIds.contains(e.getKey()._1)) { - val stage = e.getValue() + liveStages.forEach { (key, stage) => Review Comment: This is fine! I have double checked as `liveStages` is a `ConcurrentHashMap` during a `forEach` a `remove` can happen either from another thread or this and the traversing is prepared for any resizing. ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + // These dummy stages will belong only to one job + val job1Stage = new StageInfo(2, 0, "job1Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + val job2Stage = new StageInfo(3, 0, "job2Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + val job1Id = 1 + val job1StartEvent = SparkListenerJobStart(job1Id, 0L, Seq(sharedPendingStage, job1Stage)) + val job2Id = 2 + val job2StartEvent = SparkListenerJobStart(job2Id, 0L, Seq(sharedPendingStage, job2Stage)) + + listener.onApplicationStart(SparkListenerApplicationStart("app", Some("app"), 0L, "none", None)) + listener.onJobStart(job1StartEvent) + listener.onJobStart(job2StartEvent) + listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage)) + listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job1Stage)) + listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.stageIds == Seq(sharedStageId, 2)) Review Comment: ```suggestion assert(job.info.stageIds == Seq(sharedStageId, job1Stage.stageId)) ``` ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + // These dummy stages will belong only to one job + val job1Stage = new StageInfo(2, 0, "job1Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + val job2Stage = new StageInfo(3, 0, "job2Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + val job1Id = 1 + val job1StartEvent = SparkListenerJobStart(job1Id, 0L, Seq(sharedPendingStage, job1Stage)) + val job2Id = 2 + val job2StartEvent = SparkListenerJobStart(job2Id, 0L, Seq(sharedPendingStage, job2Stage)) + + listener.onApplicationStart(SparkListenerApplicationStart("app", Some("app"), 0L, "none", None)) + listener.onJobStart(job1StartEvent) + listener.onJobStart(job2StartEvent) + listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage)) + listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job1Stage)) + listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded)) + + check[JobDataWrapper](1) { job => Review Comment: ```suggestion check[JobDataWrapper](job1Id) { job => ``` ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, Review Comment: NIT: I would introduce a `val` named as `sharedStageAttemptId` as that 0 (especially down in the Array is bit cryptic for me). ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + // These dummy stages will belong only to one job + val job1Stage = new StageInfo(2, 0, "job1Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + val job2Stage = new StageInfo(3, 0, "job2Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + val job1Id = 1 + val job1StartEvent = SparkListenerJobStart(job1Id, 0L, Seq(sharedPendingStage, job1Stage)) + val job2Id = 2 + val job2StartEvent = SparkListenerJobStart(job2Id, 0L, Seq(sharedPendingStage, job2Stage)) + + listener.onApplicationStart(SparkListenerApplicationStart("app", Some("app"), 0L, "none", None)) + listener.onJobStart(job1StartEvent) + listener.onJobStart(job2StartEvent) + listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage)) + listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job1Stage)) + listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.stageIds == Seq(sharedStageId, 2)) + } + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.SKIPPED) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job2Stage)) + listener.onJobEnd(SparkListenerJobEnd(job2Id, 1L, JobSucceeded)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.SKIPPED) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + check[JobDataWrapper](2) { job => + assert(job.info.stageIds == Seq(sharedStageId, 3)) Review Comment: Same here. ########## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ########## @@ -1930,6 +1930,78 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(job.numActiveStages == 0) } + test("SPARK-50356: skip shared stage for all jobs") { + val listener = new AppStatusListener(store, conf, live = true) + + // This will be a shared stage in PENDING state which gets SKIPPED status + // when first job is completed + val sharedStageId = 1 + val sharedPendingStage = new StageInfo(sharedStageId, 0, "sharedStage", 10, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + // These dummy stages will belong only to one job + val job1Stage = new StageInfo(2, 0, "job1Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + val job2Stage = new StageInfo(3, 0, "job2Stage", 2, + Seq.empty, Seq.empty, "", resourceProfileId = 0) + + val job1Id = 1 + val job1StartEvent = SparkListenerJobStart(job1Id, 0L, Seq(sharedPendingStage, job1Stage)) + val job2Id = 2 + val job2StartEvent = SparkListenerJobStart(job2Id, 0L, Seq(sharedPendingStage, job2Stage)) + + listener.onApplicationStart(SparkListenerApplicationStart("app", Some("app"), 0L, "none", None)) + listener.onJobStart(job1StartEvent) + listener.onJobStart(job2StartEvent) + listener.onStageSubmitted(SparkListenerStageSubmitted(job1Stage)) + listener.onStageSubmitted(SparkListenerStageSubmitted(job2Stage)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job1Stage)) + listener.onJobEnd(SparkListenerJobEnd(job1Id, 1L, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.stageIds == Seq(sharedStageId, 2)) + } + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.SKIPPED) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + listener.onStageCompleted(SparkListenerStageCompleted(job2Stage)) + listener.onJobEnd(SparkListenerJobEnd(job2Id, 1L, JobSucceeded)) + + check[StageDataWrapper](Array(sharedStageId, 0)) { stage => + assert(stage.info.status === v1.StageStatus.SKIPPED) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + check[JobDataWrapper](2) { job => Review Comment: Same here. Please replace the literal with the corresponding val! ########## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ########## @@ -496,22 +496,27 @@ private[spark] class AppStatusListener( val now = System.nanoTime() // Check if there are any pending stages that match this job; mark those as skipped. - val it = liveStages.entrySet.iterator() - while (it.hasNext()) { - val e = it.next() - if (job.stageIds.contains(e.getKey()._1)) { - val stage = e.getValue() + liveStages.forEach { (key, stage) => + val stageId = key._1 + if (job.stageIds.contains(stageId)) { if (v1.StageStatus.PENDING.equals(stage.status)) { stage.status = v1.StageStatus.SKIPPED job.skippedStages += stage.info.stageId job.skippedTasks += stage.info.numTasks + // Before removing stage we need to update skipped data + // for all jobs referencing this stage + liveJobs.values.filter(_.stageIds.contains(stageId)).foreach { otherJob => Review Comment: The naming `otherJob` is right as the current job was already removed from the `liveJobs` in the beginning of this function! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org