You can try to increase the number of partitions to get ride of the OOM errors. Also try to use reduceByKey instead of groupByKey.
Thanks Best Regards On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com> wrote: > Hi everyone, > I have an RDD of the format (user: String, timestamp: Long, state: > Boolean). My task invovles converting the states, where on/off is > represented as true/false, into intervals of 'on' of the format (beginTs: > Long, endTs: Long). So this task requires me, per user, to line up all of > the on/off states so that I can compute when it is on, since the > calculation is neither associative nor commutative. > > So there are 2 main operations that I'm trying to accomplish together: > 1. group by each user > 2. sort by time -- keep all of the states in sorted order by time > > The main code inside the method that does grouping by user and sorting by > time looks sort of looks like this: > > > // RDD starts off in format (user, ts, state) of type RDD[(String, Long, > Boolean)] > val grouped = keyedStatesRDD.groupByKey > // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type > RDD[(String, Iterable(Long, Boolean))] > // take the sequence of (ts, state) per user, sort, get intervals > val groupedIntervals = grouped.mapValues( > states => { > val sortedStates = states.toSeq.sortBy(_._1) > val intervals = DFUtil.statesToIntervals(sortedStates) > val intervalsList = bucketDurations.map{case(k,v) => > (k,v)}(collection.breakOut).sortBy(_._1) > intervalsList > } > ) > // after .mapValues, new format for RDD is (user, seq-of-(startTime, > endTime)) of type RDD[(String, IndexedSeq(Long, Long))] > > > When I run my Spark job with 1 day's worth of data, the job completes > successfully. When I run with 1 month's or 1 year's worth of data, that > method is where my Spark job consistently crashes with get > OutOfMemoryErrors. I need to run on the full year's worth of data. > > My suspicion is that the groupByKey is the problem (it's pulling all of > the matching data values into a single executor's heap as a plain Scala > Iterable). But alternatives of doing sortByKey on the RDD first before > grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't > quite apply in my scenario because my operation is not associative (can't > combine per-partition results) and I still need to group by users before > doing a foldLeft. > > I've definitely thought about the issue before and come across users with > issues that are similar but not exactly the same: > > http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html > > http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E > > http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html > > http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html > > And this Jira seems relevant too: > https://issues.apache.org/jira/browse/SPARK-3655 > > The amount of memory that I'm using is 2g per executor, and I can't go > higher than that because each executor gets a YARN container from nodes > with 16 GB of RAM and 5 YARN containers allowed per node. > > So I'd like to know if there's an easy solution to executing my logic on > my full dataset in Spark. > > Thanks! > > -- Elango >