attilapiros commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r1985859145


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1898,24 +1898,34 @@ private[spark] class DAGScheduler(
     // Make sure the task's accumulators are updated before any other 
processing happens, so that
     // we can post a task end event before any jobs or stages are updated. The 
accumulators are
     // only updated in certain cases.
-    event.reason match {
+    val isIndeterministicZombie = event.reason match {
       case Success =>
-        task match {
-          case rt: ResultTask[_, _] =>
-            val resultStage = stage.asInstanceOf[ResultStage]
-            resultStage.activeJob match {
-              case Some(job) =>
-                // Only update the accumulator once for each result task.
-                if (!job.finished(rt.outputId)) {
-                  updateAccumulators(event)
-                }
-              case None => // Ignore update if task's job has finished.
-            }
-          case _ =>
-            updateAccumulators(event)
+        val isZombieIndeterminate =

Review Comment:
   because of the `|| stage.treatAllPartitionsMissing(task.stageAttemptId)` I 
would use a different name, like `shouldTreatAllPartitionsAsMissing`



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1898,24 +1898,34 @@ private[spark] class DAGScheduler(
     // Make sure the task's accumulators are updated before any other 
processing happens, so that
     // we can post a task end event before any jobs or stages are updated. The 
accumulators are
     // only updated in certain cases.
-    event.reason match {
+    val isIndeterministicZombie = event.reason match {
       case Success =>
-        task match {
-          case rt: ResultTask[_, _] =>
-            val resultStage = stage.asInstanceOf[ResultStage]
-            resultStage.activeJob match {
-              case Some(job) =>
-                // Only update the accumulator once for each result task.
-                if (!job.finished(rt.outputId)) {
-                  updateAccumulators(event)
-                }
-              case None => // Ignore update if task's job has finished.
-            }
-          case _ =>
-            updateAccumulators(event)
+        val isZombieIndeterminate =
+          (task.stageAttemptId < stage.latestInfo.attemptNumber()
+            && stage.isIndeterminate) ||
+            stage.treatAllPartitionsMissing(task.stageAttemptId)

Review Comment:
   Nit: I would even rename this method too, like `treatAllPartitionsMissing` 
-> `shouldTreatAllPartitionsAsMissing`



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2171,15 +2183,23 @@ private[spark] class DAGScheduler(
                         abortStage(mapStage, reason, None)
                       } else {
                         rollingBackStages += mapStage
+                          mapStage.markAttemptIdForAllPartitionsMissing(
+                            mapStage.latestInfo.attemptNumber())
+                        }
+                      } else {
+                        mapStage.markAttemptIdForAllPartitionsMissing(
+                          mapStage.latestInfo.attemptNumber())
                       }
-                    }
 
                   case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
                     val numMissingPartitions = 
resultStage.findMissingPartitions().length
                     if (numMissingPartitions < resultStage.numTasks) {
                       // TODO: support to rollback result tasks.
                       abortStage(resultStage, 
generateErrorMessage(resultStage), None)
-                    }
+                      } else {

Review Comment:
   Nit: indentation for the whole block



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2171,15 +2183,23 @@ private[spark] class DAGScheduler(
                         abortStage(mapStage, reason, None)
                       } else {
                         rollingBackStages += mapStage
+                          mapStage.markAttemptIdForAllPartitionsMissing(

Review Comment:
   Nit: indentation error (for the next 6 lines too)



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3185,16 +3197,94 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       "Spark can only do this while using the new shuffle block fetching 
protocol"))
   }
 
+  test("SPARK-51272: retry all the partitions of result stage, if the first 
result task" +
+    " has failed and ShuffleMap stage is inDeterminate") {
+    val latch = new CountDownLatch(1)
+    this.dagSchedulerInterceptor = new DagSchedulerInterceptor {
+      override def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit 
= {
+        event match {
+          case ResubmitFailedStages =>
+              // Before the ResubmitFailedStages is added to the queue, add 
the successful
+              // partition task completion.
+              runEvent(makeCompletionEvent(taskSets(2).tasks(1), Success, 11))
+              latch.countDown()
+
+          case _ =>
+        }
+      }
+
+      override def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): 
Unit = {
+        event match {
+          case CompletionEvent(_, reason, _, _, _, _) =>
+            reason match {
+              case FetchFailed(_, _, _, _, _, _) =>
+                // Do not allow this thread to exit, till the 
ResubmitFailedStages
+                // in callback is received. This is to ensure that this thread
+                // does not exit and process the ResubmitFailedStage event, 
before
+                // the queue gets successful partition task completion
+                latch.await(50, TimeUnit.SECONDS)
+
+              case _ =>
+            }
+
+          case _ =>
+        }
+      }
+    }
+
+    val numPartitions = 2
+    val (shuffleId1, shuffleId2) = constructTwoIndeterminateStage()
+    completeShuffleMapStageSuccessfully(shuffleId2, 0, numPartitions)
+    val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
+    val activeJob = resultStage.activeJob
+    assert(activeJob.isDefined)
+    // The result stage is still waiting for its 2 tasks to complete
+    assert(resultStage.findMissingPartitions() == 
Seq.tabulate(numPartitions)(i => i))
+
+    // The below event is going to initiate the retry of previous 
indeterminate stages, and also
+    // the retry of all result tasks. But before the "ResubmitFailedStages" 
event is added to the
+    // queue of Scheduler, a successful completion of the result partition 
task is added to the
+    // event queue.  Due to scenario, the bug surfaces where instead of retry 
of all partitions
+    // of result tasks (2 tasks in total), only some (1 task) get retried
+    runEvent(
+      makeCompletionEvent(
+        taskSets(2).tasks(0),
+        FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, 
"ignored"),
+        null))
+    val shuffleStage1 = this.scheduler.shuffleIdToMapStage(shuffleId1)
+    val shuffleStage2 = this.scheduler.shuffleIdToMapStage(shuffleId2)
+    completeShuffleMapStageSuccessfully(0, 1, numPartitions)
+    import org.scalatest.concurrent.Eventually._
+    import org.scalatest.matchers.should.Matchers._
+    import org.scalatest.time.SpanSugar._
+    eventually(timeout(3.minutes), interval(500.milliseconds)) {
+      shuffleStage1.latestInfo.attemptNumber() should equal(1)
+    }
+    eventually(timeout(3.minutes), interval(500.milliseconds)) {
+      shuffleStage2.latestInfo.attemptNumber() should equal(1)
+    }
+    completeShuffleMapStageSuccessfully(1, 1, numPartitions)
+    eventually(timeout(3.minutes), interval(500.milliseconds)) {
+      resultStage.latestInfo.attemptNumber() should equal(1)
+    }
+    org.scalatest.Assertions.assert(resultStage.latestInfo.numTasks == 2)
+  }
+
   test("SPARK-25341: retry all the succeeding stages when the map stage is 
indeterminate") {
     val (shuffleId1, shuffleId2) = constructIndeterminateStageFetchFailed()
 
     // Check status for all failedStages
     val failedStages = scheduler.failedStages.toSeq
     assert(failedStages.map(_.id) == Seq(1, 2))
     // Shuffle blocks of "hostC" is lost, so first task of the 
`shuffleMapRdd2` needs to retry.
+    // TODO: THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage is 
inDeterminate all

Review Comment:
   Please remove the TODOs! You can provide an explanation in the PR 
description or as a comment for these lines but with these TODO it should not 
be merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to