Hi everybody, the previous approach turned out to have an issue. Since we only write to one subpartition, we have N-1 empty subpartitions per Task (where N = degree of parallelism). In the current approach I didn't consume these empty subpartitions. When you don't consume a subpartition it won't be released. So we have a memory leak.
One workaround would be to read the empty subpartitions. But this is a really ugly work-around. So I had a chat with Till and we decided to create only one subpartition instead of N subpartitions per task. I have already implemented this approach. Now the problem is that we need to know, when to release this subpartition. We will create M subpartition-views per subpartition (where M is the number of task managers & M <= N). There are many ways to solve this problem: 1. Tell the subpartition how many taskmanagers will consume it. (=> propagate M) 2. All tasks which don't need to read the subpartition, send a message to the subpartition. So the subpartition will receive M release requests and N-M "I am done" requests. So when the subpartition knows the number of parallelism N, we are fine. (=> propagate N) Any thoughts how to tackle this problem? Best regards, Felix 2016-08-10 19:14 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > Cool first version Felix :-) > > On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen <se...@apache.org> wrote: > > > Cool, nice results! > > > > For the iteration unspecialization - we probably should design this hand > in > > hand with the streaming fault tolerance, as they share the notion of > > "intermediate result versions". > > > > > > On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz <neut...@googlemail.com> > > wrote: > > > > > Hi everybody, > > > > > > I found a quick and dirty way to make the blocking subpartition > readable > > by > > > multiple readers. In the JobGraph generation I make all broadcast > > > partitions blocking (see more details here: > > > https://github.com/FelixNeutatz/incubator-flink/ > > > commits/blockingMultipleReads). > > > I want to point out that this branch is only experimental! > > > > > > This works for the simple Map().withBroadcastSet() use case. > > > > > > To test this approach, I run our peel bundle flink-broadcast ( > > > https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power > > > cluster. Ibm-power has 8 nodes and we scale the number of slots per > node > > > from 1 - 16: > > > > > > broadcast.ibm-power-1 broadcast.01 6597.3333333333 > > > broadcast.ibm-power-1 broadcast.02 5997 > > > broadcast.ibm-power-1 broadcast.04 6576.6666666667 > > > broadcast.ibm-power-1 broadcast.08 7024.3333333333 > > > broadcast.ibm-power-1 broadcast.16 6933.3333333333 > > > > > > The last row is the averaged run time in milliseconds over 3 runs. You > > can > > > clearly see, that the run time stays constant :) > > > > > > As discussed, this approach doesn't work yet for native iterations (see > > > FLINK-1713). > > > > > > So in the next weeks I will work on the native iterations as Stephan > > > proposed. > > > > > > Best regards, > > > Felix > > > > > > > > > > > > 2016-08-09 21:29 GMT+07:00 Stephan Ewen <se...@apache.org>: > > > > > > > I agree with Till. Changing the basic data exchange mechanism would > > screw > > > > up many other ongoing efforts, like more incremental recovery. > > > > > > > > It seems to make this properly applicable, we need to first > > un-specialize > > > > the iterations. > > > > > > > > (1) Allow for "versioned" intermediate results, i.e., > > > result-x-superstep1, > > > > result-x-superstep2, result-x-superstep3, result-x-superstep4, ... > > > > We need something similar for fined grained recovery in streaming > > > > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3, > > > > result-x-checkpoint4, ...) so it may be worth addressing that soon > > > anyways. > > > > > > > > (2) Make iterations not dependent on the special local back channel. > > > > Then we can simply schedule iterations like all other things. > > > > > > > > (3) Do the actual FLIP-5 proposal > > > > > > > > > > > > That's quite an effort, but I fear all else will break the engine and > > > other > > > > efforts. > > > > > > > > Best, > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann <trohrm...@apache.org> > > > > wrote: > > > > > > > > > Hi Felix, > > > > > > > > > > if we cannot work around the problem with blocking intermediate > > results > > > > in > > > > > iterations, then we have to make FLINK-1713 a blocker for this new > > > issue. > > > > > But maybe you can also keep the current broadcasting mechanism to > be > > > used > > > > > within iterations only. Then we can address the iteration problem > > > later. > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz < > > neut...@googlemail.com> > > > > > wrote: > > > > > > > > > > > Hi Till, > > > > > > > > > > > > thanks for the fast answer. I also think this should be the way > to > > > go. > > > > So > > > > > > should I open a new jira "Make blocking SpillableSubpartition > able > > to > > > > be > > > > > > read multiple times". Moreover should I mark this jira and > > FLINK-1713 > > > > > > <https://issues.apache.org/jira/browse/FLINK-1713> as blocking > for > > > the > > > > > > broadcast jira? What do you think? > > > > > > > > > > > > Best regards, > > > > > > Felix > > > > > > > > > > > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann <trohrm...@apache.org>: > > > > > > > > > > > > > Hi Felix, > > > > > > > > > > > > > > I'm not sure whether PipelinedSubpartition should be readable > > more > > > > than > > > > > > > once because then it would effectively mean that we materialize > > the > > > > > > > elements of the pipelined subpartition for stragglers. > > Therefore, I > > > > > think > > > > > > > that we should make blocking intermediate results readable more > > > than > > > > > > once. > > > > > > > This will also be beneficial for interactive programs where we > > > > continue > > > > > > > from the results of previous Flink jobs. > > > > > > > > > > > > > > It might also be interesting to have a blocking mode which > > > schedules > > > > > its > > > > > > > consumers once the first result is there. Thus, having a > mixture > > of > > > > > > > pipelined and blocking mode. > > > > > > > > > > > > > > Cheers, > > > > > > > Till > > > > > > > > > > > > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz < > > > > neut...@googlemail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Stephan, > > > > > > > > > > > > > > > > I did some research about blocking intermediate results. It > > turns > > > > out > > > > > > > that > > > > > > > > neither PipelinedSubpartition (see line 178) nor blocking > > > > > intermediate > > > > > > > > results (see SpillableSubpartition line: 189) can be read > > > multiple > > > > > > times. > > > > > > > > Moreover blocking intermediate results are currently not > > > supported > > > > in > > > > > > > > native iterations (see https://issues.apache.org/ > > > > > > jira/browse/FLINK-1713 > > > > > > > ). > > > > > > > > So there are three ways to solve this: > > > > > > > > 1) We extend Pipelined subpartitions to make it possible to > > read > > > > them > > > > > > > > multiple times > > > > > > > > 2) We extend Blocking subpartitions to make it possible to > read > > > > them > > > > > > > > multiple times, but then we also have to fix FLINK-1713. So > we > > > can > > > > > use > > > > > > > > broadcasts in native iterations > > > > > > > > 3) We create one pipelined subpartition for every > taskmanager. > > > > > Problem: > > > > > > > The > > > > > > > > more taskmanager there are, the more redundant data we store, > > but > > > > the > > > > > > > > network traffic stays optimal. > > > > > > > > > > > > > > > > Thank you for your help, > > > > > > > > Felix > > > > > > > > > > > > > > > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen <se...@apache.org>: > > > > > > > > > > > > > > > > > Hi Felix! > > > > > > > > > > > > > > > > > > Hope this helps_ > > > > > > > > > > > > > > > > > > Concerning (1.1) - The producer does not think in term of > > > number > > > > of > > > > > > > > target > > > > > > > > > TaskManagers. That number can, after all, change in the > > > presence > > > > > of a > > > > > > > > > failure and recovery. The producer should, for its own > > result, > > > > not > > > > > > care > > > > > > > > how > > > > > > > > > many consumers it will have (Tasks), but produce it only > > once. > > > > > > > > > > > > > > > > > > Concerning (1.2) - Only "blocking" intermediate results > can > > be > > > > > > > consumed > > > > > > > > > multiple times. Data sent to broadcast variables must thus > be > > > > > always > > > > > > a > > > > > > > > > blocking intermediate result. > > > > > > > > > > > > > > > > > > Greetings, > > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz < > > > > > > > neut...@googlemail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Stephan, > > > > > > > > > > > > > > > > > > > > thanks for the great ideas. First I have some questions: > > > > > > > > > > > > > > > > > > > > 1.1) Does every task generate an intermediate result > > > partition > > > > > for > > > > > > > > every > > > > > > > > > > target task or is that already implemented in a way so > that > > > > there > > > > > > are > > > > > > > > > only > > > > > > > > > > as many intermediate result partitions per task manager > as > > > > target > > > > > > > > tasks? > > > > > > > > > > (Example: There are 2 task managers with 2 tasks each. Do > > we > > > > get > > > > > 4 > > > > > > > > > > intermediate result partitions per task manager or do we > > get > > > > 8?) > > > > > > > > > > 1.2) How can I consume an intermediate result partition > > > > multiple > > > > > > > times? > > > > > > > > > > When I tried that I got the following exception: > > > > > > > > > > Caused by: java.lang.IllegalStateException: > Subpartition 0 > > > of > > > > > > > > > > dbe284e3b37c1df1b993a3f0a6020ea6@ > > > > ce9fc38f08a5cc9e93431a9cbf740d > > > > > cf > > > > > > is > > > > > > > > > being > > > > > > > > > > or already has been consumed, but pipelined subpartitions > > can > > > > > only > > > > > > be > > > > > > > > > > consumed once. > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.io.network.partition. > > > > > > PipelinedSubpartition. > > > > > > > > > createReadView(PipelinedSubpartition.java:179) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.io.network.partition. > > > > > > PipelinedSubpartition. > > > > > > > > > createReadView(PipelinedSubpartition.java:36) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.io.network.partition. > > > ResultPartition. > > > > > > > > > createSubpartitionView(ResultPartition.java:348) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.io.network.partition. > > > > > > > ResultPartitionManager. > > > > > > > > > createSubpartitionView(ResultPartitionManager.java:81) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.io.network.netty. > > > > > > > > PartitionRequestServerHandler. > > > > > > > > > channelRead0(PartitionRequestServerHandler.java:98) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.io.network.netty. > > > > > > > > PartitionRequestServerHandler. > > > > > > > > > channelRead0(PartitionRequestServerHandler.java:41) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > io.netty.channel.SimpleChannelInboundHandler. > channelRead( > > > > > > > > > SimpleChannelInboundHandler.java:105) > > > > > > > > > > > > > > > > > > > > My status update: Since Friday I am implementing your > idea > > > > > > described > > > > > > > in > > > > > > > > > > (2). Locally this approach already works (for less than > 170 > > > > > > > > iterations). > > > > > > > > > I > > > > > > > > > > will investigate further to solve that issue. > > > > > > > > > > > > > > > > > > > > But I am still not sure how to implement (1). Maybe we > > > > introduce > > > > > a > > > > > > > > > similar > > > > > > > > > > construct like the BroadcastVariableManager to share the > > > > > > RecordWriter > > > > > > > > > among > > > > > > > > > > all tasks of a taskmanager. I am interested in your > > thoughts > > > :) > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > Felix > > > > > > > > > > > > > > > > > > > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen < > se...@apache.org > > >: > > > > > > > > > > > > > > > > > > > > > Hi Felix! > > > > > > > > > > > > > > > > > > > > > > Interesting suggestion. Here are some thoughts on the > > > design. > > > > > > > > > > > > > > > > > > > > > > The two core changes needed to send data once to the > > > > > TaskManagers > > > > > > > > are: > > > > > > > > > > > > > > > > > > > > > > (1) Every sender needs to produce its stuff once > > (rather > > > > than > > > > > > for > > > > > > > > > every > > > > > > > > > > > target task), there should not be redundancy there. > > > > > > > > > > > (2) Every TaskManager should request the data once, > > other > > > > > tasks > > > > > > > in > > > > > > > > > the > > > > > > > > > > > same TaskManager pick it up from there. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The current receiver-initialted pull model is actually > a > > > good > > > > > > > > > abstraction > > > > > > > > > > > for that, I think. > > > > > > > > > > > > > > > > > > > > > > Lets look at (1): > > > > > > > > > > > > > > > > > > > > > > - Currently, the TaskManagers have a separate > > > intermediate > > > > > > result > > > > > > > > > > > partition for each target slot. They should rather have > > one > > > > > > > > > intermediate > > > > > > > > > > > result partition (saves also repeated serialization) > that > > > is > > > > > > > consumed > > > > > > > > > > > multiple times. > > > > > > > > > > > > > > > > > > > > > > - Since the results that are to be broadcasted are > > always > > > > > > > > "blocking", > > > > > > > > > > > they can be consumed (pulled) multiples times. > > > > > > > > > > > > > > > > > > > > > > Lets look at (2): > > > > > > > > > > > > > > > > > > > > > > - The current BroadcastVariableManager has the > > > > functionality > > > > > to > > > > > > > let > > > > > > > > > the > > > > > > > > > > > first accessor of the BC-variable materialize the > result. > > > > > > > > > > > > > > > > > > > > > > - It could be changed such that only the first > accessor > > > > > > creates a > > > > > > > > > > > RecordReader, so the others do not even request the > > stream. > > > > > That > > > > > > > way, > > > > > > > > > the > > > > > > > > > > > TaskManager should pull only one stream from each > > producing > > > > > task, > > > > > > > > which > > > > > > > > > > > means the data is transferred once. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > That would also work perfectly with the current > failure / > > > > > > recovery > > > > > > > > > model. > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz < > > > > > > > > neut...@googlemail.com > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi everybody, > > > > > > > > > > > > > > > > > > > > > > > > I want to improve the performance of broadcasts in > > Flink. > > > > > > > Therefore > > > > > > > > > > Till > > > > > > > > > > > > told me to start a FLIP on this topic to discuss how > to > > > go > > > > > > > forward > > > > > > > > to > > > > > > > > > > > solve > > > > > > > > > > > > the current issues for broadcasts. > > > > > > > > > > > > > > > > > > > > > > > > The problem in a nutshell: Instead of sending data to > > > each > > > > > > > > > taskmanager > > > > > > > > > > > only > > > > > > > > > > > > once, at the moment the data is sent to each task. > This > > > > means > > > > > > if > > > > > > > > > there > > > > > > > > > > > are > > > > > > > > > > > > 3 slots on each taskmanager we will send the data 3 > > times > > > > > > instead > > > > > > > > of > > > > > > > > > > > once. > > > > > > > > > > > > > > > > > > > > > > > > There are multiple ways to tackle this problem and I > > > > started > > > > > to > > > > > > > do > > > > > > > > > some > > > > > > > > > > > > research and investigate. You can follow my thought > > > process > > > > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > > > > > > > 5%3A+Only+send+data+to+each+taskmanager+once+for+ > broadcasts > > > > > > > > > > > > > > > > > > > > > > > > This is my first FLIP. So please correct me, if I did > > > > > something > > > > > > > > > wrong. > > > > > > > > > > > > > > > > > > > > > > > > I am interested in your thoughts about how to solve > > this > > > > > issue. > > > > > > > Do > > > > > > > > > you > > > > > > > > > > > > think my approach is heading into the right direction > > or > > > > > should > > > > > > > we > > > > > > > > > > > follow a > > > > > > > > > > > > totally different one. > > > > > > > > > > > > > > > > > > > > > > > > I am happy about any comment :) > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > Felix > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >