We have the same issue. We are finding that we cannot express the data flow in 
a natural way because of unnecessary spilling. Instead, we're making our own 
operators which combine multiple steps together and essentially hide it from 
flink OR sometimes we even have to read an input dataset once per flow to avoid 
spilling. The performance improvements are dramatic but it's kind of reducing  
flink to a thread scheduler rather than a data flow engine because we basically 
cannot express the flow to flink. This worries us because if we let others 
write flink code using our infra, we'll be spending all our time collapsing 
their flows into much simpler but less intuititve flows to prevent flink from 
spilling. 

This also means higher level APIs such as the table API or Beam are off the 
table because they prevent us optimizing in this manner.

We already have prior implementations of the logic we are implementing in flink 
and as a result, we know it's much less efficient than the prior 
implementations which is giving us pause for rolling it out more broadly, we're 
afraid of the flink tax in effect from a performance point of view as well as 
from a usability point of view given naïve flows are not performant without 
significant collapsing.

For example, we see spilling here:

        Dataset -> Map > Filter -> Map -> Output

We're trying to combine the Map ->Output into the filter operation now to write 
the records which are not passed through to an output file during the Filter.


Or in this case

        Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output

Rewriting as

        Dataset -> Map -> FilterT -> CoGroup > Map -> Output
        Dataset -> Map -> FilterF -> Map -> Output

That is two separate flows is multiples faster. That is, reading the file twice 
rather than once.

This is all pretty unintuitive and makes using Flink pretty difficult for us 
never mind our users. Writing the flink dataflows in a naïve way is fast but 
getting it to run with acceptable efficiency results in obscure workarounds and 
collapsing and takes the bulk of the time for us which is a shame and the main 
reason, we don't want to push it out for general use yet.

It seems like it badly needs a flow rewriter which is capable of rewriting a 
naïve flow to use operators or restructured flows automatically. We're doing it 
by hand right now but there has to be a better way.

It's a shame really, it's so close.

Billy


-----Original Message-----
From: Urs Schoenenberger [mailto:urs.schoenenber...@tngtech.com] 
Sent: Tuesday, September 05, 2017 6:30 AM
To: user
Subject: DataSet: partitionByHash without materializing/spilling the entire 
partition?

Hi all,

we have a DataSet pipeline which reads CSV input data and then
essentially does a combinable GroupReduce via first(n).

In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
first(n)), we got a jobgraph like this:

source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce

This works, but we found the combine phase to be inefficient because not
enough combinable elements fit into a sorter. My idea was to
pre-partition the DataSet to increase the chance of combinable elements
(readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)).

To my surprise, I found that this changed the job graph to

source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
--[Hash Partition on 0, Sort]--> reduce

while materializing and spilling the entire partitions at the
partition(noop)-Operator!

Is there any way I can partition the data on the way from source to
combine without spilling? That is, can I get a job graph that looks like


source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
Sort]--> reduce

instead?

Thanks,
Urs

-- 
Urs Schönenberger - urs.schoenenber...@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to