Hi Stephan, thanks for the great ideas. First I have some questions:
1.1) Does every task generate an intermediate result partition for every target task or is that already implemented in a way so that there are only as many intermediate result partitions per task manager as target tasks? (Example: There are 2 task managers with 2 tasks each. Do we get 4 intermediate result partitions per task manager or do we get 8?) 1.2) How can I consume an intermediate result partition multiple times? When I tried that I got the following exception: Caused by: java.lang.IllegalStateException: Subpartition 0 of dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is being or already has been consumed, but pipelined subpartitions can only be consumed once. at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:179) at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:36) at org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:348) at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:81) at org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:98) at org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:41) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) My status update: Since Friday I am implementing your idea described in (2). Locally this approach already works (for less than 170 iterations). I will investigate further to solve that issue. But I am still not sure how to implement (1). Maybe we introduce a similar construct like the BroadcastVariableManager to share the RecordWriter among all tasks of a taskmanager. I am interested in your thoughts :) Best regards, Felix 2016-07-22 17:25 GMT+02:00 Stephan Ewen <se...@apache.org>: > Hi Felix! > > Interesting suggestion. Here are some thoughts on the design. > > The two core changes needed to send data once to the TaskManagers are: > > (1) Every sender needs to produce its stuff once (rather than for every > target task), there should not be redundancy there. > (2) Every TaskManager should request the data once, other tasks in the > same TaskManager pick it up from there. > > > The current receiver-initialted pull model is actually a good abstraction > for that, I think. > > Lets look at (1): > > - Currently, the TaskManagers have a separate intermediate result > partition for each target slot. They should rather have one intermediate > result partition (saves also repeated serialization) that is consumed > multiple times. > > - Since the results that are to be broadcasted are always "blocking", > they can be consumed (pulled) multiples times. > > Lets look at (2): > > - The current BroadcastVariableManager has the functionality to let the > first accessor of the BC-variable materialize the result. > > - It could be changed such that only the first accessor creates a > RecordReader, so the others do not even request the stream. That way, the > TaskManager should pull only one stream from each producing task, which > means the data is transferred once. > > > That would also work perfectly with the current failure / recovery model. > > What do you think? > > Stephan > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <neut...@googlemail.com> > wrote: > > > Hi everybody, > > > > I want to improve the performance of broadcasts in Flink. Therefore Till > > told me to start a FLIP on this topic to discuss how to go forward to > solve > > the current issues for broadcasts. > > > > The problem in a nutshell: Instead of sending data to each taskmanager > only > > once, at the moment the data is sent to each task. This means if there > are > > 3 slots on each taskmanager we will send the data 3 times instead of > once. > > > > There are multiple ways to tackle this problem and I started to do some > > research and investigate. You can follow my thought process here: > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts > > > > This is my first FLIP. So please correct me, if I did something wrong. > > > > I am interested in your thoughts about how to solve this issue. Do you > > think my approach is heading into the right direction or should we > follow a > > totally different one. > > > > I am happy about any comment :) > > > > Best regards, > > Felix > > >