ahshahid commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r1966860945


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3185,16 +3217,103 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       "Spark can only do this while using the new shuffle block fetching 
protocol"))
   }
 
+  test("SPARK-51272: retry all the succeeding stages when the map stage is 
indeterminate with" +
+    " concurrent tasks completion") {
+    if (scheduler != null) {
+      this.afterEach()
+    }
+
+    val monitor = new Object()
+    val resubmitFailedStageReached = Array.fill[Boolean](1)(false)
+    this.dagSchedulerInterceptor = new DagSchedulerInterceptor {
+      override def interceptHandleTaskCompletion(event: CompletionEvent): Unit 
= {
+        event.reason match {
+          case Success if event.task.isInstanceOf[ResultTask[_, _]] =>
+            assert(resubmitFailedStageReached(0))
+            monitor.synchronized {
+              monitor.notify()
+            }
+
+          case _ =>
+        }
+      }
+
+      override def interceptResubmitFailedStages(): Unit = {
+        monitor.synchronized {
+          resubmitFailedStageReached(0) = true
+          monitor.notify()
+          monitor.wait()
+        }
+      }
+    }
+
+    this.beforeEach()
+
+    val numPartitions = 2
+    val (shuffleId1, shuffleId2) = constructTwoIndeterminateStage()
+    completeShuffleMapStageSuccessfully(shuffleId2, 0, numPartitions)
+    val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
+    val activeJob = resultStage.activeJob
+    assert(activeJob.isDefined)
+    // The result stage is still waiting for its 2 tasks to complete
+    assert(resultStage.findMissingPartitions() == 
Seq.tabulate(numPartitions)(i => i))
+    new Thread(() => {
+      runEventInCurrentThread(
+        makeCompletionEvent(
+          taskSets(2).tasks(0),
+          FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, 
"ignored"),
+          null))
+    }).start()
+
+    monitor.synchronized {
+      if (!resubmitFailedStageReached(0)) {
+        monitor.wait()
+      }
+    }
+    assert(resubmitFailedStageReached(0))
+    new Thread(() => {
+      runEventInCurrentThread(makeCompletionEvent(taskSets(2).tasks(1), 
Success, 11))
+    }).start()
+
+    val shuffleStage1 = this.scheduler.shuffleIdToMapStage(shuffleId1)
+    val shuffleStage2 = this.scheduler.shuffleIdToMapStage(shuffleId2)
+    var keepGoing = true
+    while (keepGoing) {
+      Thread.sleep(500)
+      keepGoing = shuffleStage1.latestInfo.attemptNumber() != 1
+    }
+    completeShuffleMapStageSuccessfully(0, 1, numPartitions)
+    keepGoing = true
+    while (keepGoing) {
+      Thread.sleep(500)
+      keepGoing = shuffleStage2.latestInfo.attemptNumber() != 1
+    }
+
+    completeShuffleMapStageSuccessfully(1, 1, numPartitions)
+    keepGoing = true
+    while (keepGoing) {
+      Thread.sleep(500)
+      keepGoing = resultStage.latestInfo.attemptNumber() != 1
+    }
+
+    assert(resultStage.latestInfo.numTasks == 2)
+  }
+
   test("SPARK-25341: retry all the succeeding stages when the map stage is 
indeterminate") {
     val (shuffleId1, shuffleId2) = constructIndeterminateStageFetchFailed()
 
     // Check status for all failedStages
     val failedStages = scheduler.failedStages.toSeq
     assert(failedStages.map(_.id) == Seq(1, 2))
     // Shuffle blocks of "hostC" is lost, so first task of the 
`shuffleMapRdd2` needs to retry.
+    // TODO: Asif THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage 
is inDeterminate all

Review Comment:
   done.



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4163,9 +4282,16 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     val failedStages = scheduler.failedStages.toSeq
     assert(failedStages.map(_.id) == Seq(1, 2))
     // Shuffle blocks of "hostC" is lost, so first task of the 
`shuffleMapRdd2` needs to retry.
+    // TODO: Asif THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage 
is inDeterminate all

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to