On Thu, Aug 28, 2014 at 7:00 AM, Rok Roskar <[email protected]> wrote:
> I've got an RDD where each element is a long string (a whole document). I'm 
> using pyspark so some of the handy partition-handling functions aren't 
> available, and I count the number of elements in each partition with:
>
> def count_partitions(id, iterator):
>     c = sum(1 for _ in iterator)
>     yield (id,c)
>
>> rdd.mapPartitionsWithSplit(count_partitions).collectAsMap()
>
> This returns the following:
>
> {0: 866, 1: 1158, 2: 828, 3: 876}
>
> But if I do:
>
>> rdd.repartition(8).mapPartitionsWithSplit(count_partitions).collectAsMap()
>
> I get
>
> {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 3594, 7: 134}
>
> Why this strange redistribution of elements? I'm obviously misunderstanding 
> how spark does the partitioning -- is it a problem with having a list of 
> strings as an RDD?

This imbalance was introduce by BatchedDeserializer. By default,
Python elements in RDD are serialized by pickle in batch (1024
elements in one batch),
so in the view of Scala, it only see one or two element of Array[Byte]
in the RDD, then imbalance happened.

To fix this, you could change the default batchSize to 10 (or less) or
reserialize your RDD as in unbatched mode, for example:

sc = SparkContext(batchSize=10)
rdd = sc.textFile().repartition(8)

OR

rdd._reserialize(PickleSerializer()).repartition(8)

PS: _reserialize() is not an public API, so it may be changed in the future.

Davies

> Help vey much appreciated!
>
> Thanks,
>
> Rok
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to