Hi Fabian, I have a question regarding the first approach. Is there a benefit gained from choosing a RichMapPartitionFunction over a RichMapFunction in this case? I assume that each broadcasted dataset is sent only once to each task manager?
If I would broadcast dataset B, then I could for each element a in A count the number of elements in B that are smaller than a and output a tuple in a map operation. This would also save me a step in aggregating the results? Kind regards, Pieter 2015-09-30 12:44 GMT+02:00 Pieter Hameete <phame...@gmail.com>: > Hi Gabor, Fabian, > > thank you for your suggestions. I am intending to scale up so that I'm > sure that both A and B won't fit in memory. I'll see if I can come up with > a nice way to partition the datasets but if that will take too much time > I'll just have to accept that it wont work on large datasets. I'll let you > know if I managed to work something out, but I wont work on it until the > weekend :-) > > Cheers again, > > Pieter > > 2015-09-30 12:28 GMT+02:00 Gábor Gévay <gga...@gmail.com>: > >> Hello, >> >> Alternatively, if dataset B fits in memory, but dataset A doesn't, >> then you can do it with broadcasting B to a RichMapPartitionFunction >> on A: >> In the open method of mapPartition, you sort B. Then, for each element >> of A, you do a binary search in B, and look at the index found by the >> binary search, which will be the count that you are looking for. >> >> Best, >> Gabor >> >> >> >> 2015-09-30 11:20 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> > The idea is to partition both datasets by range. >> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: >> > [1,2,3] and p2: [4,5,6]. >> > Each partition is given to a different instance of a MapPartition >> operator >> > (this is a bit tricky, because you cannot use broadcastSet. You could >> load >> > the corresponding partition it in the open() function from HDFS for >> > example). >> > >> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to >> > partition 1, everything > 3 goes to p2. You can partition a dataset by >> range >> > using the partitionCustom() function. The partitioned dataset is given >> to >> > the mapPartition operator that loaded a partition of dataset A in each >> task >> > instance. >> > You do the counting just like before (sorting the partition of dataset >> A, >> > binary sort, long[]), but add an additional count for the complete >> partition >> > (basically count all elements that arrive in the task instance). >> > >> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 >> would >> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7]. >> > Now you need to compute the final count by adding the "all" counts of >> the >> > lower partitions to the counts of the "higher" partitions, i.e., add >> all:5 >> > of p1 to all counts for p2. >> > >> > This approach requires to know the value range and distribution of the >> > values which makes it a bit difficult. I guess you'll get the best >> > performance, if you partition in a way, that you have about equally >> sized >> > partitions of dataset B with the constraint that the corresponding >> > partitions of A fit into memory. >> > >> > As I said, its a bit cumbersome. I hope you could follow my explanation. >> > Please ask if something is not clear ;-) >> > >> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >> >> >> >> Hi Fabian, >> >> >> >> thanks for your tips! >> >> >> >> Do you have some pointers for getting started with the 'tricky range >> >> partitioning'? I am quite keen to get this working with large datasets >> ;-) >> >> >> >> Cheers, >> >> >> >> Pieter >> >> >> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> >>> >> >>> Hi Pieter, >> >>> >> >>> cross is indeed too expensive for this task. >> >>> >> >>> If dataset A fits into memory, you can do the following: Use a >> >>> RichMapPartitionFunction to process dataset B and add dataset A as a >> >>> broadcastSet. In the open method of mapPartition, you can load the >> >>> broadcasted set and sort it by a.propertyX and initialize a long[] >> for the >> >>> counts. For each element of dataset B, you do a binary search on the >> sorted >> >>> dataset A and increase all counts up to the position in the sorted >> list. >> >>> After all elements of dataset B have been processed, return the >> counts from >> >>> the long[]. >> >>> >> >>> If dataset A doesn't fit into memory, things become more cumbersome >> and >> >>> we need to play some tricky with range partitioning... >> >>> >> >>> Let me know, if you have questions, >> >>> Fabian >> >>> >> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >> >>>> >> >>>> Good day everyone, >> >>>> >> >>>> I am looking for a good way to do the following: >> >>>> >> >>>> I have dataset A and dataset B, and for each element in dataset A I >> >>>> would like to filter dataset B and obtain the size of the result. To >> say it >> >>>> short: >> >>>> >> >>>> for each element a in A -> B.filter( _ < a.propertyx).count >> >>>> >> >>>> Currently I am doing a cross of dataset A and B, making tuples so I >> can >> >>>> then filter all the tuples where field2 < field1.propertya and then >> group by >> >>>> field1.id and get the sizes of the groups.However this is not >> working out in >> >>>> practice. When the datasets get larger, some Tasks hang on the CHAIN >> Cross >> >>>> -> Filter probably because there is insufficient memory for the >> cross to be >> >>>> completed? >> >>>> >> >>>> Does anyone have a suggestion on how I could make this work, >> especially >> >>>> with datasets that are larger than memory available to a separate >> Task? >> >>>> >> >>>> Thank you in advance for your time :-) >> >>>> >> >>>> Kind regards, >> >>>> >> >>>> Pieter Hameete >> >>> >> >>> >> >> >> > >> > >