+1, especially if you don't want to rely on external metric reporter this
is a nice feature.

Op do 2 mei 2019 om 10:29 schreef Fabian Hueske <[email protected]>:

> Hi,
>
> Both of you seem to have the same requirement.
> This is a good indication that "fault-tolerant metrics" are a missing
> feature.
> It might make sense to think about a built-in mechanism to back metrics
> with state.
>
> Cheers,
> Fabian
>
>
>
> Am Do., 2. Mai 2019 um 10:25 Uhr schrieb Paul Lam <[email protected]>:
>
>> Hi Wouter,
>>
>> I've met the same issue and finally managed to use operator states to
>> back the accumulators, so they can be restored after restarts.
>> The downside is that we have to update the values in both accumulators
>> and states to make them consistent. FYI.
>>
>> Best,
>> Paul Lam
>>
>> Fabian Hueske <[email protected]> 于2019年5月2日周四 下午4:17写道:
>>
>>> Hi Wouter,
>>>
>>> OK, that explains it :-) Overloaded terms...
>>>
>>> The Table API / SQL documentation refers to the accumulator of an
>>> AggregateFunction [1].
>>> The accumulators that are accessible via the RuntimeContext are a rather
>>> old part of the API that is mainly intended for batch jobs.
>>>
>>> I would not use them for streaming applications as they are not
>>> checkpointed and recovered (as you noticed).
>>> You should use managed state (keyed or operator) for such use cases.
>>>
>>> Best,
>>> Fabian
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
>>>
>>> Am Do., 2. Mai 2019 um 10:01 Uhr schrieb Wouter Zorgdrager <
>>> [email protected]>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Maybe I should clarify a bit, actually I'm using a (Long)Counter
>>>> registered as Accumulator in the RuntimeContext [1]. So I'm using a
>>>> KeyedProcessFunction, not an AggregateFunction. This works property, but is
>>>> not retained after a job restart. I'm not entirely sure if I did this
>>>> correct.
>>>>
>>>> Thx,
>>>> Wouter
>>>>
>>>>
>>>> [1].
>>>> https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34
>>>>
>>>>
>>>>
>>>> Op do 2 mei 2019 om 09:36 schreef Fabian Hueske <[email protected]>:
>>>>
>>>>> Hi Wouter,
>>>>>
>>>>> The DataStream API accumulators of the AggregateFunction [1] are
>>>>> stored in state and should be recovered in case of a failure as well.
>>>>> If this does not work, it would be a serious bug.
>>>>>
>>>>> What's the type of your accumulator?
>>>>> Can you maybe share the code?
>>>>> How to you apply the AggregateFunction (window, windowAll, ...)?
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>>>
>>>>> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager <
>>>>> [email protected]>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> In the documentation I read about UDF accumulators [1] "Accumulators
>>>>>> are automatically backup-ed by Flink’s checkpointing mechanism and 
>>>>>> restored
>>>>>> in case of a failure to ensure exactly-once semantics." So I assumed
>>>>>> this also was the case of accumulators used in the DataStream API, but I
>>>>>> noticed that it isn't. So every time my jobs crashes and restarts, the
>>>>>> accumulator is reset. Is there a way to retain this information?
>>>>>>
>>>>>> Thanks,
>>>>>> Wouter
>>>>>>
>>>>>>
>>>>>> [1].
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>>>>>>
>>>>>

Reply via email to