ahshahid commented on code in PR #50033: URL: https://github.com/apache/spark/pull/50033#discussion_r1988111586
########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -2171,15 +2183,23 @@ private[spark] class DAGScheduler( abortStage(mapStage, reason, None) } else { rollingBackStages += mapStage + mapStage.markAttemptIdForAllPartitionsMissing( + mapStage.latestInfo.attemptNumber()) + } + } else { + mapStage.markAttemptIdForAllPartitionsMissing( + mapStage.latestInfo.attemptNumber()) } - } case resultStage: ResultStage if resultStage.activeJob.isDefined => val numMissingPartitions = resultStage.findMissingPartitions().length if (numMissingPartitions < resultStage.numTasks) { // TODO: support to rollback result tasks. abortStage(resultStage, generateErrorMessage(resultStage), None) - } + } else { Review Comment: done. ########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -1898,24 +1898,34 @@ private[spark] class DAGScheduler( // Make sure the task's accumulators are updated before any other processing happens, so that // we can post a task end event before any jobs or stages are updated. The accumulators are // only updated in certain cases. - event.reason match { + val isIndeterministicZombie = event.reason match { case Success => - task match { - case rt: ResultTask[_, _] => - val resultStage = stage.asInstanceOf[ResultStage] - resultStage.activeJob match { - case Some(job) => - // Only update the accumulator once for each result task. - if (!job.finished(rt.outputId)) { - updateAccumulators(event) - } - case None => // Ignore update if task's job has finished. - } - case _ => - updateAccumulators(event) + val isZombieIndeterminate = Review Comment: done. ########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -1898,24 +1898,34 @@ private[spark] class DAGScheduler( // Make sure the task's accumulators are updated before any other processing happens, so that // we can post a task end event before any jobs or stages are updated. The accumulators are // only updated in certain cases. - event.reason match { + val isIndeterministicZombie = event.reason match { case Success => - task match { - case rt: ResultTask[_, _] => - val resultStage = stage.asInstanceOf[ResultStage] - resultStage.activeJob match { - case Some(job) => - // Only update the accumulator once for each result task. - if (!job.finished(rt.outputId)) { - updateAccumulators(event) - } - case None => // Ignore update if task's job has finished. - } - case _ => - updateAccumulators(event) + val isZombieIndeterminate = + (task.stageAttemptId < stage.latestInfo.attemptNumber() + && stage.isIndeterminate) || + stage.treatAllPartitionsMissing(task.stageAttemptId) Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org