attilapiros commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r1985391930


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3185,16 +3197,106 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       "Spark can only do this while using the new shuffle block fetching 
protocol"))
   }
 
+  test("SPARK-51272: retry all the partitions of result stage, if the first 
result task" +
+    " has failed and ShuffleMap stage is inDeterminate") {
+    var resubmitFailedStageTriggered = false
+    val monitor = new Object()
+    this.dagSchedulerInterceptor = new DagSchedulerInterceptor {
+      override def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit 
= {
+        event match {
+          case ResubmitFailedStages =>
+               // Before the ResubmitFailedStages is added to the queue, add 
the successful
+               // partition task completion.
+              runEvent(makeCompletionEvent(taskSets(2).tasks(1), Success, 11))
+            monitor.synchronized {
+              resubmitFailedStageTriggered = true
+              monitor.notify()
+            }
+
+          case _ =>
+        }
+      }
+
+      override def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): 
Unit = {
+        event match {
+          case CompletionEvent(_, reason, _, _, _, _) =>
+            reason match {
+              case FetchFailed(_, _, _, _, _, _) =>
+                // Do not allow this thread to exit, till the 
ResubmitFailedStages
+                // in callback is received. This is to ensure that this thread
+                // does not exit and process the ResubmitFailedStage event, 
before
+                // the queue gets successful partition task completion
+                monitor.synchronized {
+                  if (!resubmitFailedStageTriggered) {
+                    monitor.wait()
+                  }
+                }
+
+              case _ =>
+            }
+
+          case _ =>
+        }
+      }

Review Comment:
   I see. I have added some extra logging and now it is clear we have two 
threads:
   - `ScalaTest-main-running-DAGSchedulerSuite` for the posting of the 
`FetchFailed`
   - `dag-scheduler-message` for the posting of `ResubmitFailedStages`
   
   In this case please replace the monitor with a hatch
   ```
   -    var resubmitFailedStageTriggered = false
   -    val monitor = new Object()
   +    val latch = new CountDownLatch(1)
   ```
   
   Use `latch.countDown()` for `ResubmitFailedStages` case and `latch.await(10, 
TimeUnit.SECONDS)` at the `FetchFailed`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to