Hi, Both of you seem to have the same requirement. This is a good indication that "fault-tolerant metrics" are a missing feature. It might make sense to think about a built-in mechanism to back metrics with state.
Cheers, Fabian Am Do., 2. Mai 2019 um 10:25 Uhr schrieb Paul Lam <paullin3...@gmail.com>: > Hi Wouter, > > I've met the same issue and finally managed to use operator states to back > the accumulators, so they can be restored after restarts. > The downside is that we have to update the values in both accumulators and > states to make them consistent. FYI. > > Best, > Paul Lam > > Fabian Hueske <fhue...@gmail.com> 于2019年5月2日周四 下午4:17写道: > >> Hi Wouter, >> >> OK, that explains it :-) Overloaded terms... >> >> The Table API / SQL documentation refers to the accumulator of an >> AggregateFunction [1]. >> The accumulators that are accessible via the RuntimeContext are a rather >> old part of the API that is mainly intended for batch jobs. >> >> I would not use them for streaming applications as they are not >> checkpointed and recovered (as you noticed). >> You should use managed state (keyed or operator) for such use cases. >> >> Best, >> Fabian >> >> [1] >> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java >> >> Am Do., 2. Mai 2019 um 10:01 Uhr schrieb Wouter Zorgdrager < >> w.d.zorgdra...@tudelft.nl>: >> >>> Hi Fabian, >>> >>> Maybe I should clarify a bit, actually I'm using a (Long)Counter >>> registered as Accumulator in the RuntimeContext [1]. So I'm using a >>> KeyedProcessFunction, not an AggregateFunction. This works property, but is >>> not retained after a job restart. I'm not entirely sure if I did this >>> correct. >>> >>> Thx, >>> Wouter >>> >>> >>> [1]. >>> https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34 >>> >>> >>> >>> Op do 2 mei 2019 om 09:36 schreef Fabian Hueske <fhue...@gmail.com>: >>> >>>> Hi Wouter, >>>> >>>> The DataStream API accumulators of the AggregateFunction [1] are stored >>>> in state and should be recovered in case of a failure as well. >>>> If this does not work, it would be a serious bug. >>>> >>>> What's the type of your accumulator? >>>> Can you maybe share the code? >>>> How to you apply the AggregateFunction (window, windowAll, ...)? >>>> >>>> Thanks, >>>> Fabian >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java >>>> >>>> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager < >>>> w.d.zorgdra...@tudelft.nl>: >>>> >>>>> Hi all, >>>>> >>>>> In the documentation I read about UDF accumulators [1] "Accumulators >>>>> are automatically backup-ed by Flink’s checkpointing mechanism and >>>>> restored >>>>> in case of a failure to ensure exactly-once semantics." So I assumed >>>>> this also was the case of accumulators used in the DataStream API, but I >>>>> noticed that it isn't. So every time my jobs crashes and restarts, the >>>>> accumulator is reset. Is there a way to retain this information? >>>>> >>>>> Thanks, >>>>> Wouter >>>>> >>>>> >>>>> [1]. >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html >>>>> >>>>