Hi Pieter, cross is indeed too expensive for this task.
If dataset A fits into memory, you can do the following: Use a RichMapPartitionFunction to process dataset B and add dataset A as a broadcastSet. In the open method of mapPartition, you can load the broadcasted set and sort it by a.propertyX and initialize a long[] for the counts. For each element of dataset B, you do a binary search on the sorted dataset A and increase all counts up to the position in the sorted list. After all elements of dataset B have been processed, return the counts from the long[]. If dataset A doesn't fit into memory, things become more cumbersome and we need to play some tricky with range partitioning... Let me know, if you have questions, Fabian 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phame...@gmail.com>: > Good day everyone, > > I am looking for a good way to do the following: > > I have dataset A and dataset B, and for each element in dataset A I would > like to filter dataset B and obtain the size of the result. To say it short: > > *for each element a in A -> B.filter( _ < a.propertyx).count* > > Currently I am doing a cross of dataset A and B, making tuples so I can > then filter all the tuples where field2 < field1.propertya and then group > by field1.id and get the sizes of the groups.However this is not working > out in practice. When the datasets get larger, some Tasks hang on the CHAIN > Cross -> Filter probably because there is insufficient memory for the cross > to be completed? > > Does anyone have a suggestion on how I could make this work, especially > with datasets that are larger than memory available to a separate Task? > > Thank you in advance for your time :-) > > Kind regards, > > Pieter Hameete >