Hi Ken, Thanks for the bug report!
Fabian 2018-05-05 0:46 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>: > Hi Fabian & Stefan, > > Thanks, and yes that does work more like what I’d expect. > > Regards, > > — Ken > > PS - Just FYI the Java code examples in the documentation referenced below > have a number of bugs, see FLINK-9299 > <https://issues.apache.org/jira/browse/FLINK-9299>. > > > On May 4, 2018, at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Ken, > > You can also use an additional ProcessWindowFunction [1] that is applied > on the result of the AggregateFunction to set the key. > Since the PWF is only applied on the final result, there no overhead > (actually, it might even be slightly cheaper because the AggregateFunction > can be simpler). > > If you don't want to use a PWF, your approach is the right one. > > Best, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/stream/operators/windows.html#processwindowfunction-with- > incremental-aggregation > > 2018-05-03 19:53 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>: > >> Hi list, >> >> I was trying different ways to implement a moving average (count based, >> not time based). >> >> The blunt instrument approach is to create a custom FlatMapFunction that >> keeps track of the last N values. >> >> It seemed like using an AggregateFunction would be most consistent with >> the Flink API, along the lines of... >> >> .keyBy(new MyKeySelector()) >> .window(GlobalWindows.create()) >> .trigger(CountTrigger.of(1)) >> .aggregate(new MovingAverageAggregator(10)) >> >> This works, but the API for the AggregateFunction >> (MovingAverageAggregator) feels a bit odd. >> >> Specifically, I want to emit a <key, moving average> result from >> getResult(), but the key isn’t passed to the createAccumulator() method, >> nor is it passed to the getResult() method. So in the add() method I check >> if the accumulator I’ve created has a key set, and if not then I extract >> the key from the record and set it on the accumulator, so I can use it in >> the getResult() call. >> >> Is this expected, or am I miss-using the functionality? >> >> Thanks, >> >> — Ken >> > > -------------------------------------------- > http://about.me/kkrugler > +1 530-210-6378 > >