Don't use groupBy , use reduceByKey instead , groupBy should always be avoided as it leads to lot of shuffle reads/writes.
On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya <tispra...@gmail.com> wrote: > Sorry i sent the wrong join code snippet, the actual snippet is > > ggImpsDf.join( > aggRevenueDf, > aggImpsDf("id_1") <=> aggRevenueDf("id_1") > && aggImpsDf("id_2") <=> aggRevenueDf("id_2") > && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour") > && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"), > "inner") > .select( > aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"), > aggImpsDf("day_hour_2"), aggImpsDf("metric1"), > aggRevenueDf("metric2")) > .coalesce(200) > > > On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya <tispra...@gmail.com> > wrote: > >> Hello, >> >> Data about my spark job is below. My source data is only 916MB (stage 0) >> and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a >> very long time and as i see the shuffled data is 614GB. Is this something >> expected? Both the data sets produce 200 partitions. >> >> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle >> ReadShuffle Write2saveAsTable at Driver.scala:269 >> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=2&attempt=0> >> +details >> >> 2015/10/22 18:48:122.3 h >> 200/200 >> 614.6 GB1saveAsTable at Driver.scala:269 >> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=1&attempt=0> >> +details >> >> 2015/10/22 18:46:022.1 min >> 8/8 >> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269 >> <http://sparkhs.rfiserve.net:18080/history/application_1437606252645_1034031/stages/stage?id=0&attempt=0> >> +details >> >> 2015/10/22 18:46:0235 s >> 3/3 >> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins the >> two data sets is: >> >> hc.sql(query). >> mapPartitions(iter => { >> iter.map { >> case Row( >> ... >> ... >> ... >> ) >> } >> } >> ).toDF() >> .groupBy("id_1", "id_2", "day_hour", "day_hour_2") >> .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2", >> sum("attr1").alias("attr1"), sum("attr2").alias("attr2")) >> >> >> Please advise on how to reduce the shuffle and speed this up. >> >> >> ~Pratik >> >>