As I understand things (maybe naively), my input data are stored in equal sized blocks in HDFS, and each block represents a partition within Spark when read from HDFS, therefore each block should hold roughly the same number of records.
So something is missing in my understanding - what can cause some partitions to have zero records and others to have roughly equal sized chunks (~50k in this case)? Before writing a custom partitioner, I would like to understand why has the default partitioner failed in my case? On 8 Sep 2015 3:00 pm, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: > Try using a custom partitioner for the keys so that they will get evenly > distributed across tasks > > Thanks > Best Regards > > On Fri, Sep 4, 2015 at 7:19 PM, mark <manwoodv...@googlemail.com> wrote: > >> I am trying to tune a Spark job and have noticed some strange behavior - >> tasks in a stage vary in execution time, ranging from 2 seconds to 20 >> seconds. I assume tasks should all run in roughly the same amount of time >> in a well tuned job. >> >> So I did some investigation - the fast tasks appear to have no records, >> whilst the slow tasks do. I need help understanding why this is happening. >> >> The code in the stage is pretty simple. All it does is: >> >> - filters records >> - maps records to a (key, record) tuple >> - reduces by key >> >> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS. >> >> To establish how many records in each partition I added this snippet: >> >> val counts = rdd.mapPartitions(iter => { >> val ctx = TaskContext.get >> val stageId = ctx.stageId >> val partId = ctx.partitionId >> val attemptid = ctx.taskAttemptId() >> Array(Array(stageId, partId, attemptid, iter.size)).iterator } >> , true).collect() >> >> Which produces the following: >> >> 1 1 0 0 >> 1 2 1 50489 >> 1 3 2 0 >> 1 4 3 0 >> 1 5 4 0 >> 1 6 5 53200 >> 1 7 6 0 >> 1 8 7 0 >> 1 9 8 0 >> 1 10 9 56946 >> 1 11 10 0 >> 1 12 11 0 >> 1 13 12 0 >> 1 14 13 59209 >> 1 15 14 0 >> 1 16 15 0 >> 1 17 16 0 >> 1 18 17 50202 >> 1 19 18 0 >> 1 20 19 0 >> 1 21 20 0 >> 1 22 21 54613 >> 1 23 22 0 >> 1 24 23 0 >> 1 25 24 54157 >> 1 26 25 0 >> 1 27 26 0 >> 1 28 27 0 >> 1 29 28 53595 >> 1 30 29 0 >> 1 31 30 0 >> 1 32 31 10750 >> >> >> Looking at the logs, you can see the tasks that contain records have the >> longest run time: >> >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0 >> (TID 26) in 2782 ms on DG1322 (6/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 >> (TID 8) in 2815 ms on DG1322 (7/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0 >> (TID 20) in 2815 ms on DG1322 (8/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0 >> (TID 24) in 2840 ms on DG1321 (9/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0 >> (TID 30) in 2839 ms on DG1321 (10/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0 >> (TID 12) in 2878 ms on DG1321 (11/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0 >> (TID 31) in 2870 ms on DG1321 (12/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0 >> (TID 19) in 2892 ms on DG1321 (13/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 >> (TID 1) in 2930 ms on DG1321 (14/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 >> (TID 7) in 2934 ms on DG1321 (15/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0 >> (TID 13) in 2931 ms on DG1321 (16/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 >> (TID 4) in 3246 ms on DG1323 (17/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0 >> (TID 28) in 3226 ms on DG1323 (18/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 1.0 >> (TID 16) in 3249 ms on DG1323 (19/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 10.0 in stage 1.0 >> (TID 11) in 3669 ms on DG1323 (20/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 16.0 in stage 1.0 >> (TID 17) in 3666 ms on DG1323 (21/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 22.0 in stage 1.0 >> (TID 23) in 3664 ms on DG1323 (22/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0 >> (TID 5) in 3692 ms on DG1323 (23/32) >> *15/09/03 16:26:39 INFO TaskSetManager: Finished task 31.0 in stage 1.0 >> (TID 32) in 6668 ms on DG1322 (24/32)* >> *15/09/03 16:26:48 INFO TaskSetManager: Finished task 17.0 in stage 1.0 >> (TID 18) in 15690 ms on DG1321 (25/32)* >> *15/09/03 16:26:49 INFO TaskSetManager: Finished task 1.0 in stage 1.0 >> (TID 2) in 16194 ms on DG1322 (26/32)* >> *15/09/03 16:26:49 INFO TaskSetManager: Finished task 5.0 in stage 1.0 >> (TID 6) in 16384 ms on DG1321 (27/32)* >> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 28.0 in stage 1.0 >> (TID 29) in 17194 ms on DG1323 (28/32)* >> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 21.0 in stage 1.0 >> (TID 22) in 17408 ms on DG1323 (29/32)* >> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 13.0 in stage 1.0 >> (TID 14) in 17711 ms on DG1322 (30/32)* >> *15/09/03 16:26:51 INFO TaskSetManager: Finished task 24.0 in stage 1.0 >> (TID 25) in 17995 ms on DG1321 (31/32)* >> *15/09/03 16:26:51 INFO TaskSetManager: Finished task 9.0 in stage 1.0 >> (TID 10) in 18183 ms on DG1323 (32/32)* >> >> >> The Spark UI for the stage shows this: >> >> Summary Metrics for 32 Completed Tasks >> >> Metric Min 25th percentile Median 75th percentile Max >> >> Duration >> 0.9s 2s 2s 16s 18 s >> >> GC Time >> 0ms 0ms 0ms 0.5s 1s >> >> Input Size / Records >> 0.0 B/0 0.0 B/0 0.0 B / 0 0.0 B / 53437 122.0 MB / 59209 >> >> Shuffle Write Size / Records >> 0.0 B / 0 0.0 B / 0 0.0 B / 0 3.7 KB / 1 127.8 KB / 35 >> >> Aggregated Metrics by Executor >> Executor ID Address Task Time Total Tasks Failed >> Tasks Succeeded Tasks Input Size / Records Shuffle Write >> Size / Records >> >> 0 DG1322:41308 35 s 5 0 5 243.9 MB / >> 106948 224.3 KB / 63 >> 1 DG1321:33576 48 s 5 0 5 0.0 B / 111750 >> 7.4 KB / 2 >> 2 DG1323:49348 30 s 5 0 5 0.0 B / 53595 >> 3.7 KB / 1 >> 3 DG1321:43093 12 s 5 0 5 0.0 B / 0 >> 0.0 B / 0 >> 4 DG1323:51009 34 s 6 0 6 0.0 B / 54767 >> 98.7 KB / 28 >> 5 DG1322:36787 50 s 6 0 6 0.0 B / 123977 >> 138.9 KB / 38 >> >> >> *** Edited for brevity - included an example of a short and long task *** >> Tasks >> Index ID Attempt Status Locality Level Executor ID / Host >> Duration GC Time Accumulators Input Size / Records >> Shuffle Write Size / Records >> 0 1 0 SUCCESS NODE_LOCAL 4 / DG1323 2 s >> 0.0 B (hadoop) / 0 >> 0.0 B / 0 >> 1 2 0 SUCCESS NODE_LOCAL 5 / DG1322 16 s >> 0.5 s RecordCount: 54018 0.0 B (hadoop) / 54018 127.8 KB >> / 35 >> >> >> Why are the tasks in the stage not processing an equal number of records? >> What are the empty tasks doing? How can I even the tasks in this stage out? >> >> Any insights much appreciated... >> >> >