Hi everybody,

the previous approach turned out to have an issue. Since we only write to
one subpartition, we have N-1 empty subpartitions per Task (where N =
degree of parallelism). In the current approach I didn't consume these
empty subpartitions. When you don't consume a subpartition it won't be
released. So we have a memory leak.

One workaround would be to read the empty subpartitions. But this is a
really ugly work-around.

So I had a chat with Till and we decided to create only one subpartition
instead of N subpartitions per task. I have already implemented this
approach.

Now the problem is that we need to know, when to release this subpartition.
We will create M subpartition-views per subpartition (where M is the number
of task managers & M <= N).

There are many ways to solve this problem:
1. Tell the subpartition how many taskmanagers will consume it.
(=> propagate M)
2. All tasks which don't need to read the subpartition, send a message to
the subpartition. So the subpartition will receive M release requests and
N-M "I am done" requests. So when the subpartition knows the number of
parallelism N, we are fine. (=> propagate N)

Any thoughts how to tackle this problem?

Best regards,
Felix

2016-08-10 19:14 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:

> Cool first version Felix :-)
>
> On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Cool, nice results!
> >
> > For the iteration unspecialization - we probably should design this hand
> in
> > hand with the streaming fault tolerance, as they share the notion of
> > "intermediate result versions".
> >
> >
> > On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz <neut...@googlemail.com>
> > wrote:
> >
> > > Hi everybody,
> > >
> > > I found a quick and dirty way to make the blocking subpartition
> readable
> > by
> > > multiple readers. In the JobGraph generation I make all broadcast
> > > partitions blocking (see more details here:
> > > https://github.com/FelixNeutatz/incubator-flink/
> > > commits/blockingMultipleReads).
> > > I want to point out that this branch is only experimental!
> > >
> > > This works for the simple Map().withBroadcastSet() use case.
> > >
> > > To test this approach, I run our peel bundle flink-broadcast (
> > > https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
> > > cluster. Ibm-power has 8 nodes and we scale the number of slots per
> node
> > > from 1 - 16:
> > >
> > > broadcast.ibm-power-1 broadcast.01 6597.3333333333
> > > broadcast.ibm-power-1 broadcast.02 5997
> > > broadcast.ibm-power-1 broadcast.04 6576.6666666667
> > > broadcast.ibm-power-1 broadcast.08 7024.3333333333
> > > broadcast.ibm-power-1 broadcast.16 6933.3333333333
> > >
> > > The last row is the averaged run time in milliseconds over 3 runs. You
> > can
> > > clearly see, that the run time stays constant :)
> > >
> > > As discussed, this approach doesn't work yet for native iterations (see
> > > FLINK-1713).
> > >
> > > So in the next weeks I will work on the native iterations as Stephan
> > > proposed.
> > >
> > > Best regards,
> > > Felix
> > >
> > >
> > >
> > > 2016-08-09 21:29 GMT+07:00 Stephan Ewen <se...@apache.org>:
> > >
> > > > I agree with Till. Changing the basic data exchange mechanism would
> > screw
> > > > up many other ongoing efforts, like more incremental recovery.
> > > >
> > > > It seems to make this properly applicable, we need to first
> > un-specialize
> > > > the iterations.
> > > >
> > > > (1) Allow for "versioned" intermediate results, i.e.,
> > > result-x-superstep1,
> > > > result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
> > > > We need something similar for fined grained recovery in streaming
> > > > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
> > > > result-x-checkpoint4, ...) so it may be worth addressing that soon
> > > anyways.
> > > >
> > > > (2) Make iterations not dependent on the special local back channel.
> > > > Then we can simply schedule iterations like all other things.
> > > >
> > > > (3) Do the actual FLIP-5 proposal
> > > >
> > > >
> > > > That's quite an effort, but I fear all else will break the engine and
> > > other
> > > > efforts.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann <trohrm...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Felix,
> > > > >
> > > > > if we cannot work around the problem with blocking intermediate
> > results
> > > > in
> > > > > iterations, then we have to make FLINK-1713 a blocker for this new
> > > issue.
> > > > > But maybe you can also keep the current broadcasting mechanism to
> be
> > > used
> > > > > within iterations only. Then we can address the iteration problem
> > > later.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz <
> > neut...@googlemail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Till,
> > > > > >
> > > > > > thanks for the fast answer. I also think this should be the way
> to
> > > go.
> > > > So
> > > > > > should I open a new jira "Make blocking SpillableSubpartition
> able
> > to
> > > > be
> > > > > > read multiple times". Moreover should I mark this jira and
> > FLINK-1713
> > > > > > <https://issues.apache.org/jira/browse/FLINK-1713> as blocking
> for
> > > the
> > > > > > broadcast jira? What do you think?
> > > > > >
> > > > > > Best regards,
> > > > > > Felix
> > > > > >
> > > > > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann <trohrm...@apache.org>:
> > > > > >
> > > > > > > Hi Felix,
> > > > > > >
> > > > > > > I'm not sure whether PipelinedSubpartition should be readable
> > more
> > > > than
> > > > > > > once because then it would effectively mean that we materialize
> > the
> > > > > > > elements of the pipelined subpartition for stragglers.
> > Therefore, I
> > > > > think
> > > > > > > that we should make blocking intermediate results readable more
> > > than
> > > > > > once.
> > > > > > > This will also be beneficial for interactive programs where we
> > > > continue
> > > > > > > from the results of previous Flink jobs.
> > > > > > >
> > > > > > > It might also be interesting to have a blocking mode which
> > > schedules
> > > > > its
> > > > > > > consumers once the first result is there. Thus, having a
> mixture
> > of
> > > > > > > pipelined and blocking mode.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > >
> > > > > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <
> > > > neut...@googlemail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Stephan,
> > > > > > > >
> > > > > > > > I did some research about blocking intermediate results. It
> > turns
> > > > out
> > > > > > > that
> > > > > > > > neither PipelinedSubpartition (see line 178) nor blocking
> > > > > intermediate
> > > > > > > > results (see SpillableSubpartition line: 189) can be read
> > > multiple
> > > > > > times.
> > > > > > > > Moreover blocking intermediate results are currently not
> > > supported
> > > > in
> > > > > > > > native iterations (see https://issues.apache.org/
> > > > > > jira/browse/FLINK-1713
> > > > > > > ).
> > > > > > > > So there are three ways to solve this:
> > > > > > > > 1) We extend Pipelined subpartitions to make it possible to
> > read
> > > > them
> > > > > > > > multiple times
> > > > > > > > 2) We extend Blocking subpartitions to make it possible to
> read
> > > > them
> > > > > > > > multiple times, but then we also have to fix FLINK-1713. So
> we
> > > can
> > > > > use
> > > > > > > > broadcasts in native iterations
> > > > > > > > 3) We create one pipelined subpartition for every
> taskmanager.
> > > > > Problem:
> > > > > > > The
> > > > > > > > more taskmanager there are, the more redundant data we store,
> > but
> > > > the
> > > > > > > > network traffic stays optimal.
> > > > > > > >
> > > > > > > > Thank you for your help,
> > > > > > > > Felix
> > > > > > > >
> > > > > > > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen <se...@apache.org>:
> > > > > > > >
> > > > > > > > > Hi Felix!
> > > > > > > > >
> > > > > > > > > Hope this helps_
> > > > > > > > >
> > > > > > > > > Concerning (1.1) - The producer does not think in term of
> > > number
> > > > of
> > > > > > > > target
> > > > > > > > > TaskManagers. That number can, after all, change in the
> > > presence
> > > > > of a
> > > > > > > > > failure and recovery. The producer should, for its own
> > result,
> > > > not
> > > > > > care
> > > > > > > > how
> > > > > > > > > many consumers it will have (Tasks), but produce it only
> > once.
> > > > > > > > >
> > > > > > > > > Concerning (1.2)  - Only "blocking" intermediate results
> can
> > be
> > > > > > > consumed
> > > > > > > > > multiple times. Data sent to broadcast variables must thus
> be
> > > > > always
> > > > > > a
> > > > > > > > > blocking intermediate result.
> > > > > > > > >
> > > > > > > > > Greetings,
> > > > > > > > > Stephan
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
> > > > > > > neut...@googlemail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Stephan,
> > > > > > > > > >
> > > > > > > > > > thanks for the great ideas. First I have some questions:
> > > > > > > > > >
> > > > > > > > > > 1.1) Does every task generate an intermediate result
> > > partition
> > > > > for
> > > > > > > > every
> > > > > > > > > > target task or is that already implemented in a way so
> that
> > > > there
> > > > > > are
> > > > > > > > > only
> > > > > > > > > > as many intermediate result partitions per task manager
> as
> > > > target
> > > > > > > > tasks?
> > > > > > > > > > (Example: There are 2 task managers with 2 tasks each. Do
> > we
> > > > get
> > > > > 4
> > > > > > > > > > intermediate result partitions per task manager or do we
> > get
> > > > 8?)
> > > > > > > > > > 1.2) How can I consume an intermediate result partition
> > > > multiple
> > > > > > > times?
> > > > > > > > > > When I tried that I got the following exception:
> > > > > > > > > > Caused by: java.lang.IllegalStateException:
> Subpartition 0
> > > of
> > > > > > > > > > dbe284e3b37c1df1b993a3f0a6020ea6@
> > > > ce9fc38f08a5cc9e93431a9cbf740d
> > > > > cf
> > > > > > is
> > > > > > > > > being
> > > > > > > > > > or already has been consumed, but pipelined subpartitions
> > can
> > > > > only
> > > > > > be
> > > > > > > > > > consumed once.
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
> > > > > > PipelinedSubpartition.
> > > > > > > > > createReadView(PipelinedSubpartition.java:179)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
> > > > > > PipelinedSubpartition.
> > > > > > > > > createReadView(PipelinedSubpartition.java:36)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
> > > ResultPartition.
> > > > > > > > > createSubpartitionView(ResultPartition.java:348)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
> > > > > > > ResultPartitionManager.
> > > > > > > > > createSubpartitionView(ResultPartitionManager.java:81)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.netty.
> > > > > > > > PartitionRequestServerHandler.
> > > > > > > > > channelRead0(PartitionRequestServerHandler.java:98)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > org.apache.flink.runtime.io.network.netty.
> > > > > > > > PartitionRequestServerHandler.
> > > > > > > > > channelRead0(PartitionRequestServerHandler.java:41)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > > io.netty.channel.SimpleChannelInboundHandler.
> channelRead(
> > > > > > > > > SimpleChannelInboundHandler.java:105)
> > > > > > > > > >
> > > > > > > > > > My status update: Since Friday I am implementing your
> idea
> > > > > > described
> > > > > > > in
> > > > > > > > > > (2). Locally this approach already works (for less than
> 170
> > > > > > > > iterations).
> > > > > > > > > I
> > > > > > > > > > will investigate further to solve that issue.
> > > > > > > > > >
> > > > > > > > > > But I am still not sure how to implement (1). Maybe we
> > > > introduce
> > > > > a
> > > > > > > > > similar
> > > > > > > > > > construct like the BroadcastVariableManager to share the
> > > > > > RecordWriter
> > > > > > > > > among
> > > > > > > > > > all tasks of a taskmanager. I am interested in your
> > thoughts
> > > :)
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Felix
> > > > > > > > > >
> > > > > > > > > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <
> se...@apache.org
> > >:
> > > > > > > > > >
> > > > > > > > > > > Hi Felix!
> > > > > > > > > > >
> > > > > > > > > > > Interesting suggestion. Here are some thoughts on the
> > > design.
> > > > > > > > > > >
> > > > > > > > > > > The two core changes needed to send data once to the
> > > > > TaskManagers
> > > > > > > > are:
> > > > > > > > > > >
> > > > > > > > > > >   (1) Every sender needs to produce its stuff once
> > (rather
> > > > than
> > > > > > for
> > > > > > > > > every
> > > > > > > > > > > target task), there should not be redundancy there.
> > > > > > > > > > >   (2) Every TaskManager should request the data once,
> > other
> > > > > tasks
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > same TaskManager pick it up from there.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > The current receiver-initialted pull model is actually
> a
> > > good
> > > > > > > > > abstraction
> > > > > > > > > > > for that, I think.
> > > > > > > > > > >
> > > > > > > > > > > Lets look at (1):
> > > > > > > > > > >
> > > > > > > > > > >   - Currently, the TaskManagers have a separate
> > > intermediate
> > > > > > result
> > > > > > > > > > > partition for each target slot. They should rather have
> > one
> > > > > > > > > intermediate
> > > > > > > > > > > result partition (saves also repeated serialization)
> that
> > > is
> > > > > > > consumed
> > > > > > > > > > > multiple times.
> > > > > > > > > > >
> > > > > > > > > > >   - Since the results that are to be broadcasted are
> > always
> > > > > > > > "blocking",
> > > > > > > > > > > they can be consumed (pulled)  multiples times.
> > > > > > > > > > >
> > > > > > > > > > > Lets look at (2):
> > > > > > > > > > >
> > > > > > > > > > >   - The current BroadcastVariableManager has the
> > > > functionality
> > > > > to
> > > > > > > let
> > > > > > > > > the
> > > > > > > > > > > first accessor of the BC-variable materialize the
> result.
> > > > > > > > > > >
> > > > > > > > > > >   - It could be changed such that only the first
> accessor
> > > > > > creates a
> > > > > > > > > > > RecordReader, so the others do not even request the
> > stream.
> > > > > That
> > > > > > > way,
> > > > > > > > > the
> > > > > > > > > > > TaskManager should pull only one stream from each
> > producing
> > > > > task,
> > > > > > > > which
> > > > > > > > > > > means the data is transferred once.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > That would also work perfectly with the current
> failure /
> > > > > > recovery
> > > > > > > > > model.
> > > > > > > > > > >
> > > > > > > > > > > What do you think?
> > > > > > > > > > >
> > > > > > > > > > > Stephan
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <
> > > > > > > > neut...@googlemail.com
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi everybody,
> > > > > > > > > > > >
> > > > > > > > > > > > I want to improve the performance of broadcasts in
> > Flink.
> > > > > > > Therefore
> > > > > > > > > > Till
> > > > > > > > > > > > told me to start a FLIP on this topic to discuss how
> to
> > > go
> > > > > > > forward
> > > > > > > > to
> > > > > > > > > > > solve
> > > > > > > > > > > > the current issues for broadcasts.
> > > > > > > > > > > >
> > > > > > > > > > > > The problem in a nutshell: Instead of sending data to
> > > each
> > > > > > > > > taskmanager
> > > > > > > > > > > only
> > > > > > > > > > > > once, at the moment the data is sent to each task.
> This
> > > > means
> > > > > > if
> > > > > > > > > there
> > > > > > > > > > > are
> > > > > > > > > > > > 3 slots on each taskmanager we will send the data 3
> > times
> > > > > > instead
> > > > > > > > of
> > > > > > > > > > > once.
> > > > > > > > > > > >
> > > > > > > > > > > > There are multiple ways to tackle this problem and I
> > > > started
> > > > > to
> > > > > > > do
> > > > > > > > > some
> > > > > > > > > > > > research and investigate. You can follow my thought
> > > process
> > > > > > here:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > > > > > > 5%3A+Only+send+data+to+each+taskmanager+once+for+
> broadcasts
> > > > > > > > > > > >
> > > > > > > > > > > > This is my first FLIP. So please correct me, if I did
> > > > > something
> > > > > > > > > wrong.
> > > > > > > > > > > >
> > > > > > > > > > > > I am interested in your thoughts about how to solve
> > this
> > > > > issue.
> > > > > > > Do
> > > > > > > > > you
> > > > > > > > > > > > think my approach is heading into the right direction
> > or
> > > > > should
> > > > > > > we
> > > > > > > > > > > follow a
> > > > > > > > > > > > totally different one.
> > > > > > > > > > > >
> > > > > > > > > > > > I am happy about any comment :)
> > > > > > > > > > > >
> > > > > > > > > > > > Best regards,
> > > > > > > > > > > > Felix
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to