attilapiros commented on code in PR #50630: URL: https://github.com/apache/spark/pull/50630#discussion_r2094104754
########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -2129,60 +2149,8 @@ private[spark] class DAGScheduler( // guaranteed to be determinate, so the input data of the reducers will not change // even if the map tasks are re-tried. if (mapStage.isIndeterminate) { - // It's a little tricky to find all the succeeding stages of `mapStage`, because - // each stage only know its parents not children. Here we traverse the stages from - // the leaf nodes (the result stages of active jobs), and rollback all the stages - // in the stage chains that connect to the `mapStage`. To speed up the stage - // traversing, we collect the stages to rollback first. If a stage needs to - // rollback, all its succeeding stages need to rollback to. - val stagesToRollback = HashSet[Stage](mapStage) - - def collectStagesToRollback(stageChain: List[Stage]): Unit = { - if (stagesToRollback.contains(stageChain.head)) { - stageChain.drop(1).foreach(s => stagesToRollback += s) - } else { - stageChain.head.parents.foreach { s => - collectStagesToRollback(s :: stageChain) - } - } - } - - def generateErrorMessage(stage: Stage): String = { - "A shuffle map stage with indeterminate output was failed and retried. " + - s"However, Spark cannot rollback the $stage to re-process the input data, " + - "and has to fail this job. Please eliminate the indeterminacy by " + - "checkpointing the RDD before repartition and try again." - } - - activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) - - // The stages will be rolled back after checking - val rollingBackStages = HashSet[Stage](mapStage) - stagesToRollback.foreach { - case mapStage: ShuffleMapStage => - val numMissingPartitions = mapStage.findMissingPartitions().length - if (numMissingPartitions < mapStage.numTasks) { - if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { - val reason = "A shuffle map stage with indeterminate output was failed " + - "and retried. However, Spark can only do this while using the new " + - "shuffle block fetching protocol. Please check the config " + - "'spark.shuffle.useOldFetchProtocol', see more detail in " + - "SPARK-27665 and SPARK-25341." - abortStage(mapStage, reason, None) - } else { - rollingBackStages += mapStage - } - } - - case resultStage: ResultStage if resultStage.activeJob.isDefined => - val numMissingPartitions = resultStage.findMissingPartitions().length - if (numMissingPartitions < resultStage.numTasks) { - // TODO: support to rollback result tasks. - abortStage(resultStage, generateErrorMessage(resultStage), None) - } - - case _ => - } + val stagesToRollback = collectSucceedingStages(mapStage) + val rollingBackStages = validateStageRollBacks(stagesToRollback) Review Comment: Yes, this is the fail fast and fail early branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org