[ https://issues.apache.org/jira/browse/FLINK-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931202#comment-15931202 ]
Chesnay Schepler commented on FLINK-6110: ----------------------------------------- This is certainly useful, nevertheless let's talk about your examples. Could you not implement both your examples with a mapPartition with parallelism=1? The UDF it will collect all values, insert them into the map/array that you can freely pass on to other operations. > Flink unnecessarily repeats shared work triggered by different blocking > sinks, leading to massive inefficiency > -------------------------------------------------------------------------------------------------------------- > > Key: FLINK-6110 > URL: https://issues.apache.org/jira/browse/FLINK-6110 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.2.0 > Reporter: Luke Hutchison > > After a blocking sink (collect() or count()) is called, all already-computed > intermediate DataSets are thrown away, and any subsequent code that tries to > make use of an already-computed DataSet will require the DataSet to be > computed from scratch. For example, the following code prints the elements a, > b and c twice in succession, even though the DataSet ds should only have to > be computed once: > {code} > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<String> ds = env.fromElements("a", "b", "c").map(s -> { > System.out.println(s); return s + s;}); > List<String> c1 = ds.map(s -> s).collect(); > List<String> c2 = ds.map(s -> s).collect(); > env.execute(); > {code} > This is problematic because not every operation is easy to express in Flink > using joins and filters -- sometimes for smaller datasets (such as marginal > counts) it's easier to collect the values into a HashMap, and then pass that > HashMap into subsequent operations so they can look up the values they need > directly. A more complex example is the need to sort a set of values, then > use the sorted array for subsequent binary search operations to identify rank > -- this is only really possible using an array of sorted values, as long as > that array fits easily in RAM (there's no way to do binary search as a join > type) -- so you have to drop out of the Flink pipeline using collect() to > produce the sorted binary search lookup array. > However, any collect() or count() operation causes immediate execution of the > Flink pipeline, which throws away *all* intermediate values that could be > reused for future executions. As a result, code can be extremely inefficient, > recomputing the same values over and over again unnecessarily. > I believe that intermediate values should not be released or garbage > collected until after env.execute() is called. -- This message was sent by Atlassian JIRA (v6.3.15#6346)