Hi,

i already started to work on this issue. Therefore I created a Jira:
https://issues.apache.org/jira/browse/FLINK-4175
I have already implemented a quick version which could solve it. I will run
the experiments on the cluster first and will describe my approach on
Monday :)

Have a nice weekend,
Felix

P.S. for super curious people:
https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7

2016-06-09 11:50 GMT+02:00 Felix Neutatz <neut...@googlemail.com>:

> Hi everybody,
>
> could we use the org.apache.flink.api.common.cache.DistributedCache to
> work around this Broadcast issue for the moment, until we fixed it?
> Or do you think it won't scale either?
>
> Best regards,
> Felix
>
> 2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
>> Till is right. Broadcast joins currently materialize once per slot.
>> Originally, the purely push based runtime was not good enough to handle it
>> differently.
>>
>> By now, we could definitely handle BC Vars differently (only one slot per
>> TM requests).
>> For BC Joins, the hash tables do not coordinate spilling currently, which
>> means that we cannot do multiple joins through the same hash table.
>>
>>
>> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>> > If I'm not mistaken, then broadcast variables and broadcast inputs of
>> joins
>> > follow different code paths. Broadcast variables use additional input
>> > channels and are read before the actual driver code runs. In contrast to
>> > that, a join operation is a two input operator where the join driver
>> > decides how to handle the inputs (which one to read first as build
>> input).
>> >
>> > This also entails that the broadcast variable optimization, where each
>> task
>> > manager holds the data only once and copies of the data are discarded
>> (but
>> > they are transmitted multiple times to the TM), does not apply to the
>> > broadcast join inputs. Here you should see an slightly worse performance
>> > degradation with your initial benchmark if you increase the number of
>> > slots.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
>> > alexander.s.alexand...@gmail.com> wrote:
>> >
>> > > > As far as I know, the reason why the broadcast variables are
>> > implemented
>> > > that way is that the senders would have to know which sub-tasks are
>> > > deployed to which TMs.
>> > >
>> > > As the broadcast variables are realized as additionally attached
>> > "broadcast
>> > > channels", I am assuming that the same behavior will apply for
>> broadcast
>> > > joins as well.
>> > >
>> > > Is this the case?
>> > >
>> > > Regards,
>> > > Alexander
>> > >
>> > >
>> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <andreas.ku...@tu-berlin.de
>> >:
>> > >
>> > > > Hi Till,
>> > > >
>> > > > thanks for the fast answer.
>> > > > I'll think about a concrete way of implementing and open an JIRA.
>> > > >
>> > > > Best
>> > > > Andreas
>> > > > ________________________________________
>> > > > Von: Till Rohrmann <trohrm...@apache.org>
>> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
>> > > > An: dev@flink.apache.org
>> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
>> > > >
>> > > > Hi Andreas,
>> > > >
>> > > > your observation is correct. The data is sent to each slot and the
>> > > > receiving TM only materializes one copy of the data. The rest of the
>> > data
>> > > > is discarded.
>> > > >
>> > > > As far as I know, the reason why the broadcast variables are
>> > implemented
>> > > > that way is that the senders would have to know which sub-tasks are
>> > > > deployed to which TMs. Only then, you can decide for which sub-tasks
>> > you
>> > > > can send the data together. Since the output emitters are agnostic
>> to
>> > the
>> > > > actual deployment, the necessary information would have to be
>> forwarded
>> > > to
>> > > > them.
>> > > >
>> > > > Another problem is that if you pick one of the sub-tasks to receive
>> the
>> > > > broadcast set, then you have to make sure, that this sub-task has
>> read
>> > > and
>> > > > materialized the broadcast set before the other sub-tasks start
>> > working.
>> > > > One could maybe send to one sub-task first the broadcast set and
>> then
>> > to
>> > > > all other sub-tasks, after one has sent the BC set, a kind of
>> > acknowledge
>> > > > record. That way, the other sub-tasks would block until the
>> broadcast
>> > set
>> > > > has been completely transmitted. But here one has to make sure that
>> the
>> > > > sub-task receiving the BC set has been deployed and is not queued up
>> > for
>> > > > scheduling.
>> > > >
>> > > > So there are some challenges to solve in order to optimize the BC
>> sets.
>> > > > Currently, there is nobody working on it. If you want to start
>> working
>> > on
>> > > > it, then I would recommend to open a JIRA and start writing a design
>> > > > document for it.
>> > > >
>> > > > Cheers,
>> > > > Till
>> > > >
>> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
>> > > andreas.ku...@tu-berlin.de
>> > > > >
>> > > > wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > >
>> > > > > we experience some unexpected increase of data sent over the
>> network
>> > > for
>> > > > > broadcasts with increasing number of slots per Taskmanager.
>> > > > >
>> > > > >
>> > > > > We provided a benchmark [1]. It not only increases the size of
>> data
>> > > sent
>> > > > > over the network but also hurts performance as seen in the
>> > preliminary
>> > > > > results below. In this results cloud-11 has 25 nodes and ibm-power
>> > has
>> > > 8
>> > > > > nodes with scaling the number of slots per node from 1 - 16.
>> > > > >
>> > > > >
>> > > > > +-----------------------+--------------+-------------+
>> > > > > | suite                 | name         | median_time |
>> > > > > +=======================+==============+=============+
>> > > > > | broadcast.cloud-11    | broadcast.01 |        8796 |
>> > > > > | broadcast.cloud-11    | broadcast.02 |       14802 |
>> > > > > | broadcast.cloud-11    | broadcast.04 |       30173 |
>> > > > > | broadcast.cloud-11    | broadcast.08 |       56936 |
>> > > > > | broadcast.cloud-11    | broadcast.16 |      117507 |
>> > > > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
>> > > > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
>> > > > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
>> > > > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
>> > > > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
>> > > > > +-----------------------+--------------+-------------+
>> > > > >
>> > > > >
>> > > > >
>> > > > > After looking into the code base it, it seems that the data is
>> > > > > de-serialized only once per TM, but the actual data is sent for
>> all
>> > > slots
>> > > > > running the operator with broadcast vars and just gets discarded
>> in
>> > > case
>> > > > > its already de-serialized.
>> > > > >
>> > > > >
>> > > > > I do not see a reason the data can't be shared among the slots of
>> a
>> > TM
>> > > > and
>> > > > > therefore just sent once, but I guess it would require quite some
>> > > changes
>> > > > > bc sets are handled currently.
>> > > > >
>> > > > >
>> > > > > Are there any future plans regarding this and/or is there
>> interest in
>> > > > this
>> > > > > "feature"?
>> > > > >
>> > > > >
>> > > > > Best
>> > > > >
>> > > > > Andreas?
>> > > > >
>> > > > >
>> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
>> > > > >
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to