Hi Felix! Hope this helps_
Concerning (1.1) - The producer does not think in term of number of target TaskManagers. That number can, after all, change in the presence of a failure and recovery. The producer should, for its own result, not care how many consumers it will have (Tasks), but produce it only once. Concerning (1.2) - Only "blocking" intermediate results can be consumed multiple times. Data sent to broadcast variables must thus be always a blocking intermediate result. Greetings, Stephan On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <neut...@googlemail.com> wrote: > Hi Stephan, > > thanks for the great ideas. First I have some questions: > > 1.1) Does every task generate an intermediate result partition for every > target task or is that already implemented in a way so that there are only > as many intermediate result partitions per task manager as target tasks? > (Example: There are 2 task managers with 2 tasks each. Do we get 4 > intermediate result partitions per task manager or do we get 8?) > 1.2) How can I consume an intermediate result partition multiple times? > When I tried that I got the following exception: > Caused by: java.lang.IllegalStateException: Subpartition 0 of > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is being > or already has been consumed, but pipelined subpartitions can only be > consumed once. > at > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:179) > at > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:36) > at > > org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:348) > at > > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:81) > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:98) > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:41) > at > > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > > My status update: Since Friday I am implementing your idea described in > (2). Locally this approach already works (for less than 170 iterations). I > will investigate further to solve that issue. > > But I am still not sure how to implement (1). Maybe we introduce a similar > construct like the BroadcastVariableManager to share the RecordWriter among > all tasks of a taskmanager. I am interested in your thoughts :) > > Best regards, > Felix > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <se...@apache.org>: > > > Hi Felix! > > > > Interesting suggestion. Here are some thoughts on the design. > > > > The two core changes needed to send data once to the TaskManagers are: > > > > (1) Every sender needs to produce its stuff once (rather than for every > > target task), there should not be redundancy there. > > (2) Every TaskManager should request the data once, other tasks in the > > same TaskManager pick it up from there. > > > > > > The current receiver-initialted pull model is actually a good abstraction > > for that, I think. > > > > Lets look at (1): > > > > - Currently, the TaskManagers have a separate intermediate result > > partition for each target slot. They should rather have one intermediate > > result partition (saves also repeated serialization) that is consumed > > multiple times. > > > > - Since the results that are to be broadcasted are always "blocking", > > they can be consumed (pulled) multiples times. > > > > Lets look at (2): > > > > - The current BroadcastVariableManager has the functionality to let the > > first accessor of the BC-variable materialize the result. > > > > - It could be changed such that only the first accessor creates a > > RecordReader, so the others do not even request the stream. That way, the > > TaskManager should pull only one stream from each producing task, which > > means the data is transferred once. > > > > > > That would also work perfectly with the current failure / recovery model. > > > > What do you think? > > > > Stephan > > > > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <neut...@googlemail.com> > > wrote: > > > > > Hi everybody, > > > > > > I want to improve the performance of broadcasts in Flink. Therefore > Till > > > told me to start a FLIP on this topic to discuss how to go forward to > > solve > > > the current issues for broadcasts. > > > > > > The problem in a nutshell: Instead of sending data to each taskmanager > > only > > > once, at the moment the data is sent to each task. This means if there > > are > > > 3 slots on each taskmanager we will send the data 3 times instead of > > once. > > > > > > There are multiple ways to tackle this problem and I started to do some > > > research and investigate. You can follow my thought process here: > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts > > > > > > This is my first FLIP. So please correct me, if I did something wrong. > > > > > > I am interested in your thoughts about how to solve this issue. Do you > > > think my approach is heading into the right direction or should we > > follow a > > > totally different one. > > > > > > I am happy about any comment :) > > > > > > Best regards, > > > Felix > > > > > >