Hi Ken,

Thanks for the bug report!

Fabian

2018-05-05 0:46 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>:

> Hi Fabian & Stefan,
>
> Thanks, and yes that does work more like what I’d expect.
>
> Regards,
>
> — Ken
>
> PS - Just FYI the Java code examples in the documentation referenced below
> have a number of bugs, see FLINK-9299
> <https://issues.apache.org/jira/browse/FLINK-9299>.
>
>
> On May 4, 2018, at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Ken,
>
> You can also use an additional ProcessWindowFunction [1] that is applied
> on the result of the AggregateFunction to set the key.
> Since the PWF is only applied on the final result, there no overhead
> (actually, it might even be slightly cheaper because the AggregateFunction
> can be simpler).
>
> If you don't want to use a PWF, your approach is the right one.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-
> incremental-aggregation
>
> 2018-05-03 19:53 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>:
>
>> Hi list,
>>
>> I was trying different ways to implement a moving average (count based,
>> not time based).
>>
>> The blunt instrument approach is to create a custom FlatMapFunction that
>> keeps track of the last N values.
>>
>> It seemed like using an AggregateFunction would be most consistent with
>> the Flink API, along the lines of...
>>
>>             .keyBy(new MyKeySelector())
>>             .window(GlobalWindows.create())
>>             .trigger(CountTrigger.of(1))
>>             .aggregate(new MovingAverageAggregator(10))
>>
>> This works, but the API for the AggregateFunction
>> (MovingAverageAggregator) feels a bit odd.
>>
>> Specifically, I want to emit a <key, moving average> result from
>> getResult(), but the key isn’t passed to the createAccumulator() method,
>> nor is it passed to the getResult() method. So in the add() method I check
>> if the accumulator I’ve created has a key set, and if not then I extract
>> the key from the record and set it on the accumulator, so I can use it in
>> the getResult() call.
>>
>> Is this expected, or am I miss-using the functionality?
>>
>> Thanks,
>>
>> — Ken
>>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>
>

Reply via email to