sririshindra commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r1979882434


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3185,16 +3198,101 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       "Spark can only do this while using the new shuffle block fetching 
protocol"))
   }
 
+  test("SPARK-51272: retry all the succeeding stages when the map stage is 
indeterminate with" +
+    " concurrent tasks completion") {
+    if (scheduler != null) {
+      this.afterEach()
+    }
+    val resubmitFailedStageTriggered = Array.fill[Boolean](1)(false)
+    val monitor = new Object()
+    this.dagSchedulerInterceptor = new DagSchedulerInterceptor {
+      override def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit 
= {
+        event match {
+          case ResubmitFailedStages =>
+              runEvent(makeCompletionEvent(taskSets(2).tasks(1), Success, 11))
+            monitor.synchronized {
+              resubmitFailedStageTriggered(0) = true
+              monitor.notify()
+            }
+
+          case _ =>
+        }
+      }
+
+      override def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): 
Unit = {
+        event match {
+          case CompletionEvent(_, reason, _, _, _, _) =>
+            reason match {
+              case FetchFailed(_, _, _, _, _, _) =>
+                monitor.synchronized {
+                  if (!resubmitFailedStageTriggered(0)) {
+                    monitor.wait()
+                  }
+                }
+
+              case _ =>
+            }
+
+          case _ =>
+        }
+      }
+    }
+
+    this.beforeEach()

Review Comment:
   Same. This is automatically executed by the test framework right. Why do we 
need to explicitly call this?



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3185,16 +3198,101 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       "Spark can only do this while using the new shuffle block fetching 
protocol"))
   }
 
+  test("SPARK-51272: retry all the succeeding stages when the map stage is 
indeterminate with" +
+    " concurrent tasks completion") {
+    if (scheduler != null) {
+      this.afterEach()
+    }

Review Comment:
   Do we really need this. I don't see this for other tests. Maybe this can be 
skipped?



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3185,16 +3197,101 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       "Spark can only do this while using the new shuffle block fetching 
protocol"))
   }
 
+  test("SPARK-51272: retry all the succeeding stages when the map stage is 
indeterminate with" +
+    " concurrent tasks completion") {
+    if (scheduler != null) {
+      this.afterEach()
+    }
+    val resubmitFailedStageTriggered = Array.fill[Boolean](1)(false)

Review Comment:
   Why is this not a boolean directly? Why are we using an array with only one 
element? Can't we have a single variable?



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -442,9 +452,11 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       sc.listenerBus,
       mapOutputTracker,
       blockManagerMaster,
-      sc.env))
+      sc.env
+    ))

Review Comment:
   This change is no longer needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to