Hi everybody, could we use the org.apache.flink.api.common.cache.DistributedCache to work around this Broadcast issue for the moment, until we fixed it? Or do you think it won't scale either?
Best regards, Felix 2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>: > Till is right. Broadcast joins currently materialize once per slot. > Originally, the purely push based runtime was not good enough to handle it > differently. > > By now, we could definitely handle BC Vars differently (only one slot per > TM requests). > For BC Joins, the hash tables do not coordinate spilling currently, which > means that we cannot do multiple joins through the same hash table. > > > On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > > > If I'm not mistaken, then broadcast variables and broadcast inputs of > joins > > follow different code paths. Broadcast variables use additional input > > channels and are read before the actual driver code runs. In contrast to > > that, a join operation is a two input operator where the join driver > > decides how to handle the inputs (which one to read first as build > input). > > > > This also entails that the broadcast variable optimization, where each > task > > manager holds the data only once and copies of the data are discarded > (but > > they are transmitted multiple times to the TM), does not apply to the > > broadcast join inputs. Here you should see an slightly worse performance > > degradation with your initial benchmark if you increase the number of > > slots. > > > > Cheers, > > Till > > > > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov < > > alexander.s.alexand...@gmail.com> wrote: > > > > > > As far as I know, the reason why the broadcast variables are > > implemented > > > that way is that the senders would have to know which sub-tasks are > > > deployed to which TMs. > > > > > > As the broadcast variables are realized as additionally attached > > "broadcast > > > channels", I am assuming that the same behavior will apply for > broadcast > > > joins as well. > > > > > > Is this the case? > > > > > > Regards, > > > Alexander > > > > > > > > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <andreas.ku...@tu-berlin.de > >: > > > > > > > Hi Till, > > > > > > > > thanks for the fast answer. > > > > I'll think about a concrete way of implementing and open an JIRA. > > > > > > > > Best > > > > Andreas > > > > ________________________________________ > > > > Von: Till Rohrmann <trohrm...@apache.org> > > > > Gesendet: Mittwoch, 8. Juni 2016 15:53 > > > > An: dev@flink.apache.org > > > > Betreff: Re: Broadcast data sent increases with # slots per TM > > > > > > > > Hi Andreas, > > > > > > > > your observation is correct. The data is sent to each slot and the > > > > receiving TM only materializes one copy of the data. The rest of the > > data > > > > is discarded. > > > > > > > > As far as I know, the reason why the broadcast variables are > > implemented > > > > that way is that the senders would have to know which sub-tasks are > > > > deployed to which TMs. Only then, you can decide for which sub-tasks > > you > > > > can send the data together. Since the output emitters are agnostic to > > the > > > > actual deployment, the necessary information would have to be > forwarded > > > to > > > > them. > > > > > > > > Another problem is that if you pick one of the sub-tasks to receive > the > > > > broadcast set, then you have to make sure, that this sub-task has > read > > > and > > > > materialized the broadcast set before the other sub-tasks start > > working. > > > > One could maybe send to one sub-task first the broadcast set and then > > to > > > > all other sub-tasks, after one has sent the BC set, a kind of > > acknowledge > > > > record. That way, the other sub-tasks would block until the broadcast > > set > > > > has been completely transmitted. But here one has to make sure that > the > > > > sub-task receiving the BC set has been deployed and is not queued up > > for > > > > scheduling. > > > > > > > > So there are some challenges to solve in order to optimize the BC > sets. > > > > Currently, there is nobody working on it. If you want to start > working > > on > > > > it, then I would recommend to open a JIRA and start writing a design > > > > document for it. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas < > > > andreas.ku...@tu-berlin.de > > > > > > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > > > > > > we experience some unexpected increase of data sent over the > network > > > for > > > > > broadcasts with increasing number of slots per Taskmanager. > > > > > > > > > > > > > > > We provided a benchmark [1]. It not only increases the size of data > > > sent > > > > > over the network but also hurts performance as seen in the > > preliminary > > > > > results below. In this results cloud-11 has 25 nodes and ibm-power > > has > > > 8 > > > > > nodes with scaling the number of slots per node from 1 - 16. > > > > > > > > > > > > > > > +-----------------------+--------------+-------------+ > > > > > | suite | name | median_time | > > > > > +=======================+==============+=============+ > > > > > | broadcast.cloud-11 | broadcast.01 | 8796 | > > > > > | broadcast.cloud-11 | broadcast.02 | 14802 | > > > > > | broadcast.cloud-11 | broadcast.04 | 30173 | > > > > > | broadcast.cloud-11 | broadcast.08 | 56936 | > > > > > | broadcast.cloud-11 | broadcast.16 | 117507 | > > > > > | broadcast.ibm-power-1 | broadcast.01 | 6807 | > > > > > | broadcast.ibm-power-1 | broadcast.02 | 8443 | > > > > > | broadcast.ibm-power-1 | broadcast.04 | 11823 | > > > > > | broadcast.ibm-power-1 | broadcast.08 | 21655 | > > > > > | broadcast.ibm-power-1 | broadcast.16 | 37426 | > > > > > +-----------------------+--------------+-------------+ > > > > > > > > > > > > > > > > > > > > After looking into the code base it, it seems that the data is > > > > > de-serialized only once per TM, but the actual data is sent for all > > > slots > > > > > running the operator with broadcast vars and just gets discarded in > > > case > > > > > its already de-serialized. > > > > > > > > > > > > > > > I do not see a reason the data can't be shared among the slots of a > > TM > > > > and > > > > > therefore just sent once, but I guess it would require quite some > > > changes > > > > > bc sets are handled currently. > > > > > > > > > > > > > > > Are there any future plans regarding this and/or is there interest > in > > > > this > > > > > "feature"? > > > > > > > > > > > > > > > Best > > > > > > > > > > Andreas? > > > > > > > > > > > > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast? > > > > > > > > > > > > > > > > > > > > > > > > >