Hi and thanks for your answers !

I’m not sure I can define any index to split the workload since in my case any 
point could be in any zone...
I think I’m currently trying to do it the way you call “theta-join”:

1-      Trying to split one dataset over the cluster and prepare it for work 
against with the other one (ex: parse the shapes)

a.       Either using partitioning

b.       Either using N sources + filtering based on hash so I get 
complementary datasets

2-      Make my other dataset go “through” all the “splits” of the first one 
and enrich / filter it

a.       The dataset would probably have to be entirely read multiple times 
from hdfs (one time per “split”)

I have other ideas but I don’t know if it’s doable in flink.

Question:

Is there a way for a object (key selector, flatmap) to obtain (and wait for) 
the result of a previous dataset ? Only way I can think of is a “cross” between 
my one-record-dataset (the result) and the other dataset. But maybe that’s very 
bad regarding resources ?

I’d like to try using a flatmap that clones the dataset in N parts (adding a 
partition key 0 to N-1 to each record), then use partitioning to “dispatch” 
each clone of the dataset to a matching “shape matcher” partition; then I’d use 
cross to do the work, then group back the results together (in case N clones of 
a point were inside different shapes). Maybe that would split the workload of 
the cross by dividing the size of one of the two datasets member of that cross …

sorry for my rambling if I’m not clear.

B.R.


From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: jeudi 23 février 2017 06:00
To: user@flink.apache.org
Subject: Re: Cross operation on two huge datasets

Hi all,

@Gwen From the database's point of view, the only way to avoid Cartesian 
product in join is to use index, which exhibits as key grouping in Flink. 
However, it only supports many-to-one mapping now, i.e., a shape or a point can 
only be distributed to a single group. Only points and shapes belonging to the 
same group can be joined and that could reduce the inherent pair comparisons 
(compared with a Cartesian product). It's perfectly suitable for equi-join.

@Fabian I saw this thread when I was just considering about theta-join (which 
will eventually be supported) in Flink. Since it's impossible to group (index) 
a dataset for an arbitrary theta-join, I think we may need some duplication 
mechanism here. For example, split a dataset into n parts and send the other 
dataset to all of these parts. This could be more useful in stream join. BTW, 
it seems that I've seen another thread discussing about this, but can not find 
it now. What do you think?

Best,
Xingcan

On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske 
<fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote:
Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each 
input it fills a memory buffer and to perform the cross. Then the buffer of the 
spilled input is refilled with spilled records and records are again crossed. 
This is done until one iteration over the spill records is done. Then the other 
buffer of the streamed input is filled with the next records.
You should be aware that cross is a super expensive operation, especially if 
you evaluate a complex condition for each pair of records. So cross can be 
easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial 
partitioning and do a key-based join on the partitions. Within each partition 
you'd perform a cross.
Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

I need (or at least I think I do) to do a cross operation between two huge 
datasets. One dataset is a list of points. The other one is a list of shapes 
(areas).

I want to know, for each point, the areas (they might overlap so a point can be 
in multiple areas) it belongs to so I thought I’d “cross” my points and areas 
since I need to test each point against each area.

I tried it and my job stucks seems to work for some seconds then, at some 
point, it stucks.

I’m wondering if Flink, for cross operations, tries to load one of the two 
datasets into RAM or if it’s able to split the job in multiple iterations (even 
if it means reading one of the two datasets multiple times).

Or maybe I’m going at it the wrong way, or missing some parameters, feel free 
to correct me ☺

I’m using flink 1.0.1.

Thanks in advance

Gwen’


Reply via email to