The more I'm thinking about this- I may try this instead: val myChunkedRDD: RDD[List[Event]] = inputRDD.mapPartitions(_ .grouped(300).toList)
I wonder if this would work. I'll try it when I get back to work tomorrow. Yuyhao, I tried your approach too but it seems to be somehow moving all the data to a single partition (no matter what window I set) and it seems to lock up my jobs. I waited for 15 minutes for a stage that usually takes about 15 seconds and I finally just killed the job in yarn. On Thu, Feb 12, 2015 at 4:40 PM, Corey Nolet <cjno...@gmail.com> wrote: > So I tried this: > > .mapPartitions(itr => { > itr.grouped(300).flatMap(items => { > myFunction(items) > }) > }) > > and I tried this: > > .mapPartitions(itr => { > itr.grouped(300).flatMap(myFunction) > }) > > I tried making myFunction a method, a function val, and even moving it > into a singleton object. > > The closure cleaner throws Task not serliazable exceptions with a distance > outer class whenever I do this. > > Just to test, I tried this: > > .flatMap(it => myFunction(Seq(it))) > > And it worked just fine. What am I doing wrong here? > > Also, my function is a little more complicated and it does take arguments > that depend on the class actually manipulating the RDD- but why would it > work fine with a single flatMap and not with mapPartitions? I am somewhat > new to Scala and maybe I'm missing something here. > > On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra <m...@clearstorydata.com> > wrote: > >> No, only each group should need to fit. >> >> On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet <cjno...@gmail.com> wrote: >> >>> Doesn't iter still need to fit entirely into memory? >>> >>> On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra <m...@clearstorydata.com> >>> wrote: >>> >>>> rdd.mapPartitions { iter => >>>> val grouped = iter.grouped(batchSize) >>>> for (group <- grouped) { ... } >>>> } >>>> >>>> On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet <cjno...@gmail.com> wrote: >>>> >>>>> I think the word "partition" here is a tad different than the term >>>>> "partition" that we use in Spark. Basically, I want something similar to >>>>> Guava's Iterables.partition [1], that is, If I have an RDD[People] and I >>>>> want to run an algorithm that can be optimized by working on 30 people at >>>>> a >>>>> time, I'd like to be able to say: >>>>> >>>>> val rdd: RDD[People] = ..... >>>>> val partitioned: RDD[Seq[People]] = rdd.partition(30).... >>>>> >>>>> I also don't want any shuffling- everything can still be processed >>>>> locally. >>>>> >>>>> >>>>> [1] >>>>> http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int) >>>>> >>>> >>>> >>> >> >