Ah, sorry :-)
toSet, toList, etc. are regular methods of Scala's Iterator API [1] and not
part of Flink's API although the concrete iterator is provided by Flink. I
am not a Scala expert, but I think it will eagerly fetch the contents of
the function's iterator into a set (or list). This call is part of the user
function and executed just like any other call.

[1]
http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

2015-09-14 22:26 GMT+02:00 Michele Bertoni <michele1.bert...@mail.polimi.it>
:

> sorry i was not talking about that collect, I know what a collector is
> I was talking about the outer join case where inside a cogroup you should
> do a ToSet on left or right side and collect it to be traversable multiple
> times
>
> with a toSet it is transforming (something like) a lazy iterator to a list
> in memory: is it actually collecting something thus stopping execution or
> is it something different?
>
>
>
>
>
> Il giorno 14/set/2015, alle ore 22:18, Fabian Hueske <fhue...@gmail.com>
> ha scritto:
>
> Hi Michele,
>
> collect on DataSet and collect on a Collector within a Function are two
> different things and have the same name by coincidence (actually, this is
> the first time I noticed that).
>
> DataSet.collect() fetches a DataSet which can be distributed across
> several TaskManagers (in a cluster) to you local client program.
> Collector.collect() adds a value to the result of a function call. The
> collector is used in function that can return an arbitrary number of
> results (0 to n).
>
> Best,
> Fabian
>
> 2015-09-14 20:58 GMT+02:00 Michele Bertoni <
> michele1.bert...@mail.polimi.it>:
>
>> Hi Stephan,
>> I have one more question: what happens when I do collect inside a cogroup
>> (i.e. doing an outer join) or in a groupreduce?
>>
>>
>>
>> Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <se...@apache.org>
>> ha scritto:
>>
>> Hi!
>>
>> In most places where you use collect(), you should be able to use a
>> broadcast variable to the same extend. This keeps the plan as one DAG,
>> executed in one unit, so no re-computation will happen.
>>
>> Intermediate result caching is actually a work that has been in progress
>> for a while, but has stalled for a bit due to prioritization of some other
>> issues. It will be resumed in the near future, definitely. Too many parts
>> are already in place to not complete this feature...
>>
>> Greetings,
>> Stephan
>>
>>
>> On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni <
>> michele1.bert...@mail.polimi.it> wrote:
>>
>>> ok, I think I got the point: I don’t have two execute but a collect in
>>> some branch
>>> I will look for a way to remove it
>>>
>>>
>>> What I am doing is to keep all the elements of A that as value equal to
>>> something in B, where B (at this point) is very small
>>> Is it better to collect or a cogroup?
>>>
>>>
>>> btw is something you expect to solve i further versions?
>>>
>>>
>>> thanks
>>> michele
>>>
>>>
>>>
>>>
>>> Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <se...@apache.org>
>>> ha scritto:
>>>
>>> Fabian has explained it well. All functions are executed lazily as one
>>> DAG, when "env.execute()" is called.
>>>
>>> Beware that there are three exceptions:
>>>   - count()
>>>   - collect()
>>>   - print()
>>>
>>> These functions trigger an immediate program execution (they are "eager"
>>> functions). They will execute all that is needed for produce their result.
>>> Summing up:
>>>
>>> ---------------------------
>>>
>>> One execution in this case (result "a" is reused by "b" and "c")
>>>
>>> a = env.createInput() -> map() -> reduce() -> filter()
>>>
>>> b = a.flatmap()
>>> c = a.groupBy() -> reduce()
>>>
>>> b.writeAsText()
>>> c.writeAsCsv()
>>>
>>> env.execute();
>>>
>>> ---------------------------
>>>
>>> Two executions in this case ("a" is computed twice, once for "b" and
>>> once for "c")
>>>
>>> a = env.createInput() -> map() -> reduce() -> filter()
>>>
>>> b = a -> flatmap() -> count()
>>> c = a -> groupBy() -> reduce().collect()
>>>
>>> ---------------------------
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Michele,
>>>>
>>>> Flink programs can have multiple sinks.
>>>> In your program, the intermediate result a will be streamed to both
>>>> filters (b and c) at the same time and both sinks will be written at the
>>>> same time.
>>>> So in this case, there is no need to materialize the intermediate
>>>> result a.
>>>>
>>>> If you call execute() after you defined b, the program will compute a
>>>> and stream the result only to b.
>>>> If you call execute() again after you defined c, the program will
>>>> compute a again and stream the result to c.
>>>>
>>>> Summary:
>>>> Flink programs can usually stream intermediate results without
>>>> materializing them. There are a few cases where it needs to materialize
>>>> intermediate results in order to avoid deadlocks, but these are fully
>>>> transparently handled.
>>>> It is not possible (yet!) to share results across program executions,
>>>> i.e., whenever you call execute().
>>>>
>>>> I suppose, you call execute() between defining b and c. If you execute
>>>> that call, a will be computed once and both b and c are computed at the
>>>> same time.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2015-09-12 11:02 GMT+02:00 Michele Bertoni <
>>>> michele1.bert...@mail.polimi.it>:
>>>>
>>>>> Hi everybody, I have a question about internal optimization
>>>>> is flink able to reuse intermediate result that are used twice in the
>>>>> graph?
>>>>>
>>>>> i.e.
>>>>> a = readsource -> filter -> reduce -> something else even more
>>>>> complicated
>>>>>
>>>>> b = a filter(something)
>>>>> store b
>>>>>
>>>>> c = a filter(something else)
>>>>> store c
>>>>>
>>>>> what happens to a? is it computed twice?
>>>>>
>>>>> in my read function I have a some logging commands and I see the
>>>>> printed twice, but it sounds strange to me
>>>>>
>>>>>
>>>>>
>>>>> thanks
>>>>> cheers
>>>>> michele
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>

Reply via email to