> do you mean inside the processElement() method? I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess.
> what is 0ms pause? do you mean > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); > How do you create a random exception? do you mean not mine SimulatedException? I mean it should be thrown at random because the checkpoint must reliably precede it. So on recovery that there is some state. Checking against 6666... only once assumes that the checkpoint was triggered before. Besides, checkpoint is not guaranteed to be triggered before the end of input. I tried to run it in with maven and it worked after making the source infinite. >From the code you provided the parallelism level doesn't seem important and can be set 1 (or restart strategy to full). Then using getRuntimeContext().getAttemptNumber() would be simpler and more reliable. Regards, Roman On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > > > > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <ro...@apache.org> wrote: >> >> I tried to run the test that you mentioned >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. >> 6f08d0a. >> >> In IDE, I see that: >> - checkpoint is never triggered (sentence is too short, checkpoint >> pause and interval are too large) >> - exception is never thrown, so the job never restarted >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not) >> >> When I add sleep between each element, set 10ms interval, 0ms pause >> and introduce some random exception, I do see > > > do you mean inside the processElement() method? > > what is 0ms pause? do you mean > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? > > How do you create a random exception? do you mean not mine SimulatedException? > > Using these configurations that I just said it is not working for me. I am > testing on the terminal "mvn > -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". On > IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: > latencyTrackingConfigBuilder" when I call "env.execute();" > > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) > at > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) > at > org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder > > > >> >> 2021-06-18 17:37:53,241 INFO >> org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess - >> Attempts restart: 1 >> in the logs. >> >> These settings probably differ on the cluster and there is some >> unrelated exception which causes a restart. >> >> Regards, >> Roman >> >> On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez >> <felipe.o.gutier...@gmail.com> wrote: >> > >> > I investigated a little bit more. I created the same POC on a Flink >> > version 1.13. I have this ProcessFunction where I want to count the times >> > it recovers. I tested with ListState and ValueState and it seems that >> > during the integration test (only for integration test) the process is >> > hanging on the open() or on the initializeState() methods. >> > >> > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java >> > >> > Here is my integration test: >> > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 >> > >> > If I comment out the state ListState or the ValueState, the integration >> > test works. It first throws an exception and I see it on the output. Then >> > after 2 seconds it does not throw exceptions. Then I compare the output >> > and they match. But I don't have a way to count how many restarts happened >> > during the integration test. I con only count the restarts when I run the >> > application on a stand-alone flink cluster. >> > >> > I don't know. Maybe, do I have to configure some parameters to work with >> > the state on integration tests? >> > >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > >> > >> > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez >> > <felipe.o.gutier...@gmail.com> wrote: >> >> >> >> No, it didn't work. >> >> >> >> The "context.isRestored()" returns true when I run the application on the >> >> Flink standalone-cluster and it is recovering after a failure. When I do >> >> the same on a integration test it does not returns true after a failure. >> >> I mean, I can log the exception that is causing the failure, the >> >> initializeState() is called after a failure, but the context.isRestored() >> >> is false again. I also tried to update the state on the first time to 0 >> >> "if (!context.isRestored()) { restartsState.add(0L); }" and it does not >> >> work. >> >> I think the problem is not on the ListState that I am using and not on >> >> the context.isRestore() as well. It is on the >> >> "context.getOperatorStateStore()" that is always null only on integration >> >> tests. Using the below code I can see on the logs "restarts: 0" twice, >> >> before and after failure. >> >> >> >> @Override >> >> public void initializeState(FunctionInitializationContext context) >> >> throws Exception { >> >> // unit tests does not open OperatorStateStore >> >> if (context.getOperatorStateStore() != null) { >> >> restartsState = >> >> context.getOperatorStateStore().getListState(new >> >> ListStateDescriptor<Long>("restarts", Long.class)); >> >> >> >> List<Long> restoreList = >> >> Lists.newArrayList(restartsState.get()); >> >> if (restoreList == null || restoreList.isEmpty()) { >> >> restartsState.add(0L); >> >> LOG.info("restarts: 0"); >> >> } else { >> >> Long max = Collections.max(restoreList); >> >> LOG.info("restarts: " + max); >> >> restartsState.add(max + 1); >> >> } >> >> } >> >> } >> >> >> >> -- >> >> -- Felipe Gutierrez >> >> -- skype: felipe.o.gutierrez >> >> >> >> >> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> >> >> wrote: >> >>> >> >>> Thanks for sharing, >> >>> >> >>> I think the problem is that restartsState is never updated: >> >>> - on the first attempt, context.isRestored() returns false (and "never >> >>> restored" is logged) >> >>> - on subsequent attempts, it again returns false, because the state >> >>> was never updated before >> >>> >> >>> Adding >> >>> if (!context.isRestored()) { restartsState.add(0L); } >> >>> should solve the problem >> >>> (it's also better to use state.update instead of state.add if only max >> >>> is needed). >> >>> >> >>> Regards, >> >>> Roman >> >>> >> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez >> >>> <felipe.o.gutier...@gmail.com> wrote: >> >>> > >> >>> > Sure, here it is. Nothing is mocked. I double-checked. >> >>> > >> >>> > UnitTestClass {..... >> >>> > protected static LocalFlinkMiniCluster flink; >> >>> > >> >>> > @BeforeClass >> >>> > public static void prepare() { >> >>> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); >> >>> > flink.start(); >> >>> > >> >>> > TestStreamEnvironment.setAsContext(flink, PARALLELISM); >> >>> > } >> >>> > >> >>> > private static Configuration getFlinkConfiguration() { >> >>> > Configuration flinkConfig = new Configuration(); >> >>> > flinkConfig.setInteger("local.number-taskmanager", 1); >> >>> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); >> >>> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); >> >>> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); >> >>> > try { >> >>> > flinkConfig.setString("state.checkpoints.dir", "file://" + >> >>> > tempFolder.newFolder().getAbsolutePath()); >> >>> > } catch (IOException e) { >> >>> > throw new RuntimeException("error in flink cluster config", e); >> >>> > } >> >>> > return flinkConfig; >> >>> > } >> >>> > >> >>> > >> >>> > The class that I check if the job was restarted: >> >>> > >> >>> > public class ExceptionSimulatorProcessFunction extends >> >>> > ProcessFunction<Object..., Object...> >> >>> > implements CheckpointedFunction { >> >>> > >> >>> > final OutputTag<Long> outputTag = new >> >>> > OutputTag<Long>("side-output") { >> >>> > }; >> >>> > private transient ListState<Long> restartsState; >> >>> > private Long restartsLocal; >> >>> > ... >> >>> > @Override >> >>> > public void processElement(Object value, Context ctx, >> >>> > Collector<Object> out) throws Exception { >> >>> > this.currentTimeMillis = System.currentTimeMillis() - >> >>> > currentTimeMillisBehind; >> >>> > >> >>> > // If current time is less than the reference time ahead AND >> >>> > we have the poison auction an exception will throw >> >>> > if (this.currentTimeMillis < this.referenceTimeMillisAhead && >> >>> > POISON__TRANSACTION_ID.equals(value.toString())) { >> >>> > >> >>> > LOG.error("This exception will trigger until the reference >> >>> > time [{}] reaches the trigger time [{}]", >> >>> > sdfMillis.format(new Date(this.currentTimeMillis)), >> >>> > sdfMillis.format(new >> >>> > Date(this.referenceTimeMillisAhead))); >> >>> > >> >>> > throw new SimulatedException("Transaction ID: " + >> >>> > value.toString() + >> >>> > " not allowed. This is a simple exception for >> >>> > testing purposes."); >> >>> > } >> >>> > out.collect(value); >> >>> > >> >>> > >> >>> > // counts the restarts >> >>> > if (restartsState != null) { >> >>> > List<Long> restoreList = >> >>> > Lists.newArrayList(restartsState.get()); >> >>> > Long attemptsRestart = 0L; >> >>> > if (restoreList != null && !restoreList.isEmpty()) { >> >>> > attemptsRestart = Collections.max(restoreList); >> >>> > if (restartsLocal < attemptsRestart) { >> >>> > restartsLocal = attemptsRestart; >> >>> > ctx.output(outputTag, >> >>> > Long.valueOf(attemptsRestart)); >> >>> > } >> >>> > } >> >>> > LOG.info("Attempts restart: " + attemptsRestart); >> >>> > } >> >>> > } >> >>> > >> >>> > @Override >> >>> > public void snapshotState(FunctionSnapshotContext context) throws >> >>> > Exception {} >> >>> > >> >>> > @Override >> >>> > public void initializeState(FunctionInitializationContext context) >> >>> > throws Exception { >> >>> > restartsState = >> >>> > context.getOperatorStateStore().getListState(new >> >>> > ListStateDescriptor<Long>("restarts", Long.class)); >> >>> > >> >>> > if (context.isRestored()) { >> >>> > List<Long> restoreList = >> >>> > Lists.newArrayList(restartsState.get()); >> >>> > if (restoreList == null || restoreList.isEmpty()) { >> >>> > restartsState.add(1L); >> >>> > LOG.info("restarts: 1"); >> >>> > } else { >> >>> > Long max = Collections.max(restoreList); >> >>> > LOG.info("restarts: " + max); >> >>> > restartsState.add(max + 1); >> >>> > } >> >>> > } else { >> >>> > LOG.info("restarts: never restored"); >> >>> > } >> >>> > } >> >>> > } >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> >> >>> > wrote: >> >>> >> >> >>> >> Hi, >> >>> >> >> >>> >> Could you please share the test code? >> >>> >> >> >>> >> I think the returned value might depend on the level on which the >> >>> >> tests are executed. If it's a regular job then it should return the >> >>> >> correct value (as with cluster). If the environment in which the code >> >>> >> is executed is mocked then it can be false. >> >>> >> >> >>> >> Regards, >> >>> >> Roman >> >>> >> >> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >> >>> >> <felipe.o.gutier...@gmail.com> wrote: >> >>> >> > >> >>> >> > Yes, I have state on the ProcessFunction. I tested it on a >> >>> >> > stand-alone cluster and it returns true when the application >> >>> >> > recovers. However, in integration tests it does not returns true. I >> >>> >> > am using Flink 1.4. Do you know where it is saying at Flink release >> >>> >> > 1.13 (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) >> >>> >> > that I cannot see `isRestored()` equals true on integration tests? >> >>> >> > >> >>> >> > -- >> >>> >> > -- Felipe Gutierrez >> >>> >> > -- skype: felipe.o.gutierrez >> >>> >> > >> >>> >> > >> >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> >> >>> >> > wrote: >> >>> >> >> >> >>> >> >> Does your ProcessFunction has state? If not it would be in line >> >>> >> >> with the documentation. >> >>> >> >> >> >>> >> >> Also which Flink version are you using? Before Flink 1.13 empty >> >>> >> >> state was omitted so I could imagine that `isRestored()` would >> >>> >> >> return false but it should actually now also return true for empty >> >>> >> >> state. >> >>> >> >> >> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez >> >>> >> >> <felipe.o.gutier...@gmail.com> wrote: >> >>> >> >>> >> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction as >> >>> >> >>> it shows here [1]. But the method isRestored() says in its >> >>> >> >>> documentation [2]: >> >>> >> >>> >> >>> >> >>> "Returns true, if state was restored from the snapshot of a >> >>> >> >>> previous execution. This returns always false for stateless >> >>> >> >>> tasks." >> >>> >> >>> >> >>> >> >>> It is weird because I am extending a ProcessFunction which is a >> >>> >> >>> RichFunction. >> >>> >> >>> >> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends >> >>> >> >>> ProcessFunction<KeyedReportingData, KeyedReportingData> >> >>> >> >>> implements CheckpointedFunction { >> >>> >> >>> ... >> >>> >> >>> >> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what >> >>> >> >>> could be wrong? I used the same implementation method of [1]. >> >>> >> >>> >> >>> >> >>> [1] >> >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >> >>> >> >>> [2] >> >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> -- >> >>> >> >>> -- Felipe Gutierrez >> >>> >> >>> -- skype: felipe.o.gutierrez >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan >> >>> >> >>> <ro...@apache.org> wrote: >> >>> >> >>>> >> >>> >> >>>> You can also use accumulators [1] to collect the number of >> >>> >> >>>> restarts >> >>> >> >>>> (and then access it via client); but side outputs should work as >> >>> >> >>>> well. >> >>> >> >>>> >> >>> >> >>>> [1] >> >>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >> >>> >> >>>> >> >>> >> >>>> Regards, >> >>> >> >>>> Roman >> >>> >> >>>> >> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >> >>> >> >>>> <felipe.o.gutier...@gmail.com> wrote: >> >>> >> >>>> > >> >>> >> >>>> > I just realised that only the ProcessFunction is enough. I >> >>> >> >>>> > don't need the CheckpointFunction. >> >>> >> >>>> > >> >>> >> >>>> > >> >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, >> >>> >> >>>> > <felipe.o.gutier...@gmail.com> wrote: >> >>> >> >>>> >> >> >>> >> >>>> >> Cool! >> >>> >> >>>> >> >> >>> >> >>>> >> I did using this example >> >>> >> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> >>> >> >>>> >> because I don't have a keyed stream on the specific operator >> >>> >> >>>> >> that I want to count the number of restarts. (yes I am using >> >>> >> >>>> >> version 1.4 unfortunately). >> >>> >> >>>> >> >> >>> >> >>>> >> Because I need to test it in an integration test I am using a >> >>> >> >>>> >> side output >> >>> >> >>>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> >>> >> >>>> >> to attach a sink. I am not sure if you have a better idea to >> >>> >> >>>> >> test the restarts on an integration test. If you have a >> >>> >> >>>> >> simple idea please tell me :). This was the way that I >> >>> >> >>>> >> solved.... >> >>> >> >>>> >> >> >>> >> >>>> >> Thanks >> >>> >> >>>> >> Felipe >> >>> >> >>>> >> >> >>> >> >>>> >> -- >> >>> >> >>>> >> -- Felipe Gutierrez >> >>> >> >>>> >> -- skype: felipe.o.gutierrez >> >>> >> >>>> >> >> >>> >> >>>> >> >> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan >> >>> >> >>>> >> <ro...@apache.org> wrote: >> >>> >> >>>> >>> >> >>> >> >>>> >>> Hi Felipe, >> >>> >> >>>> >>> >> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but >> >>> >> >>>> >>> beware >> >>> >> >>>> >>> that depending on the configuration only a pipeline region >> >>> >> >>>> >>> can be >> >>> >> >>>> >>> restarted, not the whole job). >> >>> >> >>>> >>> >> >>> >> >>>> >>> But if all you want is to check whether it's a first attempt >> >>> >> >>>> >>> or not, >> >>> >> >>>> >>> you can also call context.isRestored() from >> >>> >> >>>> >>> initializeState() [2] >> >>> >> >>>> >>> >> >>> >> >>>> >>> [1] >> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >>> >> >>>> >>> >> >>> >> >>>> >>> [2] >> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >>> >> >>>> >>> >> >>> >> >>>> >>> Regards, >> >>> >> >>>> >>> Roman >> >>> >> >>>> >>> >> >>> >> >>>> >>> >> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> >>> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote: >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > Hello community, >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > Is it possible to know programmatically how many times my >> >>> >> >>>> >>> > Flink stream job restarted since it was running? >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > My use case is like this. I have an Unit test that uses >> >>> >> >>>> >>> > checkpoint and I throw one exception in a MapFunction for >> >>> >> >>>> >>> > a given time, i.e.: for the 2 seconds ahead. Because Flink >> >>> >> >>>> >>> > restarts the job and I have checkpoint I can recover the >> >>> >> >>>> >>> > state and after 2 seconds I don't throw any exception >> >>> >> >>>> >>> > anymore. Then I would like to know how many times the job >> >>> >> >>>> >>> > was restarted. >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > Thanks, >> >>> >> >>>> >>> > Felipe >> >>> >> >>>> >>> >