Hi Michele,

collect on DataSet and collect on a Collector within a Function are two
different things and have the same name by coincidence (actually, this is
the first time I noticed that).

DataSet.collect() fetches a DataSet which can be distributed across several
TaskManagers (in a cluster) to you local client program.
Collector.collect() adds a value to the result of a function call. The
collector is used in function that can return an arbitrary number of
results (0 to n).

Best,
Fabian

2015-09-14 20:58 GMT+02:00 Michele Bertoni <michele1.bert...@mail.polimi.it>
:

> Hi Stephan,
> I have one more question: what happens when I do collect inside a cogroup
> (i.e. doing an outer join) or in a groupreduce?
>
>
>
> Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <se...@apache.org> ha
> scritto:
>
> Hi!
>
> In most places where you use collect(), you should be able to use a
> broadcast variable to the same extend. This keeps the plan as one DAG,
> executed in one unit, so no re-computation will happen.
>
> Intermediate result caching is actually a work that has been in progress
> for a while, but has stalled for a bit due to prioritization of some other
> issues. It will be resumed in the near future, definitely. Too many parts
> are already in place to not complete this feature...
>
> Greetings,
> Stephan
>
>
> On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni <
> michele1.bert...@mail.polimi.it> wrote:
>
>> ok, I think I got the point: I don’t have two execute but a collect in
>> some branch
>> I will look for a way to remove it
>>
>>
>> What I am doing is to keep all the elements of A that as value equal to
>> something in B, where B (at this point) is very small
>> Is it better to collect or a cogroup?
>>
>>
>> btw is something you expect to solve i further versions?
>>
>>
>> thanks
>> michele
>>
>>
>>
>>
>> Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <se...@apache.org>
>> ha scritto:
>>
>> Fabian has explained it well. All functions are executed lazily as one
>> DAG, when "env.execute()" is called.
>>
>> Beware that there are three exceptions:
>>   - count()
>>   - collect()
>>   - print()
>>
>> These functions trigger an immediate program execution (they are "eager"
>> functions). They will execute all that is needed for produce their result.
>> Summing up:
>>
>> ---------------------------
>>
>> One execution in this case (result "a" is reused by "b" and "c")
>>
>> a = env.createInput() -> map() -> reduce() -> filter()
>>
>> b = a.flatmap()
>> c = a.groupBy() -> reduce()
>>
>> b.writeAsText()
>> c.writeAsCsv()
>>
>> env.execute();
>>
>> ---------------------------
>>
>> Two executions in this case ("a" is computed twice, once for "b" and once
>> for "c")
>>
>> a = env.createInput() -> map() -> reduce() -> filter()
>>
>> b = a -> flatmap() -> count()
>> c = a -> groupBy() -> reduce().collect()
>>
>> ---------------------------
>>
>> Greetings,
>> Stephan
>>
>>
>> On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Hi Michele,
>>>
>>> Flink programs can have multiple sinks.
>>> In your program, the intermediate result a will be streamed to both
>>> filters (b and c) at the same time and both sinks will be written at the
>>> same time.
>>> So in this case, there is no need to materialize the intermediate result
>>> a.
>>>
>>> If you call execute() after you defined b, the program will compute a
>>> and stream the result only to b.
>>> If you call execute() again after you defined c, the program will
>>> compute a again and stream the result to c.
>>>
>>> Summary:
>>> Flink programs can usually stream intermediate results without
>>> materializing them. There are a few cases where it needs to materialize
>>> intermediate results in order to avoid deadlocks, but these are fully
>>> transparently handled.
>>> It is not possible (yet!) to share results across program executions,
>>> i.e., whenever you call execute().
>>>
>>> I suppose, you call execute() between defining b and c. If you execute
>>> that call, a will be computed once and both b and c are computed at the
>>> same time.
>>>
>>> Best, Fabian
>>>
>>> 2015-09-12 11:02 GMT+02:00 Michele Bertoni <
>>> michele1.bert...@mail.polimi.it>:
>>>
>>>> Hi everybody, I have a question about internal optimization
>>>> is flink able to reuse intermediate result that are used twice in the
>>>> graph?
>>>>
>>>> i.e.
>>>> a = readsource -> filter -> reduce -> something else even more
>>>> complicated
>>>>
>>>> b = a filter(something)
>>>> store b
>>>>
>>>> c = a filter(something else)
>>>> store c
>>>>
>>>> what happens to a? is it computed twice?
>>>>
>>>> in my read function I have a some logging commands and I see the
>>>> printed twice, but it sounds strange to me
>>>>
>>>>
>>>>
>>>> thanks
>>>> cheers
>>>> michele
>>>
>>>
>>>
>>
>>
>
>

Reply via email to