Hi everybody,

could we use the org.apache.flink.api.common.cache.DistributedCache to work
around this Broadcast issue for the moment, until we fixed it?
Or do you think it won't scale either?

Best regards,
Felix

2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Till is right. Broadcast joins currently materialize once per slot.
> Originally, the purely push based runtime was not good enough to handle it
> differently.
>
> By now, we could definitely handle BC Vars differently (only one slot per
> TM requests).
> For BC Joins, the hash tables do not coordinate spilling currently, which
> means that we cannot do multiple joins through the same hash table.
>
>
> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> > If I'm not mistaken, then broadcast variables and broadcast inputs of
> joins
> > follow different code paths. Broadcast variables use additional input
> > channels and are read before the actual driver code runs. In contrast to
> > that, a join operation is a two input operator where the join driver
> > decides how to handle the inputs (which one to read first as build
> input).
> >
> > This also entails that the broadcast variable optimization, where each
> task
> > manager holds the data only once and copies of the data are discarded
> (but
> > they are transmitted multiple times to the TM), does not apply to the
> > broadcast join inputs. Here you should see an slightly worse performance
> > degradation with your initial benchmark if you increase the number of
> > slots.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
> > alexander.s.alexand...@gmail.com> wrote:
> >
> > > > As far as I know, the reason why the broadcast variables are
> > implemented
> > > that way is that the senders would have to know which sub-tasks are
> > > deployed to which TMs.
> > >
> > > As the broadcast variables are realized as additionally attached
> > "broadcast
> > > channels", I am assuming that the same behavior will apply for
> broadcast
> > > joins as well.
> > >
> > > Is this the case?
> > >
> > > Regards,
> > > Alexander
> > >
> > >
> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <andreas.ku...@tu-berlin.de
> >:
> > >
> > > > Hi Till,
> > > >
> > > > thanks for the fast answer.
> > > > I'll think about a concrete way of implementing and open an JIRA.
> > > >
> > > > Best
> > > > Andreas
> > > > ________________________________________
> > > > Von: Till Rohrmann <trohrm...@apache.org>
> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
> > > > An: dev@flink.apache.org
> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
> > > >
> > > > Hi Andreas,
> > > >
> > > > your observation is correct. The data is sent to each slot and the
> > > > receiving TM only materializes one copy of the data. The rest of the
> > data
> > > > is discarded.
> > > >
> > > > As far as I know, the reason why the broadcast variables are
> > implemented
> > > > that way is that the senders would have to know which sub-tasks are
> > > > deployed to which TMs. Only then, you can decide for which sub-tasks
> > you
> > > > can send the data together. Since the output emitters are agnostic to
> > the
> > > > actual deployment, the necessary information would have to be
> forwarded
> > > to
> > > > them.
> > > >
> > > > Another problem is that if you pick one of the sub-tasks to receive
> the
> > > > broadcast set, then you have to make sure, that this sub-task has
> read
> > > and
> > > > materialized the broadcast set before the other sub-tasks start
> > working.
> > > > One could maybe send to one sub-task first the broadcast set and then
> > to
> > > > all other sub-tasks, after one has sent the BC set, a kind of
> > acknowledge
> > > > record. That way, the other sub-tasks would block until the broadcast
> > set
> > > > has been completely transmitted. But here one has to make sure that
> the
> > > > sub-task receiving the BC set has been deployed and is not queued up
> > for
> > > > scheduling.
> > > >
> > > > So there are some challenges to solve in order to optimize the BC
> sets.
> > > > Currently, there is nobody working on it. If you want to start
> working
> > on
> > > > it, then I would recommend to open a JIRA and start writing a design
> > > > document for it.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
> > > andreas.ku...@tu-berlin.de
> > > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > > we experience some unexpected increase of data sent over the
> network
> > > for
> > > > > broadcasts with increasing number of slots per Taskmanager.
> > > > >
> > > > >
> > > > > We provided a benchmark [1]. It not only increases the size of data
> > > sent
> > > > > over the network but also hurts performance as seen in the
> > preliminary
> > > > > results below. In this results cloud-11 has 25 nodes and ibm-power
> > has
> > > 8
> > > > > nodes with scaling the number of slots per node from 1 - 16.
> > > > >
> > > > >
> > > > > +-----------------------+--------------+-------------+
> > > > > | suite                 | name         | median_time |
> > > > > +=======================+==============+=============+
> > > > > | broadcast.cloud-11    | broadcast.01 |        8796 |
> > > > > | broadcast.cloud-11    | broadcast.02 |       14802 |
> > > > > | broadcast.cloud-11    | broadcast.04 |       30173 |
> > > > > | broadcast.cloud-11    | broadcast.08 |       56936 |
> > > > > | broadcast.cloud-11    | broadcast.16 |      117507 |
> > > > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> > > > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> > > > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> > > > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> > > > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> > > > > +-----------------------+--------------+-------------+
> > > > >
> > > > >
> > > > >
> > > > > After looking into the code base it, it seems that the data is
> > > > > de-serialized only once per TM, but the actual data is sent for all
> > > slots
> > > > > running the operator with broadcast vars and just gets discarded in
> > > case
> > > > > its already de-serialized.
> > > > >
> > > > >
> > > > > I do not see a reason the data can't be shared among the slots of a
> > TM
> > > > and
> > > > > therefore just sent once, but I guess it would require quite some
> > > changes
> > > > > bc sets are handled currently.
> > > > >
> > > > >
> > > > > Are there any future plans regarding this and/or is there interest
> in
> > > > this
> > > > > "feature"?
> > > > >
> > > > >
> > > > > Best
> > > > >
> > > > > Andreas?
> > > > >
> > > > >
> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to