Hi, i already started to work on this issue. Therefore I created a Jira: https://issues.apache.org/jira/browse/FLINK-4175 I have already implemented a quick version which could solve it. I will run the experiments on the cluster first and will describe my approach on Monday :)
Have a nice weekend, Felix P.S. for super curious people: https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7 2016-06-09 11:50 GMT+02:00 Felix Neutatz <neut...@googlemail.com>: > Hi everybody, > > could we use the org.apache.flink.api.common.cache.DistributedCache to > work around this Broadcast issue for the moment, until we fixed it? > Or do you think it won't scale either? > > Best regards, > Felix > > 2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>: > >> Till is right. Broadcast joins currently materialize once per slot. >> Originally, the purely push based runtime was not good enough to handle it >> differently. >> >> By now, we could definitely handle BC Vars differently (only one slot per >> TM requests). >> For BC Joins, the hash tables do not coordinate spilling currently, which >> means that we cannot do multiple joins through the same hash table. >> >> >> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >> > If I'm not mistaken, then broadcast variables and broadcast inputs of >> joins >> > follow different code paths. Broadcast variables use additional input >> > channels and are read before the actual driver code runs. In contrast to >> > that, a join operation is a two input operator where the join driver >> > decides how to handle the inputs (which one to read first as build >> input). >> > >> > This also entails that the broadcast variable optimization, where each >> task >> > manager holds the data only once and copies of the data are discarded >> (but >> > they are transmitted multiple times to the TM), does not apply to the >> > broadcast join inputs. Here you should see an slightly worse performance >> > degradation with your initial benchmark if you increase the number of >> > slots. >> > >> > Cheers, >> > Till >> > >> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov < >> > alexander.s.alexand...@gmail.com> wrote: >> > >> > > > As far as I know, the reason why the broadcast variables are >> > implemented >> > > that way is that the senders would have to know which sub-tasks are >> > > deployed to which TMs. >> > > >> > > As the broadcast variables are realized as additionally attached >> > "broadcast >> > > channels", I am assuming that the same behavior will apply for >> broadcast >> > > joins as well. >> > > >> > > Is this the case? >> > > >> > > Regards, >> > > Alexander >> > > >> > > >> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <andreas.ku...@tu-berlin.de >> >: >> > > >> > > > Hi Till, >> > > > >> > > > thanks for the fast answer. >> > > > I'll think about a concrete way of implementing and open an JIRA. >> > > > >> > > > Best >> > > > Andreas >> > > > ________________________________________ >> > > > Von: Till Rohrmann <trohrm...@apache.org> >> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53 >> > > > An: dev@flink.apache.org >> > > > Betreff: Re: Broadcast data sent increases with # slots per TM >> > > > >> > > > Hi Andreas, >> > > > >> > > > your observation is correct. The data is sent to each slot and the >> > > > receiving TM only materializes one copy of the data. The rest of the >> > data >> > > > is discarded. >> > > > >> > > > As far as I know, the reason why the broadcast variables are >> > implemented >> > > > that way is that the senders would have to know which sub-tasks are >> > > > deployed to which TMs. Only then, you can decide for which sub-tasks >> > you >> > > > can send the data together. Since the output emitters are agnostic >> to >> > the >> > > > actual deployment, the necessary information would have to be >> forwarded >> > > to >> > > > them. >> > > > >> > > > Another problem is that if you pick one of the sub-tasks to receive >> the >> > > > broadcast set, then you have to make sure, that this sub-task has >> read >> > > and >> > > > materialized the broadcast set before the other sub-tasks start >> > working. >> > > > One could maybe send to one sub-task first the broadcast set and >> then >> > to >> > > > all other sub-tasks, after one has sent the BC set, a kind of >> > acknowledge >> > > > record. That way, the other sub-tasks would block until the >> broadcast >> > set >> > > > has been completely transmitted. But here one has to make sure that >> the >> > > > sub-task receiving the BC set has been deployed and is not queued up >> > for >> > > > scheduling. >> > > > >> > > > So there are some challenges to solve in order to optimize the BC >> sets. >> > > > Currently, there is nobody working on it. If you want to start >> working >> > on >> > > > it, then I would recommend to open a JIRA and start writing a design >> > > > document for it. >> > > > >> > > > Cheers, >> > > > Till >> > > > >> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas < >> > > andreas.ku...@tu-berlin.de >> > > > > >> > > > wrote: >> > > > >> > > > > Hi, >> > > > > >> > > > > >> > > > > we experience some unexpected increase of data sent over the >> network >> > > for >> > > > > broadcasts with increasing number of slots per Taskmanager. >> > > > > >> > > > > >> > > > > We provided a benchmark [1]. It not only increases the size of >> data >> > > sent >> > > > > over the network but also hurts performance as seen in the >> > preliminary >> > > > > results below. In this results cloud-11 has 25 nodes and ibm-power >> > has >> > > 8 >> > > > > nodes with scaling the number of slots per node from 1 - 16. >> > > > > >> > > > > >> > > > > +-----------------------+--------------+-------------+ >> > > > > | suite | name | median_time | >> > > > > +=======================+==============+=============+ >> > > > > | broadcast.cloud-11 | broadcast.01 | 8796 | >> > > > > | broadcast.cloud-11 | broadcast.02 | 14802 | >> > > > > | broadcast.cloud-11 | broadcast.04 | 30173 | >> > > > > | broadcast.cloud-11 | broadcast.08 | 56936 | >> > > > > | broadcast.cloud-11 | broadcast.16 | 117507 | >> > > > > | broadcast.ibm-power-1 | broadcast.01 | 6807 | >> > > > > | broadcast.ibm-power-1 | broadcast.02 | 8443 | >> > > > > | broadcast.ibm-power-1 | broadcast.04 | 11823 | >> > > > > | broadcast.ibm-power-1 | broadcast.08 | 21655 | >> > > > > | broadcast.ibm-power-1 | broadcast.16 | 37426 | >> > > > > +-----------------------+--------------+-------------+ >> > > > > >> > > > > >> > > > > >> > > > > After looking into the code base it, it seems that the data is >> > > > > de-serialized only once per TM, but the actual data is sent for >> all >> > > slots >> > > > > running the operator with broadcast vars and just gets discarded >> in >> > > case >> > > > > its already de-serialized. >> > > > > >> > > > > >> > > > > I do not see a reason the data can't be shared among the slots of >> a >> > TM >> > > > and >> > > > > therefore just sent once, but I guess it would require quite some >> > > changes >> > > > > bc sets are handled currently. >> > > > > >> > > > > >> > > > > Are there any future plans regarding this and/or is there >> interest in >> > > > this >> > > > > "feature"? >> > > > > >> > > > > >> > > > > Best >> > > > > >> > > > > Andreas? >> > > > > >> > > > > >> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast? >> > > > > >> > > > > >> > > > > >> > > > >> > > >> > >> > >