Hi Fabian,

Maybe I should clarify a bit, actually I'm using a (Long)Counter registered
as Accumulator in the RuntimeContext [1]. So I'm using a
KeyedProcessFunction, not an AggregateFunction. This works property, but is
not retained after a job restart. I'm not entirely sure if I did this
correct.

Thx,
Wouter


[1].
https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34



Op do 2 mei 2019 om 09:36 schreef Fabian Hueske <[email protected]>:

> Hi Wouter,
>
> The DataStream API accumulators of the AggregateFunction [1] are stored in
> state and should be recovered in case of a failure as well.
> If this does not work, it would be a serious bug.
>
> What's the type of your accumulator?
> Can you maybe share the code?
> How to you apply the AggregateFunction (window, windowAll, ...)?
>
> Thanks,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>
> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager <
> [email protected]>:
>
>> Hi all,
>>
>> In the documentation I read about UDF accumulators [1] "Accumulators are
>> automatically backup-ed by Flink’s checkpointing mechanism and restored in
>> case of a failure to ensure exactly-once semantics." So I assumed this
>> also was the case of accumulators used in the DataStream API, but I noticed
>> that it isn't. So every time my jobs crashes and restarts, the accumulator
>> is reset. Is there a way to retain this information?
>>
>> Thanks,
>> Wouter
>>
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>>
>

Reply via email to