btw. not sure if you know that you can visualize the JSON plan returned by ExecutionEnvironment.getExecutionPlan() on the website [1].
Best, Fabian [1] http://flink.apache.org/visualizer/ 2017-09-06 14:39 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Urs, > > a hash-partition operator should not spill. In general, DataSet plans aim > to be as much pipelined as possible. > There are a few cases when spilling happens: > > - full sort with not sufficient memory > - hash-tables that need to spill (only in join operators) > - range partitioning to compute a histogram of the partitioning keys. > - temp nodes to avoid deadlocks. These can occur in plans that branch and > join later like the following: > > /--- Map ---\ > Input --< JOIN --- Output > \--- Map ---/ > > > A simple plan without branching with as the one you posted > readCsvFile -> partitionBy(0) -> groupBy(0) -> sortGroup(0) -> > first(n) > has no reason to spill except for the full sort that is required for the > final aggregation. > > Can you share the execution plan that you get of the plan > (ExecutionEnvironment.getExecutionPlan())? > > Btw, the sortGroup(0) call is superfluous because it would sort a group > where all 0-fields are the same on the 0-field. > I believe Flink's optimizer automatically removes that so it does not > impact the performance. > Sorting on another field would indeed make sense, because this would > determine order within a group and hence the records which are forwarded by > First(n). > > In order to force a combiner on a partitioned data set, you can do the > following: > > -------- > > public static void main(String[] args) throws Exception { > > ExecutionEnvironment env = ExecutionEnvironment. > getExecutionEnvironment(); > > DataSet<Tuple2<Long, Long>> data = randData(env); > > DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0) > .groupBy(0).combineGroup(new First3()) > .withForwardedFields("f0") > .groupBy(0).reduceGroup(new First3()); > > result.print(); > } > > public static class First3 implements > GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>, > GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { > > @Override > public void combine(Iterable<Tuple2<Long, Long>> values, > Collector<Tuple2<Long, Long>> out) throws Exception { > reduce(values, out); > } > > @Override > public void reduce(Iterable<Tuple2<Long, Long>> values, > Collector<Tuple2<Long, Long>> out) throws Exception { > int i = 0; > for (Tuple2<Long, Long> v : values) { > out.collect(v); > i++; > if (i == 3) { > break; > } > } > } > } > > -------- > > The generated plan will > - hash partition the input data > - partially sort the data in memory on the first field (not going to disk) > - invoke the combiner for each in-memory sorted group > - locally forward the data (because of the forwarded field information [1]) > - fully sort the data > - invoke group reducer for each group > > In this plan, the only spilling should happen in the sort for the final > aggregation. > > Best, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/batch/index.html#semantic-annotations > > > > > 2017-09-05 22:21 GMT+02:00 Newport, Billy <billy.newp...@gs.com>: > >> We have the same issue. We are finding that we cannot express the data >> flow in a natural way because of unnecessary spilling. Instead, we're >> making our own operators which combine multiple steps together and >> essentially hide it from flink OR sometimes we even have to read an input >> dataset once per flow to avoid spilling. The performance improvements are >> dramatic but it's kind of reducing flink to a thread scheduler rather than >> a data flow engine because we basically cannot express the flow to flink. >> This worries us because if we let others write flink code using our infra, >> we'll be spending all our time collapsing their flows into much simpler but >> less intuititve flows to prevent flink from spilling. >> >> This also means higher level APIs such as the table API or Beam are off >> the table because they prevent us optimizing in this manner. >> >> We already have prior implementations of the logic we are implementing in >> flink and as a result, we know it's much less efficient than the prior >> implementations which is giving us pause for rolling it out more broadly, >> we're afraid of the flink tax in effect from a performance point of view as >> well as from a usability point of view given naïve flows are not performant >> without significant collapsing. >> >> For example, we see spilling here: >> >> Dataset -> Map > Filter -> Map -> Output >> >> We're trying to combine the Map ->Output into the filter operation now to >> write the records which are not passed through to an output file during the >> Filter. >> >> >> Or in this case >> >> Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output >> >> Rewriting as >> >> Dataset -> Map -> FilterT -> CoGroup > Map -> Output >> Dataset -> Map -> FilterF -> Map -> Output >> >> That is two separate flows is multiples faster. That is, reading the file >> twice rather than once. >> >> This is all pretty unintuitive and makes using Flink pretty difficult for >> us never mind our users. Writing the flink dataflows in a naïve way is fast >> but getting it to run with acceptable efficiency results in obscure >> workarounds and collapsing and takes the bulk of the time for us which is a >> shame and the main reason, we don't want to push it out for general use yet. >> >> It seems like it badly needs a flow rewriter which is capable of >> rewriting a naïve flow to use operators or restructured flows >> automatically. We're doing it by hand right now but there has to be a >> better way. >> >> It's a shame really, it's so close. >> >> Billy >> >> >> -----Original Message----- >> From: Urs Schoenenberger [mailto:urs.schoenenber...@tngtech.com] >> Sent: Tuesday, September 05, 2017 6:30 AM >> To: user >> Subject: DataSet: partitionByHash without materializing/spilling the >> entire partition? >> >> Hi all, >> >> we have a DataSet pipeline which reads CSV input data and then >> essentially does a combinable GroupReduce via first(n). >> >> In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) -> >> first(n)), we got a jobgraph like this: >> >> source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce >> >> This works, but we found the combine phase to be inefficient because not >> enough combinable elements fit into a sorter. My idea was to >> pre-partition the DataSet to increase the chance of combinable elements >> (readCsvFile -> partitionBy(0) -> groupBy(0) -> sortGroup(0) -> >> first(n)). >> >> To my surprise, I found that this changed the job graph to >> >> source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine >> --[Hash Partition on 0, Sort]--> reduce >> >> while materializing and spilling the entire partitions at the >> partition(noop)-Operator! >> >> Is there any way I can partition the data on the way from source to >> combine without spilling? That is, can I get a job graph that looks like >> >> >> source --[Hash Partition on 0]--> combine --[Hash Partition on 0, >> Sort]--> reduce >> >> instead? >> >> Thanks, >> Urs >> >> -- >> Urs Schönenberger - urs.schoenenber...@tngtech.com >> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> > >