ahshahid commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r2042446623


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler(
                         abortStage(mapStage, reason, None)
                       } else {
                         rollingBackStages += mapStage
+                        mapOutputTracker.unregisterAllMapAndMergeOutput(
+                          mapStage.shuffleDep.shuffleId)
                       }
+                    } else {
+                      mapOutputTracker.unregisterAllMapAndMergeOutput(
+                        mapStage.shuffleDep.shuffleId)
                     }
 
                   case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
                     val numMissingPartitions = 
resultStage.findMissingPartitions().length
                     if (numMissingPartitions < resultStage.numTasks) {
                       // TODO: support to rollback result tasks.
                       abortStage(resultStage, 
generateErrorMessage(resultStage), None)
+                    } else {
+                      resultStage.markAllPartitionsMissing()

Review Comment:
   We are at this line, as per my understanding,  because the current stage ( 
result stage)  , its first task has failed. And the ResultStage is directly or 
indirectly dependent on an inDeterminate stage ( that is ResultStage is an 
inDeterminate type of stage).  The function call 
resultStage.markAllPartitionsMissing(), sets the flag for this stage , such 
that while this stage is being resubmitted for   re-execution ( a separate 
thread adds it back to the event queue), in that window , if any successful 
task comes for some other partition , it should be rejected. If it gets added , 
then that is the race. As it would other wise have resulted in refetch of some 
partitions ( & not all).
   This flag is checked at 
   line : 1881.
   && stage.isIndeterminate) || stage.shouldDiscardResult(task.stageAttemptId)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to