Hi Billy, a program that is defined as
Dataset -> Map > Filter -> Map -> Output should not spill at all. There is an unnecessary serialization/deserialization step between the last map and the sink, but there shouldn't be any spilling to disk. As I said in my response to Urs, spilling should only happen in a few cases: - full sort with not sufficient memory - hash-tables that need to spill (only in join operators) - range partitioning to compute a histogram of the partitioning keys. - temp nodes to avoid deadlocks. These can occur in plans that branch and join later like the following: /--- Map ---\ Input --< JOIN --- Output \--- Map ---/ The first two should not be surprising, but the last one is usually unexpected. Can you share a bit more information about your optimization of rewriting Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output to Dataset -> Map -> FilterT -> CoGroup > Map -> Output Dataset -> Map -> FilterF -> Map -> Output I did not completely understand the structure of the first job. Is it branching and merging again? Maybe you can share the JSON plan (ExecutionEnvironment.getExecutionPlan())? Thanks, Fabian 2017-09-06 14:41 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > btw. not sure if you know that you can visualize the JSON plan returned by > ExecutionEnvironment.getExecutionPlan() on the website [1]. > > Best, Fabian > > [1] http://flink.apache.org/visualizer/ > > > 2017-09-06 14:39 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Urs, >> >> a hash-partition operator should not spill. In general, DataSet plans aim >> to be as much pipelined as possible. >> There are a few cases when spilling happens: >> >> - full sort with not sufficient memory >> - hash-tables that need to spill (only in join operators) >> - range partitioning to compute a histogram of the partitioning keys. >> - temp nodes to avoid deadlocks. These can occur in plans that branch and >> join later like the following: >> >> /--- Map ---\ >> Input --< JOIN --- Output >> \--- Map ---/ >> >> >> A simple plan without branching with as the one you posted >> readCsvFile -> partitionBy(0) -> groupBy(0) -> sortGroup(0) -> >> first(n) >> has no reason to spill except for the full sort that is required for the >> final aggregation. >> >> Can you share the execution plan that you get of the plan >> (ExecutionEnvironment.getExecutionPlan())? >> >> Btw, the sortGroup(0) call is superfluous because it would sort a group >> where all 0-fields are the same on the 0-field. >> I believe Flink's optimizer automatically removes that so it does not >> impact the performance. >> Sorting on another field would indeed make sense, because this would >> determine order within a group and hence the records which are forwarded by >> First(n). >> >> In order to force a combiner on a partitioned data set, you can do the >> following: >> >> -------- >> >> public static void main(String[] args) throws Exception { >> >> ExecutionEnvironment env = ExecutionEnvironment.getExecut >> ionEnvironment(); >> >> DataSet<Tuple2<Long, Long>> data = randData(env); >> >> DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0) >> .groupBy(0).combineGroup(new First3()) >> .withForwardedFields("f0") >> .groupBy(0).reduceGroup(new First3()); >> >> result.print(); >> } >> >> public static class First3 implements >> GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>, >> GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { >> >> @Override >> public void combine(Iterable<Tuple2<Long, Long>> values, >> Collector<Tuple2<Long, Long>> out) throws Exception { >> reduce(values, out); >> } >> >> @Override >> public void reduce(Iterable<Tuple2<Long, Long>> values, >> Collector<Tuple2<Long, Long>> out) throws Exception { >> int i = 0; >> for (Tuple2<Long, Long> v : values) { >> out.collect(v); >> i++; >> if (i == 3) { >> break; >> } >> } >> } >> } >> >> -------- >> >> The generated plan will >> - hash partition the input data >> - partially sort the data in memory on the first field (not going to disk) >> - invoke the combiner for each in-memory sorted group >> - locally forward the data (because of the forwarded field information >> [1]) >> - fully sort the data >> - invoke group reducer for each group >> >> In this plan, the only spilling should happen in the sort for the final >> aggregation. >> >> Best, Fabian >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >> dev/batch/index.html#semantic-annotations >> >> >> >> >> 2017-09-05 22:21 GMT+02:00 Newport, Billy <billy.newp...@gs.com>: >> >>> We have the same issue. We are finding that we cannot express the data >>> flow in a natural way because of unnecessary spilling. Instead, we're >>> making our own operators which combine multiple steps together and >>> essentially hide it from flink OR sometimes we even have to read an input >>> dataset once per flow to avoid spilling. The performance improvements are >>> dramatic but it's kind of reducing flink to a thread scheduler rather than >>> a data flow engine because we basically cannot express the flow to flink. >>> This worries us because if we let others write flink code using our infra, >>> we'll be spending all our time collapsing their flows into much simpler but >>> less intuititve flows to prevent flink from spilling. >>> >>> This also means higher level APIs such as the table API or Beam are off >>> the table because they prevent us optimizing in this manner. >>> >>> We already have prior implementations of the logic we are implementing >>> in flink and as a result, we know it's much less efficient than the prior >>> implementations which is giving us pause for rolling it out more broadly, >>> we're afraid of the flink tax in effect from a performance point of view as >>> well as from a usability point of view given naïve flows are not performant >>> without significant collapsing. >>> >>> For example, we see spilling here: >>> >>> Dataset -> Map > Filter -> Map -> Output >>> >>> We're trying to combine the Map ->Output into the filter operation now >>> to write the records which are not passed through to an output file during >>> the Filter. >>> >>> >>> Or in this case >>> >>> Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output >>> >>> Rewriting as >>> >>> Dataset -> Map -> FilterT -> CoGroup > Map -> Output >>> Dataset -> Map -> FilterF -> Map -> Output >>> >>> That is two separate flows is multiples faster. That is, reading the >>> file twice rather than once. >>> >>> This is all pretty unintuitive and makes using Flink pretty difficult >>> for us never mind our users. Writing the flink dataflows in a naïve way is >>> fast but getting it to run with acceptable efficiency results in obscure >>> workarounds and collapsing and takes the bulk of the time for us which is a >>> shame and the main reason, we don't want to push it out for general use yet. >>> >>> It seems like it badly needs a flow rewriter which is capable of >>> rewriting a naïve flow to use operators or restructured flows >>> automatically. We're doing it by hand right now but there has to be a >>> better way. >>> >>> It's a shame really, it's so close. >>> >>> Billy >>> >>> >>> -----Original Message----- >>> From: Urs Schoenenberger [mailto:urs.schoenenber...@tngtech.com] >>> Sent: Tuesday, September 05, 2017 6:30 AM >>> To: user >>> Subject: DataSet: partitionByHash without materializing/spilling the >>> entire partition? >>> >>> Hi all, >>> >>> we have a DataSet pipeline which reads CSV input data and then >>> essentially does a combinable GroupReduce via first(n). >>> >>> In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) -> >>> first(n)), we got a jobgraph like this: >>> >>> source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce >>> >>> This works, but we found the combine phase to be inefficient because not >>> enough combinable elements fit into a sorter. My idea was to >>> pre-partition the DataSet to increase the chance of combinable elements >>> (readCsvFile -> partitionBy(0) -> groupBy(0) -> sortGroup(0) -> >>> first(n)). >>> >>> To my surprise, I found that this changed the job graph to >>> >>> source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine >>> --[Hash Partition on 0, Sort]--> reduce >>> >>> while materializing and spilling the entire partitions at the >>> partition(noop)-Operator! >>> >>> Is there any way I can partition the data on the way from source to >>> combine without spilling? That is, can I get a job graph that looks like >>> >>> >>> source --[Hash Partition on 0]--> combine --[Hash Partition on 0, >>> Sort]--> reduce >>> >>> instead? >>> >>> Thanks, >>> Urs >>> >>> -- >>> Urs Schönenberger - urs.schoenenber...@tngtech.com >>> >>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller >>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>> >> >> >