Please ignore this lengthy email, It seems the issue was being caused by the fact that the stage was evicting some of the cached memory and therefore this caused the 22 tasks to become slow. Is there a way to see in the logs when evictions happen? I've looked at spark executor metrics and it doesn't seem it is possible to report storage evictions at the moment? That would be a really nice feature, to be able to set up alerts on such an event.
On Wed, Jun 16, 2021 at 3:07 PM Zilvinas Saltys < zilvinas.sal...@verizonmedia.com> wrote: > Hi, > > I'm running Spark 3.0.1 on AWS. Dynamic allocation is disabled. I'm > caching a large dataset 100% in memory. Before caching it I coalesce the > dataset to 1792 partitions. There are 112 executors and 896 cores on the > cluster. > > The next stage is reading as input those 1792 partitions. The query plan > is: > > 1. InMemoryScan (MapPartitionsRDD1 -> MapPartitionsRDD2) > 2. WholeStageCodegen (MapPartitionsRDD) > 3. ShuffleQueryStage (MapPartitionsRDD) > > I've set *spark.locality.wait *to 120 seconds because I want to make sure > that no matter what all tasks will run PROCESS_LOCAL to utilize local > executor cache. I expect each executor to run a total of *16 tasks > exactly*. > > The *problem *that I see that this stage runs 1770 tasks and then it > shows 1770/1792 tasks completed and no other tasks running. It then waits > exactly 120 seconds and schedules 22 remaining tasks using RACK_LOCAL. > Those 22 tasks run very slow compared to the rest of the tasks. In the end > it shows that some executors ran 17 tasks and some ran 15. > > The stage summary UI shows: > > > - *Total Time Across All Tasks: *1.7 h > - *Locality Level Summary: *Process local: 1770; Rack local: 22 > - *Input Size / Records: *1418.9 GiB / 9928798 > - *Shuffle Write Size / Records: *2.5 GiB / 35494236 > > The slowest PROCESS_LOCAL task: > > IndexTask IDAttemptStatusLocality levelExecutor IDHostLogsLaunch Time > DurationGC TimeInput Size / RecordsWrite TimeShuffle Write Size / Records > Errors > 477 4590 0 SUCCESS PROCESS_LOCAL 9 ip-xxxx.ec2.internal > stdout > <http://ip-10-99-88-233.ec2.internal:8042/node/containerlogs/container_1623664711892_2541_01_000022/ec2-user/stdout?start=-4096> > stderr > <http://ip-10-99-88-233.ec2.internal:8042/node/containerlogs/container_1623664711892_2541_01_000022/ec2-user/stderr?start=-4096> > 2021-06-16 12:59:25 6 s 2 s 1.2 GiB / 311 7.0 ms 1.8 MiB / 25474 > and slowest RACK_LOCAL task: > > IndexTask IDAttemptStatusLocality levelExecutor IDHostLogsLaunch Time > DurationGC TimeInput Size / RecordsWrite TimeShuffle Write Size / Records > Errors > 792 5876 0 SUCCESS RACK_LOCAL 28 ip-xxxx.ec2.internal > stdout > <http://ip-10-99-89-160.ec2.internal:8042/node/containerlogs/container_1623664711892_2541_01_000061/ec2-user/stdout?start=-4096> > stderr > <http://ip-10-99-89-160.ec2.internal:8042/node/containerlogs/container_1623664711892_2541_01_000061/ec2-user/stderr?start=-4096> > 2021-06-16 13:01:31 1.1 min 0.2 s 384.6 MiB / 642897 3.0 ms 1.6 MiB / > 21786 > > I don't understand what is causing this. Why is Spark refusing to run the > tasks on PROCESS_LOCAL? What is happening with those 22 tasks and why do > they become RACK_LOCAL? One interesting thing to note is that these 22 > tasks are showing a much higher input record count. I understand that these > tasks could be skewed but why do they not begin running on PROCESS_LOCAL > and wait for the locality timer to be scheduled? > > Any insight greatly appreciated, > Thanks >