Setting up the partitioning etc is done automatically by the optimizer.
What is missing is a hash-based aggregator operator, that the optimizer can
choose as an alternative strategy to sort-based aggregation.

A good first step would be to have a look at how the hash join works, in
order to get an idea of how to implement an algorithm on Flink's managed
memory. Have a look at the MutableHashTable class in the "flink-runtime"
project.

Also, a hash-combiner is probably even more interesting than a
hash-reducer. Every aggregation that fits a running hash-aggregate will
needs a hash-combiner. And it is possible to hash-combine and sort-reduce
afterwards, if ordered output is desired.


On Wed, Jun 17, 2015 at 3:26 PM, Alexander Alexandrov <
alexander.s.alexand...@gmail.com> wrote:

> I added a comment with suggestions how to proceed in the JIRA issue.
>
> 2015-06-17 22:41 GMT+02:00 <rafi_33...@mailbox.tu-berlin.de>:
>
> >
> > Hello dear Developer,
> > Currently aggregation functions are implemented based on sorting. We
> would
> > like to add hash based aggregation to Flink. We would be thankful if you
> > could tell as how to get started (shall we add it as an operator, runtime
> > or ...). Our current thinking is to first hash partition the data based
> on
> > grouping attribute and then apply aggregation function on the data that
> are
> > in same bucket. Now we need to know the work flow of Aggregation in
> Flink.
> >
> > With Respect,
> > Rafiullah Momand
> >
> >
>

Reply via email to