
Alternatively, if dataset B fits in memory, but dataset A doesn't,
then you can do it with broadcasting B to a RichMapPartitionFunction
on A:
In the open method of mapPartition, you sort B. Then, for each element
of A, you do a binary search in B, and look at the index found by the
binary search, which will be the count that you are looking for.


2015-09-30 11:20 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
> The idea is to partition both datasets by range.
> Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> [1,2,3] and p2: [4,5,6].
> Each partition is given to a different instance of a MapPartition operator
> (this is a bit tricky, because you cannot use broadcastSet. You could load
> the corresponding partition it in the open() function from HDFS for
> example).
> DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> partition 1, everything > 3 goes to p2. You can partition a dataset by range
> using the partitionCustom() function. The partitioned dataset is given to
> the mapPartition operator that loaded a partition of dataset A in each task
> instance.
> You do the counting just like before (sorting the partition of dataset A,
> binary sort, long[]), but add an additional count for the complete partition
> (basically count all elements that arrive in the task instance).
> If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would
> be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> Now you need to compute the final count by adding the "all" counts of the
> lower partitions to the counts of the "higher" partitions, i.e., add all:5
> of p1 to all counts for p2.
> This approach requires to know the value range and distribution of the
> values which makes it a bit difficult. I guess you'll get the best
> performance, if you partition in a way, that you have about equally sized
> partitions of dataset B with the constraint that the corresponding
> partitions of A fit into memory.
> As I said, its a bit cumbersome. I hope you could follow my explanation.
> Please ask if something is not clear ;-)
> 2015-09-30 10:46 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
>> Hi Fabian,
>> thanks for your tips!
>> Do you have some pointers for getting started with the 'tricky range
>> partitioning'? I am quite keen to get this working with large datasets ;-)
>> Cheers,
>> Pieter
>> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>> Hi Pieter,
>>> cross is indeed too expensive for this task.
>>> If dataset A fits into memory, you can do the following: Use a
>>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> broadcastSet. In the open method of mapPartition, you can load the
>>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>>> counts. For each element of dataset B, you do a binary search on the sorted
>>> dataset A and increase all counts up to the position in the sorted list.
>>> After all elements of dataset B have been processed, return the counts from
>>> the long[].
>>> If dataset A doesn't fit into memory, things become more cumbersome and
>>> we need to play some tricky with range partitioning...
>>> Let me know, if you have questions,
>>> Fabian
>>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
>>>> Good day everyone,
>>>> I am looking for a good way to do the following:
>>>> I have dataset A and dataset B, and for each element in dataset A I
>>>> would like to filter dataset B and obtain the size of the result. To say it
>>>> short:
>>>> for each element a in A -> B.filter( _ < a.propertyx).count
>>>> Currently I am doing a cross of dataset A and B, making tuples so I can
>>>> then filter all the tuples where field2 < field1.propertya and then group 
>>>> by
>>>> field1.id and get the sizes of the groups.However this is not working out 
>>>> in
>>>> practice. When the datasets get larger, some Tasks hang on the CHAIN Cross
>>>> -> Filter probably because there is insufficient memory for the cross to be
>>>> completed?
>>>> Does anyone have a suggestion on how I could make this work, especially
>>>> with datasets that are larger than memory available to a separate Task?
>>>> Thank you in advance for your time :-)
>>>> Kind regards,
>>>> Pieter Hameete

Reply via email to