Just to clarify, I was using isRestored() but I think getAttemptNumber() should be simpler.
Regards, Roman On Mon, Jun 21, 2021 at 10:30 AM Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > > ummm, ok. you are using the "getRuntimeContext().getAttemptNumber()". I was > using the "isRestored()". Now it is counting. > thanks! > Felipe > > > On Fri, Jun 18, 2021 at 10:17 PM Roman Khachatryan <ro...@apache.org> wrote: >> >> > do you mean inside the processElement() method? >> I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess. >> >> > what is 0ms pause? do you mean >> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? >> Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); >> >> > How do you create a random exception? do you mean not mine >> > SimulatedException? >> I mean it should be thrown at random because the checkpoint must >> reliably precede it. So on recovery that there is some state. Checking >> against 6666... only once assumes that the checkpoint was triggered >> before. Besides, checkpoint is not guaranteed to be triggered before >> the end of input. >> I tried to run it in with maven and it worked after making the source >> infinite. >> >> From the code you provided the parallelism level doesn't seem >> important and can be set 1 (or restart strategy to full). Then using >> getRuntimeContext().getAttemptNumber() would be simpler and more >> reliable. >> >> Regards, >> Roman >> >> On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez >> <felipe.o.gutier...@gmail.com> wrote: >> > >> > >> > >> > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <ro...@apache.org> wrote: >> >> >> >> I tried to run the test that you mentioned >> >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. >> >> 6f08d0a. >> >> >> >> In IDE, I see that: >> >> - checkpoint is never triggered (sentence is too short, checkpoint >> >> pause and interval are too large) >> >> - exception is never thrown, so the job never restarted >> >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not) >> >> >> >> When I add sleep between each element, set 10ms interval, 0ms pause >> >> and introduce some random exception, I do see >> > >> > >> > do you mean inside the processElement() method? >> > >> > what is 0ms pause? do you mean >> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? >> > >> > How do you create a random exception? do you mean not mine >> > SimulatedException? >> > >> > Using these configurations that I just said it is not working for me. I am >> > testing on the terminal "mvn >> > -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". >> > On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: >> > latencyTrackingConfigBuilder" when I call "env.execute();" >> > >> > org.apache.flink.runtime.client.JobExecutionException: Job execution >> > failed. >> > at >> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >> > at >> > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) >> > at >> > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >> > at >> > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) >> > at >> > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) >> > at >> > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) >> > at >> > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) >> > at >> > org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > at >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:498) >> > at >> > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) >> > at >> > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> > at >> > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) >> > at >> > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >> > at >> > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) >> > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) >> > at >> > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) >> > at >> > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) >> > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) >> > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) >> > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) >> > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) >> > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) >> > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >> > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) >> > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> > at >> > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) >> > at >> > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) >> > at >> > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) >> > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) >> > Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder >> > >> > >> > >> >> >> >> 2021-06-18 17:37:53,241 INFO >> >> org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess - >> >> Attempts restart: 1 >> >> in the logs. >> >> >> >> These settings probably differ on the cluster and there is some >> >> unrelated exception which causes a restart. >> >> >> >> Regards, >> >> Roman >> >> >> >> On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez >> >> <felipe.o.gutier...@gmail.com> wrote: >> >> > >> >> > I investigated a little bit more. I created the same POC on a Flink >> >> > version 1.13. I have this ProcessFunction where I want to count the >> >> > times it recovers. I tested with ListState and ValueState and it seems >> >> > that during the integration test (only for integration test) the >> >> > process is hanging on the open() or on the initializeState() methods. >> >> > >> >> > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java >> >> > >> >> > Here is my integration test: >> >> > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 >> >> > >> >> > If I comment out the state ListState or the ValueState, the integration >> >> > test works. It first throws an exception and I see it on the output. >> >> > Then after 2 seconds it does not throw exceptions. Then I compare the >> >> > output and they match. But I don't have a way to count how many >> >> > restarts happened during the integration test. I con only count the >> >> > restarts when I run the application on a stand-alone flink cluster. >> >> > >> >> > I don't know. Maybe, do I have to configure some parameters to work >> >> > with the state on integration tests? >> >> > >> >> > -- >> >> > -- Felipe Gutierrez >> >> > -- skype: felipe.o.gutierrez >> >> > >> >> > >> >> > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez >> >> > <felipe.o.gutier...@gmail.com> wrote: >> >> >> >> >> >> No, it didn't work. >> >> >> >> >> >> The "context.isRestored()" returns true when I run the application on >> >> >> the Flink standalone-cluster and it is recovering after a failure. >> >> >> When I do the same on a integration test it does not returns true >> >> >> after a failure. I mean, I can log the exception that is causing the >> >> >> failure, the initializeState() is called after a failure, but the >> >> >> context.isRestored() is false again. I also tried to update the state >> >> >> on the first time to 0 "if (!context.isRestored()) { >> >> >> restartsState.add(0L); }" and it does not work. >> >> >> I think the problem is not on the ListState that I am using and not on >> >> >> the context.isRestore() as well. It is on the >> >> >> "context.getOperatorStateStore()" that is always null only on >> >> >> integration tests. Using the below code I can see on the logs >> >> >> "restarts: 0" twice, before and after failure. >> >> >> >> >> >> @Override >> >> >> public void initializeState(FunctionInitializationContext context) >> >> >> throws Exception { >> >> >> // unit tests does not open OperatorStateStore >> >> >> if (context.getOperatorStateStore() != null) { >> >> >> restartsState = >> >> >> context.getOperatorStateStore().getListState(new >> >> >> ListStateDescriptor<Long>("restarts", Long.class)); >> >> >> >> >> >> List<Long> restoreList = >> >> >> Lists.newArrayList(restartsState.get()); >> >> >> if (restoreList == null || restoreList.isEmpty()) { >> >> >> restartsState.add(0L); >> >> >> LOG.info("restarts: 0"); >> >> >> } else { >> >> >> Long max = Collections.max(restoreList); >> >> >> LOG.info("restarts: " + max); >> >> >> restartsState.add(max + 1); >> >> >> } >> >> >> } >> >> >> } >> >> >> >> >> >> -- >> >> >> -- Felipe Gutierrez >> >> >> -- skype: felipe.o.gutierrez >> >> >> >> >> >> >> >> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> >> >> >> wrote: >> >> >>> >> >> >>> Thanks for sharing, >> >> >>> >> >> >>> I think the problem is that restartsState is never updated: >> >> >>> - on the first attempt, context.isRestored() returns false (and "never >> >> >>> restored" is logged) >> >> >>> - on subsequent attempts, it again returns false, because the state >> >> >>> was never updated before >> >> >>> >> >> >>> Adding >> >> >>> if (!context.isRestored()) { restartsState.add(0L); } >> >> >>> should solve the problem >> >> >>> (it's also better to use state.update instead of state.add if only max >> >> >>> is needed). >> >> >>> >> >> >>> Regards, >> >> >>> Roman >> >> >>> >> >> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez >> >> >>> <felipe.o.gutier...@gmail.com> wrote: >> >> >>> > >> >> >>> > Sure, here it is. Nothing is mocked. I double-checked. >> >> >>> > >> >> >>> > UnitTestClass {..... >> >> >>> > protected static LocalFlinkMiniCluster flink; >> >> >>> > >> >> >>> > @BeforeClass >> >> >>> > public static void prepare() { >> >> >>> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), >> >> >>> > false); >> >> >>> > flink.start(); >> >> >>> > >> >> >>> > TestStreamEnvironment.setAsContext(flink, PARALLELISM); >> >> >>> > } >> >> >>> > >> >> >>> > private static Configuration getFlinkConfiguration() { >> >> >>> > Configuration flinkConfig = new Configuration(); >> >> >>> > flinkConfig.setInteger("local.number-taskmanager", 1); >> >> >>> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); >> >> >>> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, >> >> >>> > 16L); >> >> >>> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 >> >> >>> > s"); >> >> >>> > try { >> >> >>> > flinkConfig.setString("state.checkpoints.dir", "file://" + >> >> >>> > tempFolder.newFolder().getAbsolutePath()); >> >> >>> > } catch (IOException e) { >> >> >>> > throw new RuntimeException("error in flink cluster config", >> >> >>> > e); >> >> >>> > } >> >> >>> > return flinkConfig; >> >> >>> > } >> >> >>> > >> >> >>> > >> >> >>> > The class that I check if the job was restarted: >> >> >>> > >> >> >>> > public class ExceptionSimulatorProcessFunction extends >> >> >>> > ProcessFunction<Object..., Object...> >> >> >>> > implements CheckpointedFunction { >> >> >>> > >> >> >>> > final OutputTag<Long> outputTag = new >> >> >>> > OutputTag<Long>("side-output") { >> >> >>> > }; >> >> >>> > private transient ListState<Long> restartsState; >> >> >>> > private Long restartsLocal; >> >> >>> > ... >> >> >>> > @Override >> >> >>> > public void processElement(Object value, Context ctx, >> >> >>> > Collector<Object> out) throws Exception { >> >> >>> > this.currentTimeMillis = System.currentTimeMillis() - >> >> >>> > currentTimeMillisBehind; >> >> >>> > >> >> >>> > // If current time is less than the reference time ahead >> >> >>> > AND we have the poison auction an exception will throw >> >> >>> > if (this.currentTimeMillis < this.referenceTimeMillisAhead >> >> >>> > && POISON__TRANSACTION_ID.equals(value.toString())) { >> >> >>> > >> >> >>> > LOG.error("This exception will trigger until the >> >> >>> > reference time [{}] reaches the trigger time [{}]", >> >> >>> > sdfMillis.format(new >> >> >>> > Date(this.currentTimeMillis)), >> >> >>> > sdfMillis.format(new >> >> >>> > Date(this.referenceTimeMillisAhead))); >> >> >>> > >> >> >>> > throw new SimulatedException("Transaction ID: " + >> >> >>> > value.toString() + >> >> >>> > " not allowed. This is a simple exception for >> >> >>> > testing purposes."); >> >> >>> > } >> >> >>> > out.collect(value); >> >> >>> > >> >> >>> > >> >> >>> > // counts the restarts >> >> >>> > if (restartsState != null) { >> >> >>> > List<Long> restoreList = >> >> >>> > Lists.newArrayList(restartsState.get()); >> >> >>> > Long attemptsRestart = 0L; >> >> >>> > if (restoreList != null && !restoreList.isEmpty()) { >> >> >>> > attemptsRestart = Collections.max(restoreList); >> >> >>> > if (restartsLocal < attemptsRestart) { >> >> >>> > restartsLocal = attemptsRestart; >> >> >>> > ctx.output(outputTag, >> >> >>> > Long.valueOf(attemptsRestart)); >> >> >>> > } >> >> >>> > } >> >> >>> > LOG.info("Attempts restart: " + attemptsRestart); >> >> >>> > } >> >> >>> > } >> >> >>> > >> >> >>> > @Override >> >> >>> > public void snapshotState(FunctionSnapshotContext context) >> >> >>> > throws Exception {} >> >> >>> > >> >> >>> > @Override >> >> >>> > public void initializeState(FunctionInitializationContext >> >> >>> > context) throws Exception { >> >> >>> > restartsState = >> >> >>> > context.getOperatorStateStore().getListState(new >> >> >>> > ListStateDescriptor<Long>("restarts", Long.class)); >> >> >>> > >> >> >>> > if (context.isRestored()) { >> >> >>> > List<Long> restoreList = >> >> >>> > Lists.newArrayList(restartsState.get()); >> >> >>> > if (restoreList == null || restoreList.isEmpty()) { >> >> >>> > restartsState.add(1L); >> >> >>> > LOG.info("restarts: 1"); >> >> >>> > } else { >> >> >>> > Long max = Collections.max(restoreList); >> >> >>> > LOG.info("restarts: " + max); >> >> >>> > restartsState.add(max + 1); >> >> >>> > } >> >> >>> > } else { >> >> >>> > LOG.info("restarts: never restored"); >> >> >>> > } >> >> >>> > } >> >> >>> > } >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan >> >> >>> > <ro...@apache.org> wrote: >> >> >>> >> >> >> >>> >> Hi, >> >> >>> >> >> >> >>> >> Could you please share the test code? >> >> >>> >> >> >> >>> >> I think the returned value might depend on the level on which the >> >> >>> >> tests are executed. If it's a regular job then it should return the >> >> >>> >> correct value (as with cluster). If the environment in which the >> >> >>> >> code >> >> >>> >> is executed is mocked then it can be false. >> >> >>> >> >> >> >>> >> Regards, >> >> >>> >> Roman >> >> >>> >> >> >> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >> >> >>> >> <felipe.o.gutier...@gmail.com> wrote: >> >> >>> >> > >> >> >>> >> > Yes, I have state on the ProcessFunction. I tested it on a >> >> >>> >> > stand-alone cluster and it returns true when the application >> >> >>> >> > recovers. However, in integration tests it does not returns >> >> >>> >> > true. I am using Flink 1.4. Do you know where it is saying at >> >> >>> >> > Flink release 1.13 >> >> >>> >> > (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) >> >> >>> >> > that I cannot see `isRestored()` equals true on integration >> >> >>> >> > tests? >> >> >>> >> > >> >> >>> >> > -- >> >> >>> >> > -- Felipe Gutierrez >> >> >>> >> > -- skype: felipe.o.gutierrez >> >> >>> >> > >> >> >>> >> > >> >> >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> >> >> >>> >> > wrote: >> >> >>> >> >> >> >> >>> >> >> Does your ProcessFunction has state? If not it would be in line >> >> >>> >> >> with the documentation. >> >> >>> >> >> >> >> >>> >> >> Also which Flink version are you using? Before Flink 1.13 empty >> >> >>> >> >> state was omitted so I could imagine that `isRestored()` would >> >> >>> >> >> return false but it should actually now also return true for >> >> >>> >> >> empty state. >> >> >>> >> >> >> >> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez >> >> >>> >> >> <felipe.o.gutier...@gmail.com> wrote: >> >> >>> >> >>> >> >> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction >> >> >>> >> >>> as it shows here [1]. But the method isRestored() says in its >> >> >>> >> >>> documentation [2]: >> >> >>> >> >>> >> >> >>> >> >>> "Returns true, if state was restored from the snapshot of a >> >> >>> >> >>> previous execution. This returns always false for stateless >> >> >>> >> >>> tasks." >> >> >>> >> >>> >> >> >>> >> >>> It is weird because I am extending a ProcessFunction which is >> >> >>> >> >>> a RichFunction. >> >> >>> >> >>> >> >> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends >> >> >>> >> >>> ProcessFunction<KeyedReportingData, KeyedReportingData> >> >> >>> >> >>> implements CheckpointedFunction { >> >> >>> >> >>> ... >> >> >>> >> >>> >> >> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know >> >> >>> >> >>> what could be wrong? I used the same implementation method of >> >> >>> >> >>> [1]. >> >> >>> >> >>> >> >> >>> >> >>> [1] >> >> >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >> >> >>> >> >>> [2] >> >> >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >> >>> >> >>> >> >> >>> >> >>> >> >> >>> >> >>> -- >> >> >>> >> >>> -- Felipe Gutierrez >> >> >>> >> >>> -- skype: felipe.o.gutierrez >> >> >>> >> >>> >> >> >>> >> >>> >> >> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan >> >> >>> >> >>> <ro...@apache.org> wrote: >> >> >>> >> >>>> >> >> >>> >> >>>> You can also use accumulators [1] to collect the number of >> >> >>> >> >>>> restarts >> >> >>> >> >>>> (and then access it via client); but side outputs should work >> >> >>> >> >>>> as well. >> >> >>> >> >>>> >> >> >>> >> >>>> [1] >> >> >>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >> >> >>> >> >>>> >> >> >>> >> >>>> Regards, >> >> >>> >> >>>> Roman >> >> >>> >> >>>> >> >> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >> >> >>> >> >>>> <felipe.o.gutier...@gmail.com> wrote: >> >> >>> >> >>>> > >> >> >>> >> >>>> > I just realised that only the ProcessFunction is enough. I >> >> >>> >> >>>> > don't need the CheckpointFunction. >> >> >>> >> >>>> > >> >> >>> >> >>>> > >> >> >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, >> >> >>> >> >>>> > <felipe.o.gutier...@gmail.com> wrote: >> >> >>> >> >>>> >> >> >> >>> >> >>>> >> Cool! >> >> >>> >> >>>> >> >> >> >>> >> >>>> >> I did using this example >> >> >>> >> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> >> >>> >> >>>> >> because I don't have a keyed stream on the specific >> >> >>> >> >>>> >> operator that I want to count the number of restarts. (yes >> >> >>> >> >>>> >> I am using version 1.4 unfortunately). >> >> >>> >> >>>> >> >> >> >>> >> >>>> >> Because I need to test it in an integration test I am >> >> >>> >> >>>> >> using a side output >> >> >>> >> >>>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> >> >>> >> >>>> >> to attach a sink. I am not sure if you have a better idea >> >> >>> >> >>>> >> to test the restarts on an integration test. If you have a >> >> >>> >> >>>> >> simple idea please tell me :). This was the way that I >> >> >>> >> >>>> >> solved.... >> >> >>> >> >>>> >> >> >> >>> >> >>>> >> Thanks >> >> >>> >> >>>> >> Felipe >> >> >>> >> >>>> >> >> >> >>> >> >>>> >> -- >> >> >>> >> >>>> >> -- Felipe Gutierrez >> >> >>> >> >>>> >> -- skype: felipe.o.gutierrez >> >> >>> >> >>>> >> >> >> >>> >> >>>> >> >> >> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan >> >> >>> >> >>>> >> <ro...@apache.org> wrote: >> >> >>> >> >>>> >>> >> >> >>> >> >>>> >>> Hi Felipe, >> >> >>> >> >>>> >>> >> >> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] >> >> >>> >> >>>> >>> (but beware >> >> >>> >> >>>> >>> that depending on the configuration only a pipeline >> >> >>> >> >>>> >>> region can be >> >> >>> >> >>>> >>> restarted, not the whole job). >> >> >>> >> >>>> >>> >> >> >>> >> >>>> >>> But if all you want is to check whether it's a first >> >> >>> >> >>>> >>> attempt or not, >> >> >>> >> >>>> >>> you can also call context.isRestored() from >> >> >>> >> >>>> >>> initializeState() [2] >> >> >>> >> >>>> >>> >> >> >>> >> >>>> >>> [1] >> >> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >> >>> >> >>>> >>> >> >> >>> >> >>>> >>> [2] >> >> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >> >>> >> >>>> >>> >> >> >>> >> >>>> >>> Regards, >> >> >>> >> >>>> >>> Roman >> >> >>> >> >>>> >>> >> >> >>> >> >>>> >>> >> >> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> >> >>> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote: >> >> >>> >> >>>> >>> > >> >> >>> >> >>>> >>> > Hello community, >> >> >>> >> >>>> >>> > >> >> >>> >> >>>> >>> > Is it possible to know programmatically how many times >> >> >>> >> >>>> >>> > my Flink stream job restarted since it was running? >> >> >>> >> >>>> >>> > >> >> >>> >> >>>> >>> > My use case is like this. I have an Unit test that uses >> >> >>> >> >>>> >>> > checkpoint and I throw one exception in a MapFunction >> >> >>> >> >>>> >>> > for a given time, i.e.: for the 2 seconds ahead. >> >> >>> >> >>>> >>> > Because Flink restarts the job and I have checkpoint I >> >> >>> >> >>>> >>> > can recover the state and after 2 seconds I don't throw >> >> >>> >> >>>> >>> > any exception anymore. Then I would like to know how >> >> >>> >> >>>> >>> > many times the job was restarted. >> >> >>> >> >>>> >>> > >> >> >>> >> >>>> >>> > Thanks, >> >> >>> >> >>>> >>> > Felipe >> >> >>> >> >>>> >>> >