Yes, I have state on the ProcessFunction. I tested it on a stand-alone cluster and it returns true when the application recovers. However, in integration tests it does not returns true. I am using Flink 1.4. Do you know where it is saying at Flink release 1.13 ( https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot see `isRestored()` equals true on integration tests?
*--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <[email protected]> wrote: > Does your ProcessFunction has state? If not it would be in line with the > documentation. > > Also which Flink version are you using? Before Flink 1.13 empty state was > omitted so I could imagine that `isRestored()` would return false but it > should actually now also return true for empty state. > > On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez < > [email protected]> wrote: > >> So, I was trying to improve by using the CheckpointedFunction as it shows >> here [1]. But the method isRestored() says in its documentation [2]: >> >> "Returns true, if state was restored from the snapshot of a previous >> execution. This returns always false for stateless tasks." >> >> It is weird because I am extending a ProcessFunction which is a >> RichFunction. >> >> public class AuctionExceptionSimulatorProcessFunction extends >> ProcessFunction<KeyedReportingData, KeyedReportingData> >> implements CheckpointedFunction { >> ... >> >> In the end, I cannot rely on the "isRestored()". Do you know what could >> be wrong? I used the same implementation method of [1]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >> >> *--* >> *-- Felipe Gutierrez* >> *-- skype: felipe.o.gutierrez* >> >> >> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <[email protected]> >> wrote: >> >>> You can also use accumulators [1] to collect the number of restarts >>> (and then access it via client); but side outputs should work as well. >>> >>> [1] >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >>> >>> Regards, >>> Roman >>> >>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >>> <[email protected]> wrote: >>> > >>> > I just realised that only the ProcessFunction is enough. I don't need >>> the CheckpointFunction. >>> > >>> > >>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < >>> [email protected]> wrote: >>> >> >>> >> Cool! >>> >> >>> >> I did using this example >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >>> because I don't have a keyed stream on the specific operator that I want to >>> count the number of restarts. (yes I am using version 1.4 unfortunately). >>> >> >>> >> Because I need to test it in an integration test I am using a side >>> output ( >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >>> to attach a sink. I am not sure if you have a better idea to test the >>> restarts on an integration test. If you have a simple idea please tell me >>> :). This was the way that I solved.... >>> >> >>> >> Thanks >>> >> Felipe >>> >> >>> >> -- >>> >> -- Felipe Gutierrez >>> >> -- skype: felipe.o.gutierrez >>> >> >>> >> >>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <[email protected]> >>> wrote: >>> >>> >>> >>> Hi Felipe, >>> >>> >>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware >>> >>> that depending on the configuration only a pipeline region can be >>> >>> restarted, not the whole job). >>> >>> >>> >>> But if all you want is to check whether it's a first attempt or not, >>> >>> you can also call context.isRestored() from initializeState() [2] >>> >>> >>> >>> [1] >>> >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>> >>> >>> >>> [2] >>> >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >>> >>> >>> Regards, >>> >>> Roman >>> >>> >>> >>> >>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>> >>> <[email protected]> wrote: >>> >>> > >>> >>> > Hello community, >>> >>> > >>> >>> > Is it possible to know programmatically how many times my Flink >>> stream job restarted since it was running? >>> >>> > >>> >>> > My use case is like this. I have an Unit test that uses checkpoint >>> and I throw one exception in a MapFunction for a given time, i.e.: for the >>> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can >>> recover the state and after 2 seconds I don't throw any exception anymore. >>> Then I would like to know how many times the job was restarted. >>> >>> > >>> >>> > Thanks, >>> >>> > Felipe >>> >>> > >>> >>
