mridulm commented on code in PR #50033: URL: https://github.com/apache/spark/pull/50033#discussion_r2040749867
########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -1554,6 +1554,7 @@ private[spark] class DAGScheduler( case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable => mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId) sms.shuffleDep.newShuffleMergeState() + Review Comment: revert ########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler( abortStage(mapStage, reason, None) } else { rollingBackStages += mapStage + mapOutputTracker.unregisterAllMapAndMergeOutput( + mapStage.shuffleDep.shuffleId) } + } else { + mapOutputTracker.unregisterAllMapAndMergeOutput( + mapStage.shuffleDep.shuffleId) } case resultStage: ResultStage if resultStage.activeJob.isDefined => val numMissingPartitions = resultStage.findMissingPartitions().length if (numMissingPartitions < resultStage.numTasks) { // TODO: support to rollback result tasks. abortStage(resultStage, generateErrorMessage(resultStage), None) + } else { + resultStage.markAllPartitionsMissing() } case _ => } logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") + } else if (failedStage.isIndeterminate) { + failedStage match { + case resultStage: ResultStage if resultStage.activeJob.isDefined => + val numMissingPartitions = resultStage.findMissingPartitions().length + if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + abortStage(resultStage, generateErrorMessage(resultStage), None) + } else { + resultStage.markAllPartitionsMissing() + } + + case _ => Review Comment: `failedStage` has already been marked as failed. Given mapStage is not indeterminate, but only `failedStage` (the 'reducer' stage which is a `ResultStage`) is - we only need to recompute the failed partitions for it, not all partitions. ########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler( abortStage(mapStage, reason, None) } else { rollingBackStages += mapStage + mapOutputTracker.unregisterAllMapAndMergeOutput( + mapStage.shuffleDep.shuffleId) } + } else { + mapOutputTracker.unregisterAllMapAndMergeOutput( + mapStage.shuffleDep.shuffleId) } case resultStage: ResultStage if resultStage.activeJob.isDefined => val numMissingPartitions = resultStage.findMissingPartitions().length if (numMissingPartitions < resultStage.numTasks) { // TODO: support to rollback result tasks. abortStage(resultStage, generateErrorMessage(resultStage), None) + } else { + resultStage.markAllPartitionsMissing() Review Comment: (Here and in other places) If ResultStage does not have any missing tasks - why are we failing it ? ########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler( abortStage(mapStage, reason, None) } else { rollingBackStages += mapStage + mapOutputTracker.unregisterAllMapAndMergeOutput( + mapStage.shuffleDep.shuffleId) } + } else { + mapOutputTracker.unregisterAllMapAndMergeOutput( + mapStage.shuffleDep.shuffleId) Review Comment: Add if/else for `SHUFFLE_USE_OLD_FETCH_PROTOCOL`. Orthogonally, fiven we are in 4.0, perhaps it is time to drop `SHUFFLE_USE_OLD_FETCH_PROTOCOL` @attilapiros ? ########## core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala: ########## @@ -64,5 +75,16 @@ private[spark] class ResultStage( (0 until job.numPartitions).filter(id => !job.finished(id)) } + def markAllPartitionsMissing(): Unit = { + this.discardResultsForAttemptId = this.latestInfo.attemptNumber() + val job = activeJob.get + for (id <- 0 until job.numPartitions) { + job.finished(id) = false + } + } + + override def shouldDiscardResult(attemptId: Int): Boolean = + this.discardResultsForAttemptId >= attemptId Review Comment: Assuming both usecases for `markAllPartitionsMissing` are not required (based on discussion above), we can remove these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org