[ 
https://issues.apache.org/jira/browse/FLINK-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931202#comment-15931202
 ] 

Chesnay Schepler commented on FLINK-6110:
-----------------------------------------

This is certainly useful, nevertheless let's talk about your examples.

Could you not implement both your examples with a mapPartition with 
parallelism=1? The UDF it will collect all values, insert them into the 
map/array that you can freely pass on to other operations.

> Flink unnecessarily repeats shared work triggered by different blocking 
> sinks, leading to massive inefficiency
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6110
>                 URL: https://issues.apache.org/jira/browse/FLINK-6110
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>
> After a blocking sink (collect() or count()) is called, all already-computed 
> intermediate DataSets are thrown away, and any subsequent code that tries to 
> make use of an already-computed DataSet will require the DataSet to be 
> computed from scratch. For example, the following code prints the elements a, 
> b and c twice in succession, even though the DataSet ds should only have to 
> be computed once:
> {code}
>         ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<String> ds = env.fromElements("a", "b", "c").map(s -> { 
> System.out.println(s); return s + s;});
>         List<String> c1 = ds.map(s -> s).collect();
>         List<String> c2 = ds.map(s -> s).collect();
>         env.execute();
> {code}
> This is problematic because not every operation is easy to express in Flink 
> using joins and filters -- sometimes for smaller datasets (such as marginal 
> counts) it's easier to collect the values into a HashMap, and then pass that 
> HashMap into subsequent operations so they can look up the values they need 
> directly. A more complex example is the need to sort a set of values, then 
> use the sorted array for subsequent binary search operations to identify rank 
> -- this is only really possible using an array of sorted values, as long as 
> that array fits easily in RAM (there's no way to do binary search as a join 
> type) -- so you have to drop out of the Flink pipeline using collect() to 
> produce the sorted binary search lookup array.
> However, any collect() or count() operation causes immediate execution of the 
> Flink pipeline, which throws away *all* intermediate values that could be 
> reused for future executions. As a result, code can be extremely inefficient, 
> recomputing the same values over and over again unnecessarily.
> I believe that intermediate values should not be released or garbage 
> collected until after env.execute() is called.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to