ahshahid commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r2026114245
##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3185,16 +3202,164 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
"Spark can only do this while using the new shuffle block fetching
protocol"))
}
+
+
+ test("SPARK-51272: retry all the partitions of result stage, if the first
result task" +
+ " has failed and failing ShuffleMap stage is inDeterminate") {
+ this.dagSchedulerInterceptor = createDagInterceptorForSpark51272(
+ () => taskSets(2).tasks(1), "RELEASE_LATCH")
+
+ val numPartitions = 2
+ // The first shuffle stage is completed by the below function itself which
creates two
+ // indeterminate stages.
+ val (shuffleId1, shuffleId2) = constructTwoStages(
+ stage1InDeterminate = false, stage2InDeterminate = true)
+ completeShuffleMapStageSuccessfully(shuffleId2, 0, numPartitions)
+ val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
+ val activeJob = resultStage.activeJob
+ assert(activeJob.isDefined)
+ // The result stage is still waiting for its 2 tasks to complete
+ assert(resultStage.findMissingPartitions() ==
Seq.tabulate(numPartitions)(i => i))
+
+ // The below event is going to initiate the retry of previous
indeterminate stages, and also
+ // the retry of all result tasks. But before the "ResubmitFailedStages"
event is added to the
+ // queue of Scheduler, a successful completion of the result partition
task is added to the
+ // event queue. Due to scenario, the bug surfaces where instead of retry
of all partitions
+ // of result tasks (2 tasks in total), only some (1 task) get retried
+ runEvent(
+ makeCompletionEvent(
+ taskSets(2).tasks(0),
+ FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0,
"ignored"),
Review Comment:
I understand that my tests should not have transitive dependency. I have
fixed the tests .. what I m trying to say is
that source code change will not be needed, because as tests without fix
were failing and are passing with fix, for transitive dependency, they will
also pass for result stage depending on 2 independent shuffle stages.
Pls do note that in all 3 tests , the testing exception is introduced in
result stage and not during individual shuffle stages.
Also please note that though other tests implicitly assume that for
shuffleId 0 , the stage Id will be 0, and for shuffleId 1 , stage ID will be
1, this might be true for transitive dependency.
But for direct dependency , this is no longer true. The shuffleIDs are the
fundamental numbers, and stageID need to be obtained by looking up the stage
from the shuffleId to StageId map.
Also, in the commit, no final assertions in the tests , which determine
pass or fail, have been changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]