Hi, @Gwen, sorry that I missed the cross function and showed you the wrong way. @Fabian's answers are what I mean.
Considering that the cross function is so expensive, can we find a way to restrict the broadcast. That is, if the groupBy function is a many-to-one mapping, the cross function is an all-to-all mapping, is it possible to define a many-to-many mapping function that broadcasts shapes to more than one (but not all) index area? Best, Xingcan On Thu, Feb 23, 2017 at 7:07 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Gwen, > > sorry I didn't read your answer, I was still writing mine when you sent > yours ;-) > > Regarding your strategy, this is basically what Cross does: > It keeps on input partitioned and broadcasts (replicates) the other one. > On each partition, it combines the records of the partition of the first > input with all records of the replicated second input. > I think this is what you describe as well, right? > > As I wrote before, this approach is quadratic and does not scale to large > data sizes. > I would recommend to look into spatial partitioning. Otherwise, I do not > see how the problem can be solved for large data sets. > > Best, Fabian > > 2017-02-23 12:00 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi, >> >> Flink's batch DataSet API does already support (manual) theta-joins via >> the CrossFunction. It combines each pair of records of two input data sets. >> This is done by broadcasting (and hence replicating) one of the inputs. >> @Xingcan, so I think what you describe is already there. >> However, as I said before, it is often prohibitively expensive to >> compute. When you are at a point, where a MapFunction with broadcast set is >> not longer sufficient (the smaller data set does not fit into memory), >> you're problem is often too big too compute. >> The complexity of a Cartesian product (Cross) is simply quadratic. >> >> Regarding the specific problem of joining spatial shapes and points, I >> would go with a spatial partitioning as follows: >> - Partition the space and compute for each shape into which partitions it >> belongs (could be more than one). >> - Do the same for the points (will be exactly one). >> - Do a 1-n join on the partition ids + an additional check if the point >> is actually in the shape. >> >> The challenge here is to have partitions of similar size. >> >> Cheers, Fabian >> >> 2017-02-23 5:59 GMT+01:00 Xingcan Cui <xingc...@gmail.com>: >> >>> Hi all, >>> >>> @Gwen From the database's point of view, the only way to avoid Cartesian >>> product in join is to use index, which exhibits as key grouping in Flink. >>> However, it only supports many-to-one mapping now, i.e., a shape or a point >>> can only be distributed to a single group. Only points and shapes belonging >>> to the same group can be joined and that could reduce the inherent pair >>> comparisons (compared with a Cartesian product). It's perfectly >>> suitable for equi-join. >>> >>> @Fabian I saw this thread when I was just considering about theta-join >>> (which will eventually be supported) in Flink. Since it's impossible to >>> group (index) a dataset for an arbitrary theta-join, I think we may need >>> some duplication mechanism here. For example, split a dataset into n parts >>> and send the other dataset to all of these parts. This could be more useful >>> in stream join. BTW, it seems that I've seen another thread discussing >>> about this, but can not find it now. What do you think? >>> >>> Best, >>> Xingcan >>> >>> On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Gwen, >>>> >>>> Flink usually performs a block nested loop join to cross two data sets. >>>> This algorithm spills one input to disk and streams the other input. >>>> For each input it fills a memory buffer and to perform the cross. Then the >>>> buffer of the spilled input is refilled with spilled records and records >>>> are again crossed. This is done until one iteration over the spill records >>>> is done. Then the other buffer of the streamed input is filled with the >>>> next records. >>>> >>>> You should be aware that cross is a super expensive operation, >>>> especially if you evaluate a complex condition for each pair of records. So >>>> cross can be easily too expensive to compute. >>>> For such use cases it is usually better to apply a coarse-grained >>>> spatial partitioning and do a key-based join on the partitions. Within each >>>> partition you'd perform a cross. >>>> >>>> Best, Fabian >>>> >>>> >>>> 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers < >>>> gwenhael.pasqui...@ericsson.com>: >>>> >>>>> Hi, >>>>> >>>>> >>>>> >>>>> I need (or at least I think I do) to do a cross operation between two >>>>> huge datasets. One dataset is a list of points. The other one is a list of >>>>> shapes (areas). >>>>> >>>>> >>>>> >>>>> I want to know, for each point, the areas (they might overlap so a >>>>> point can be in multiple areas) it belongs to so I thought I’d “cross” my >>>>> points and areas since I need to test each point against each area. >>>>> >>>>> >>>>> >>>>> I tried it and my job stucks seems to work for some seconds then, at >>>>> some point, it stucks. >>>>> >>>>> >>>>> >>>>> I’m wondering if Flink, for cross operations, tries to load one of the >>>>> two datasets into RAM or if it’s able to split the job in multiple >>>>> iterations (even if it means reading one of the two datasets multiple >>>>> times). >>>>> >>>>> >>>>> >>>>> Or maybe I’m going at it the wrong way, or missing some parameters, >>>>> feel free to correct me J >>>>> >>>>> >>>>> >>>>> I’m using flink 1.0.1. >>>>> >>>>> >>>>> >>>>> Thanks in advance >>>>> >>>>> >>>>> >>>>> Gwen’ >>>>> >>>> >>>> >>> >> >