Re: intermediate result reuse

2015-09-14 Thread Stephan Ewen
ToSet should be good to use. By default, the Iterators stream data (across memory, network, and disk), which allows you to use very large groups (larger than memory). With ToSet, your group naturally has to fit into memory. But in most cases it will ;-) On Mon, Sep 14, 2015 at 11:06 PM, Fabian H

Re: intermediate result reuse

2015-09-14 Thread Fabian Hueske
Ah, sorry :-) toSet, toList, etc. are regular methods of Scala's Iterator API [1] and not part of Flink's API although the concrete iterator is provided by Flink. I am not a Scala expert, but I think it will eagerly fetch the contents of the function's iterator into a set (or list). This call is pa

Re: intermediate result reuse

2015-09-14 Thread Michele Bertoni
sorry i was not talking about that collect, I know what a collector is I was talking about the outer join case where inside a cogroup you should do a ToSet on left or right side and collect it to be traversable multiple times with a toSet it is transforming (something like) a lazy iterator to a l

Re: intermediate result reuse

2015-09-14 Thread Fabian Hueske
Hi Michele, collect on DataSet and collect on a Collector within a Function are two different things and have the same name by coincidence (actually, this is the first time I noticed that). DataSet.collect() fetches a DataSet which can be distributed across several TaskManagers (in a cluster) to

Re: intermediate result reuse

2015-09-14 Thread Michele Bertoni
Hi Stephan, I have one more question: what happens when I do collect inside a cogroup (i.e. doing an outer join) or in a groupreduce? Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen mailto:se...@apache.org>> ha scritto: Hi! In most places where you use collect(), you should be able to use

Re: intermediate result reuse

2015-09-12 Thread Stephan Ewen
Hi! In most places where you use collect(), you should be able to use a broadcast variable to the same extend. This keeps the plan as one DAG, executed in one unit, so no re-computation will happen. Intermediate result caching is actually a work that has been in progress for a while, but has stal

Re: intermediate result reuse

2015-09-12 Thread Michele Bertoni
ok, I think I got the point: I don’t have two execute but a collect in some branch I will look for a way to remove it What I am doing is to keep all the elements of A that as value equal to something in B, where B (at this point) is very small Is it better to collect or a cogroup? btw is some

Re: intermediate result reuse

2015-09-12 Thread Stephan Ewen
Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called. Beware that there are three exceptions: - count() - collect() - print() These functions trigger an immediate program execution (they are "eager" functions). They will execute all that

Re: intermediate result reuse

2015-09-12 Thread Fabian Hueske
Hi Michele, Flink programs can have multiple sinks. In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time. So in this case, there is no need to materialize the intermediate result a. If you call execu