Hi Tom,

there are a couple of things you can do here to make this more efficient.
 first, I think you can replace your self-join with a groupByKey. on your
example data set, this would give you

(1, Iterable(2,3))
(4, Iterable(3))

this reduces the amount of data that needs to be shuffled, and that way you
can produce all of your pairs just from the Iterable(2,3).

second, if you expect the same pairs to appear many times in your dataset,
you might first want to replace them with a count.  eg., if you start with

(1,2)
(1,2)
(1,2)
...
(1,2)
(1,3)
(1,3)
(4,3)
...

you might want to first convert that to get a count of each pair

val pairCounts = rdd.map{x => (x,1)}.reduceByKey{_ + _}

to give you something like:

((1,2), 145)
((1,3), 2)
((4,3), 982)
...

and then with a little more massaging you can group by key and also keep
the counts of each item:

val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] =
pairCounts.map{case((key, value), counts) =>
  key -> (value,counts)
}.groupByKey

which would give you something like

(1, Iterable((2,145), (3, 2))
(4, Iterable((3, 982))


hope this helps
Imran

On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn <[email protected]> wrote:

> Thanks for the reply, I'll try your suggestions.
>
> Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
> of (Int, Int). I'm doing the self-join so I can count two things. First, I
> can count the number of times a value appears in the data set. Second I can
> count number of times values occur with the same key. For example, if I
> have the following:
>
> (1,2)
> (1,3)
> (4,3)
>
> Then joining with itself I get:
>
> (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
> (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
> (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
> (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
> (4,(3,3)) - map - ((3,3),1) _|
>
> Note that I want to keep the duplicates (2,2) and reflections.
>
> Rgds
>
> On 18 February 2015 at 09:00, Akhil Das <[email protected]>
> wrote:
>
>> Why are you joining the rdd with itself?
>>
>> You can try these things:
>>
>> - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
>> MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.
>>
>> - Set your default Serializer to Kryo (.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer"))
>>
>> - Enable rdd compression (.set("spark.rdd.compress","true"))
>>
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn <[email protected]> wrote:
>>
>>> Hi All,
>>>
>>> I'm a new Spark (and Hadoop) user and I want to find out if the cluster
>>> resources I am using are feasible for my use-case. The following is a
>>> snippet of code that is causing a OOM exception in the executor after about
>>> 125/1000 tasks during the map stage.
>>>
>>> > val rdd2 = rdd.join(rdd, numPartitions=1000)
>>> > .map(fp=>((fp._2._1, fp._2._2), 1))
>>> > .reduceByKey((x,y)=>x+y)
>>> > rdd2.count()
>>>
>>> Which errors with a stack trace like:
>>>
>>> > 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
>>> stage 2.0 (TID 498)
>>> > java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> >         at
>>> scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
>>> >         at
>>> scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
>>> >         at
>>> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
>>> >         at
>>> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
>>> >         at scala.collection.immutable.List.foreach(List.scala:318)
>>>
>>> rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
>>> co-occuring values by key in the dataset, i.e. 'These two numbers occurred
>>> with the same key n times'. I intentionally don't want to filter out
>>> duplicates and reflections. rdd is about 3.6 million records, which has a
>>> size in memory of about 120MB, and results in a 'joined' RDD (before the
>>> reduceByKey stage) of around 460 million records, with a size in memory of
>>> about 35GB.
>>>
>>> My cluster setup is as follows. I have 3 nodes, where each node has 2
>>> cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
>>> executors are allowed 1280m each and the job has 5 executors and 1 driver.
>>> Additionally, I have set spark.storage.memoryFraction to 0.06, and
>>> spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
>>> the issue. I've also tried increasing the number of partitions after the
>>> join dramatically (up to 15000). Nothing has been effective. Thus, I'm
>>> beginning to suspect I don't have enough resources for the job.
>>>
>>> Does anyone have a feeling about what the resource requirements would be
>>> for a use-case like this? I could scale the cluster up if necessary, but
>>> would like to avoid it. I'm willing to accept longer computation times if
>>> that is an option.
>>>
>>> Warm Regards,
>>> Thomas
>>>
>>>
>>
>

Reply via email to