Hello, Alternatively, if dataset B fits in memory, but dataset A doesn't, then you can do it with broadcasting B to a RichMapPartitionFunction on A: In the open method of mapPartition, you sort B. Then, for each element of A, you do a binary search in B, and look at the index found by the binary search, which will be the count that you are looking for.
Best, Gabor 2015-09-30 11:20 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > The idea is to partition both datasets by range. > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: > [1,2,3] and p2: [4,5,6]. > Each partition is given to a different instance of a MapPartition operator > (this is a bit tricky, because you cannot use broadcastSet. You could load > the corresponding partition it in the open() function from HDFS for > example). > > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to > partition 1, everything > 3 goes to p2. You can partition a dataset by range > using the partitionCustom() function. The partitioned dataset is given to > the mapPartition operator that loaded a partition of dataset A in each task > instance. > You do the counting just like before (sorting the partition of dataset A, > binary sort, long[]), but add an additional count for the complete partition > (basically count all elements that arrive in the task instance). > > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7]. > Now you need to compute the final count by adding the "all" counts of the > lower partitions to the counts of the "higher" partitions, i.e., add all:5 > of p1 to all counts for p2. > > This approach requires to know the value range and distribution of the > values which makes it a bit difficult. I guess you'll get the best > performance, if you partition in a way, that you have about equally sized > partitions of dataset B with the constraint that the corresponding > partitions of A fit into memory. > > As I said, its a bit cumbersome. I hope you could follow my explanation. > Please ask if something is not clear ;-) > > 2015-09-30 10:46 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >> >> Hi Fabian, >> >> thanks for your tips! >> >> Do you have some pointers for getting started with the 'tricky range >> partitioning'? I am quite keen to get this working with large datasets ;-) >> >> Cheers, >> >> Pieter >> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>> >>> Hi Pieter, >>> >>> cross is indeed too expensive for this task. >>> >>> If dataset A fits into memory, you can do the following: Use a >>> RichMapPartitionFunction to process dataset B and add dataset A as a >>> broadcastSet. In the open method of mapPartition, you can load the >>> broadcasted set and sort it by a.propertyX and initialize a long[] for the >>> counts. For each element of dataset B, you do a binary search on the sorted >>> dataset A and increase all counts up to the position in the sorted list. >>> After all elements of dataset B have been processed, return the counts from >>> the long[]. >>> >>> If dataset A doesn't fit into memory, things become more cumbersome and >>> we need to play some tricky with range partitioning... >>> >>> Let me know, if you have questions, >>> Fabian >>> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >>>> >>>> Good day everyone, >>>> >>>> I am looking for a good way to do the following: >>>> >>>> I have dataset A and dataset B, and for each element in dataset A I >>>> would like to filter dataset B and obtain the size of the result. To say it >>>> short: >>>> >>>> for each element a in A -> B.filter( _ < a.propertyx).count >>>> >>>> Currently I am doing a cross of dataset A and B, making tuples so I can >>>> then filter all the tuples where field2 < field1.propertya and then group >>>> by >>>> field1.id and get the sizes of the groups.However this is not working out >>>> in >>>> practice. When the datasets get larger, some Tasks hang on the CHAIN Cross >>>> -> Filter probably because there is insufficient memory for the cross to be >>>> completed? >>>> >>>> Does anyone have a suggestion on how I could make this work, especially >>>> with datasets that are larger than memory available to a separate Task? >>>> >>>> Thank you in advance for your time :-) >>>> >>>> Kind regards, >>>> >>>> Pieter Hameete >>> >>> >> >