If I'm not mistaken, then broadcast variables and broadcast inputs of joins
follow different code paths. Broadcast variables use additional input
channels and are read before the actual driver code runs. In contrast to
that, a join operation is a two input operator where the join driver
decides how to handle the inputs (which one to read first as build input).

This also entails that the broadcast variable optimization, where each task
manager holds the data only once and copies of the data are discarded (but
they are transmitted multiple times to the TM), does not apply to the
broadcast join inputs. Here you should see an slightly worse performance
degradation with your initial benchmark if you increase the number of slots.

Cheers,
Till

On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
alexander.s.alexand...@gmail.com> wrote:

> > As far as I know, the reason why the broadcast variables are implemented
> that way is that the senders would have to know which sub-tasks are
> deployed to which TMs.
>
> As the broadcast variables are realized as additionally attached "broadcast
> channels", I am assuming that the same behavior will apply for broadcast
> joins as well.
>
> Is this the case?
>
> Regards,
> Alexander
>
>
> 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <andreas.ku...@tu-berlin.de>:
>
> > Hi Till,
> >
> > thanks for the fast answer.
> > I'll think about a concrete way of implementing and open an JIRA.
> >
> > Best
> > Andreas
> > ________________________________________
> > Von: Till Rohrmann <trohrm...@apache.org>
> > Gesendet: Mittwoch, 8. Juni 2016 15:53
> > An: dev@flink.apache.org
> > Betreff: Re: Broadcast data sent increases with # slots per TM
> >
> > Hi Andreas,
> >
> > your observation is correct. The data is sent to each slot and the
> > receiving TM only materializes one copy of the data. The rest of the data
> > is discarded.
> >
> > As far as I know, the reason why the broadcast variables are implemented
> > that way is that the senders would have to know which sub-tasks are
> > deployed to which TMs. Only then, you can decide for which sub-tasks you
> > can send the data together. Since the output emitters are agnostic to the
> > actual deployment, the necessary information would have to be forwarded
> to
> > them.
> >
> > Another problem is that if you pick one of the sub-tasks to receive the
> > broadcast set, then you have to make sure, that this sub-task has read
> and
> > materialized the broadcast set before the other sub-tasks start working.
> > One could maybe send to one sub-task first the broadcast set and then to
> > all other sub-tasks, after one has sent the BC set, a kind of acknowledge
> > record. That way, the other sub-tasks would block until the broadcast set
> > has been completely transmitted. But here one has to make sure that the
> > sub-task receiving the BC set has been deployed and is not queued up for
> > scheduling.
> >
> > So there are some challenges to solve in order to optimize the BC sets.
> > Currently, there is nobody working on it. If you want to start working on
> > it, then I would recommend to open a JIRA and start writing a design
> > document for it.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
> andreas.ku...@tu-berlin.de
> > >
> > wrote:
> >
> > > Hi,
> > >
> > >
> > > we experience some unexpected increase of data sent over the network
> for
> > > broadcasts with increasing number of slots per Taskmanager.
> > >
> > >
> > > We provided a benchmark [1]. It not only increases the size of data
> sent
> > > over the network but also hurts performance as seen in the preliminary
> > > results below. In this results cloud-11 has 25 nodes and ibm-power has
> 8
> > > nodes with scaling the number of slots per node from 1 - 16.
> > >
> > >
> > > +-----------------------+--------------+-------------+
> > > | suite                 | name         | median_time |
> > > +=======================+==============+=============+
> > > | broadcast.cloud-11    | broadcast.01 |        8796 |
> > > | broadcast.cloud-11    | broadcast.02 |       14802 |
> > > | broadcast.cloud-11    | broadcast.04 |       30173 |
> > > | broadcast.cloud-11    | broadcast.08 |       56936 |
> > > | broadcast.cloud-11    | broadcast.16 |      117507 |
> > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> > > +-----------------------+--------------+-------------+
> > >
> > >
> > >
> > > After looking into the code base it, it seems that the data is
> > > de-serialized only once per TM, but the actual data is sent for all
> slots
> > > running the operator with broadcast vars and just gets discarded in
> case
> > > its already de-serialized.
> > >
> > >
> > > I do not see a reason the data can't be shared among the slots of a TM
> > and
> > > therefore just sent once, but I guess it would require quite some
> changes
> > > bc sets are handled currently.
> > >
> > >
> > > Are there any future plans regarding this and/or is there interest in
> > this
> > > "feature"?
> > >
> > >
> > > Best
> > >
> > > Andreas?
> > >
> > >
> > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
> > >
> > >
> > >
> >
>

Reply via email to