Hi Stephan,

thanks for the great ideas. First I have some questions:

1.1) Does every task generate an intermediate result partition for every
target task or is that already implemented in a way so that there are only
as many intermediate result partitions per task manager as target tasks?
(Example: There are 2 task managers with 2 tasks each. Do we get 4
intermediate result partitions per task manager or do we get 8?)
1.2) How can I consume an intermediate result partition multiple times?
When I tried that I got the following exception:
Caused by: java.lang.IllegalStateException: Subpartition 0 of
dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is being
or already has been consumed, but pipelined subpartitions can only be
consumed once.
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:179)
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:36)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:348)
at
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:81)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:98)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:41)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)

My status update: Since Friday I am implementing your idea described in
(2). Locally this approach already works (for less than 170 iterations). I
will investigate further to solve that issue.

But I am still not sure how to implement (1). Maybe we introduce a similar
construct like the BroadcastVariableManager to share the RecordWriter among
all tasks of a taskmanager. I am interested in your thoughts :)

Best regards,
Felix

2016-07-22 17:25 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Hi Felix!
>
> Interesting suggestion. Here are some thoughts on the design.
>
> The two core changes needed to send data once to the TaskManagers are:
>
>   (1) Every sender needs to produce its stuff once (rather than for every
> target task), there should not be redundancy there.
>   (2) Every TaskManager should request the data once, other tasks in the
> same TaskManager pick it up from there.
>
>
> The current receiver-initialted pull model is actually a good abstraction
> for that, I think.
>
> Lets look at (1):
>
>   - Currently, the TaskManagers have a separate intermediate result
> partition for each target slot. They should rather have one intermediate
> result partition (saves also repeated serialization) that is consumed
> multiple times.
>
>   - Since the results that are to be broadcasted are always "blocking",
> they can be consumed (pulled)  multiples times.
>
> Lets look at (2):
>
>   - The current BroadcastVariableManager has the functionality to let the
> first accessor of the BC-variable materialize the result.
>
>   - It could be changed such that only the first accessor creates a
> RecordReader, so the others do not even request the stream. That way, the
> TaskManager should pull only one stream from each producing task, which
> means the data is transferred once.
>
>
> That would also work perfectly with the current failure / recovery model.
>
> What do you think?
>
> Stephan
>
>
> On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <neut...@googlemail.com>
> wrote:
>
> > Hi everybody,
> >
> > I want to improve the performance of broadcasts in Flink. Therefore Till
> > told me to start a FLIP on this topic to discuss how to go forward to
> solve
> > the current issues for broadcasts.
> >
> > The problem in a nutshell: Instead of sending data to each taskmanager
> only
> > once, at the moment the data is sent to each task. This means if there
> are
> > 3 slots on each taskmanager we will send the data 3 times instead of
> once.
> >
> > There are multiple ways to tackle this problem and I started to do some
> > research and investigate. You can follow my thought process here:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
> >
> > This is my first FLIP. So please correct me, if I did something wrong.
> >
> > I am interested in your thoughts about how to solve this issue. Do you
> > think my approach is heading into the right direction or should we
> follow a
> > totally different one.
> >
> > I am happy about any comment :)
> >
> > Best regards,
> > Felix
> >
>

Reply via email to