ahshahid commented on code in PR #50033: URL: https://github.com/apache/spark/pull/50033#discussion_r1982447456
########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -3185,16 +3198,101 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti "Spark can only do this while using the new shuffle block fetching protocol")) } + test("SPARK-51272: retry all the succeeding stages when the map stage is indeterminate with" + + " concurrent tasks completion") { + if (scheduler != null) { + this.afterEach() + } + val resubmitFailedStageTriggered = Array.fill[Boolean](1)(false) + val monitor = new Object() + this.dagSchedulerInterceptor = new DagSchedulerInterceptor { + override def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit = { + event match { + case ResubmitFailedStages => + runEvent(makeCompletionEvent(taskSets(2).tasks(1), Success, 11)) + monitor.synchronized { + resubmitFailedStageTriggered(0) = true + monitor.notify() + } + + case _ => + } + } + + override def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): Unit = { + event match { + case CompletionEvent(_, reason, _, _, _, _) => + reason match { + case FetchFailed(_, _, _, _, _, _) => + monitor.synchronized { + if (!resubmitFailedStageTriggered(0)) { + monitor.wait() + } + } + + case _ => + } + + case _ => + } + } + } + + this.beforeEach() Review Comment: thinking outlined in the afterEach... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org