The idea is to partition both datasets by range. Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: [1,2,3] and p2: [4,5,6]. Each partition is given to a different instance of a MapPartition operator (this is a bit tricky, because you cannot use broadcastSet. You could load the corresponding partition it in the open() function from HDFS for example).
DataSet B is partitioned in the same way, i.e., all elements <= 3 go to partition 1, everything > 3 goes to p2. You can partition a dataset by range using the partitionCustom() function. The partitioned dataset is given to the mapPartition operator that loaded a partition of dataset A in each task instance. You do the counting just like before (sorting the partition of dataset A, binary sort, long[]), but add an additional count for the complete partition (basically count all elements that arrive in the task instance). If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7]. Now you need to compute the final count by adding the "all" counts of the lower partitions to the counts of the "higher" partitions, i.e., add all:5 of p1 to all counts for p2. This approach requires to know the value range and distribution of the values which makes it a bit difficult. I guess you'll get the best performance, if you partition in a way, that you have about equally sized partitions of dataset B with the constraint that the corresponding partitions of A fit into memory. As I said, its a bit cumbersome. I hope you could follow my explanation. Please ask if something is not clear ;-) 2015-09-30 10:46 GMT+02:00 Pieter Hameete <phame...@gmail.com>: > Hi Fabian, > > thanks for your tips! > > Do you have some pointers for getting started with the 'tricky range > partitioning'? I am quite keen to get this working with large datasets ;-) > > Cheers, > > Pieter > > 2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Pieter, >> >> cross is indeed too expensive for this task. >> >> If dataset A fits into memory, you can do the following: Use a >> RichMapPartitionFunction to process dataset B and add dataset A as a >> broadcastSet. In the open method of mapPartition, you can load the >> broadcasted set and sort it by a.propertyX and initialize a long[] for the >> counts. For each element of dataset B, you do a binary search on the sorted >> dataset A and increase all counts up to the position in the sorted list. >> After all elements of dataset B have been processed, return the counts from >> the long[]. >> >> If dataset A doesn't fit into memory, things become more cumbersome and >> we need to play some tricky with range partitioning... >> >> Let me know, if you have questions, >> Fabian >> >> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >> >>> Good day everyone, >>> >>> I am looking for a good way to do the following: >>> >>> I have dataset A and dataset B, and for each element in dataset A I >>> would like to filter dataset B and obtain the size of the result. To say it >>> short: >>> >>> *for each element a in A -> B.filter( _ < a.propertyx).count* >>> >>> Currently I am doing a cross of dataset A and B, making tuples so I can >>> then filter all the tuples where field2 < field1.propertya and then group >>> by field1.id and get the sizes of the groups.However this is not >>> working out in practice. When the datasets get larger, some Tasks hang on >>> the CHAIN Cross -> Filter probably because there is insufficient memory for >>> the cross to be completed? >>> >>> Does anyone have a suggestion on how I could make this work, especially >>> with datasets that are larger than memory available to a separate Task? >>> >>> Thank you in advance for your time :-) >>> >>> Kind regards, >>> >>> Pieter Hameete >>> >> >> >