The more I'm thinking about this- I may try this instead:

val myChunkedRDD: RDD[List[Event]] = inputRDD.mapPartitions(_
.grouped(300).toList)

I wonder if this would work. I'll try it when I get back to work tomorrow.


Yuyhao, I tried your approach too but it seems to be somehow moving all the
data to a single partition (no matter what window I set) and it seems to
lock up my jobs. I waited for 15 minutes for a stage that usually takes
about 15 seconds and I finally just killed the job in yarn.

On Thu, Feb 12, 2015 at 4:40 PM, Corey Nolet <cjno...@gmail.com> wrote:

> So I tried this:
>
> .mapPartitions(itr => {
>     itr.grouped(300).flatMap(items => {
>         myFunction(items)
>     })
> })
>
> and I tried this:
>
> .mapPartitions(itr => {
>     itr.grouped(300).flatMap(myFunction)
> })
>
>  I tried making myFunction a method, a function val, and even moving it
> into a singleton object.
>
> The closure cleaner throws Task not serliazable exceptions with a distance
> outer class whenever I do this.
>
> Just to test, I tried this:
>
> .flatMap(it => myFunction(Seq(it)))
>
> And it worked just fine. What am I doing wrong here?
>
> Also, my function is a little more complicated and it does take arguments
> that depend on the class actually manipulating the RDD- but why would it
> work fine with a single flatMap and not with mapPartitions? I am somewhat
> new to Scala and maybe I'm missing something here.
>
> On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra <m...@clearstorydata.com>
> wrote:
>
>> No, only each group should need to fit.
>>
>> On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> Doesn't iter still need to fit entirely into memory?
>>>
>>> On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra <m...@clearstorydata.com>
>>> wrote:
>>>
>>>> rdd.mapPartitions { iter =>
>>>>   val grouped = iter.grouped(batchSize)
>>>>   for (group <- grouped) { ... }
>>>> }
>>>>
>>>> On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet <cjno...@gmail.com> wrote:
>>>>
>>>>> I think the word "partition" here is a tad different than the term
>>>>> "partition" that we use in Spark. Basically, I want something similar to
>>>>> Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
>>>>> want to run an algorithm that can be optimized by working on 30 people at 
>>>>> a
>>>>> time, I'd like to be able to say:
>>>>>
>>>>> val rdd: RDD[People] = .....
>>>>> val partitioned: RDD[Seq[People]] = rdd.partition(30)....
>>>>>
>>>>> I also don't want any shuffling- everything can still be processed
>>>>> locally.
>>>>>
>>>>>
>>>>> [1]
>>>>> http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to