"Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]."
Obvioulsy one of your key value pair is two large. You can try to increase spark.shuffle.memoryFraction. Are you sure you can't : partition your data by user/time-interval => process with a mapPartition => partition by user => process with a mapPartition Not efficient but if your operation decrease the amount of data per user it may work. 2015-09-29 0:17 GMT+08:00 Fabien Martin <fabien.marti...@gmail.com>: > You can try to reduce the number of containers in order to increase their > memory. > > 2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>: > >> You can try to increase the number of partitions to get ride of the OOM >> errors. Also try to use reduceByKey instead of groupByKey. >> >> Thanks >> Best Regards >> >> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com> >> wrote: >> >>> Hi everyone, >>> I have an RDD of the format (user: String, timestamp: Long, state: >>> Boolean). My task invovles converting the states, where on/off is >>> represented as true/false, into intervals of 'on' of the format (beginTs: >>> Long, endTs: Long). So this task requires me, per user, to line up all of >>> the on/off states so that I can compute when it is on, since the >>> calculation is neither associative nor commutative. >>> >>> So there are 2 main operations that I'm trying to accomplish together: >>> 1. group by each user >>> 2. sort by time -- keep all of the states in sorted order by time >>> >>> The main code inside the method that does grouping by user and sorting >>> by time looks sort of looks like this: >>> >>> >>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long, >>> Boolean)] >>> val grouped = keyedStatesRDD.groupByKey >>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of >>> type RDD[(String, Iterable(Long, Boolean))] >>> // take the sequence of (ts, state) per user, sort, get intervals >>> val groupedIntervals = grouped.mapValues( >>> states => { >>> val sortedStates = states.toSeq.sortBy(_._1) >>> val intervals = DFUtil.statesToIntervals(sortedStates) >>> val intervalsList = bucketDurations.map{case(k,v) => >>> (k,v)}(collection.breakOut).sortBy(_._1) >>> intervalsList >>> } >>> ) >>> // after .mapValues, new format for RDD is (user, seq-of-(startTime, >>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))] >>> >>> >>> When I run my Spark job with 1 day's worth of data, the job completes >>> successfully. When I run with 1 month's or 1 year's worth of data, that >>> method is where my Spark job consistently crashes with get >>> OutOfMemoryErrors. I need to run on the full year's worth of data. >>> >>> My suspicion is that the groupByKey is the problem (it's pulling all of >>> the matching data values into a single executor's heap as a plain Scala >>> Iterable). But alternatives of doing sortByKey on the RDD first before >>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't >>> quite apply in my scenario because my operation is not associative (can't >>> combine per-partition results) and I still need to group by users before >>> doing a foldLeft. >>> >>> I've definitely thought about the issue before and come across users >>> with issues that are similar but not exactly the same: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html >>> >>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html >>> >>> And this Jira seems relevant too: >>> https://issues.apache.org/jira/browse/SPARK-3655 >>> >>> The amount of memory that I'm using is 2g per executor, and I can't go >>> higher than that because each executor gets a YARN container from nodes >>> with 16 GB of RAM and 5 YARN containers allowed per node. >>> >>> So I'd like to know if there's an easy solution to executing my logic on >>> my full dataset in Spark. >>> >>> Thanks! >>> >>> -- Elango >>> >> >> > -- Alexis GILLAIN