Hi Michele, collect on DataSet and collect on a Collector within a Function are two different things and have the same name by coincidence (actually, this is the first time I noticed that).
DataSet.collect() fetches a DataSet which can be distributed across several TaskManagers (in a cluster) to you local client program. Collector.collect() adds a value to the result of a function call. The collector is used in function that can return an arbitrary number of results (0 to n). Best, Fabian 2015-09-14 20:58 GMT+02:00 Michele Bertoni <michele1.bert...@mail.polimi.it> : > Hi Stephan, > I have one more question: what happens when I do collect inside a cogroup > (i.e. doing an outer join) or in a groupreduce? > > > > Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <se...@apache.org> ha > scritto: > > Hi! > > In most places where you use collect(), you should be able to use a > broadcast variable to the same extend. This keeps the plan as one DAG, > executed in one unit, so no re-computation will happen. > > Intermediate result caching is actually a work that has been in progress > for a while, but has stalled for a bit due to prioritization of some other > issues. It will be resumed in the near future, definitely. Too many parts > are already in place to not complete this feature... > > Greetings, > Stephan > > > On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni < > michele1.bert...@mail.polimi.it> wrote: > >> ok, I think I got the point: I don’t have two execute but a collect in >> some branch >> I will look for a way to remove it >> >> >> What I am doing is to keep all the elements of A that as value equal to >> something in B, where B (at this point) is very small >> Is it better to collect or a cogroup? >> >> >> btw is something you expect to solve i further versions? >> >> >> thanks >> michele >> >> >> >> >> Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <se...@apache.org> >> ha scritto: >> >> Fabian has explained it well. All functions are executed lazily as one >> DAG, when "env.execute()" is called. >> >> Beware that there are three exceptions: >> - count() >> - collect() >> - print() >> >> These functions trigger an immediate program execution (they are "eager" >> functions). They will execute all that is needed for produce their result. >> Summing up: >> >> --------------------------- >> >> One execution in this case (result "a" is reused by "b" and "c") >> >> a = env.createInput() -> map() -> reduce() -> filter() >> >> b = a.flatmap() >> c = a.groupBy() -> reduce() >> >> b.writeAsText() >> c.writeAsCsv() >> >> env.execute(); >> >> --------------------------- >> >> Two executions in this case ("a" is computed twice, once for "b" and once >> for "c") >> >> a = env.createInput() -> map() -> reduce() -> filter() >> >> b = a -> flatmap() -> count() >> c = a -> groupBy() -> reduce().collect() >> >> --------------------------- >> >> Greetings, >> Stephan >> >> >> On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <fhue...@gmail.com> >> wrote: >> >>> Hi Michele, >>> >>> Flink programs can have multiple sinks. >>> In your program, the intermediate result a will be streamed to both >>> filters (b and c) at the same time and both sinks will be written at the >>> same time. >>> So in this case, there is no need to materialize the intermediate result >>> a. >>> >>> If you call execute() after you defined b, the program will compute a >>> and stream the result only to b. >>> If you call execute() again after you defined c, the program will >>> compute a again and stream the result to c. >>> >>> Summary: >>> Flink programs can usually stream intermediate results without >>> materializing them. There are a few cases where it needs to materialize >>> intermediate results in order to avoid deadlocks, but these are fully >>> transparently handled. >>> It is not possible (yet!) to share results across program executions, >>> i.e., whenever you call execute(). >>> >>> I suppose, you call execute() between defining b and c. If you execute >>> that call, a will be computed once and both b and c are computed at the >>> same time. >>> >>> Best, Fabian >>> >>> 2015-09-12 11:02 GMT+02:00 Michele Bertoni < >>> michele1.bert...@mail.polimi.it>: >>> >>>> Hi everybody, I have a question about internal optimization >>>> is flink able to reuse intermediate result that are used twice in the >>>> graph? >>>> >>>> i.e. >>>> a = readsource -> filter -> reduce -> something else even more >>>> complicated >>>> >>>> b = a filter(something) >>>> store b >>>> >>>> c = a filter(something else) >>>> store c >>>> >>>> what happens to a? is it computed twice? >>>> >>>> in my read function I have a some logging commands and I see the >>>> printed twice, but it sounds strange to me >>>> >>>> >>>> >>>> thanks >>>> cheers >>>> michele >>> >>> >>> >> >> > >