attilapiros commented on code in PR #50630:
URL: https://github.com/apache/spark/pull/50630#discussion_r2049762666
##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -56,28 +56,31 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler:
DAGScheduler)
dagScheduler.setEventProcessLoop(this)
- private var isProcessing = false
- private val eventQueue = new ListBuffer[DAGSchedulerEvent]()
-
+ private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]()
override def post(event: DAGSchedulerEvent): Unit = {
- if (isProcessing) {
- // `DAGSchedulerEventProcessLoop` is guaranteed to process events
sequentially. So we should
- // buffer events for sequent processing later instead of processing them
recursively.
- eventQueue += event
- } else {
- try {
- isProcessing = true
- // Forward event to `onReceive` directly to avoid processing event
asynchronously.
- onReceive(event)
- } catch {
- case NonFatal(e) => onError(e)
- } finally {
- isProcessing = false
- }
- if (eventQueue.nonEmpty) {
- post(eventQueue.remove(0))
- }
+ // `DAGSchedulerEventProcessLoop` is guaranteed to process events
sequentially in the main test
Review Comment:
This part is modified as I have seen the following in the unit-tests.log
without it (focus on the thread names:
`pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite`,
`dag-scheduler-message`):
```
25/04/17 14:15:05.815 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1662
25/04/17 14:15:05.815 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: Submitting 2 missing tasks from
ResultStage 2 (DAGSchedulerSuiteRDD 2) (first 15 tasks are for partitions
Vector(0, 1))
25/04/17 14:15:05.816 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: Marking ResultStage 2 () as failed due
to a fetch failure from ShuffleMapStage 1 (RDD at DAGSchedulerSuite.scala:123)
25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: ResultStage 2 () failed in 3 ms due to
ignored
25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: Resubmitting ShuffleMapStage 1 (RDD at
DAGSchedulerSuite.scala:123) and ResultStage 2 () due to fetch failure
25/04/17 14:15:05.817 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: Executor lost: hostA-exec (epoch 3)
25/04/17 14:15:05.818 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: Shuffle files lost for executor:
hostA-exec (epoch 3)
25/04/17 14:15:06.023 dag-scheduler-message INFO
DAGSchedulerSuite$MyDAGScheduler: Resubmitting failed stages
25/04/17 14:15:06.024 dag-scheduler-message INFO
DAGSchedulerSuite$MyDAGScheduler: Submitting ShuffleMapStage 1
(DAGSchedulerSuiteRDD 0), which has no missing parents
25/04/17 14:15:06.025 dag-scheduler-message INFO MemoryStore: Block
broadcast_3 stored as values in memory (estimated size 2.9 KiB, free 2.4 GiB)
25/04/17 14:15:06.025 dag-scheduler-message INFO MemoryStore: Block
broadcast_3_piece0 stored as bytes in memory (estimated size 1825.0 B, free 2.4
GiB)
25/04/17 14:15:06.026 dag-scheduler-message INFO SparkContext: Created
broadcast 3 from broadcast at DAGScheduler.scala:1662
25/04/17 14:15:06.027 dag-scheduler-message INFO
DAGSchedulerSuite$MyDAGScheduler: Submitting 1 missing tasks from
ShuffleMapStage 1 (DAGSchedulerSuiteRDD 0) (first 15 tasks are for partitions
Vector(0))
25/04/17 14:15:06.028 dag-scheduler-message INFO
DAGSchedulerSuite$MyDAGScheduler: Submitting ShuffleMapStage 0
(DAGSchedulerSuiteRDD 1), which has no missing parents
25/04/17 14:15:06.029 dag-scheduler-message INFO MemoryStore: Block
broadcast_4 stored as values in memory (estimated size 2.9 KiB, free 2.4 GiB)
25/04/17 14:15:06.029 dag-scheduler-message INFO MemoryStore: Block
broadcast_4_piece0 stored as bytes in memory (estimated size 1826.0 B, free 2.4
GiB)
25/04/17 14:15:06.030 dag-scheduler-message INFO SparkContext: Created
broadcast 4 from broadcast at DAGScheduler.scala:1662
25/04/17 14:15:06.030 dag-scheduler-message INFO
DAGSchedulerSuite$MyDAGScheduler: Submitting 2 missing tasks from
ShuffleMapStage 0 (DAGSchedulerSuiteRDD 1) (first 15 tasks are for partitions
Vector(0, 1))
25/04/17 14:15:06.227 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: ShuffleMapStage 0 (RDD at
DAGSchedulerSuite.scala:123) finished in 198 ms
25/04/17 14:15:06.228 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: looking for newly runnable stages
25/04/17 14:15:06.228 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: running: HashSet(ShuffleMapStage 1)
25/04/17 14:15:06.229 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: waiting: HashSet(ResultStage 2)
25/04/17 14:15:06.229 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: failed: HashSet()
25/04/17 14:15:06.233 pool-1-thread-1-ScalaTest-running-DAGSchedulerSuite
INFO DAGSchedulerSuite$MyDAGScheduler: ShuffleMapStage 1 (RDD at
DAGSchedulerSuite.scala:123) finished in 209 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]