attilapiros commented on code in PR #50630: URL: https://github.com/apache/spark/pull/50630#discussion_r2049762666
########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -56,28 +56,31 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) dagScheduler.setEventProcessLoop(this) - private var isProcessing = false - private val eventQueue = new ListBuffer[DAGSchedulerEvent]() - + private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]() override def post(event: DAGSchedulerEvent): Unit = { - if (isProcessing) { - // `DAGSchedulerEventProcessLoop` is guaranteed to process events sequentially. So we should - // buffer events for sequent processing later instead of processing them recursively. - eventQueue += event - } else { - try { - isProcessing = true - // Forward event to `onReceive` directly to avoid processing event asynchronously. - onReceive(event) - } catch { - case NonFatal(e) => onError(e) - } finally { - isProcessing = false - } - if (eventQueue.nonEmpty) { - post(eventQueue.remove(0)) - } + // `DAGSchedulerEventProcessLoop` is guaranteed to process events sequentially in the main test Review Comment: This part is modified as I have seen the following in the unit-tests.log without it (focus on the thread names: `pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite`, `dag-scheduler-message`): ``` 25/04/17 14:15:05.815 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1662 25/04/17 14:15:05.815 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Submitting 2 missing tasks from ResultStage 2 (DAGSchedulerSuiteRDD 2) (first 15 tasks are for partitions Vector(0, 1)) 25/04/17 14:15:05.816 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Marking ResultStage 2 () as failed due to a fetch failure from ShuffleMapStage 1 (RDD at DAGSchedulerSuite.scala:123) 25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: ResultStage 2 () failed in 3 ms due to ignored 25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Resubmitting ShuffleMapStage 1 (RDD at DAGSchedulerSuite.scala:123) and ResultStage 2 () due to fetch failure 25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Executor lost: hostA-exec (epoch 3) 25/04/17 14:15:05.818 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: Shuffle files lost for executor: hostA-exec (epoch 3) 25/04/17 14:15:06.023 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Resubmitting failed stages 25/04/17 14:15:06.024 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Submitting ShuffleMapStage 1 (DAGSchedulerSuiteRDD 0), which has no missing parents 25/04/17 14:15:06.025 dag-scheduler-message INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.9 KiB, free 2.4 GiB) 25/04/17 14:15:06.025 dag-scheduler-message INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1825.0 B, free 2.4 GiB) 25/04/17 14:15:06.026 dag-scheduler-message INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1662 25/04/17 14:15:06.027 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 1 (DAGSchedulerSuiteRDD 0) (first 15 tasks are for partitions Vector(0)) 25/04/17 14:15:06.028 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Submitting ShuffleMapStage 0 (DAGSchedulerSuiteRDD 1), which has no missing parents 25/04/17 14:15:06.029 dag-scheduler-message INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KiB, free 2.4 GiB) 25/04/17 14:15:06.029 dag-scheduler-message INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1826.0 B, free 2.4 GiB) 25/04/17 14:15:06.030 dag-scheduler-message INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1662 25/04/17 14:15:06.030 dag-scheduler-message INFO DAGSchedulerSuite$MyDAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (DAGSchedulerSuiteRDD 1) (first 15 tasks are for partitions Vector(0, 1)) 25/04/17 14:15:06.227 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: ShuffleMapStage 0 (RDD at DAGSchedulerSuite.scala:123) finished in 198 ms 25/04/17 14:15:06.228 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: looking for newly runnable stages 25/04/17 14:15:06.228 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: running: HashSet(ShuffleMapStage 1) 25/04/17 14:15:06.229 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: waiting: HashSet(ResultStage 2) 25/04/17 14:15:06.229 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: failed: HashSet() 25/04/17 14:15:06.233 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite INFO DAGSchedulerSuite$MyDAGScheduler: ShuffleMapStage 1 (RDD at DAGSchedulerSuite.scala:123) finished in 209 ms ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org