The more fundamental question is why doesn't groupByKey return RDD[(K, RDD[V])] instead of RDD[(K, Iterable[V])].
I wrote something like this (Yet to test. & I am not sure if this is even correct) I appreciate any suggestions/comments def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = { def createCombiner(v: V) = self.context.parallelize(Array(v)) def mergeValue(buf: RDD[V], v: V) = buf ++ self.context.parallelize(Array(v)) def mergeCombiners(c1: RDD[V], c2: RDD[V]) = c1 ++ c2 val bufs = combineByKey[RDD[V]]( createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) bufs } --Vivek On Mon, Jun 16, 2014 at 6:37 AM, Krishna Sankar <ksanka...@gmail.com> wrote: > Ian, > Yep, HLL is an appropriate mechanism. The countApproxDistinctByKey is > a wrapper around the > com.clearspring.analytics.stream.cardinality.HyperLogLogPlus. > Cheers > <k/> > > > On Sun, Jun 15, 2014 at 4:50 PM, Ian O'Connell <i...@ianoconnell.com> > wrote: > >> Depending on your requirements when doing hourly metrics calculating >> distinct cardinality, a much more scalable method would be to use a hyper >> log log data structure. >> a scala impl people have used with spark would be >> https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala >> >> >> On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman < >> suren.hira...@velos.io> wrote: >> >>> Vivek, >>> >>> If the foldByKey solution doesn't work for you, my team uses >>> RDD.persist(DISK_ONLY) to avoid OOM errors. >>> >>> It's slower, of course, and requires tuning other config parameters. It >>> can also be a problem if you do not have enough disk space, meaning that >>> you have to unpersist at the right points if you are running long flows. >>> >>> For us, even though the disk writes are a performance hit, we prefer the >>> Spark programming model to Hadoop M/R. But we are still working on getting >>> this to work end to end on 100s of GB of data on our 16-node cluster. >>> >>> Suren >>> >>> >>> >>> On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS <vivek...@gmail.com> wrote: >>> >>>> Thanks for the input. I will give foldByKey a shot. >>>> >>>> The way I am doing is, data is partitioned hourly. So I am computing >>>> distinct values hourly. Then I use unionRDD to merge them and compute >>>> distinct on the overall data. >>>> >>>> > Is there a way to know which key,value pair is resulting in the OOM ? >>>> > Is there a way to set parallelism in the map stage so that, each >>>> worker will process one key at time. ? >>>> >>>> I didn't realise countApproxDistinctByKey is using hyperloglogplus. >>>> This should be interesting. >>>> >>>> --Vivek >>>> >>>> >>>> On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen <so...@cloudera.com> wrote: >>>> >>>>> Grouping by key is always problematic since a key might have a huge >>>>> number of values. You can do a little better than grouping *all* values >>>>> and >>>>> *then* finding distinct values by using foldByKey, putting values into a >>>>> Set. At least you end up with only distinct values in memory. (You don't >>>>> need two maps either, right?) >>>>> >>>>> If the number of distinct values is still huge for some keys, consider >>>>> the experimental method countApproxDistinctByKey: >>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285 >>>>> >>>>> This should be much more performant at the cost of some accuracy. >>>>> >>>>> >>>>> On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS <vivek...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> For last couple of days I have been trying hard to get around this >>>>>> problem. Please share any insights on solving this problem. >>>>>> >>>>>> Problem : >>>>>> There is a huge list of (key, value) pairs. I want to transform this >>>>>> to (key, distinct values) and then eventually to (key, distinct values >>>>>> count) >>>>>> >>>>>> On small dataset >>>>>> >>>>>> groupByKey().map( x => (x_1, x._2.distinct)) ...map(x => (x_1, >>>>>> x._2.distinct.count)) >>>>>> >>>>>> On large data set I am getting OOM. >>>>>> >>>>>> Is there a way to represent Seq of values from groupByKey as RDD and >>>>>> then perform distinct over it ? >>>>>> >>>>>> Thanks >>>>>> Vivek >>>>>> >>>>> >>>>> >>>> >>> >>> >>> -- >>> >>> SUREN HIRAMAN, VP TECHNOLOGY >>> Velos >>> Accelerating Machine Learning >>> >>> 440 NINTH AVENUE, 11TH FLOOR >>> NEW YORK, NY 10001 >>> O: (917) 525-2466 ext. 105 >>> F: 646.349.4063 >>> E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io >>> W: www.velos.io >>> >>> >> >