Hi Fabian, After a quick look at the current behaviour of Flink's combinable reduce, I saw that it does something like this: https://docs.google.com/drawings/d/1WfGJq1ZNQ-F0EQZ2TwEYS_861xc3fSdfJL9Z4VdXBQU/edit?usp=sharing
It basically iterates over all the key groups two by two and if it finds two keys within the same key group, it reduces them. What we would like to do(as Vasia said) as part of my thesis would be to speed up the regular reduce by turning it into a treeReduce() where appropriate[in GSA, for example, we do a reduce; that could easily be hot-swapped with treeReduce() when skewed vertices are detected]. This new operator will get the number of levels as an additional parameter(val1, val2, numLevels) and will aggregate in levels: https://docs.google.com/drawings/d/1X_yJBdZykB9oBTbACUy9Bdd7oG5eDPDicNKhPcZ71ik/edit?usp=sharing The goal is to make computation for highly skewed graphs scale. If we map one of the first nodes in the drawing to a vertex with high in-degree, it will slow down computation with the first reduce approach. But I am sure we could find many other use cases. Now, in order to write the treeReduce operator, I made some investigations. It does not suffice to make reduce's run() method operate on levels, you also need to ensure that the partial reduces in the levels are executed on different machines. This is where the tricky and fun part begins. How do you know which reduce is executed on which machine? In which class is this described? I sure would hate to do duplicate work and since this is the first time I had a look at Flink's internals, I could also use some guidance. On Mon, Apr 27, 2015 at 10:36 AM, Vasiliki Kalavri < vasilikikala...@gmail.com> wrote: > Hi, > > Andra is working on a modified reduce operator, which would internally > create an aggregation tree. > This is work related to her thesis and we want to use it in graph > computations for skewed inputs. > It might or might not be a good idea to add it as a Flink operator and we > will need to evaluate that (as part of the thesis), so we don't have a JIRA > for this :-) > > -Vasia. > > On 27 April 2015 at 10:20, Fabian Hueske <fhue...@gmail.com> wrote: > > > Hi Andra, > > > > is there a JIRA for the new runtime operator? > > > > Adding a new operator is a lot of work and touches many core parts of the > > system. > > It would be good to start a discussion about that early in the process to > > make sure that the design is aligned with the system. > > Otherwise, duplicated work might be necessary before it can be added to > the > > system. > > > > Cheers, > > Fabian > > > > 2015-04-26 13:05 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > Yes Markus, > > > > > > ds.reduce() -> AllReduceDriver > > > ds.groupBy().reduce() -> ReduceDriver > > > > > > It's very intuitive ;) > > > > > > On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer < > > > holzemer.mar...@googlemail.com> wrote: > > > > > > > Hey Andrea, > > > > perhaps you are looking at the wrong ReduceDriver? > > > > As you can see in the DriverStrategy enum there is several different > > > > ReduceDrivers depending on the strategy the optimizer chooses. > > > > > > > > best, > > > > Markus > > > > > > > > 2015-04-26 12:26 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > > > > > Hey guys, > > > > > > > > > > I am trying to add a new runtime operator; > > > > > To this end, I am following the guide here: > > > > > > > > > > > > > > > > > > > > http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html > > > > > and the code itself. > > > > > > > > > > > > > > > From what I understood, the run() in ReduceDriver, for instance, > > should > > > > be > > > > > called every time a reduce() is called. However, I added a > breakpoint > > > in > > > > > ReduceDriver's run method on the first if and called reduce() on a > > > > DataSet. > > > > > When debugging, it seems that the method is not called; I also > tried > > > > adding > > > > > a log.info() there. That doesn't get printed either... Obviously, > > the > > > > same > > > > > goes for System.out.println. > > > > > > > > > > Could someone explain the workflow a bit better? When exactly does > > > run() > > > > > get called and what is ReduceDriver's role? > > > > > > > > > > Thanks! > > > > > Andra > > > > > > > > > > > > > > >