Hi Fabian,

I have a question regarding the first approach. Is there a benefit gained
from choosing a RichMapPartitionFunction over a RichMapFunction in this
case? I assume that each broadcasted dataset is sent only once to each task
manager?

If I would broadcast dataset B, then I could for each element a in A count
the number of elements in B that are smaller than a and output a tuple in a
map operation. This would also save me a step in aggregating the results?

Kind regards,

Pieter

2015-09-30 12:44 GMT+02:00 Pieter Hameete <phame...@gmail.com>:

> Hi Gabor, Fabian,
>
> thank you for your suggestions. I am intending to scale up so that I'm
> sure that both A and B won't fit in memory. I'll see if I can come up with
> a nice way to partition the datasets but if that will take too much time
> I'll just have to accept that it wont work on large datasets. I'll let you
> know if I managed to work something out, but I wont work on it until the
> weekend :-)
>
> Cheers again,
>
> Pieter
>
> 2015-09-30 12:28 GMT+02:00 Gábor Gévay <gga...@gmail.com>:
>
>> Hello,
>>
>> Alternatively, if dataset B fits in memory, but dataset A doesn't,
>> then you can do it with broadcasting B to a RichMapPartitionFunction
>> on A:
>> In the open method of mapPartition, you sort B. Then, for each element
>> of A, you do a binary search in B, and look at the index found by the
>> binary search, which will be the count that you are looking for.
>>
>> Best,
>> Gabor
>>
>>
>>
>> 2015-09-30 11:20 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>> > The idea is to partition both datasets by range.
>> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
>> > [1,2,3] and p2: [4,5,6].
>> > Each partition is given to a different instance of a MapPartition
>> operator
>> > (this is a bit tricky, because you cannot use broadcastSet. You could
>> load
>> > the corresponding partition it in the open() function from HDFS for
>> > example).
>> >
>> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
>> > partition 1, everything > 3 goes to p2. You can partition a dataset by
>> range
>> > using the partitionCustom() function. The partitioned dataset is given
>> to
>> > the mapPartition operator that loaded a partition of dataset A in each
>> task
>> > instance.
>> > You do the counting just like before (sorting the partition of dataset
>> A,
>> > binary sort, long[]), but add an additional count for the complete
>> partition
>> > (basically count all elements that arrive in the task instance).
>> >
>> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
>> would
>> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
>> > Now you need to compute the final count by adding the "all" counts of
>> the
>> > lower partitions to the counts of the "higher" partitions, i.e., add
>> all:5
>> > of p1 to all counts for p2.
>> >
>> > This approach requires to know the value range and distribution of the
>> > values which makes it a bit difficult. I guess you'll get the best
>> > performance, if you partition in a way, that you have about equally
>> sized
>> > partitions of dataset B with the constraint that the corresponding
>> > partitions of A fit into memory.
>> >
>> > As I said, its a bit cumbersome. I hope you could follow my explanation.
>> > Please ask if something is not clear ;-)
>> >
>> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
>> >>
>> >> Hi Fabian,
>> >>
>> >> thanks for your tips!
>> >>
>> >> Do you have some pointers for getting started with the 'tricky range
>> >> partitioning'? I am quite keen to get this working with large datasets
>> ;-)
>> >>
>> >> Cheers,
>> >>
>> >> Pieter
>> >>
>> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>> >>>
>> >>> Hi Pieter,
>> >>>
>> >>> cross is indeed too expensive for this task.
>> >>>
>> >>> If dataset A fits into memory, you can do the following: Use a
>> >>> RichMapPartitionFunction to process dataset B and add dataset A as a
>> >>> broadcastSet. In the open method of mapPartition, you can load the
>> >>> broadcasted set and sort it by a.propertyX and initialize a long[]
>> for the
>> >>> counts. For each element of dataset B, you do a binary search on the
>> sorted
>> >>> dataset A and increase all counts up to the position in the sorted
>> list.
>> >>> After all elements of dataset B have been processed, return the
>> counts from
>> >>> the long[].
>> >>>
>> >>> If dataset A doesn't fit into memory, things become more cumbersome
>> and
>> >>> we need to play some tricky with range partitioning...
>> >>>
>> >>> Let me know, if you have questions,
>> >>> Fabian
>> >>>
>> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
>> >>>>
>> >>>> Good day everyone,
>> >>>>
>> >>>> I am looking for a good way to do the following:
>> >>>>
>> >>>> I have dataset A and dataset B, and for each element in dataset A I
>> >>>> would like to filter dataset B and obtain the size of the result. To
>> say it
>> >>>> short:
>> >>>>
>> >>>> for each element a in A -> B.filter( _ < a.propertyx).count
>> >>>>
>> >>>> Currently I am doing a cross of dataset A and B, making tuples so I
>> can
>> >>>> then filter all the tuples where field2 < field1.propertya and then
>> group by
>> >>>> field1.id and get the sizes of the groups.However this is not
>> working out in
>> >>>> practice. When the datasets get larger, some Tasks hang on the CHAIN
>> Cross
>> >>>> -> Filter probably because there is insufficient memory for the
>> cross to be
>> >>>> completed?
>> >>>>
>> >>>> Does anyone have a suggestion on how I could make this work,
>> especially
>> >>>> with datasets that are larger than memory available to a separate
>> Task?
>> >>>>
>> >>>> Thank you in advance for your time :-)
>> >>>>
>> >>>> Kind regards,
>> >>>>
>> >>>> Pieter Hameete
>> >>>
>> >>>
>> >>
>> >
>>
>
>

Reply via email to