attilapiros commented on code in PR #50630:
URL: https://github.com/apache/spark/pull/50630#discussion_r2049762666


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -56,28 +56,31 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: 
DAGScheduler)
 
   dagScheduler.setEventProcessLoop(this)
 
-  private var isProcessing = false
-  private val eventQueue = new ListBuffer[DAGSchedulerEvent]()
-
+  private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]()
 
   override def post(event: DAGSchedulerEvent): Unit = {
-    if (isProcessing) {
-      // `DAGSchedulerEventProcessLoop` is guaranteed to process events 
sequentially. So we should
-      // buffer events for sequent processing later instead of processing them 
recursively.
-      eventQueue += event
-    } else {
-      try {
-        isProcessing = true
-        // Forward event to `onReceive` directly to avoid processing event 
asynchronously.
-        onReceive(event)
-      } catch {
-        case NonFatal(e) => onError(e)
-      } finally {
-        isProcessing = false
-      }
-      if (eventQueue.nonEmpty) {
-        post(eventQueue.remove(0))
-      }
+    // `DAGSchedulerEventProcessLoop` is guaranteed to process events 
sequentially in the main test

Review Comment:
   This part is modified as I have seen the following in the unit-tests.log 
without it (focus on the thread names: 
`pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite`, 
`dag-scheduler-message`): 
   
   ```
   25/04/17 14:15:05.815 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1662
   25/04/17 14:15:05.815 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: Submitting 2 missing tasks from 
ResultStage 2 (DAGSchedulerSuiteRDD 2) (first 15 tasks are for partitions 
Vector(0, 1))
   25/04/17 14:15:05.816 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: Marking ResultStage 2 () as failed due 
to a fetch failure from ShuffleMapStage 1 (RDD at DAGSchedulerSuite.scala:123)
   25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: ResultStage 2 () failed in 3 ms due to 
ignored
   25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: Resubmitting ShuffleMapStage 1 (RDD at 
DAGSchedulerSuite.scala:123) and ResultStage 2 () due to fetch failure
   25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: Executor lost: hostA-exec (epoch 3)
   25/04/17 14:15:05.818 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: Shuffle files lost for executor: 
hostA-exec (epoch 3)
   25/04/17 14:15:06.023 dag-scheduler-message INFO 
DAGSchedulerSuite$MyDAGScheduler: Resubmitting failed stages
   25/04/17 14:15:06.024 dag-scheduler-message INFO 
DAGSchedulerSuite$MyDAGScheduler: Submitting ShuffleMapStage 1 
(DAGSchedulerSuiteRDD 0), which has no missing parents
   25/04/17 14:15:06.025 dag-scheduler-message INFO MemoryStore: Block 
broadcast_3 stored as values in memory (estimated size 2.9 KiB, free 2.4 GiB)
   25/04/17 14:15:06.025 dag-scheduler-message INFO MemoryStore: Block 
broadcast_3_piece0 stored as bytes in memory (estimated size 1825.0 B, free 2.4 
GiB)
   25/04/17 14:15:06.026 dag-scheduler-message INFO SparkContext: Created 
broadcast 3 from broadcast at DAGScheduler.scala:1662
   25/04/17 14:15:06.027 dag-scheduler-message INFO 
DAGSchedulerSuite$MyDAGScheduler: Submitting 1 missing tasks from 
ShuffleMapStage 1 (DAGSchedulerSuiteRDD 0) (first 15 tasks are for partitions 
Vector(0))
   25/04/17 14:15:06.028 dag-scheduler-message INFO 
DAGSchedulerSuite$MyDAGScheduler: Submitting ShuffleMapStage 0 
(DAGSchedulerSuiteRDD 1), which has no missing parents
   25/04/17 14:15:06.029 dag-scheduler-message INFO MemoryStore: Block 
broadcast_4 stored as values in memory (estimated size 2.9 KiB, free 2.4 GiB)
   25/04/17 14:15:06.029 dag-scheduler-message INFO MemoryStore: Block 
broadcast_4_piece0 stored as bytes in memory (estimated size 1826.0 B, free 2.4 
GiB)
   25/04/17 14:15:06.030 dag-scheduler-message INFO SparkContext: Created 
broadcast 4 from broadcast at DAGScheduler.scala:1662
   25/04/17 14:15:06.030 dag-scheduler-message INFO 
DAGSchedulerSuite$MyDAGScheduler: Submitting 2 missing tasks from 
ShuffleMapStage 0 (DAGSchedulerSuiteRDD 1) (first 15 tasks are for partitions 
Vector(0, 1))
   25/04/17 14:15:06.227 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: ShuffleMapStage 0 (RDD at 
DAGSchedulerSuite.scala:123) finished in 198 ms
   25/04/17 14:15:06.228 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: looking for newly runnable stages
   25/04/17 14:15:06.228 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: running: HashSet(ShuffleMapStage 1)
   25/04/17 14:15:06.229 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: waiting: HashSet(ResultStage 2)
   25/04/17 14:15:06.229 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: failed: HashSet()
   25/04/17 14:15:06.233 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite 
INFO DAGSchedulerSuite$MyDAGScheduler: ShuffleMapStage 1 (RDD at 
DAGSchedulerSuite.scala:123) finished in 209 ms
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to