Interesting ... could you provide the code and the dataset you're using to
reproduce the issue?

El lun, 14 oct 2024, 16:52, Olivier Sannier <obo...@free.fr> escribió:

> Hello,
>
> I have a small dataset that I use a test ground and which is working fine
> with Spark 2.1.1 to do some CART or RandomForest regression.
> It takes around 40 second to go from training to evaluation on a Spark
> context started locally via spark-submit.
>
> I'm now in the process of validating my migration to Spark 3.5.2 and while
> the base functionality is there, I'm facing a strange situation where the
> process mentioned above takes the same amount of time as with 2.1.1 except
> for rare occasions where it would take upwards of 3 minutes to complete.
>
> In the log, when it takes 40 seconds for the entire process, I see these
> timings from DecistionTree:
>
> INFO RandomForest: Internal timing for DecisionTree:
> INFO RandomForest:   init: 0.0094236 total: 4.1664414 findBestSplits:
> 4.1466146 chooseSplits: 4.1240084
>
> But when it is extremely slow, I see this:
>
> INFO RandomForest: Internal timing for DecisionTree:
> INFO RandomForest:   init: 0.004392 total: 138.2320968 findBestSplits:
> 138.2209867 chooseSplits: 138.2152113
>
> or this:
>
> INFO RandomForest: Internal timing for DecisionTree:
> INFO RandomForest:   init: 0.0032708 total: 218.9888818 findBestSplits:
> 218.9759717 chooseSplits: 218.9569924
>
> In the first case, there is an additional error message, that is
> apparently ignored:
>
> ERROR Inbox: Ignoring error
> java.lang.NullPointerException
>         at org.apache.spark.storage.BlockManagerMasterEndpoint.org
> $apache$spark$storage$BlockManagerMasterEndpoint$$register(BlockManagerMasterEndpoint.scala:677)
>         at
> org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:133)
>         at
> org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
>         at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
>         at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>         at org.apache.spark.rpc.netty.MessageLoop.org
> $apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>         at
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>
> But in the second "slow" case, I can't see any exception, but I see quite
> a few more blocks of "collectAsMap" jobs from RandomForest like so:
>
> INFO SparkContext: Created broadcast 20 from broadcast at
> RandomForest.scala:622
> INFO SparkContext: Starting job: collectAsMap at RandomForest.scala:663
> INFO DAGScheduler: Registering RDD 31 (mapPartitions at
> RandomForest.scala:644) as input to shuffle 6
> INFO DAGScheduler: Got job 8 (collectAsMap at RandomForest.scala:663) with
> 1 output partitions
> INFO DAGScheduler: Final stage: ResultStage 15 (collectAsMap at
> RandomForest.scala:663)
> INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 14)
> INFO DAGScheduler: Missing parents: List(ShuffleMapStage 14)
> INFO DAGScheduler: Submitting ShuffleMapStage 14 (MapPartitionsRDD[31] at
> mapPartitions at RandomForest.scala:644), which has no missing parents
> WARN DAGScheduler: Broadcasting large task binary with size 1295.7 KiB
> INFO MemoryStore: Block broadcast_21 stored as values in memory (estimated
> size 1295.7 KiB, free 4.6 GiB)
> INFO MemoryStore: Block broadcast_21_piece0 stored as bytes in memory
> (estimated size 615.3 KiB, free 4.6 GiB)
> INFO BlockManagerInfo: Added broadcast_21_piece0 in memory on
> computer.some.domain:62524 (size: 615.3 KiB, free: 4.6 GiB)
> INFO SparkContext: Created broadcast 21 from broadcast at
> DAGScheduler.scala:1585
> INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 14
> (MapPartitionsRDD[31] at mapPartitions at RandomForest.scala:644) (first 15
> tasks are for partitions Vector(0))
> INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks resource profile
> 0
> INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 14)
> (computer.some.domain, executor driver, partition 0, PROCESS_LOCAL, 9149
> bytes)
> INFO Executor: Running task 0.0 in stage 14.0 (TID 14)
> INFO BlockManager: Found block rdd_15_0 locally
> INFO Executor: Finished task 0.0 in stage 14.0 (TID 14). 1659 bytes result
> sent to driver
> INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 14) in 17493 ms
> on computer.some.domain (executor driver) (1/1)
> INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all
> completed, from pool
> INFO DAGScheduler: ShuffleMapStage 14 (mapPartitions at
> RandomForest.scala:644) finished in 17,524 s
> INFO DAGScheduler: looking for newly runnable stages
> INFO DAGScheduler: running: HashSet()
> INFO DAGScheduler: waiting: HashSet(ResultStage 15)
> INFO DAGScheduler: failed: HashSet()
>
> Basically, the "Starting job: collectAsMap at RandomForest.scala:663" log
> line is repeated 10 times instead of 5 in the "fast case".
>
> This is a bit of a "shot in the dark" message in the hopes someone will
> have a "I know this" moment that could put me on the track of a potential
> culprit.
>
> Many thanks for any suggestion.
>
> Olivier
>
>

Reply via email to