Hi,

Both of you seem to have the same requirement.
This is a good indication that "fault-tolerant metrics" are a missing
feature.
It might make sense to think about a built-in mechanism to back metrics
with state.

Cheers,
Fabian



Am Do., 2. Mai 2019 um 10:25 Uhr schrieb Paul Lam <paullin3...@gmail.com>:

> Hi Wouter,
>
> I've met the same issue and finally managed to use operator states to back
> the accumulators, so they can be restored after restarts.
> The downside is that we have to update the values in both accumulators and
> states to make them consistent. FYI.
>
> Best,
> Paul Lam
>
> Fabian Hueske <fhue...@gmail.com> 于2019年5月2日周四 下午4:17写道:
>
>> Hi Wouter,
>>
>> OK, that explains it :-) Overloaded terms...
>>
>> The Table API / SQL documentation refers to the accumulator of an
>> AggregateFunction [1].
>> The accumulators that are accessible via the RuntimeContext are a rather
>> old part of the API that is mainly intended for batch jobs.
>>
>> I would not use them for streaming applications as they are not
>> checkpointed and recovered (as you noticed).
>> You should use managed state (keyed or operator) for such use cases.
>>
>> Best,
>> Fabian
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
>>
>> Am Do., 2. Mai 2019 um 10:01 Uhr schrieb Wouter Zorgdrager <
>> w.d.zorgdra...@tudelft.nl>:
>>
>>> Hi Fabian,
>>>
>>> Maybe I should clarify a bit, actually I'm using a (Long)Counter
>>> registered as Accumulator in the RuntimeContext [1]. So I'm using a
>>> KeyedProcessFunction, not an AggregateFunction. This works property, but is
>>> not retained after a job restart. I'm not entirely sure if I did this
>>> correct.
>>>
>>> Thx,
>>> Wouter
>>>
>>>
>>> [1].
>>> https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34
>>>
>>>
>>>
>>> Op do 2 mei 2019 om 09:36 schreef Fabian Hueske <fhue...@gmail.com>:
>>>
>>>> Hi Wouter,
>>>>
>>>> The DataStream API accumulators of the AggregateFunction [1] are stored
>>>> in state and should be recovered in case of a failure as well.
>>>> If this does not work, it would be a serious bug.
>>>>
>>>> What's the type of your accumulator?
>>>> Can you maybe share the code?
>>>> How to you apply the AggregateFunction (window, windowAll, ...)?
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>>
>>>> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager <
>>>> w.d.zorgdra...@tudelft.nl>:
>>>>
>>>>> Hi all,
>>>>>
>>>>> In the documentation I read about UDF accumulators [1] "Accumulators
>>>>> are automatically backup-ed by Flink’s checkpointing mechanism and 
>>>>> restored
>>>>> in case of a failure to ensure exactly-once semantics." So I assumed
>>>>> this also was the case of accumulators used in the DataStream API, but I
>>>>> noticed that it isn't. So every time my jobs crashes and restarts, the
>>>>> accumulator is reset. Is there a way to retain this information?
>>>>>
>>>>> Thanks,
>>>>> Wouter
>>>>>
>>>>>
>>>>> [1].
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>>>>>
>>>>

Reply via email to