ahshahid commented on code in PR #50033: URL: https://github.com/apache/spark/pull/50033#discussion_r1966860945
########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -3185,16 +3217,103 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti "Spark can only do this while using the new shuffle block fetching protocol")) } + test("SPARK-51272: retry all the succeeding stages when the map stage is indeterminate with" + + " concurrent tasks completion") { + if (scheduler != null) { + this.afterEach() + } + + val monitor = new Object() + val resubmitFailedStageReached = Array.fill[Boolean](1)(false) + this.dagSchedulerInterceptor = new DagSchedulerInterceptor { + override def interceptHandleTaskCompletion(event: CompletionEvent): Unit = { + event.reason match { + case Success if event.task.isInstanceOf[ResultTask[_, _]] => + assert(resubmitFailedStageReached(0)) + monitor.synchronized { + monitor.notify() + } + + case _ => + } + } + + override def interceptResubmitFailedStages(): Unit = { + monitor.synchronized { + resubmitFailedStageReached(0) = true + monitor.notify() + monitor.wait() + } + } + } + + this.beforeEach() + + val numPartitions = 2 + val (shuffleId1, shuffleId2) = constructTwoIndeterminateStage() + completeShuffleMapStageSuccessfully(shuffleId2, 0, numPartitions) + val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage] + val activeJob = resultStage.activeJob + assert(activeJob.isDefined) + // The result stage is still waiting for its 2 tasks to complete + assert(resultStage.findMissingPartitions() == Seq.tabulate(numPartitions)(i => i)) + new Thread(() => { + runEventInCurrentThread( + makeCompletionEvent( + taskSets(2).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, "ignored"), + null)) + }).start() + + monitor.synchronized { + if (!resubmitFailedStageReached(0)) { + monitor.wait() + } + } + assert(resubmitFailedStageReached(0)) + new Thread(() => { + runEventInCurrentThread(makeCompletionEvent(taskSets(2).tasks(1), Success, 11)) + }).start() + + val shuffleStage1 = this.scheduler.shuffleIdToMapStage(shuffleId1) + val shuffleStage2 = this.scheduler.shuffleIdToMapStage(shuffleId2) + var keepGoing = true + while (keepGoing) { + Thread.sleep(500) + keepGoing = shuffleStage1.latestInfo.attemptNumber() != 1 + } + completeShuffleMapStageSuccessfully(0, 1, numPartitions) + keepGoing = true + while (keepGoing) { + Thread.sleep(500) + keepGoing = shuffleStage2.latestInfo.attemptNumber() != 1 + } + + completeShuffleMapStageSuccessfully(1, 1, numPartitions) + keepGoing = true + while (keepGoing) { + Thread.sleep(500) + keepGoing = resultStage.latestInfo.attemptNumber() != 1 + } + + assert(resultStage.latestInfo.numTasks == 2) + } + test("SPARK-25341: retry all the succeeding stages when the map stage is indeterminate") { val (shuffleId1, shuffleId2) = constructIndeterminateStageFetchFailed() // Check status for all failedStages val failedStages = scheduler.failedStages.toSeq assert(failedStages.map(_.id) == Seq(1, 2)) // Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry. + // TODO: Asif THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage is inDeterminate all Review Comment: done. ########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -4163,9 +4282,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val failedStages = scheduler.failedStages.toSeq assert(failedStages.map(_.id) == Seq(1, 2)) // Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry. + // TODO: Asif THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage is inDeterminate all Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org