Some form of tree aggregation is useful in many cases, and IMO a good
addition to the system.

Kostas

On Mon, Apr 27, 2015 at 11:04 AM, Andra Lungu <lungu.an...@gmail.com> wrote:

> Hi Fabian,
>
> After a quick look at the current behaviour of Flink's combinable reduce, I
> saw that it does something like this:
>
> https://docs.google.com/drawings/d/1WfGJq1ZNQ-F0EQZ2TwEYS_861xc3fSdfJL9Z4VdXBQU/edit?usp=sharing
>
> It basically iterates over all the key groups two by two and if it finds
> two keys within the same key group, it reduces them.
>
> What we would like to do(as Vasia said) as part of my thesis would be to
> speed up the regular reduce by turning it into a treeReduce() where
> appropriate[in GSA, for example, we do a reduce; that could easily be
> hot-swapped with treeReduce() when skewed vertices are detected]. This new
> operator will get the number of levels as an additional parameter(val1,
> val2, numLevels) and will aggregate in levels:
>
> https://docs.google.com/drawings/d/1X_yJBdZykB9oBTbACUy9Bdd7oG5eDPDicNKhPcZ71ik/edit?usp=sharing
>
> The goal is to make computation for highly skewed graphs scale. If we map
> one of the first nodes in the drawing to a vertex with high in-degree, it
> will slow down computation with the first reduce approach. But I am sure we
> could find many other use cases.
>
> Now, in order to write the treeReduce operator, I made some investigations.
> It does not suffice to make reduce's run() method operate on levels, you
> also need to ensure that the partial reduces in the levels are executed on
> different machines. This is where the tricky and fun part begins. How do
> you know which reduce is executed on which machine? In which class is this
> described?
>
> I sure would hate to do duplicate work and since this is the first time I
> had a look at Flink's internals, I could also use some  guidance.
>
>
>
>
> On Mon, Apr 27, 2015 at 10:36 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
> > Hi,
> >
> > Andra is working on a modified reduce operator, which would internally
> > create an aggregation tree.
> > This is work related to her thesis and we want to use it in graph
> > computations for skewed inputs.
> > It might or might not be a good idea to add it as a Flink operator and we
> > will need to evaluate that (as part of the thesis), so we don't have a
> JIRA
> > for this :-)
> >
> > -Vasia.
> >
> > On 27 April 2015 at 10:20, Fabian Hueske <fhue...@gmail.com> wrote:
> >
> > > Hi Andra,
> > >
> > > is there a JIRA for the new runtime operator?
> > >
> > > Adding a new operator is a lot of work and touches many core parts of
> the
> > > system.
> > > It would be good to start a discussion about that early in the process
> to
> > > make sure that the design is aligned with the system.
> > > Otherwise, duplicated work might be necessary before it can be added to
> > the
> > > system.
> > >
> > > Cheers,
> > > Fabian
> > >
> > > 2015-04-26 13:05 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:
> > >
> > > > Yes Markus,
> > > >
> > > > ds.reduce() -> AllReduceDriver
> > > > ds.groupBy().reduce() -> ReduceDriver
> > > >
> > > > It's very intuitive ;)
> > > >
> > > > On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer <
> > > > holzemer.mar...@googlemail.com> wrote:
> > > >
> > > > > Hey Andrea,
> > > > > perhaps you are looking at the wrong ReduceDriver?
> > > > > As you can see in the DriverStrategy enum there is several
> different
> > > > > ReduceDrivers depending on the strategy the optimizer chooses.
> > > > >
> > > > > best,
> > > > > Markus
> > > > >
> > > > > 2015-04-26 12:26 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:
> > > > >
> > > > > > Hey guys,
> > > > > >
> > > > > > I am trying to add a new runtime operator;
> > > > > > To this end, I am following the guide here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> > > > > > and the code itself.
> > > > > >
> > > > > >
> > > > > > From what I understood, the run() in ReduceDriver, for instance,
> > > should
> > > > > be
> > > > > > called every time a reduce() is called. However, I added a
> > breakpoint
> > > > in
> > > > > > ReduceDriver's run method on the first if and called reduce() on
> a
> > > > > DataSet.
> > > > > > When debugging, it seems that the method is not called; I also
> > tried
> > > > > adding
> > > > > > a log.info() there. That doesn't get printed either...
> Obviously,
> > > the
> > > > > same
> > > > > > goes for System.out.println.
> > > > > >
> > > > > > Could someone explain the workflow a bit better? When exactly
> does
> > > > run()
> > > > > > get called and what is ReduceDriver's role?
> > > > > >
> > > > > > Thanks!
> > > > > > Andra
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to