Hi Felix!

Hope this helps_

Concerning (1.1) - The producer does not think in term of number of target
TaskManagers. That number can, after all, change in the presence of a
failure and recovery. The producer should, for its own result, not care how
many consumers it will have (Tasks), but produce it only once.

Concerning (1.2)  - Only "blocking" intermediate results can be consumed
multiple times. Data sent to broadcast variables must thus be always a
blocking intermediate result.

Greetings,
Stephan


On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <neut...@googlemail.com>
wrote:

> Hi Stephan,
>
> thanks for the great ideas. First I have some questions:
>
> 1.1) Does every task generate an intermediate result partition for every
> target task or is that already implemented in a way so that there are only
> as many intermediate result partitions per task manager as target tasks?
> (Example: There are 2 task managers with 2 tasks each. Do we get 4
> intermediate result partitions per task manager or do we get 8?)
> 1.2) How can I consume an intermediate result partition multiple times?
> When I tried that I got the following exception:
> Caused by: java.lang.IllegalStateException: Subpartition 0 of
> dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is being
> or already has been consumed, but pipelined subpartitions can only be
> consumed once.
> at
>
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:179)
> at
>
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:36)
> at
>
> org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:348)
> at
>
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:81)
> at
>
> org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:98)
> at
>
> org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:41)
> at
>
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>
> My status update: Since Friday I am implementing your idea described in
> (2). Locally this approach already works (for less than 170 iterations). I
> will investigate further to solve that issue.
>
> But I am still not sure how to implement (1). Maybe we introduce a similar
> construct like the BroadcastVariableManager to share the RecordWriter among
> all tasks of a taskmanager. I am interested in your thoughts :)
>
> Best regards,
> Felix
>
> 2016-07-22 17:25 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
> > Hi Felix!
> >
> > Interesting suggestion. Here are some thoughts on the design.
> >
> > The two core changes needed to send data once to the TaskManagers are:
> >
> >   (1) Every sender needs to produce its stuff once (rather than for every
> > target task), there should not be redundancy there.
> >   (2) Every TaskManager should request the data once, other tasks in the
> > same TaskManager pick it up from there.
> >
> >
> > The current receiver-initialted pull model is actually a good abstraction
> > for that, I think.
> >
> > Lets look at (1):
> >
> >   - Currently, the TaskManagers have a separate intermediate result
> > partition for each target slot. They should rather have one intermediate
> > result partition (saves also repeated serialization) that is consumed
> > multiple times.
> >
> >   - Since the results that are to be broadcasted are always "blocking",
> > they can be consumed (pulled)  multiples times.
> >
> > Lets look at (2):
> >
> >   - The current BroadcastVariableManager has the functionality to let the
> > first accessor of the BC-variable materialize the result.
> >
> >   - It could be changed such that only the first accessor creates a
> > RecordReader, so the others do not even request the stream. That way, the
> > TaskManager should pull only one stream from each producing task, which
> > means the data is transferred once.
> >
> >
> > That would also work perfectly with the current failure / recovery model.
> >
> > What do you think?
> >
> > Stephan
> >
> >
> > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <neut...@googlemail.com>
> > wrote:
> >
> > > Hi everybody,
> > >
> > > I want to improve the performance of broadcasts in Flink. Therefore
> Till
> > > told me to start a FLIP on this topic to discuss how to go forward to
> > solve
> > > the current issues for broadcasts.
> > >
> > > The problem in a nutshell: Instead of sending data to each taskmanager
> > only
> > > once, at the moment the data is sent to each task. This means if there
> > are
> > > 3 slots on each taskmanager we will send the data 3 times instead of
> > once.
> > >
> > > There are multiple ways to tackle this problem and I started to do some
> > > research and investigate. You can follow my thought process here:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
> > >
> > > This is my first FLIP. So please correct me, if I did something wrong.
> > >
> > > I am interested in your thoughts about how to solve this issue. Do you
> > > think my approach is heading into the right direction or should we
> > follow a
> > > totally different one.
> > >
> > > I am happy about any comment :)
> > >
> > > Best regards,
> > > Felix
> > >
> >
>

Reply via email to