btw. not sure if you know that you can visualize the JSON plan returned by
ExecutionEnvironment.getExecutionPlan() on the website [1].

Best, Fabian

[1] http://flink.apache.org/visualizer/


2017-09-06 14:39 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Urs,
>
> a hash-partition operator should not spill. In general, DataSet plans aim
> to be as much pipelined as possible.
> There are a few cases when spilling happens:
>
> - full sort with not sufficient memory
> - hash-tables that need to spill (only in join operators)
> - range partitioning to compute a histogram of the partitioning keys.
> - temp nodes to avoid deadlocks. These can occur in plans that branch and
> join later like the following:
>
>               /--- Map ---\
> Input --<                   JOIN --- Output
>               \--- Map ---/
>
>
> A simple plan without branching with as the one you posted
>    readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
> first(n)
> has no reason to spill except for the full sort that is required for the
> final aggregation.
>
> Can you share the execution plan that you get of the plan
> (ExecutionEnvironment.getExecutionPlan())?
>
> Btw, the sortGroup(0) call is superfluous because it would sort a group
> where all 0-fields are the same on the 0-field.
> I believe Flink's optimizer automatically removes that so it does not
> impact the performance.
> Sorting on another field would indeed make sense, because this would
> determine order within a group and hence the records which are forwarded by
> First(n).
>
> In order to force a combiner on a partitioned data set, you can do the
> following:
>
> --------
>
> public static void main(String[] args) throws Exception {
>
>    ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
>
>    DataSet<Tuple2<Long, Long>> data = randData(env);
>
>    DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
>       .groupBy(0).combineGroup(new First3())
>          .withForwardedFields("f0")
>       .groupBy(0).reduceGroup(new First3());
>
>    result.print();
> }
>
> public static class First3 implements
>    GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
>    GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>
>    @Override
>    public void combine(Iterable<Tuple2<Long, Long>> values,
> Collector<Tuple2<Long, Long>> out) throws Exception {
>       reduce(values, out);
>    }
>
>    @Override
>    public void reduce(Iterable<Tuple2<Long, Long>> values,
> Collector<Tuple2<Long, Long>> out) throws Exception {
>       int i = 0;
>       for (Tuple2<Long, Long> v : values) {
>          out.collect(v);
>          i++;
>          if (i == 3) {
>             break;
>          }
>       }
>    }
> }
>
> --------
>
> The generated plan will
> - hash partition the input data
> - partially sort the data in memory on the first field (not going to disk)
> - invoke the combiner for each in-memory sorted group
> - locally forward the data (because of the forwarded field information [1])
> - fully sort the data
> - invoke group reducer for each group
>
> In this plan, the only spilling should happen in the sort for the final
> aggregation.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/index.html#semantic-annotations
>
>
>
>
> 2017-09-05 22:21 GMT+02:00 Newport, Billy <billy.newp...@gs.com>:
>
>> We have the same issue. We are finding that we cannot express the data
>> flow in a natural way because of unnecessary spilling. Instead, we're
>> making our own operators which combine multiple steps together and
>> essentially hide it from flink OR sometimes we even have to read an input
>> dataset once per flow to avoid spilling. The performance improvements are
>> dramatic but it's kind of reducing  flink to a thread scheduler rather than
>> a data flow engine because we basically cannot express the flow to flink.
>> This worries us because if we let others write flink code using our infra,
>> we'll be spending all our time collapsing their flows into much simpler but
>> less intuititve flows to prevent flink from spilling.
>>
>> This also means higher level APIs such as the table API or Beam are off
>> the table because they prevent us optimizing in this manner.
>>
>> We already have prior implementations of the logic we are implementing in
>> flink and as a result, we know it's much less efficient than the prior
>> implementations which is giving us pause for rolling it out more broadly,
>> we're afraid of the flink tax in effect from a performance point of view as
>> well as from a usability point of view given naïve flows are not performant
>> without significant collapsing.
>>
>> For example, we see spilling here:
>>
>>         Dataset -> Map > Filter -> Map -> Output
>>
>> We're trying to combine the Map ->Output into the filter operation now to
>> write the records which are not passed through to an output file during the
>> Filter.
>>
>>
>> Or in this case
>>
>>         Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
>>
>> Rewriting as
>>
>>         Dataset -> Map -> FilterT -> CoGroup > Map -> Output
>>         Dataset -> Map -> FilterF -> Map -> Output
>>
>> That is two separate flows is multiples faster. That is, reading the file
>> twice rather than once.
>>
>> This is all pretty unintuitive and makes using Flink pretty difficult for
>> us never mind our users. Writing the flink dataflows in a naïve way is fast
>> but getting it to run with acceptable efficiency results in obscure
>> workarounds and collapsing and takes the bulk of the time for us which is a
>> shame and the main reason, we don't want to push it out for general use yet.
>>
>> It seems like it badly needs a flow rewriter which is capable of
>> rewriting a naïve flow to use operators or restructured flows
>> automatically. We're doing it by hand right now but there has to be a
>> better way.
>>
>> It's a shame really, it's so close.
>>
>> Billy
>>
>>
>> -----Original Message-----
>> From: Urs Schoenenberger [mailto:urs.schoenenber...@tngtech.com]
>> Sent: Tuesday, September 05, 2017 6:30 AM
>> To: user
>> Subject: DataSet: partitionByHash without materializing/spilling the
>> entire partition?
>>
>> Hi all,
>>
>> we have a DataSet pipeline which reads CSV input data and then
>> essentially does a combinable GroupReduce via first(n).
>>
>> In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
>> first(n)), we got a jobgraph like this:
>>
>> source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce
>>
>> This works, but we found the combine phase to be inefficient because not
>> enough combinable elements fit into a sorter. My idea was to
>> pre-partition the DataSet to increase the chance of combinable elements
>> (readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
>> first(n)).
>>
>> To my surprise, I found that this changed the job graph to
>>
>> source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
>> --[Hash Partition on 0, Sort]--> reduce
>>
>> while materializing and spilling the entire partitions at the
>> partition(noop)-Operator!
>>
>> Is there any way I can partition the data on the way from source to
>> combine without spilling? That is, can I get a job graph that looks like
>>
>>
>> source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
>> Sort]--> reduce
>>
>> instead?
>>
>> Thanks,
>> Urs
>>
>> --
>> Urs Schönenberger - urs.schoenenber...@tngtech.com
>>
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>
>

Reply via email to