Hi list,

I was trying different ways to implement a moving average (count based, not 
time based).

The blunt instrument approach is to create a custom FlatMapFunction that keeps 
track of the last N values.

It seemed like using an AggregateFunction would be most consistent with the 
Flink API, along the lines of...

            .keyBy(new MyKeySelector())
            .window(GlobalWindows.create())
            .trigger(CountTrigger.of(1))
            .aggregate(new MovingAverageAggregator(10))

This works, but the API for the AggregateFunction (MovingAverageAggregator) 
feels a bit odd.

Specifically, I want to emit a <key, moving average> result from getResult(), 
but the key isn’t passed to the createAccumulator() method, nor is it passed to 
the getResult() method. So in the add() method I check if the accumulator I’ve 
created has a key set, and if not then I extract the key from the record and 
set it on the accumulator, so I can use it in the getResult() call.

Is this expected, or am I miss-using the functionality?

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to