"Note: As currently implemented, groupByKey must be able to hold all the
key-value pairs for any key in memory. If a key has too many values, it can
result in an [[OutOfMemoryError]]."

Obvioulsy one of your key value pair is two large. You can try to increase
spark.shuffle.memoryFraction.

Are you sure you can't :
partition your data by user/time-interval => process with a mapPartition =>
partition by user => process with a mapPartition
Not efficient but if your operation decrease the amount of data per user it
may work.


2015-09-29 0:17 GMT+08:00 Fabien Martin <fabien.marti...@gmail.com>:

> You can try to reduce the number of containers in order to increase their
> memory.
>
> 2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>:
>
>> You can try to increase the number of partitions to get ride of the OOM
>> errors. Also try to use reduceByKey instead of groupByKey.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com>
>> wrote:
>>
>>> Hi everyone,
>>> I have an RDD of the format (user: String, timestamp: Long, state:
>>> Boolean).  My task invovles converting the states, where on/off is
>>> represented as true/false, into intervals of 'on' of the format (beginTs:
>>> Long, endTs: Long).  So this task requires me, per user, to line up all of
>>> the on/off states so that I can compute when it is on, since the
>>> calculation is neither associative nor commutative.
>>>
>>> So there are 2 main operations that I'm trying to accomplish together:
>>> 1. group by each user
>>> 2. sort by time -- keep all of the states in sorted order by time
>>>
>>> The main code inside the method that does grouping by user and sorting
>>> by time looks sort of looks like this:
>>>
>>>
>>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
>>> Boolean)]
>>> val grouped = keyedStatesRDD.groupByKey
>>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of
>>> type RDD[(String, Iterable(Long, Boolean))]
>>> // take the sequence of (ts, state) per user, sort, get intervals
>>> val groupedIntervals = grouped.mapValues(
>>>   states => {
>>>     val sortedStates = states.toSeq.sortBy(_._1)
>>>     val intervals = DFUtil.statesToIntervals(sortedStates)
>>>     val intervalsList = bucketDurations.map{case(k,v) =>
>>> (k,v)}(collection.breakOut).sortBy(_._1)
>>>     intervalsList
>>>   }
>>> )
>>> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
>>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>>>
>>>
>>> When I run my Spark job with 1 day's worth of data, the job completes
>>> successfully.  When I run with 1 month's or 1 year's worth of data, that
>>> method is where my Spark job consistently crashes with get
>>> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>>>
>>> My suspicion is that the groupByKey is the problem (it's pulling all of
>>> the matching data values into a single executor's heap as a plain Scala
>>> Iterable).  But alternatives of doing sortByKey on the RDD first before
>>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
>>> quite apply in my scenario because my operation is not associative (can't
>>> combine per-partition results) and I still need to group by users before
>>> doing a foldLeft.
>>>
>>> I've definitely thought about the issue before and come across users
>>> with issues that are similar but not exactly the same:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>>
>>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>>>
>>> And this Jira seems relevant too:
>>> https://issues.apache.org/jira/browse/SPARK-3655
>>>
>>> The amount of memory that I'm using is 2g per executor, and I can't go
>>> higher than that because each executor gets a YARN container from nodes
>>> with 16 GB of RAM and 5 YARN containers allowed per node.
>>>
>>> So I'd like to know if there's an easy solution to executing my logic on
>>> my full dataset in Spark.
>>>
>>> Thanks!
>>>
>>> -- Elango
>>>
>>
>>
>


-- 
Alexis GILLAIN

Reply via email to