Hello,
I have a small dataset that I use a test ground and which is working
fine with Spark 2.1.1 to do some CART or RandomForest regression.
It takes around 40 second to go from training to evaluation on a Spark
context started locally via spark-submit.
I'm now in the process of validating my migration to Spark 3.5.2 and
while the base functionality is there, I'm facing a strange situation
where the process mentioned above takes the same amount of time as with
2.1.1 except for rare occasions where it would take upwards of 3 minutes
to complete.
In the log, when it takes 40 seconds for the entire process, I see these
timings from DecistionTree:
INFO RandomForest: Internal timing for DecisionTree:
INFO RandomForest: init: 0.0094236 total: 4.1664414 findBestSplits:
4.1466146 chooseSplits: 4.1240084
But when it is extremely slow, I see this:
INFO RandomForest: Internal timing for DecisionTree:
INFO RandomForest: init: 0.004392 total: 138.2320968 findBestSplits:
138.2209867 chooseSplits: 138.2152113
or this:
INFO RandomForest: Internal timing for DecisionTree:
INFO RandomForest: init: 0.0032708 total: 218.9888818 findBestSplits:
218.9759717 chooseSplits: 218.9569924
In the first case, there is an additional error message, that is
apparently ignored:
ERROR Inbox: Ignoring error
java.lang.NullPointerException
at
org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$register(BlockManagerMasterEndpoint.scala:677)
at
org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:133)
at
org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
But in the second "slow" case, I can't see any exception, but I see
quite a few more blocks of "collectAsMap" jobs from RandomForest like so:
INFO SparkContext: Created broadcast 20 from broadcast at
RandomForest.scala:622
INFO SparkContext: Starting job: collectAsMap at RandomForest.scala:663
INFO DAGScheduler: Registering RDD 31 (mapPartitions at
RandomForest.scala:644) as input to shuffle 6
INFO DAGScheduler: Got job 8 (collectAsMap at RandomForest.scala:663)
with 1 output partitions
INFO DAGScheduler: Final stage: ResultStage 15 (collectAsMap at
RandomForest.scala:663)
INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 14)
INFO DAGScheduler: Missing parents: List(ShuffleMapStage 14)
INFO DAGScheduler: Submitting ShuffleMapStage 14 (MapPartitionsRDD[31]
at mapPartitions at RandomForest.scala:644), which has no missing parents
WARN DAGScheduler: Broadcasting large task binary with size 1295.7 KiB
INFO MemoryStore: Block broadcast_21 stored as values in memory
(estimated size 1295.7 KiB, free 4.6 GiB)
INFO MemoryStore: Block broadcast_21_piece0 stored as bytes in memory
(estimated size 615.3 KiB, free 4.6 GiB)
INFO BlockManagerInfo: Added broadcast_21_piece0 in memory on
computer.some.domain:62524 (size: 615.3 KiB, free: 4.6 GiB)
INFO SparkContext: Created broadcast 21 from broadcast at
DAGScheduler.scala:1585
INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 14
(MapPartitionsRDD[31] at mapPartitions at RandomForest.scala:644) (first
15 tasks are for partitions Vector(0))
INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks resource profile 0
INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 14)
(computer.some.domain, executor driver, partition 0, PROCESS_LOCAL, 9149
bytes)
INFO Executor: Running task 0.0 in stage 14.0 (TID 14)
INFO BlockManager: Found block rdd_15_0 locally
INFO Executor: Finished task 0.0 in stage 14.0 (TID 14). 1659 bytes
result sent to driver
INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 14) in 17493
ms on computer.some.domain (executor driver) (1/1)
INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all
completed, from pool
INFO DAGScheduler: ShuffleMapStage 14 (mapPartitions at
RandomForest.scala:644) finished in 17,524 s
INFO DAGScheduler: looking for newly runnable stages
INFO DAGScheduler: running: HashSet()
INFO DAGScheduler: waiting: HashSet(ResultStage 15)
INFO DAGScheduler: failed: HashSet()
Basically, the "Starting job: collectAsMap at RandomForest.scala:663"
log line is repeated 10 times instead of 5 in the "fast case".
This is a bit of a "shot in the dark" message in the hopes someone will
have a "I know this" moment that could put me on the track of a
potential culprit.
Many thanks for any suggestion.
Olivier