attilapiros commented on code in PR #50946:
URL: https://github.com/apache/spark/pull/50946#discussion_r2096886187


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3048,6 +3072,90 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(countSubmittedMapStageAttempts() === 2)
   }
 
+  /**
+   * This function creates the following dependency graph:
+   *
+   * (determinate)        (indeterminate)
+   * shuffleMapRdd0       shuffleMapRDD1
+   *              \       /
+   *               \     /
+   *               finalRdd
+   *
+   * Both ShuffleMapRdds will be ShuffleMapStages with 2 partitions executed on
+   * hostA_exec and hostB_exec.
+   */
+  def constructMixedDeterminateDependencies():
+    (ShuffleDependency[_, _, _], ShuffleDependency[_, _, _]) = {
+    val numPartitions = 2
+    val shuffleMapRdd0 = new MyRDD(sc, numPartitions, Nil, indeterminate = 
false)
+    val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new 
HashPartitioner(2))
+
+    val shuffleMapRdd1 =
+      new MyRDD(sc, numPartitions, Nil, tracker = mapOutputTracker, 
indeterminate = true)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(2))
+
+    val finalRdd =
+      new MyRDD(sc, numPartitions, List(shuffleDep0, shuffleDep1), tracker = 
mapOutputTracker)
+
+    submit(finalRdd, Array(0, 1))
+
+    // Finish the first shuffle map stage.
+    completeShuffleMapStageSuccessfully(0, 0, numPartitions, Seq("hostA", 
"hostB"))
+    completeShuffleMapStageSuccessfully(1, 0, numPartitions, Seq("hostA", 
"hostB"))
+    assert(mapOutputTracker.findMissingPartitions(0) === Some(Seq.empty))
+    assert(mapOutputTracker.findMissingPartitions(1) === Some(Seq.empty))
+
+    (shuffleDep0, shuffleDep1)
+  }
+
+  test("SPARK-51272: re-submit of an indeterminate stage without partial 
result can succeed") {
+    val shuffleDeps = constructMixedDeterminateDependencies()
+    val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
+
+    // the fetch failure is from the determinate shuffle map stage but this 
leads to
+    // executor lost and removing the shuffle files generated by the 
indeterminate stage too
+    completeNextStageWithFetchFailure(resultStage.id, 0, shuffleDeps._1, 
"hostA")
+
+    Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
+    dagEventProcessLoopTester.runEvents()
+    assert(scheduler.runningStages.size === 2)
+    assert(scheduler.runningStages.forall(_.isInstanceOf[ShuffleMapStage]))
+
+    completeShuffleMapStageSuccessfully(0, 1, 2, Seq("hostA", "hostB"))
+    completeShuffleMapStageSuccessfully(1, 1, 2, Seq("hostA", "hostB"))
+    assert(scheduler.runningStages.size === 1)
+    assert(scheduler.runningStages.head === resultStage)
+    assert(resultStage.latestInfo.failureReason.isEmpty)
+
+    completeNextResultStageWithSuccess(resultStage.id, 1)
+  }
+
+  test("SPARK-51272: re-submit of an indeterminate stage with partial result 
will fail") {
+    val shuffleDeps = constructMixedDeterminateDependencies()
+    val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
+
+    runEvent(makeCompletionEvent(taskSets(2).tasks(0), Success, 42))
+    // the fetch failure is from the determinate shuffle map stage but this 
leads to
+    // executor lost and removing the shuffle files generated by the 
indeterminate stage too
+    runEvent(makeCompletionEvent(
+      taskSets(2).tasks(1),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleDeps._1.shuffleId, 0L, 
0, 0, "ignored"),
+      null))
+
+    dagEventProcessLoopTester.runEvents()
+    // resubmission has not yet happened, so job is still running
+    assert(scheduler.activeJobs.nonEmpty)
+    Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
+    dagEventProcessLoopTester.runEvents()
+
+    // all dependent jobs have been failed
+    assert(scheduler.runningStages.size === 0)
+    assert(scheduler.activeJobs.isEmpty)
+    assert(resultStage.latestInfo.failureReason.isDefined)
+    assert(resultStage.latestInfo.failureReason.get.contains("ignored"))

Review Comment:
   @cloud-fan I would like to raise your attention to this part as this assert 
in the master branch is a bit different:
   ```
     assert(resultStage.latestInfo.failureReason.get.
         contains("A shuffle map stage with indeterminate output was failed and 
retried. " +
           "However, Spark cannot rollback the ResultStage"))
    ```
    
    This is because here an `abortStage()` on the `ResultStage` is missing and 
only the previous abort is available as a result of the `FetchFailure`.
    
    The reason behind is a missing commit from 3.5: 
   -  [SPARK-50648][CORE] Cleanup zombie tasks in non-running stages when the 
job is cancelled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to