+1, especially if you don't want to rely on external metric reporter this is a nice feature.
Op do 2 mei 2019 om 10:29 schreef Fabian Hueske <[email protected]>: > Hi, > > Both of you seem to have the same requirement. > This is a good indication that "fault-tolerant metrics" are a missing > feature. > It might make sense to think about a built-in mechanism to back metrics > with state. > > Cheers, > Fabian > > > > Am Do., 2. Mai 2019 um 10:25 Uhr schrieb Paul Lam <[email protected]>: > >> Hi Wouter, >> >> I've met the same issue and finally managed to use operator states to >> back the accumulators, so they can be restored after restarts. >> The downside is that we have to update the values in both accumulators >> and states to make them consistent. FYI. >> >> Best, >> Paul Lam >> >> Fabian Hueske <[email protected]> 于2019年5月2日周四 下午4:17写道: >> >>> Hi Wouter, >>> >>> OK, that explains it :-) Overloaded terms... >>> >>> The Table API / SQL documentation refers to the accumulator of an >>> AggregateFunction [1]. >>> The accumulators that are accessible via the RuntimeContext are a rather >>> old part of the API that is mainly intended for batch jobs. >>> >>> I would not use them for streaming applications as they are not >>> checkpointed and recovered (as you noticed). >>> You should use managed state (keyed or operator) for such use cases. >>> >>> Best, >>> Fabian >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java >>> >>> Am Do., 2. Mai 2019 um 10:01 Uhr schrieb Wouter Zorgdrager < >>> [email protected]>: >>> >>>> Hi Fabian, >>>> >>>> Maybe I should clarify a bit, actually I'm using a (Long)Counter >>>> registered as Accumulator in the RuntimeContext [1]. So I'm using a >>>> KeyedProcessFunction, not an AggregateFunction. This works property, but is >>>> not retained after a job restart. I'm not entirely sure if I did this >>>> correct. >>>> >>>> Thx, >>>> Wouter >>>> >>>> >>>> [1]. >>>> https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34 >>>> >>>> >>>> >>>> Op do 2 mei 2019 om 09:36 schreef Fabian Hueske <[email protected]>: >>>> >>>>> Hi Wouter, >>>>> >>>>> The DataStream API accumulators of the AggregateFunction [1] are >>>>> stored in state and should be recovered in case of a failure as well. >>>>> If this does not work, it would be a serious bug. >>>>> >>>>> What's the type of your accumulator? >>>>> Can you maybe share the code? >>>>> How to you apply the AggregateFunction (window, windowAll, ...)? >>>>> >>>>> Thanks, >>>>> Fabian >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java >>>>> >>>>> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager < >>>>> [email protected]>: >>>>> >>>>>> Hi all, >>>>>> >>>>>> In the documentation I read about UDF accumulators [1] "Accumulators >>>>>> are automatically backup-ed by Flink’s checkpointing mechanism and >>>>>> restored >>>>>> in case of a failure to ensure exactly-once semantics." So I assumed >>>>>> this also was the case of accumulators used in the DataStream API, but I >>>>>> noticed that it isn't. So every time my jobs crashes and restarts, the >>>>>> accumulator is reset. Is there a way to retain this information? >>>>>> >>>>>> Thanks, >>>>>> Wouter >>>>>> >>>>>> >>>>>> [1]. >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html >>>>>> >>>>>
