Hi Richard, thanks for the response. My use case is weird I need to process data row by row for one partition and update required rows. Updated rows percentage would be 30%. As per above stackoverflow.com answer suggestions I refactored code to use mappartitionswithindex
JavaRDD<Row> indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new Function2<Integer, Iterator<Row>, Iterator<Row>>() { @Override public Iterator<Row> call(Integer ind, Iterator<Row> rowIterator) throws Exception { List<Row> rowList = new ArrayList<>(); while (rowIterator.hasNext()) { Row row = rowIterator.next(); List<Object> rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq())); Row updatedRow = RowFactory.create(rowAsList.toArray()); rowList.add(updatedRow); } return rowList.iterator(); } }, true). union(remainingrdd).coalesce(200,true); Above code still hits memory limits as I have 2 tb data to process and above resulted rdd I use to create DataFrame which again I use it to register as temp table using hiveContext and execute few insert into partitions query using hiveContext.sql Please help me optimize above code. On Sep 9, 2015 2:55 AM, "Richard Marscher" <rmarsc...@localytics.com> wrote: > Hi, > > what is the reasoning behind the use of `coalesce(1,false)`? This is > saying to aggregate all data into a single partition, which must fit in > memory on one node in the Spark cluster. If the cluster has more than one > node it must shuffle to move the data. It doesn't seem like the following > map or union necessitate coalesce, but the use case is not clear to me. > > On Fri, Sep 4, 2015 at 12:29 PM, unk1102 <umesh.ka...@gmail.com> wrote: > >> Hi I have Spark job which does some processing on ORC data and stores back >> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I >> have >> the following piece of code which is using heavy shuffle memory. How do I >> optimize below code? Is there anything wrong with it? It is working fine >> as >> expected only causing slowness because of GC pause and shuffles lots of >> data >> so hitting memory issues. Please guide I am new to Spark. Thanks in >> advance. >> >> JavaRDD<Row> updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, >> false).map(new Function<Row, Row>() { >> @Override >> public Row call(Row row) throws Exception { >> List rowAsList; >> Row row1 = null; >> if (row != null) { >> rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq())); >> row1 = RowFactory.create(rowAsList.toArray()); >> } >> return row1; >> } >> }).union(modifiedRDD); >> DataFrame updatedDataFrame = >> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema()); >> >> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", >> "date").save("baseTable"); >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > > > -- > *Richard Marscher* > Software Engineer > Localytics > Localytics.com <http://localytics.com/> | Our Blog > <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | > Facebook <http://facebook.com/localytics> | LinkedIn > <http://www.linkedin.com/company/1148792?trk=tyah> >