[ 
https://issues.apache.org/jira/browse/FLINK-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Hutchison updated FLINK-6110:
----------------------------------
    Description: 
After a blocking sink (collect() or count()) is called, all already-computed 
intermediate DataSets are thrown away, and any subsequent code that tries to 
make use of an already-computed DataSet will require the DataSet to be computed 
from scratch. For example, the following code prints the elements a, b and c 
twice in succession, even though the DataSet ds should only have to be computed 
once:

{code}
        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> ds = env.fromElements("a", "b", "c").map(s -> { 
System.out.println(s); return s + s;});
        List<String> c1 = ds.map(s -> s).collect();
        List<String> c2 = ds.map(s -> s).collect();
        env.execute();
{code}

This is problematic because not every operation is easy to express in Flink 
using joins and filters -- sometimes for smaller datasets (such as marginal 
counts) it's easier to collect the values into a HashMap, and then pass that 
HashMap into subsequent operations so they can look up the values they need 
directly. A more complex example is the need to sort a set of values, then use 
the sorted array for subsequent binary search operations to identify rank -- 
this is only really possible using an array of sorted values, as long as that 
array fits easily in RAM (there's no way to do binary search as a join type) -- 
so you have to drop out of the Flink pipeline using collect() to produce the 
sorted binary search lookup array.

However, any collect() or count() operation causes immediate execution of the 
Flink pipeline, which throws away *all* intermediate values that could be 
reused for future executions. As a result, code can be extremely inefficient, 
recomputing the same values over and over again unnecessarily.

I believe that intermediate values should not be released or garbage collected 
until after env.execute() is called.

  was:
After a blocking sink (collect() or count()) is called, all already-computed 
intermediate DataSets are thrown away, and any subsequent code that tries to 
make use of an already-computed DataSet will require the DataSet to be computed 
from scratch. For example, the following code prints the elements a, b and c 
twice in succession, even though the DataSet ds should only have to be computed 
once:

{code}
        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> ds = env.fromElements("a", "b", "c").map(s -> { 
System.out.println(s); return s + s;});
        List<String> c1 = ds.map(s -> s).collect();
        List<String> c2 = ds.map(s -> s).collect();
{code}

This is problematic because not every operation is easy to express in Flink 
using joins and filters -- sometimes for smaller datasets (such as marginal 
counts) it's easier to collect the values into a HashMap, and then pass that 
HashMap into subsequent operations so they can look up the values they need 
directly. A more complex example is the need to sort a set of values, then use 
the sorted array for subsequent binary search operations to identify rank -- 
this is only really possible using an array of sorted values, as long as that 
array fits easily in RAM (there's no way to do binary search as a join type) -- 
so you have to drop out of the Flink pipeline using collect() to produce the 
sorted binary search lookup array.

However, any collect() or count() operation causes immediate execution of the 
Flink pipeline, which throws away *all* intermediate values that could be 
reused for future executions. As a result, code can be extremely inefficient, 
recomputing the same values over and over again unnecessarily.

I believe that intermediate values should not be released or garbage collected 
until after env.execute() is called.


> Flink unnecessarily repeats shared work triggered by different blocking 
> sinks, leading to massive inefficiency
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6110
>                 URL: https://issues.apache.org/jira/browse/FLINK-6110
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>
> After a blocking sink (collect() or count()) is called, all already-computed 
> intermediate DataSets are thrown away, and any subsequent code that tries to 
> make use of an already-computed DataSet will require the DataSet to be 
> computed from scratch. For example, the following code prints the elements a, 
> b and c twice in succession, even though the DataSet ds should only have to 
> be computed once:
> {code}
>         ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<String> ds = env.fromElements("a", "b", "c").map(s -> { 
> System.out.println(s); return s + s;});
>         List<String> c1 = ds.map(s -> s).collect();
>         List<String> c2 = ds.map(s -> s).collect();
>         env.execute();
> {code}
> This is problematic because not every operation is easy to express in Flink 
> using joins and filters -- sometimes for smaller datasets (such as marginal 
> counts) it's easier to collect the values into a HashMap, and then pass that 
> HashMap into subsequent operations so they can look up the values they need 
> directly. A more complex example is the need to sort a set of values, then 
> use the sorted array for subsequent binary search operations to identify rank 
> -- this is only really possible using an array of sorted values, as long as 
> that array fits easily in RAM (there's no way to do binary search as a join 
> type) -- so you have to drop out of the Flink pipeline using collect() to 
> produce the sorted binary search lookup array.
> However, any collect() or count() operation causes immediate execution of the 
> Flink pipeline, which throws away *all* intermediate values that could be 
> reused for future executions. As a result, code can be extremely inefficient, 
> recomputing the same values over and over again unnecessarily.
> I believe that intermediate values should not be released or garbage 
> collected until after env.execute() is called.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to