Some form of tree aggregation is useful in many cases, and IMO a good addition to the system.
Kostas On Mon, Apr 27, 2015 at 11:04 AM, Andra Lungu <lungu.an...@gmail.com> wrote: > Hi Fabian, > > After a quick look at the current behaviour of Flink's combinable reduce, I > saw that it does something like this: > > https://docs.google.com/drawings/d/1WfGJq1ZNQ-F0EQZ2TwEYS_861xc3fSdfJL9Z4VdXBQU/edit?usp=sharing > > It basically iterates over all the key groups two by two and if it finds > two keys within the same key group, it reduces them. > > What we would like to do(as Vasia said) as part of my thesis would be to > speed up the regular reduce by turning it into a treeReduce() where > appropriate[in GSA, for example, we do a reduce; that could easily be > hot-swapped with treeReduce() when skewed vertices are detected]. This new > operator will get the number of levels as an additional parameter(val1, > val2, numLevels) and will aggregate in levels: > > https://docs.google.com/drawings/d/1X_yJBdZykB9oBTbACUy9Bdd7oG5eDPDicNKhPcZ71ik/edit?usp=sharing > > The goal is to make computation for highly skewed graphs scale. If we map > one of the first nodes in the drawing to a vertex with high in-degree, it > will slow down computation with the first reduce approach. But I am sure we > could find many other use cases. > > Now, in order to write the treeReduce operator, I made some investigations. > It does not suffice to make reduce's run() method operate on levels, you > also need to ensure that the partial reduces in the levels are executed on > different machines. This is where the tricky and fun part begins. How do > you know which reduce is executed on which machine? In which class is this > described? > > I sure would hate to do duplicate work and since this is the first time I > had a look at Flink's internals, I could also use some guidance. > > > > > On Mon, Apr 27, 2015 at 10:36 AM, Vasiliki Kalavri < > vasilikikala...@gmail.com> wrote: > > > Hi, > > > > Andra is working on a modified reduce operator, which would internally > > create an aggregation tree. > > This is work related to her thesis and we want to use it in graph > > computations for skewed inputs. > > It might or might not be a good idea to add it as a Flink operator and we > > will need to evaluate that (as part of the thesis), so we don't have a > JIRA > > for this :-) > > > > -Vasia. > > > > On 27 April 2015 at 10:20, Fabian Hueske <fhue...@gmail.com> wrote: > > > > > Hi Andra, > > > > > > is there a JIRA for the new runtime operator? > > > > > > Adding a new operator is a lot of work and touches many core parts of > the > > > system. > > > It would be good to start a discussion about that early in the process > to > > > make sure that the design is aligned with the system. > > > Otherwise, duplicated work might be necessary before it can be added to > > the > > > system. > > > > > > Cheers, > > > Fabian > > > > > > 2015-04-26 13:05 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > > > Yes Markus, > > > > > > > > ds.reduce() -> AllReduceDriver > > > > ds.groupBy().reduce() -> ReduceDriver > > > > > > > > It's very intuitive ;) > > > > > > > > On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer < > > > > holzemer.mar...@googlemail.com> wrote: > > > > > > > > > Hey Andrea, > > > > > perhaps you are looking at the wrong ReduceDriver? > > > > > As you can see in the DriverStrategy enum there is several > different > > > > > ReduceDrivers depending on the strategy the optimizer chooses. > > > > > > > > > > best, > > > > > Markus > > > > > > > > > > 2015-04-26 12:26 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > > > > > > > Hey guys, > > > > > > > > > > > > I am trying to add a new runtime operator; > > > > > > To this end, I am following the guide here: > > > > > > > > > > > > > > > > > > > > > > > > > > > http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html > > > > > > and the code itself. > > > > > > > > > > > > > > > > > > From what I understood, the run() in ReduceDriver, for instance, > > > should > > > > > be > > > > > > called every time a reduce() is called. However, I added a > > breakpoint > > > > in > > > > > > ReduceDriver's run method on the first if and called reduce() on > a > > > > > DataSet. > > > > > > When debugging, it seems that the method is not called; I also > > tried > > > > > adding > > > > > > a log.info() there. That doesn't get printed either... > Obviously, > > > the > > > > > same > > > > > > goes for System.out.println. > > > > > > > > > > > > Could someone explain the workflow a bit better? When exactly > does > > > > run() > > > > > > get called and what is ReduceDriver's role? > > > > > > > > > > > > Thanks! > > > > > > Andra > > > > > > > > > > > > > > > > > > > > >