ahshahid commented on code in PR #50033:
URL: https://github.com/apache/spark/pull/50033#discussion_r1984505354


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -3185,16 +3197,106 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       "Spark can only do this while using the new shuffle block fetching 
protocol"))
   }
 
+  test("SPARK-51272: retry all the partitions of result stage, if the first 
result task" +
+    " has failed and ShuffleMap stage is inDeterminate") {
+    var resubmitFailedStageTriggered = false
+    val monitor = new Object()
+    this.dagSchedulerInterceptor = new DagSchedulerInterceptor {
+      override def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit 
= {
+        event match {
+          case ResubmitFailedStages =>
+               // Before the ResubmitFailedStages is added to the queue, add 
the successful
+               // partition task completion.
+              runEvent(makeCompletionEvent(taskSets(2).tasks(1), Success, 11))
+            monitor.synchronized {
+              resubmitFailedStageTriggered = true
+              monitor.notify()
+            }
+
+          case _ =>
+        }
+      }
+
+      override def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): 
Unit = {
+        event match {
+          case CompletionEvent(_, reason, _, _, _, _) =>
+            reason match {
+              case FetchFailed(_, _, _, _, _, _) =>
+                // Do not allow this thread to exit, till the 
ResubmitFailedStages
+                // in callback is received. This is to ensure that this thread
+                // does not exit and process the ResubmitFailedStage event, 
before
+                // the queue gets successful partition task completion
+                monitor.synchronized {
+                  if (!resubmitFailedStageTriggered) {
+                    monitor.wait()
+                  }
+                }
+
+              case _ =>
+            }
+
+          case _ =>
+        }
+      }

Review Comment:
   Not sure why you feel that way.
   What I found is that the MyDagScheduler 's EventLoop has at a time 
possibility of 2 threads operating on it.,
   If the first task is put in the queue, it is executed in the current thread, 
and in that duration if another task is put in the queue, there is a 
possibility that it may be picked by the current thread or the event loop 
thread.
   The test will not fail reliably if second task is picked by the current 
thread . The test will reliably fail only iff the subsequent tasks are picked 
by the event loop thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to