sririshindra commented on code in PR #50033: URL: https://github.com/apache/spark/pull/50033#discussion_r1979882434
########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -3185,16 +3198,101 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti "Spark can only do this while using the new shuffle block fetching protocol")) } + test("SPARK-51272: retry all the succeeding stages when the map stage is indeterminate with" + + " concurrent tasks completion") { + if (scheduler != null) { + this.afterEach() + } + val resubmitFailedStageTriggered = Array.fill[Boolean](1)(false) + val monitor = new Object() + this.dagSchedulerInterceptor = new DagSchedulerInterceptor { + override def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit = { + event match { + case ResubmitFailedStages => + runEvent(makeCompletionEvent(taskSets(2).tasks(1), Success, 11)) + monitor.synchronized { + resubmitFailedStageTriggered(0) = true + monitor.notify() + } + + case _ => + } + } + + override def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): Unit = { + event match { + case CompletionEvent(_, reason, _, _, _, _) => + reason match { + case FetchFailed(_, _, _, _, _, _) => + monitor.synchronized { + if (!resubmitFailedStageTriggered(0)) { + monitor.wait() + } + } + + case _ => + } + + case _ => + } + } + } + + this.beforeEach() Review Comment: Same. This is automatically executed by the test framework right. Why do we need to explicitly call this? ########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -3185,16 +3198,101 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti "Spark can only do this while using the new shuffle block fetching protocol")) } + test("SPARK-51272: retry all the succeeding stages when the map stage is indeterminate with" + + " concurrent tasks completion") { + if (scheduler != null) { + this.afterEach() + } Review Comment: Do we really need this. I don't see this for other tests. Maybe this can be skipped? ########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -3185,16 +3197,101 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti "Spark can only do this while using the new shuffle block fetching protocol")) } + test("SPARK-51272: retry all the succeeding stages when the map stage is indeterminate with" + + " concurrent tasks completion") { + if (scheduler != null) { + this.afterEach() + } + val resubmitFailedStageTriggered = Array.fill[Boolean](1)(false) Review Comment: Why is this not a boolean directly? Why are we using an array with only one element? Can't we have a single variable? ########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -442,9 +452,11 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti sc.listenerBus, mapOutputTracker, blockManagerMaster, - sc.env)) + sc.env + )) Review Comment: This change is no longer needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org