ahshahid commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r2042483321


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler(
                         abortStage(mapStage, reason, None)
                       } else {
                         rollingBackStages += mapStage
+                        mapOutputTracker.unregisterAllMapAndMergeOutput(
+                          mapStage.shuffleDep.shuffleId)
                       }
+                    } else {
+                      mapOutputTracker.unregisterAllMapAndMergeOutput(
+                        mapStage.shuffleDep.shuffleId)
                     }
 
                   case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
                     val numMissingPartitions = 
resultStage.findMissingPartitions().length
                     if (numMissingPartitions < resultStage.numTasks) {
                       // TODO: support to rollback result tasks.
                       abortStage(resultStage, 
generateErrorMessage(resultStage), None)
+                    } else {
+                      resultStage.markAllPartitionsMissing()
                     }
 
                   case _ =>
                 }
                 logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} 
with indeterminate output was failed, " +
                   log"we will roll back and rerun below stages which include 
itself and all its " +
                   log"indeterminate child stages: ${MDC(STAGES, 
rollingBackStages)}")
+              } else if (failedStage.isIndeterminate) {
+                failedStage match {
+                  case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
+                    val numMissingPartitions = 
resultStage.findMissingPartitions().length
+                    if (numMissingPartitions < resultStage.numTasks) {
+                      // TODO: support to rollback result tasks.
+                      abortStage(resultStage, 
generateErrorMessage(resultStage), None)
+                    } else {
+                      resultStage.markAllPartitionsMissing()
+                    }
+
+                  case _ =>

Review Comment:
   This is the case, where ResultStage is failing due to the first task.  The 
ShuffleID which caused the result stage to fail is Determinate. But the Result 
Stage ( in case of  Join) is dependent on two shuffle stages,  where the other 
shuffle stage is inDeterminate. ( which makes the ResultStage also 
inDeterminate).
   Now at this point its not known whether the other inDeterminate shuffle 
stage has also failed or not.
   Assuming that if it has also failed,  and one of the result task is marked 
successful or if not all partitions are retried, then we will get wrong results 
( the commented HA test can fail sporadically due to this problem).
   So in this case also, the behaviour needs to be the same as the previous one 
( where the failing Shuffle Stage is inDeterminate).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to