Hi Richard, thanks for the response. My use case is weird I need to process
data row by row for one partition and update required rows. Updated rows
percentage would be 30%. As per above stackoverflow.com answer suggestions
I refactored code to use mappartitionswithindex

JavaRDD<Row> indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new
Function2<Integer, Iterator<Row>, Iterator<Row>>() { @Override public
Iterator<Row> call(Integer ind, Iterator<Row> rowIterator) throws Exception
{ List<Row> rowList = new ArrayList<>(); while (rowIterator.hasNext()) {
Row row = rowIterator.next(); List<Object> rowAsList =
iterate(JavaConversions.seqAsJavaList(row.toSeq())); Row updatedRow =
RowFactory.create(rowAsList.toArray()); rowList.add(updatedRow); } return
rowList.iterator(); } }, true). union(remainingrdd).coalesce(200,true);

Above code still hits memory limits as I have 2 tb data to process and
above resulted rdd I use to create DataFrame which again I use it to
register as temp table using hiveContext and execute few insert into
partitions query using hiveContext.sql

Please help me optimize above code.
On Sep 9, 2015 2:55 AM, "Richard Marscher" <rmarsc...@localytics.com> wrote:

> Hi,
>
> what is the reasoning behind the use of `coalesce(1,false)`? This is
> saying to aggregate all data into a single partition, which must fit in
> memory on one node in the Spark cluster. If the cluster has more than one
> node it must shuffle to move the data. It doesn't seem like the following
> map or union necessitate coalesce, but the use case is not clear to me.
>
> On Fri, Sep 4, 2015 at 12:29 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>
>> Hi I have Spark job which does some processing on ORC data and stores back
>> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I
>> have
>> the following piece of code which is using heavy shuffle memory. How do I
>> optimize below code? Is there anything wrong with it? It is working fine
>> as
>> expected only causing slowness because of GC pause and shuffles lots of
>> data
>> so hitting memory issues. Please guide I am new to Spark. Thanks in
>> advance.
>>
>> JavaRDD<Row> updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
>> false).map(new Function<Row, Row>() {
>>    @Override
>>    public Row call(Row row) throws Exception {
>>         List rowAsList;
>>         Row row1 = null;
>>         if (row != null) {
>>           rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>>           row1 = RowFactory.create(rowAsList.toArray());
>>         }
>>         return row1;
>>    }
>> }).union(modifiedRDD);
>> DataFrame updatedDataFrame =
>> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
>>
>> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
>> "date").save("baseTable");
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>

Reply via email to