mridulm commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r2040749867


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1554,6 +1554,7 @@ private[spark] class DAGScheduler(
       case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
         
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
         sms.shuffleDep.newShuffleMergeState()
+

Review Comment:
   revert



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler(
                         abortStage(mapStage, reason, None)
                       } else {
                         rollingBackStages += mapStage
+                        mapOutputTracker.unregisterAllMapAndMergeOutput(
+                          mapStage.shuffleDep.shuffleId)
                       }
+                    } else {
+                      mapOutputTracker.unregisterAllMapAndMergeOutput(
+                        mapStage.shuffleDep.shuffleId)
                     }
 
                   case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
                     val numMissingPartitions = 
resultStage.findMissingPartitions().length
                     if (numMissingPartitions < resultStage.numTasks) {
                       // TODO: support to rollback result tasks.
                       abortStage(resultStage, 
generateErrorMessage(resultStage), None)
+                    } else {
+                      resultStage.markAllPartitionsMissing()
                     }
 
                   case _ =>
                 }
                 logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} 
with indeterminate output was failed, " +
                   log"we will roll back and rerun below stages which include 
itself and all its " +
                   log"indeterminate child stages: ${MDC(STAGES, 
rollingBackStages)}")
+              } else if (failedStage.isIndeterminate) {
+                failedStage match {
+                  case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
+                    val numMissingPartitions = 
resultStage.findMissingPartitions().length
+                    if (numMissingPartitions < resultStage.numTasks) {
+                      // TODO: support to rollback result tasks.
+                      abortStage(resultStage, 
generateErrorMessage(resultStage), None)
+                    } else {
+                      resultStage.markAllPartitionsMissing()
+                    }
+
+                  case _ =>

Review Comment:
   `failedStage` has already been marked as failed.
   
   Given mapStage is not indeterminate, but only `failedStage` (the 'reducer' 
stage which is a `ResultStage`) is - we only need to recompute the failed 
partitions for it, not all partitions.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler(
                         abortStage(mapStage, reason, None)
                       } else {
                         rollingBackStages += mapStage
+                        mapOutputTracker.unregisterAllMapAndMergeOutput(
+                          mapStage.shuffleDep.shuffleId)
                       }
+                    } else {
+                      mapOutputTracker.unregisterAllMapAndMergeOutput(
+                        mapStage.shuffleDep.shuffleId)
                     }
 
                   case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
                     val numMissingPartitions = 
resultStage.findMissingPartitions().length
                     if (numMissingPartitions < resultStage.numTasks) {
                       // TODO: support to rollback result tasks.
                       abortStage(resultStage, 
generateErrorMessage(resultStage), None)
+                    } else {
+                      resultStage.markAllPartitionsMissing()

Review Comment:
   (Here and in other places) If ResultStage does not have any missing tasks - 
why are we failing it ?



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler(
                         abortStage(mapStage, reason, None)
                       } else {
                         rollingBackStages += mapStage
+                        mapOutputTracker.unregisterAllMapAndMergeOutput(
+                          mapStage.shuffleDep.shuffleId)
                       }
+                    } else {
+                      mapOutputTracker.unregisterAllMapAndMergeOutput(
+                        mapStage.shuffleDep.shuffleId)

Review Comment:
   Add if/else for `SHUFFLE_USE_OLD_FETCH_PROTOCOL`.
   Orthogonally, fiven we are in 4.0, perhaps it is time to drop 
`SHUFFLE_USE_OLD_FETCH_PROTOCOL` @attilapiros  ?



##########
core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala:
##########
@@ -64,5 +75,16 @@ private[spark] class ResultStage(
     (0 until job.numPartitions).filter(id => !job.finished(id))
   }
 
+  def markAllPartitionsMissing(): Unit = {
+    this.discardResultsForAttemptId = this.latestInfo.attemptNumber()
+    val job = activeJob.get
+    for (id <- 0 until job.numPartitions) {
+      job.finished(id) = false
+    }
+  }
+
+  override def shouldDiscardResult(attemptId: Int): Boolean =
+    this.discardResultsForAttemptId >= attemptId

Review Comment:
   Assuming both usecases for `markAllPartitionsMissing` are not required 
(based on discussion above), we can remove these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to