Thank you for your detailed reply.

First, the purpose of MyKey class is a wrapper to provide equals() and 
Comparable interface to byte[].

groupByKey() is for performance.
I have to merge the byte[]s that have the same key.
If merging is done with reduceByKey(), a lot of intermediate byte[] allocation 
and System.arraycopy() is executed, and it is too slow. So I had to resort to 
groupByKey(), and in the callback allocate the byte[] that has the total size 
of the byte[]s, and arraycopy() into it.
groupByKey() works for this, since the size of the group is manageable in my 
application.
And in fact it actually worked well when I implemented the same process with 
HBase Put class.
So, I assume that it is not the problem.

WithIndex is for excluding the record for the first partition.
I could remove the record after collect()and sort(), but it was easier.

I think that the problem is that when mapPartitionsWithIndex() executes, the 
size of the partition is too big. (several GB - it's the size of the HBase 
regions, so it has to be several GB.)
I could allocate more memory to the executor, but then I cannot spawn enough 
number of executors for the previous RDD operations.

It would be nice if:
- mapPartitionsWithIndex() had loaded the partition by small chunks as the 
iterator sweeps through it, or
- there were a function named firstRecordsOfPartitions().

About Parallelism lost:
I thought that it is the possible alternative to mapPartitionsWithIndex() which 
can be run with smaller memory footprint.

About strange Spark behavior:
I don't think that Spark is malfunctioning.
I just want to know the more detailed flow information - how can I check?

Thanks.


-----Original Message-----
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Monday, September 22, 2014 5:46 PM
To: innowireless TaeYun Kim
Cc: user
Subject: Re: Bulk-load to HBase

I see a number of potential issues:

On Mon, Sep 22, 2014 at 8:42 AM, innowireless TaeYun Kim 
<taeyun....@innowireless.co.kr> wrote:
>   JavaPairRDD<MyKey, byte[]> rdd =
>           // MyKey has a byte[] member for rowkey

Two byte[] with the same contents are not equals(), so won't work as you intend 
as a key. Is there more to it? I assume so given your comment later.

>     .groupByKey()

This forces all values for each key to be held in memory when the key is 
processed. Are you sure you can't do something comparable with reduceByKey?


>      rdd.mapPartitionsWithIndex(...)
>           // Gets the first record of each partitions.
>           // First partition's first record is excluded, since it's not 
> needed.

You won't need "WithIndex" for that, though I doubt it matters.


> 1. OutOfMemory exception on mapPartitionsWithIndex() for splitKeys.
>
> In my case, the number of regions is fairly small for the RDD, and the size 
> of a region is big.
> This is intentional since the reasonable size of a HBase region is several GB.
> But, for Spark, it is too big for a partition that can be handled for an 
> executor.
> I thought mapPartitionsWithIndex would not load the entire partition, but I 
> was wrong.
> Maybe it loaded the whole partition while I only wanted to fetch the first 
> record of the iterator.

You can give executors more memory but I think groupByKey is your problem.


> I could save all the partitions with save...() API and then load each 
> partition separately and call first().
> But I does not feel right. Parallelism is lost.

Why is that necessary?


> 2. Strange Spark behavior
>
> It is not fatal as 1, but it's strange.
> In my code, the flow is as follows: flatMapToPair -> groupByKey -> 
> mapValues -> sortByKey But when I watch the Spark UI, it is executed 
> as follows: flatMapToPair -> sortByKey -> sortByKey(again!) -> mapValues 
> Since in my case the number of records are very large between flatMapToPair 
> and mapValues, it seems that Spark executes sortByKey on the worst timing.
> I tried to trick the Spark with replacing mapValues with mapToPair, but the 
> execution order did not change.
> Why?

The final operation called by a top-level method X may not be X.
Double-check these operation are from your current run and not an earlier one, 
and that the code you're executing is what you think it is. It is not going to 
somehow execute things in an order that is semantically different, no.


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to