Hi Pieter,

cross is indeed too expensive for this task.

If dataset A fits into memory, you can do the following: Use a
RichMapPartitionFunction to process dataset B and add dataset A as a
broadcastSet. In the open method of mapPartition, you can load the
broadcasted set and sort it by a.propertyX and initialize a long[] for the
counts. For each element of dataset B, you do a binary search on the sorted
dataset A and increase all counts up to the position in the sorted list.
After all elements of dataset B have been processed, return the counts from
the long[].

If dataset A doesn't fit into memory, things become more cumbersome and we
need to play some tricky with range partitioning...

Let me know, if you have questions,
Fabian

2015-09-29 16:59 GMT+02:00 Pieter Hameete <phame...@gmail.com>:

> Good day everyone,
>
> I am looking for a good way to do the following:
>
> I have dataset A and dataset B, and for each element in dataset A I would
> like to filter dataset B and obtain the size of the result. To say it short:
>
> *for each element a in A -> B.filter( _ < a.propertyx).count*
>
> Currently I am doing a cross of dataset A and B, making tuples so I can
> then filter all the tuples where field2 < field1.propertya and then group
> by field1.id and get the sizes of the groups.However this is not working
> out in practice. When the datasets get larger, some Tasks hang on the CHAIN
> Cross -> Filter probably because there is insufficient memory for the cross
> to be completed?
>
> Does anyone have a suggestion on how I could make this work, especially
> with datasets that are larger than memory available to a separate Task?
>
> Thank you in advance for your time :-)
>
> Kind regards,
>
> Pieter Hameete
>

Reply via email to