Try using a custom partitioner for the keys so that they will get evenly distributed across tasks
Thanks Best Regards On Fri, Sep 4, 2015 at 7:19 PM, mark <manwoodv...@googlemail.com> wrote: > I am trying to tune a Spark job and have noticed some strange behavior - > tasks in a stage vary in execution time, ranging from 2 seconds to 20 > seconds. I assume tasks should all run in roughly the same amount of time > in a well tuned job. > > So I did some investigation - the fast tasks appear to have no records, > whilst the slow tasks do. I need help understanding why this is happening. > > The code in the stage is pretty simple. All it does is: > > - filters records > - maps records to a (key, record) tuple > - reduces by key > > The data are Avro objects stored in Parquet files in 16MB blocks in HDFS. > > To establish how many records in each partition I added this snippet: > > val counts = rdd.mapPartitions(iter => { > val ctx = TaskContext.get > val stageId = ctx.stageId > val partId = ctx.partitionId > val attemptid = ctx.taskAttemptId() > Array(Array(stageId, partId, attemptid, iter.size)).iterator } > , true).collect() > > Which produces the following: > > 1 1 0 0 > 1 2 1 50489 > 1 3 2 0 > 1 4 3 0 > 1 5 4 0 > 1 6 5 53200 > 1 7 6 0 > 1 8 7 0 > 1 9 8 0 > 1 10 9 56946 > 1 11 10 0 > 1 12 11 0 > 1 13 12 0 > 1 14 13 59209 > 1 15 14 0 > 1 16 15 0 > 1 17 16 0 > 1 18 17 50202 > 1 19 18 0 > 1 20 19 0 > 1 21 20 0 > 1 22 21 54613 > 1 23 22 0 > 1 24 23 0 > 1 25 24 54157 > 1 26 25 0 > 1 27 26 0 > 1 28 27 0 > 1 29 28 53595 > 1 30 29 0 > 1 31 30 0 > 1 32 31 10750 > > > Looking at the logs, you can see the tasks that contain records have the > longest run time: > > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0 > (TID 26) in 2782 ms on DG1322 (6/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID > 8) in 2815 ms on DG1322 (7/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0 > (TID 20) in 2815 ms on DG1322 (8/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0 > (TID 24) in 2840 ms on DG1321 (9/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0 > (TID 30) in 2839 ms on DG1321 (10/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0 > (TID 12) in 2878 ms on DG1321 (11/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0 > (TID 31) in 2870 ms on DG1321 (12/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0 > (TID 19) in 2892 ms on DG1321 (13/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID > 1) in 2930 ms on DG1321 (14/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID > 7) in 2934 ms on DG1321 (15/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0 > (TID 13) in 2931 ms on DG1321 (16/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID > 4) in 3246 ms on DG1323 (17/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0 > (TID 28) in 3226 ms on DG1323 (18/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 1.0 > (TID 16) in 3249 ms on DG1323 (19/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 10.0 in stage 1.0 > (TID 11) in 3669 ms on DG1323 (20/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 16.0 in stage 1.0 > (TID 17) in 3666 ms on DG1323 (21/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 22.0 in stage 1.0 > (TID 23) in 3664 ms on DG1323 (22/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID > 5) in 3692 ms on DG1323 (23/32) > *15/09/03 16:26:39 INFO TaskSetManager: Finished task 31.0 in stage 1.0 > (TID 32) in 6668 ms on DG1322 (24/32)* > *15/09/03 16:26:48 INFO TaskSetManager: Finished task 17.0 in stage 1.0 > (TID 18) in 15690 ms on DG1321 (25/32)* > *15/09/03 16:26:49 INFO TaskSetManager: Finished task 1.0 in stage 1.0 > (TID 2) in 16194 ms on DG1322 (26/32)* > *15/09/03 16:26:49 INFO TaskSetManager: Finished task 5.0 in stage 1.0 > (TID 6) in 16384 ms on DG1321 (27/32)* > *15/09/03 16:26:50 INFO TaskSetManager: Finished task 28.0 in stage 1.0 > (TID 29) in 17194 ms on DG1323 (28/32)* > *15/09/03 16:26:50 INFO TaskSetManager: Finished task 21.0 in stage 1.0 > (TID 22) in 17408 ms on DG1323 (29/32)* > *15/09/03 16:26:50 INFO TaskSetManager: Finished task 13.0 in stage 1.0 > (TID 14) in 17711 ms on DG1322 (30/32)* > *15/09/03 16:26:51 INFO TaskSetManager: Finished task 24.0 in stage 1.0 > (TID 25) in 17995 ms on DG1321 (31/32)* > *15/09/03 16:26:51 INFO TaskSetManager: Finished task 9.0 in stage 1.0 > (TID 10) in 18183 ms on DG1323 (32/32)* > > > The Spark UI for the stage shows this: > > Summary Metrics for 32 Completed Tasks > > Metric Min 25th percentile Median 75th percentile Max > > Duration > 0.9s 2s 2s 16s 18 s > > GC Time > 0ms 0ms 0ms 0.5s 1s > > Input Size / Records > 0.0 B/0 0.0 B/0 0.0 B / 0 0.0 B / 53437 122.0 MB / 59209 > > Shuffle Write Size / Records > 0.0 B / 0 0.0 B / 0 0.0 B / 0 3.7 KB / 1 127.8 KB / 35 > > Aggregated Metrics by Executor > Executor ID Address Task Time Total Tasks Failed > Tasks Succeeded Tasks Input Size / Records Shuffle Write > Size / Records > > 0 DG1322:41308 35 s 5 0 5 243.9 MB / > 106948 224.3 KB / 63 > 1 DG1321:33576 48 s 5 0 5 0.0 B / 111750 > 7.4 KB / 2 > 2 DG1323:49348 30 s 5 0 5 0.0 B / 53595 > 3.7 KB / 1 > 3 DG1321:43093 12 s 5 0 5 0.0 B / 0 > 0.0 B / 0 > 4 DG1323:51009 34 s 6 0 6 0.0 B / 54767 > 98.7 KB / 28 > 5 DG1322:36787 50 s 6 0 6 0.0 B / 123977 > 138.9 KB / 38 > > > *** Edited for brevity - included an example of a short and long task *** > Tasks > Index ID Attempt Status Locality Level Executor ID / Host > Duration GC Time Accumulators Input Size / Records > Shuffle Write Size / Records > 0 1 0 SUCCESS NODE_LOCAL 4 / DG1323 2 s > 0.0 B (hadoop) / 0 > 0.0 B / 0 > 1 2 0 SUCCESS NODE_LOCAL 5 / DG1322 16 s > 0.5 s RecordCount: 54018 0.0 B (hadoop) / 54018 127.8 KB / > 35 > > > Why are the tasks in the stage not processing an equal number of records? > What are the empty tasks doing? How can I even the tasks in this stage out? > > Any insights much appreciated... > >