attilapiros commented on code in PR #50630:
URL: https://github.com/apache/spark/pull/50630#discussion_r2094104754


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2129,60 +2149,8 @@ private[spark] class DAGScheduler(
               // guaranteed to be determinate, so the input data of the 
reducers will not change
               // even if the map tasks are re-tried.
               if (mapStage.isIndeterminate) {
-                // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
-                // each stage only know its parents not children. Here we 
traverse the stages from
-                // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
-                // in the stage chains that connect to the `mapStage`. To 
speed up the stage
-                // traversing, we collect the stages to rollback first. If a 
stage needs to
-                // rollback, all its succeeding stages need to rollback to.
-                val stagesToRollback = HashSet[Stage](mapStage)
-
-                def collectStagesToRollback(stageChain: List[Stage]): Unit = {
-                  if (stagesToRollback.contains(stageChain.head)) {
-                    stageChain.drop(1).foreach(s => stagesToRollback += s)
-                  } else {
-                    stageChain.head.parents.foreach { s =>
-                      collectStagesToRollback(s :: stageChain)
-                    }
-                  }
-                }
-
-                def generateErrorMessage(stage: Stage): String = {
-                  "A shuffle map stage with indeterminate output was failed 
and retried. " +
-                    s"However, Spark cannot rollback the $stage to re-process 
the input data, " +
-                    "and has to fail this job. Please eliminate the 
indeterminacy by " +
-                    "checkpointing the RDD before repartition and try again."
-                }
-
-                activeJobs.foreach(job => 
collectStagesToRollback(job.finalStage :: Nil))
-
-                // The stages will be rolled back after checking
-                val rollingBackStages = HashSet[Stage](mapStage)
-                stagesToRollback.foreach {
-                  case mapStage: ShuffleMapStage =>
-                    val numMissingPartitions = 
mapStage.findMissingPartitions().length
-                    if (numMissingPartitions < mapStage.numTasks) {
-                      if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
-                        val reason = "A shuffle map stage with indeterminate 
output was failed " +
-                          "and retried. However, Spark can only do this while 
using the new " +
-                          "shuffle block fetching protocol. Please check the 
config " +
-                          "'spark.shuffle.useOldFetchProtocol', see more 
detail in " +
-                          "SPARK-27665 and SPARK-25341."
-                        abortStage(mapStage, reason, None)
-                      } else {
-                        rollingBackStages += mapStage
-                      }
-                    }
-
-                  case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
-                    val numMissingPartitions = 
resultStage.findMissingPartitions().length
-                    if (numMissingPartitions < resultStage.numTasks) {
-                      // TODO: support to rollback result tasks.
-                      abortStage(resultStage, 
generateErrorMessage(resultStage), None)
-                    }
-
-                  case _ =>
-                }
+                val stagesToRollback = collectSucceedingStages(mapStage)
+                val rollingBackStages = 
validateStageRollBacks(stagesToRollback)

Review Comment:
   Yes, this is the fail fast and fail early branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to