ahshahid commented on code in PR #50033: URL: https://github.com/apache/spark/pull/50033#discussion_r1984499520
########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -3185,16 +3197,106 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti "Spark can only do this while using the new shuffle block fetching protocol")) } + test("SPARK-51272: retry all the partitions of result stage, if the first result task" + + " has failed and ShuffleMap stage is inDeterminate") { + var resubmitFailedStageTriggered = false + val monitor = new Object() + this.dagSchedulerInterceptor = new DagSchedulerInterceptor { + override def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit = { + event match { + case ResubmitFailedStages => + // Before the ResubmitFailedStages is added to the queue, add the successful + // partition task completion. + runEvent(makeCompletionEvent(taskSets(2).tasks(1), Success, 11)) + monitor.synchronized { + resubmitFailedStageTriggered = true + monitor.notify() + } + + case _ => + } + } + + override def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): Unit = { + event match { + case CompletionEvent(_, reason, _, _, _, _) => + reason match { + case FetchFailed(_, _, _, _, _, _) => + // Do not allow this thread to exit, till the ResubmitFailedStages + // in callback is received. This is to ensure that this thread + // does not exit and process the ResubmitFailedStage event, before + // the queue gets successful partition task completion + monitor.synchronized { + if (!resubmitFailedStageTriggered) { + monitor.wait() + } + } + + case _ => + } + + case _ => + } + } + } + + val numPartitions = 2 + val (shuffleId1, shuffleId2) = constructTwoIndeterminateStage() + completeShuffleMapStageSuccessfully(shuffleId2, 0, numPartitions) + val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage] + val activeJob = resultStage.activeJob + assert(activeJob.isDefined) + // The result stage is still waiting for its 2 tasks to complete + assert(resultStage.findMissingPartitions() == Seq.tabulate(numPartitions)(i => i)) + + // The below event is going to initiate the retry of previous indeterminate stages, and also + // the retry of all result tasks. But before the "ResubmitFailedStages" event is added to the + // queue of Scheduler, a successful completion of the result partition task is added to the + // event queue. Due to scenario, the bug surfaces where instead of retry of all partitions + // of result tasks (2 tasks in total), only some (1 task) get retried + runEvent( + makeCompletionEvent( + taskSets(2).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, "ignored"), + null)) + + val shuffleStage1 = this.scheduler.shuffleIdToMapStage(shuffleId1) + val shuffleStage2 = this.scheduler.shuffleIdToMapStage(shuffleId2) + var keepGoing = true + while (keepGoing) { + Thread.sleep(500) + keepGoing = shuffleStage1.latestInfo.attemptNumber() != 1 + } Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org