If one of your inputs fits into memory, using side inputs is
definitely the way to go. If neither side fits into memory, the cross
product may be prohibitively large to compute even on a distributed
computing platform (a billion times a billion is big, though I suppose
one may hit memory limits with fewer elements if the elements
themselves are large) but one can still do the partitioning hack. E.g.

partitions = pcoll_B | beam.Partition(hash, N)
cross_product = tuple([
  pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs],
beam.pvalue.AsList(part))
  for part in partitions
]) | beam.Flatten()

One may need to break fusion before the FlatMap to avoid trying to
pull all parts into memory, e.g.

class FusionBreak(beam.PTransform()):
  def expand(self, pcoll):
    empty = pcoll | beam.FlatMap(lambda x: ())
    return pcoll | beam.Map(lambda x, _: x, beam.pvalue.AsList(empty))

On Mon, Sep 19, 2022 at 8:34 AM Brian Hulette via dev
<dev@beam.apache.org> wrote:
>
> In SQL we just don't support cross joins currently [1]. I'm not aware of an 
> existing implementation of a cross join/cartesian product.
>
> > My team has an internal implementation of a CartesianProduct transform, 
> > based on using hashing to split a pcollection into a finite number of 
> > groups and CoGroupByKey.
>
> Could this be contributed to Beam?
>
> > On the other hand, if any of the input pcollections are small, using side 
> > inputs would probably be the way to go to avoid the need for a shuffle.
>
> We run into this problem frequently in Beam SQL. Our optimizer could be much 
> more effective with accurate size estimates, but we rarely have them, and 
> they may never be good enough for us to select a side input implementation 
> over CoGroupByKey. I've had some offline discussions in this space, the best 
> solution we've come up with is to allow hints in SQL (or just arguments in 
> join transforms) that allow users to select a side input implementation. We 
> could also add some logging when a pipeline uses a CoGroupByKey and 
> PCollection sizes could be handled by a side input implementation, to nudge 
> users that way for future runs.
>
> Brian
>
> [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/
>
> On Mon, Sep 19, 2022 at 8:01 AM Stephan Hoyer via dev <dev@beam.apache.org> 
> wrote:
>>
>> I'm wondering if it would make sense to have a built-in Beam transformation 
>> for calculating the Cartesian product of PCollections.
>>
>> Just this past week, I've encountered two separate cases where calculating a 
>> Cartesian product was a bottleneck. The in-memory option of using something 
>> like Python's itertools.product() is convenient, but it only scales to a 
>> single node.
>>
>> Unfortunately, implementing a scalable Cartesian product seems to be 
>> somewhat non-trivial. I found two version of this question on StackOverflow, 
>> but neither contains a code solution:
>> https://stackoverflow.com/questions/35008721/how-to-get-the-cartesian-product-of-two-pcollections
>> https://stackoverflow.com/questions/41050477/how-to-do-a-cartesian-product-of-two-pcollections-in-dataflow/
>>
>> There's a fair amount of nuance in an efficient and scalable implementation. 
>> My team has an internal implementation of a CartesianProduct transform, 
>> based on using hashing to split a pcollection into a finite number of groups 
>> and CoGroupByKey. On the other hand, if any of the input pcollections are 
>> small, using side inputs would probably be the way to go to avoid the need 
>> for a shuffle.
>>
>> Any thoughts?
>>
>> Cheers,
>> Stephan

Reply via email to