attilapiros commented on code in PR #50946: URL: https://github.com/apache/spark/pull/50946#discussion_r2096886187
########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -3048,6 +3072,90 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(countSubmittedMapStageAttempts() === 2) } + /** + * This function creates the following dependency graph: + * + * (determinate) (indeterminate) + * shuffleMapRdd0 shuffleMapRDD1 + * \ / + * \ / + * finalRdd + * + * Both ShuffleMapRdds will be ShuffleMapStages with 2 partitions executed on + * hostA_exec and hostB_exec. + */ + def constructMixedDeterminateDependencies(): + (ShuffleDependency[_, _, _], ShuffleDependency[_, _, _]) = { + val numPartitions = 2 + val shuffleMapRdd0 = new MyRDD(sc, numPartitions, Nil, indeterminate = false) + val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new HashPartitioner(2)) + + val shuffleMapRdd1 = + new MyRDD(sc, numPartitions, Nil, tracker = mapOutputTracker, indeterminate = true) + val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2)) + + val finalRdd = + new MyRDD(sc, numPartitions, List(shuffleDep0, shuffleDep1), tracker = mapOutputTracker) + + submit(finalRdd, Array(0, 1)) + + // Finish the first shuffle map stage. + completeShuffleMapStageSuccessfully(0, 0, numPartitions, Seq("hostA", "hostB")) + completeShuffleMapStageSuccessfully(1, 0, numPartitions, Seq("hostA", "hostB")) + assert(mapOutputTracker.findMissingPartitions(0) === Some(Seq.empty)) + assert(mapOutputTracker.findMissingPartitions(1) === Some(Seq.empty)) + + (shuffleDep0, shuffleDep1) + } + + test("SPARK-51272: re-submit of an indeterminate stage without partial result can succeed") { + val shuffleDeps = constructMixedDeterminateDependencies() + val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage] + + // the fetch failure is from the determinate shuffle map stage but this leads to + // executor lost and removing the shuffle files generated by the indeterminate stage too + completeNextStageWithFetchFailure(resultStage.id, 0, shuffleDeps._1, "hostA") + + Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() + assert(scheduler.runningStages.size === 2) + assert(scheduler.runningStages.forall(_.isInstanceOf[ShuffleMapStage])) + + completeShuffleMapStageSuccessfully(0, 1, 2, Seq("hostA", "hostB")) + completeShuffleMapStageSuccessfully(1, 1, 2, Seq("hostA", "hostB")) + assert(scheduler.runningStages.size === 1) + assert(scheduler.runningStages.head === resultStage) + assert(resultStage.latestInfo.failureReason.isEmpty) + + completeNextResultStageWithSuccess(resultStage.id, 1) + } + + test("SPARK-51272: re-submit of an indeterminate stage with partial result will fail") { + val shuffleDeps = constructMixedDeterminateDependencies() + val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage] + + runEvent(makeCompletionEvent(taskSets(2).tasks(0), Success, 42)) + // the fetch failure is from the determinate shuffle map stage but this leads to + // executor lost and removing the shuffle files generated by the indeterminate stage too + runEvent(makeCompletionEvent( + taskSets(2).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleDeps._1.shuffleId, 0L, 0, 0, "ignored"), + null)) + + dagEventProcessLoopTester.runEvents() + // resubmission has not yet happened, so job is still running + assert(scheduler.activeJobs.nonEmpty) + Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + dagEventProcessLoopTester.runEvents() + + // all dependent jobs have been failed + assert(scheduler.runningStages.size === 0) + assert(scheduler.activeJobs.isEmpty) + assert(resultStage.latestInfo.failureReason.isDefined) + assert(resultStage.latestInfo.failureReason.get.contains("ignored")) Review Comment: @cloud-fan I would like to raise your attention to this part as this assert in the master branch is a bit different: ``` assert(resultStage.latestInfo.failureReason.get. contains("A shuffle map stage with indeterminate output was failed and retried. " + "However, Spark cannot rollback the ResultStage")) ``` This is because here an `abortStage()` on the `ResultStage` is missing and only the previous abort is available as a result of the `FetchFailure`. The reason behind is a missing commit from 3.5: - [SPARK-50648][CORE] Cleanup zombie tasks in non-running stages when the job is cancelled -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org