Hello,

I have a small dataset that I use a test ground and which is working fine with Spark 2.1.1 to do some CART or RandomForest regression. It takes around 40 second to go from training to evaluation on a Spark context started locally via spark-submit.

I'm now in the process of validating my migration to Spark 3.5.2 and while the base functionality is there, I'm facing a strange situation where the process mentioned above takes the same amount of time as with 2.1.1 except for rare occasions where it would take upwards of 3 minutes to complete.

In the log, when it takes 40 seconds for the entire process, I see these timings from DecistionTree:

INFO RandomForest: Internal timing for DecisionTree:
INFO RandomForest:   init: 0.0094236 total: 4.1664414 findBestSplits: 4.1466146 chooseSplits: 4.1240084

But when it is extremely slow, I see this:

INFO RandomForest: Internal timing for DecisionTree:
INFO RandomForest:   init: 0.004392 total: 138.2320968 findBestSplits: 138.2209867 chooseSplits: 138.2152113

or this:

INFO RandomForest: Internal timing for DecisionTree:
INFO RandomForest:   init: 0.0032708 total: 218.9888818 findBestSplits: 218.9759717 chooseSplits: 218.9569924

In the first case, there is an additional error message, that is apparently ignored:

ERROR Inbox: Ignoring error
java.lang.NullPointerException
        at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$register(BlockManagerMasterEndpoint.scala:677)         at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:133)         at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)         at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

But in the second "slow" case, I can't see any exception, but I see quite a few more blocks of "collectAsMap" jobs from RandomForest like so:

INFO SparkContext: Created broadcast 20 from broadcast at RandomForest.scala:622
INFO SparkContext: Starting job: collectAsMap at RandomForest.scala:663
INFO DAGScheduler: Registering RDD 31 (mapPartitions at RandomForest.scala:644) as input to shuffle 6 INFO DAGScheduler: Got job 8 (collectAsMap at RandomForest.scala:663) with 1 output partitions INFO DAGScheduler: Final stage: ResultStage 15 (collectAsMap at RandomForest.scala:663)
INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 14)
INFO DAGScheduler: Missing parents: List(ShuffleMapStage 14)
INFO DAGScheduler: Submitting ShuffleMapStage 14 (MapPartitionsRDD[31] at mapPartitions at RandomForest.scala:644), which has no missing parents
WARN DAGScheduler: Broadcasting large task binary with size 1295.7 KiB
INFO MemoryStore: Block broadcast_21 stored as values in memory (estimated size 1295.7 KiB, free 4.6 GiB) INFO MemoryStore: Block broadcast_21_piece0 stored as bytes in memory (estimated size 615.3 KiB, free 4.6 GiB) INFO BlockManagerInfo: Added broadcast_21_piece0 in memory on computer.some.domain:62524 (size: 615.3 KiB, free: 4.6 GiB) INFO SparkContext: Created broadcast 21 from broadcast at DAGScheduler.scala:1585 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 14 (MapPartitionsRDD[31] at mapPartitions at RandomForest.scala:644) (first 15 tasks are for partitions Vector(0))
INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks resource profile 0
INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 14) (computer.some.domain, executor driver, partition 0, PROCESS_LOCAL, 9149 bytes)
INFO Executor: Running task 0.0 in stage 14.0 (TID 14)
INFO BlockManager: Found block rdd_15_0 locally
INFO Executor: Finished task 0.0 in stage 14.0 (TID 14). 1659 bytes result sent to driver INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 14) in 17493 ms on computer.some.domain (executor driver) (1/1) INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all completed, from pool INFO DAGScheduler: ShuffleMapStage 14 (mapPartitions at RandomForest.scala:644) finished in 17,524 s
INFO DAGScheduler: looking for newly runnable stages
INFO DAGScheduler: running: HashSet()
INFO DAGScheduler: waiting: HashSet(ResultStage 15)
INFO DAGScheduler: failed: HashSet()

Basically, the "Starting job: collectAsMap at RandomForest.scala:663" log line is repeated 10 times instead of 5 in the "fast case".

This is a bit of a "shot in the dark" message in the hopes someone will have a "I know this" moment that could put me on the track of a potential culprit.

Many thanks for any suggestion.

Olivier

Reply via email to