Yes, I have state on the ProcessFunction. I tested it on a stand-alone
cluster and it returns true when the application recovers. However, in
integration tests it does not returns true. I am using Flink 1.4. Do you
know where it is saying at Flink release 1.13 (
https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot
see `isRestored()` equals true on integration tests?

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[email protected]> wrote:

> Does your ProcessFunction has state? If not it would be in line with the
> documentation.
>
> Also which Flink version are you using? Before Flink 1.13 empty state was
> omitted so I could imagine that `isRestored()` would return false but it
> should actually now also return true for empty state.
>
> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
> [email protected]> wrote:
>
>> So, I was trying to improve by using the CheckpointedFunction as it shows
>> here [1]. But the method isRestored() says in its documentation [2]:
>>
>> "Returns true, if state was restored from the snapshot of a previous
>> execution. This returns always false for stateless tasks."
>>
>> It is weird because I am extending a ProcessFunction which is a
>> RichFunction.
>>
>> public class AuctionExceptionSimulatorProcessFunction extends
>> ProcessFunction<KeyedReportingData, KeyedReportingData>
>>         implements CheckpointedFunction {
>> ...
>>
>> In the end, I cannot rely on the "isRestored()". Do you know what could
>> be wrong? I used the same implementation method of [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>
>>
>> *--*
>> *-- Felipe Gutierrez*
>> *-- skype: felipe.o.gutierrez*
>>
>>
>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[email protected]>
>> wrote:
>>
>>> You can also use accumulators [1] to collect the number of restarts
>>> (and then access it via client); but side outputs should work as well.
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>>
>>> Regards,
>>> Roman
>>>
>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>> <[email protected]> wrote:
>>> >
>>> > I just realised that only the ProcessFunction is enough. I don't need
>>> the CheckpointFunction.
>>> >
>>> >
>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
>>> [email protected]> wrote:
>>> >>
>>> >> Cool!
>>> >>
>>> >> I did using this example
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>>> because I don't have a keyed stream on the specific operator that I want to
>>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>>> >>
>>> >> Because I need to test it in an integration test I am using a side
>>> output (
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>>> to attach a sink. I am not sure if you have a better idea to test the
>>> restarts on an integration test. If you have a simple idea please tell me
>>> :). This was the way that I solved....
>>> >>
>>> >> Thanks
>>> >> Felipe
>>> >>
>>> >> --
>>> >> -- Felipe Gutierrez
>>> >> -- skype: felipe.o.gutierrez
>>> >>
>>> >>
>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[email protected]>
>>> wrote:
>>> >>>
>>> >>> Hi Felipe,
>>> >>>
>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> >>> that depending on the configuration only a pipeline region can be
>>> >>> restarted, not the whole job).
>>> >>>
>>> >>> But if all you want is to check whether it's a first attempt or not,
>>> >>> you can also call context.isRestored() from initializeState() [2]
>>> >>>
>>> >>> [1]
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>> >>>
>>> >>> [2]
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >>>
>>> >>> Regards,
>>> >>> Roman
>>> >>>
>>> >>>
>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> >>> <[email protected]> wrote:
>>> >>> >
>>> >>> > Hello community,
>>> >>> >
>>> >>> > Is it possible to know programmatically how many times my Flink
>>> stream job restarted since it was running?
>>> >>> >
>>> >>> > My use case is like this. I have an Unit test that uses checkpoint
>>> and I throw one exception in a MapFunction for a given time, i.e.: for the
>>> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can
>>> recover the state and after 2 seconds I don't throw any exception anymore.
>>> Then I would like to know how many times the job was restarted.
>>> >>> >
>>> >>> > Thanks,
>>> >>> > Felipe
>>> >>> >
>>>
>>

Reply via email to