Setting up the partitioning etc is done automatically by the optimizer. What is missing is a hash-based aggregator operator, that the optimizer can choose as an alternative strategy to sort-based aggregation.
A good first step would be to have a look at how the hash join works, in order to get an idea of how to implement an algorithm on Flink's managed memory. Have a look at the MutableHashTable class in the "flink-runtime" project. Also, a hash-combiner is probably even more interesting than a hash-reducer. Every aggregation that fits a running hash-aggregate will needs a hash-combiner. And it is possible to hash-combine and sort-reduce afterwards, if ordered output is desired. On Wed, Jun 17, 2015 at 3:26 PM, Alexander Alexandrov < alexander.s.alexand...@gmail.com> wrote: > I added a comment with suggestions how to proceed in the JIRA issue. > > 2015-06-17 22:41 GMT+02:00 <rafi_33...@mailbox.tu-berlin.de>: > > > > > Hello dear Developer, > > Currently aggregation functions are implemented based on sorting. We > would > > like to add hash based aggregation to Flink. We would be thankful if you > > could tell as how to get started (shall we add it as an operator, runtime > > or ...). Our current thinking is to first hash partition the data based > on > > grouping attribute and then apply aggregation function on the data that > are > > in same bucket. Now we need to know the work flow of Aggregation in > Flink. > > > > With Respect, > > Rafiullah Momand > > > > >