ummm, ok. you are using the "getRuntimeContext().getAttemptNumber()". I was using the "isRestored()". Now it is counting. thanks! Felipe
On Fri, Jun 18, 2021 at 10:17 PM Roman Khachatryan <ro...@apache.org> wrote: > > do you mean inside the processElement() method? > I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess. > > > what is 0ms pause? do you mean > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? > Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); > > > How do you create a random exception? do you mean not mine > SimulatedException? > I mean it should be thrown at random because the checkpoint must > reliably precede it. So on recovery that there is some state. Checking > against 6666... only once assumes that the checkpoint was triggered > before. Besides, checkpoint is not guaranteed to be triggered before > the end of input. > I tried to run it in with maven and it worked after making the source > infinite. > > From the code you provided the parallelism level doesn't seem > important and can be set 1 (or restart strategy to full). Then using > getRuntimeContext().getAttemptNumber() would be simpler and more > reliable. > > Regards, > Roman > > On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: > > > > > > > > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> > >> I tried to run the test that you mentioned > >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. > >> 6f08d0a. > >> > >> In IDE, I see that: > >> - checkpoint is never triggered (sentence is too short, checkpoint > >> pause and interval are too large) > >> - exception is never thrown, so the job never restarted > >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not) > >> > >> When I add sleep between each element, set 10ms interval, 0ms pause > >> and introduce some random exception, I do see > > > > > > do you mean inside the processElement() method? > > > > what is 0ms pause? do you mean > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ? > > > > How do you create a random exception? do you mean not mine > SimulatedException? > > > > Using these configurations that I just said it is not working for me. I > am testing on the terminal "mvn > -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". > On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: > latencyTrackingConfigBuilder" when I call "env.execute();" > > > > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) > > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > > at > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) > > at > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) > > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842) > > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804) > > at > org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) > > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > > Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder > > > > > > > >> > >> 2021-06-18 17:37:53,241 INFO > >> org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess - > >> Attempts restart: 1 > >> in the logs. > >> > >> These settings probably differ on the cluster and there is some > >> unrelated exception which causes a restart. > >> > >> Regards, > >> Roman > >> > >> On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez > >> <felipe.o.gutier...@gmail.com> wrote: > >> > > >> > I investigated a little bit more. I created the same POC on a Flink > version 1.13. I have this ProcessFunction where I want to count the times > it recovers. I tested with ListState and ValueState and it seems that > during the integration test (only for integration test) the process is > hanging on the open() or on the initializeState() methods. > >> > > >> > > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java > >> > > >> > Here is my integration test: > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 > >> > > >> > If I comment out the state ListState or the ValueState, the > integration test works. It first throws an exception and I see it on the > output. Then after 2 seconds it does not throw exceptions. Then I compare > the output and they match. But I don't have a way to count how many > restarts happened during the integration test. I con only count the > restarts when I run the application on a stand-alone flink cluster. > >> > > >> > I don't know. Maybe, do I have to configure some parameters to work > with the state on integration tests? > >> > > >> > -- > >> > -- Felipe Gutierrez > >> > -- skype: felipe.o.gutierrez > >> > > >> > > >> > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> >> > >> >> No, it didn't work. > >> >> > >> >> The "context.isRestored()" returns true when I run the application > on the Flink standalone-cluster and it is recovering after a failure. When > I do the same on a integration test it does not returns true after a > failure. I mean, I can log the exception that is causing the failure, the > initializeState() is called after a failure, but the context.isRestored() > is false again. I also tried to update the state on the first time to 0 "if > (!context.isRestored()) { restartsState.add(0L); }" and it does not work. > >> >> I think the problem is not on the ListState that I am using and not > on the context.isRestore() as well. It is on the > "context.getOperatorStateStore()" that is always null only on integration > tests. Using the below code I can see on the logs "restarts: 0" twice, > before and after failure. > >> >> > >> >> @Override > >> >> public void initializeState(FunctionInitializationContext > context) throws Exception { > >> >> // unit tests does not open OperatorStateStore > >> >> if (context.getOperatorStateStore() != null) { > >> >> restartsState = > context.getOperatorStateStore().getListState(new > ListStateDescriptor<Long>("restarts", Long.class)); > >> >> > >> >> List<Long> restoreList = > Lists.newArrayList(restartsState.get()); > >> >> if (restoreList == null || restoreList.isEmpty()) { > >> >> restartsState.add(0L); > >> >> LOG.info("restarts: 0"); > >> >> } else { > >> >> Long max = Collections.max(restoreList); > >> >> LOG.info("restarts: " + max); > >> >> restartsState.add(max + 1); > >> >> } > >> >> } > >> >> } > >> >> > >> >> -- > >> >> -- Felipe Gutierrez > >> >> -- skype: felipe.o.gutierrez > >> >> > >> >> > >> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> >>> > >> >>> Thanks for sharing, > >> >>> > >> >>> I think the problem is that restartsState is never updated: > >> >>> - on the first attempt, context.isRestored() returns false (and > "never > >> >>> restored" is logged) > >> >>> - on subsequent attempts, it again returns false, because the state > >> >>> was never updated before > >> >>> > >> >>> Adding > >> >>> if (!context.isRestored()) { restartsState.add(0L); } > >> >>> should solve the problem > >> >>> (it's also better to use state.update instead of state.add if only > max > >> >>> is needed). > >> >>> > >> >>> Regards, > >> >>> Roman > >> >>> > >> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez > >> >>> <felipe.o.gutier...@gmail.com> wrote: > >> >>> > > >> >>> > Sure, here it is. Nothing is mocked. I double-checked. > >> >>> > > >> >>> > UnitTestClass {..... > >> >>> > protected static LocalFlinkMiniCluster flink; > >> >>> > > >> >>> > @BeforeClass > >> >>> > public static void prepare() { > >> >>> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), > false); > >> >>> > flink.start(); > >> >>> > > >> >>> > TestStreamEnvironment.setAsContext(flink, PARALLELISM); > >> >>> > } > >> >>> > > >> >>> > private static Configuration getFlinkConfiguration() { > >> >>> > Configuration flinkConfig = new Configuration(); > >> >>> > flinkConfig.setInteger("local.number-taskmanager", 1); > >> >>> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); > >> >>> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, > 16L); > >> >>> > flinkConfig.setString("restart-strategy.fixed-delay.delay", > "0 s"); > >> >>> > try { > >> >>> > flinkConfig.setString("state.checkpoints.dir", "file://" > + tempFolder.newFolder().getAbsolutePath()); > >> >>> > } catch (IOException e) { > >> >>> > throw new RuntimeException("error in flink cluster > config", e); > >> >>> > } > >> >>> > return flinkConfig; > >> >>> > } > >> >>> > > >> >>> > > >> >>> > The class that I check if the job was restarted: > >> >>> > > >> >>> > public class ExceptionSimulatorProcessFunction extends > ProcessFunction<Object..., Object...> > >> >>> > implements CheckpointedFunction { > >> >>> > > >> >>> > final OutputTag<Long> outputTag = new > OutputTag<Long>("side-output") { > >> >>> > }; > >> >>> > private transient ListState<Long> restartsState; > >> >>> > private Long restartsLocal; > >> >>> > ... > >> >>> > @Override > >> >>> > public void processElement(Object value, Context ctx, > Collector<Object> out) throws Exception { > >> >>> > this.currentTimeMillis = System.currentTimeMillis() - > currentTimeMillisBehind; > >> >>> > > >> >>> > // If current time is less than the reference time ahead > AND we have the poison auction an exception will throw > >> >>> > if (this.currentTimeMillis < > this.referenceTimeMillisAhead && > POISON__TRANSACTION_ID.equals(value.toString())) { > >> >>> > > >> >>> > LOG.error("This exception will trigger until the > reference time [{}] reaches the trigger time [{}]", > >> >>> > sdfMillis.format(new > Date(this.currentTimeMillis)), > >> >>> > sdfMillis.format(new > Date(this.referenceTimeMillisAhead))); > >> >>> > > >> >>> > throw new SimulatedException("Transaction ID: " + > value.toString() + > >> >>> > " not allowed. This is a simple exception for > testing purposes."); > >> >>> > } > >> >>> > out.collect(value); > >> >>> > > >> >>> > > >> >>> > // counts the restarts > >> >>> > if (restartsState != null) { > >> >>> > List<Long> restoreList = > Lists.newArrayList(restartsState.get()); > >> >>> > Long attemptsRestart = 0L; > >> >>> > if (restoreList != null && !restoreList.isEmpty()) { > >> >>> > attemptsRestart = Collections.max(restoreList); > >> >>> > if (restartsLocal < attemptsRestart) { > >> >>> > restartsLocal = attemptsRestart; > >> >>> > ctx.output(outputTag, > Long.valueOf(attemptsRestart)); > >> >>> > } > >> >>> > } > >> >>> > LOG.info("Attempts restart: " + attemptsRestart); > >> >>> > } > >> >>> > } > >> >>> > > >> >>> > @Override > >> >>> > public void snapshotState(FunctionSnapshotContext context) > throws Exception {} > >> >>> > > >> >>> > @Override > >> >>> > public void initializeState(FunctionInitializationContext > context) throws Exception { > >> >>> > restartsState = > context.getOperatorStateStore().getListState(new > ListStateDescriptor<Long>("restarts", Long.class)); > >> >>> > > >> >>> > if (context.isRestored()) { > >> >>> > List<Long> restoreList = > Lists.newArrayList(restartsState.get()); > >> >>> > if (restoreList == null || restoreList.isEmpty()) { > >> >>> > restartsState.add(1L); > >> >>> > LOG.info("restarts: 1"); > >> >>> > } else { > >> >>> > Long max = Collections.max(restoreList); > >> >>> > LOG.info("restarts: " + max); > >> >>> > restartsState.add(max + 1); > >> >>> > } > >> >>> > } else { > >> >>> > LOG.info("restarts: never restored"); > >> >>> > } > >> >>> > } > >> >>> > } > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan < > ro...@apache.org> wrote: > >> >>> >> > >> >>> >> Hi, > >> >>> >> > >> >>> >> Could you please share the test code? > >> >>> >> > >> >>> >> I think the returned value might depend on the level on which the > >> >>> >> tests are executed. If it's a regular job then it should return > the > >> >>> >> correct value (as with cluster). If the environment in which the > code > >> >>> >> is executed is mocked then it can be false. > >> >>> >> > >> >>> >> Regards, > >> >>> >> Roman > >> >>> >> > >> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez > >> >>> >> <felipe.o.gutier...@gmail.com> wrote: > >> >>> >> > > >> >>> >> > Yes, I have state on the ProcessFunction. I tested it on a > stand-alone cluster and it returns true when the application recovers. > However, in integration tests it does not returns true. I am using Flink > 1.4. Do you know where it is saying at Flink release 1.13 ( > https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I > cannot see `isRestored()` equals true on integration tests? > >> >>> >> > > >> >>> >> > -- > >> >>> >> > -- Felipe Gutierrez > >> >>> >> > -- skype: felipe.o.gutierrez > >> >>> >> > > >> >>> >> > > >> >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> > wrote: > >> >>> >> >> > >> >>> >> >> Does your ProcessFunction has state? If not it would be in > line with the documentation. > >> >>> >> >> > >> >>> >> >> Also which Flink version are you using? Before Flink 1.13 > empty state was omitted so I could imagine that `isRestored()` would return > false but it should actually now also return true for empty state. > >> >>> >> >> > >> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> >>> >> >>> > >> >>> >> >>> So, I was trying to improve by using the > CheckpointedFunction as it shows here [1]. But the method isRestored() says > in its documentation [2]: > >> >>> >> >>> > >> >>> >> >>> "Returns true, if state was restored from the snapshot of a > previous execution. This returns always false for stateless tasks." > >> >>> >> >>> > >> >>> >> >>> It is weird because I am extending a ProcessFunction which > is a RichFunction. > >> >>> >> >>> > >> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction > extends ProcessFunction<KeyedReportingData, KeyedReportingData> > >> >>> >> >>> implements CheckpointedFunction { > >> >>> >> >>> ... > >> >>> >> >>> > >> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know > what could be wrong? I used the same implementation method of [1]. > >> >>> >> >>> > >> >>> >> >>> [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction > >> >>> >> >>> [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > >> >>> >> >>> > >> >>> >> >>> > >> >>> >> >>> -- > >> >>> >> >>> -- Felipe Gutierrez > >> >>> >> >>> -- skype: felipe.o.gutierrez > >> >>> >> >>> > >> >>> >> >>> > >> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan < > ro...@apache.org> wrote: > >> >>> >> >>>> > >> >>> >> >>>> You can also use accumulators [1] to collect the number of > restarts > >> >>> >> >>>> (and then access it via client); but side outputs should > work as well. > >> >>> >> >>>> > >> >>> >> >>>> [1] > >> >>> >> >>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters > >> >>> >> >>>> > >> >>> >> >>>> Regards, > >> >>> >> >>>> Roman > >> >>> >> >>>> > >> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez > >> >>> >> >>>> <felipe.o.gutier...@gmail.com> wrote: > >> >>> >> >>>> > > >> >>> >> >>>> > I just realised that only the ProcessFunction is enough. > I don't need the CheckpointFunction. > >> >>> >> >>>> > > >> >>> >> >>>> > > >> >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < > felipe.o.gutier...@gmail.com> wrote: > >> >>> >> >>>> >> > >> >>> >> >>>> >> Cool! > >> >>> >> >>>> >> > >> >>> >> >>>> >> I did using this example > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state > because I don't have a keyed stream on the specific operator that I want to > count the number of restarts. (yes I am using version 1.4 unfortunately). > >> >>> >> >>>> >> > >> >>> >> >>>> >> Because I need to test it in an integration test I am > using a side output ( > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) > to attach a sink. I am not sure if you have a better idea to test the > restarts on an integration test. If you have a simple idea please tell me > :). This was the way that I solved.... > >> >>> >> >>>> >> > >> >>> >> >>>> >> Thanks > >> >>> >> >>>> >> Felipe > >> >>> >> >>>> >> > >> >>> >> >>>> >> -- > >> >>> >> >>>> >> -- Felipe Gutierrez > >> >>> >> >>>> >> -- skype: felipe.o.gutierrez > >> >>> >> >>>> >> > >> >>> >> >>>> >> > >> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan < > ro...@apache.org> wrote: > >> >>> >> >>>> >>> > >> >>> >> >>>> >>> Hi Felipe, > >> >>> >> >>>> >>> > >> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] > (but beware > >> >>> >> >>>> >>> that depending on the configuration only a pipeline > region can be > >> >>> >> >>>> >>> restarted, not the whole job). > >> >>> >> >>>> >>> > >> >>> >> >>>> >>> But if all you want is to check whether it's a first > attempt or not, > >> >>> >> >>>> >>> you can also call context.isRestored() from > initializeState() [2] > >> >>> >> >>>> >>> > >> >>> >> >>>> >>> [1] > >> >>> >> >>>> >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- > >> >>> >> >>>> >>> > >> >>> >> >>>> >>> [2] > >> >>> >> >>>> >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > >> >>> >> >>>> >>> > >> >>> >> >>>> >>> Regards, > >> >>> >> >>>> >>> Roman > >> >>> >> >>>> >>> > >> >>> >> >>>> >>> > >> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez > >> >>> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote: > >> >>> >> >>>> >>> > > >> >>> >> >>>> >>> > Hello community, > >> >>> >> >>>> >>> > > >> >>> >> >>>> >>> > Is it possible to know programmatically how many > times my Flink stream job restarted since it was running? > >> >>> >> >>>> >>> > > >> >>> >> >>>> >>> > My use case is like this. I have an Unit test that > uses checkpoint and I throw one exception in a MapFunction for a given > time, i.e.: for the 2 seconds ahead. Because Flink restarts the job and I > have checkpoint I can recover the state and after 2 seconds I don't throw > any exception anymore. Then I would like to know how many times the job was > restarted. > >> >>> >> >>>> >>> > > >> >>> >> >>>> >>> > Thanks, > >> >>> >> >>>> >>> > Felipe > >> >>> >> >>>> >>> > >