ahshahid commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r1988111586


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2171,15 +2183,23 @@ private[spark] class DAGScheduler(
                         abortStage(mapStage, reason, None)
                       } else {
                         rollingBackStages += mapStage
+                          mapStage.markAttemptIdForAllPartitionsMissing(
+                            mapStage.latestInfo.attemptNumber())
+                        }
+                      } else {
+                        mapStage.markAttemptIdForAllPartitionsMissing(
+                          mapStage.latestInfo.attemptNumber())
                       }
-                    }
 
                   case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
                     val numMissingPartitions = 
resultStage.findMissingPartitions().length
                     if (numMissingPartitions < resultStage.numTasks) {
                       // TODO: support to rollback result tasks.
                       abortStage(resultStage, 
generateErrorMessage(resultStage), None)
-                    }
+                      } else {

Review Comment:
   done.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1898,24 +1898,34 @@ private[spark] class DAGScheduler(
     // Make sure the task's accumulators are updated before any other 
processing happens, so that
     // we can post a task end event before any jobs or stages are updated. The 
accumulators are
     // only updated in certain cases.
-    event.reason match {
+    val isIndeterministicZombie = event.reason match {
       case Success =>
-        task match {
-          case rt: ResultTask[_, _] =>
-            val resultStage = stage.asInstanceOf[ResultStage]
-            resultStage.activeJob match {
-              case Some(job) =>
-                // Only update the accumulator once for each result task.
-                if (!job.finished(rt.outputId)) {
-                  updateAccumulators(event)
-                }
-              case None => // Ignore update if task's job has finished.
-            }
-          case _ =>
-            updateAccumulators(event)
+        val isZombieIndeterminate =

Review Comment:
   done.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1898,24 +1898,34 @@ private[spark] class DAGScheduler(
     // Make sure the task's accumulators are updated before any other 
processing happens, so that
     // we can post a task end event before any jobs or stages are updated. The 
accumulators are
     // only updated in certain cases.
-    event.reason match {
+    val isIndeterministicZombie = event.reason match {
       case Success =>
-        task match {
-          case rt: ResultTask[_, _] =>
-            val resultStage = stage.asInstanceOf[ResultStage]
-            resultStage.activeJob match {
-              case Some(job) =>
-                // Only update the accumulator once for each result task.
-                if (!job.finished(rt.outputId)) {
-                  updateAccumulators(event)
-                }
-              case None => // Ignore update if task's job has finished.
-            }
-          case _ =>
-            updateAccumulators(event)
+        val isZombieIndeterminate =
+          (task.stageAttemptId < stage.latestInfo.attemptNumber()
+            && stage.isIndeterminate) ||
+            stage.treatAllPartitionsMissing(task.stageAttemptId)

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to