Ah, sorry :-) toSet, toList, etc. are regular methods of Scala's Iterator API [1] and not part of Flink's API although the concrete iterator is provided by Flink. I am not a Scala expert, but I think it will eagerly fetch the contents of the function's iterator into a set (or list). This call is part of the user function and executed just like any other call.
[1] http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator 2015-09-14 22:26 GMT+02:00 Michele Bertoni <michele1.bert...@mail.polimi.it> : > sorry i was not talking about that collect, I know what a collector is > I was talking about the outer join case where inside a cogroup you should > do a ToSet on left or right side and collect it to be traversable multiple > times > > with a toSet it is transforming (something like) a lazy iterator to a list > in memory: is it actually collecting something thus stopping execution or > is it something different? > > > > > > Il giorno 14/set/2015, alle ore 22:18, Fabian Hueske <fhue...@gmail.com> > ha scritto: > > Hi Michele, > > collect on DataSet and collect on a Collector within a Function are two > different things and have the same name by coincidence (actually, this is > the first time I noticed that). > > DataSet.collect() fetches a DataSet which can be distributed across > several TaskManagers (in a cluster) to you local client program. > Collector.collect() adds a value to the result of a function call. The > collector is used in function that can return an arbitrary number of > results (0 to n). > > Best, > Fabian > > 2015-09-14 20:58 GMT+02:00 Michele Bertoni < > michele1.bert...@mail.polimi.it>: > >> Hi Stephan, >> I have one more question: what happens when I do collect inside a cogroup >> (i.e. doing an outer join) or in a groupreduce? >> >> >> >> Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <se...@apache.org> >> ha scritto: >> >> Hi! >> >> In most places where you use collect(), you should be able to use a >> broadcast variable to the same extend. This keeps the plan as one DAG, >> executed in one unit, so no re-computation will happen. >> >> Intermediate result caching is actually a work that has been in progress >> for a while, but has stalled for a bit due to prioritization of some other >> issues. It will be resumed in the near future, definitely. Too many parts >> are already in place to not complete this feature... >> >> Greetings, >> Stephan >> >> >> On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni < >> michele1.bert...@mail.polimi.it> wrote: >> >>> ok, I think I got the point: I don’t have two execute but a collect in >>> some branch >>> I will look for a way to remove it >>> >>> >>> What I am doing is to keep all the elements of A that as value equal to >>> something in B, where B (at this point) is very small >>> Is it better to collect or a cogroup? >>> >>> >>> btw is something you expect to solve i further versions? >>> >>> >>> thanks >>> michele >>> >>> >>> >>> >>> Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <se...@apache.org> >>> ha scritto: >>> >>> Fabian has explained it well. All functions are executed lazily as one >>> DAG, when "env.execute()" is called. >>> >>> Beware that there are three exceptions: >>> - count() >>> - collect() >>> - print() >>> >>> These functions trigger an immediate program execution (they are "eager" >>> functions). They will execute all that is needed for produce their result. >>> Summing up: >>> >>> --------------------------- >>> >>> One execution in this case (result "a" is reused by "b" and "c") >>> >>> a = env.createInput() -> map() -> reduce() -> filter() >>> >>> b = a.flatmap() >>> c = a.groupBy() -> reduce() >>> >>> b.writeAsText() >>> c.writeAsCsv() >>> >>> env.execute(); >>> >>> --------------------------- >>> >>> Two executions in this case ("a" is computed twice, once for "b" and >>> once for "c") >>> >>> a = env.createInput() -> map() -> reduce() -> filter() >>> >>> b = a -> flatmap() -> count() >>> c = a -> groupBy() -> reduce().collect() >>> >>> --------------------------- >>> >>> Greetings, >>> Stephan >>> >>> >>> On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Michele, >>>> >>>> Flink programs can have multiple sinks. >>>> In your program, the intermediate result a will be streamed to both >>>> filters (b and c) at the same time and both sinks will be written at the >>>> same time. >>>> So in this case, there is no need to materialize the intermediate >>>> result a. >>>> >>>> If you call execute() after you defined b, the program will compute a >>>> and stream the result only to b. >>>> If you call execute() again after you defined c, the program will >>>> compute a again and stream the result to c. >>>> >>>> Summary: >>>> Flink programs can usually stream intermediate results without >>>> materializing them. There are a few cases where it needs to materialize >>>> intermediate results in order to avoid deadlocks, but these are fully >>>> transparently handled. >>>> It is not possible (yet!) to share results across program executions, >>>> i.e., whenever you call execute(). >>>> >>>> I suppose, you call execute() between defining b and c. If you execute >>>> that call, a will be computed once and both b and c are computed at the >>>> same time. >>>> >>>> Best, Fabian >>>> >>>> 2015-09-12 11:02 GMT+02:00 Michele Bertoni < >>>> michele1.bert...@mail.polimi.it>: >>>> >>>>> Hi everybody, I have a question about internal optimization >>>>> is flink able to reuse intermediate result that are used twice in the >>>>> graph? >>>>> >>>>> i.e. >>>>> a = readsource -> filter -> reduce -> something else even more >>>>> complicated >>>>> >>>>> b = a filter(something) >>>>> store b >>>>> >>>>> c = a filter(something else) >>>>> store c >>>>> >>>>> what happens to a? is it computed twice? >>>>> >>>>> in my read function I have a some logging commands and I see the >>>>> printed twice, but it sounds strange to me >>>>> >>>>> >>>>> >>>>> thanks >>>>> cheers >>>>> michele >>>> >>>> >>>> >>> >>> >> >> > >