[ https://issues.apache.org/jira/browse/FLINK-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931214#comment-15931214 ]
Luke Hutchison commented on FLINK-6110: --------------------------------------- Chesnay: Sorry, I couldn't find that dup, and yes, thanks, having a single mapPartition is more or less what I have ended up doing in these two cases. A more complex situation arises where a join is a lot more complex to write than something like Map#get(), but a dataset won't fit in RAM as a Map. I would like to be able to look up values in one DataSet from within another DataSet's mapper function. Once FLINK-2250 is fixed/implemented, it should be possible to do something like {code} dataSet1.flatMap((xIter, out) -> { for (T x : xIter) { for (T xy : dataSet2.filter(y -> y.id == x.id).iterator()) { out.collect(new Tuple2<>(x.val, y.val)); } } {code} except that performing a Map#get() using a filter is horribly inefficient. Flink's where() syntax exists only on joins. I assume that groupBy() builds an index, so maybe groupBy() could get a query-like where() syntax too, for doing fast indexed lookups within group keys? > Flink unnecessarily repeats shared work triggered by different blocking > sinks, leading to massive inefficiency > -------------------------------------------------------------------------------------------------------------- > > Key: FLINK-6110 > URL: https://issues.apache.org/jira/browse/FLINK-6110 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.2.0 > Reporter: Luke Hutchison > > After a blocking sink (collect() or count()) is called, all already-computed > intermediate DataSets are thrown away, and any subsequent code that tries to > make use of an already-computed DataSet will require the DataSet to be > computed from scratch. For example, the following code prints the elements a, > b and c twice in succession, even though the DataSet ds should only have to > be computed once: > {code} > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<String> ds = env.fromElements("a", "b", "c").map(s -> { > System.out.println(s); return s + s;}); > List<String> c1 = ds.map(s -> s).collect(); > List<String> c2 = ds.map(s -> s).collect(); > env.execute(); > {code} > This is problematic because not every operation is easy to express in Flink > using joins and filters -- sometimes for smaller datasets (such as marginal > counts) it's easier to collect the values into a HashMap, and then pass that > HashMap into subsequent operations so they can look up the values they need > directly. A more complex example is the need to sort a set of values, then > use the sorted array for subsequent binary search operations to identify rank > -- this is only really possible using an array of sorted values, as long as > that array fits easily in RAM (there's no way to do binary search as a join > type) -- so you have to drop out of the Flink pipeline using collect() to > produce the sorted binary search lookup array. > However, any collect() or count() operation causes immediate execution of the > Flink pipeline, which throws away *all* intermediate values that could be > reused for future executions. As a result, code can be extremely inefficient, > recomputing the same values over and over again unnecessarily. > I believe that intermediate values should not be released or garbage > collected until after env.execute() is called. -- This message was sent by Atlassian JIRA (v6.3.15#6346)