Just to clarify, I was using isRestored() but I think
getAttemptNumber() should be simpler.

Regards,
Roman


On Mon, Jun 21, 2021 at 10:30 AM Felipe Gutierrez
<felipe.o.gutier...@gmail.com> wrote:
>
> ummm, ok. you are using the "getRuntimeContext().getAttemptNumber()". I was 
> using the "isRestored()". Now it is counting.
> thanks!
> Felipe
>
>
> On Fri, Jun 18, 2021 at 10:17 PM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> > do you mean inside the processElement() method?
>> I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess.
>>
>> > what is 0ms pause? do you mean 
>> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
>> Yes, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
>>
>> > How do you create a random exception? do you mean not mine 
>> > SimulatedException?
>> I mean it should be thrown at random because the checkpoint must
>> reliably precede it. So on recovery that there is some state. Checking
>> against 6666... only once assumes that the checkpoint was triggered
>> before. Besides, checkpoint is not guaranteed to be triggered before
>> the end of input.
>> I tried to run it in with maven and it worked after making the source 
>> infinite.
>>
>> From the code you provided the parallelism level doesn't seem
>> important and can be set 1 (or restart strategy to full). Then using
>> getRuntimeContext().getAttemptNumber() would be simpler and more
>> reliable.
>>
>> Regards,
>> Roman
>>
>> On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez
>> <felipe.o.gutier...@gmail.com> wrote:
>> >
>> >
>> >
>> > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <ro...@apache.org> wrote:
>> >>
>> >> I tried to run the test that you mentioned
>> >> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
>> >> 6f08d0a.
>> >>
>> >> In IDE, I see that:
>> >> - checkpoint is never triggered (sentence is too short, checkpoint
>> >> pause and interval are too large)
>> >> - exception is never thrown, so the job never restarted
>> >> (currentTimeMillis is incremented but referenceTimeMillisAhead is not)
>> >>
>> >> When I add sleep between each element, set 10ms interval, 0ms pause
>> >> and introduce some random exception, I do see
>> >
>> >
>> > do you mean inside the processElement() method?
>> >
>> > what is 0ms pause? do you mean 
>> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?
>> >
>> > How do you create a random exception? do you mean not mine 
>> > SimulatedException?
>> >
>> > Using these configurations that I just said it is not working for me. I am 
>> > testing on the terminal "mvn 
>> > -Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test". 
>> > On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError: 
>> > latencyTrackingConfigBuilder" when I call "env.execute();"
>> >
>> > org.apache.flink.runtime.client.JobExecutionException: Job execution 
>> > failed.
>> > at 
>> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>> > at 
>> > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
>> > at 
>> > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>> > at 
>> > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
>> > at 
>> > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
>> > at 
>> > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
>> > at 
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
>> > at 
>> > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
>> > at 
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
>> > at 
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
>> > at 
>> > org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at 
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at 
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at 
>> > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> > at 
>> > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> > at 
>> > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> > at 
>> > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> > at 
>> > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>> > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>> > at 
>> > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>> > at 
>> > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>> > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>> > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>> > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>> > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>> > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>> > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> > at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>> > at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> > at 
>> > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>> > at 
>> > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>> > at 
>> > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
>> > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>> > Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder
>> >
>> >
>> >
>> >>
>> >> 2021-06-18 17:37:53,241 INFO
>> >> org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess  -
>> >> Attempts restart: 1
>> >> in the logs.
>> >>
>> >> These settings probably differ on the cluster and there is some
>> >> unrelated exception which causes a restart.
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez
>> >> <felipe.o.gutier...@gmail.com> wrote:
>> >> >
>> >> > I investigated a little bit more. I created the same POC on a Flink 
>> >> > version 1.13. I have this ProcessFunction where I want to count the 
>> >> > times it recovers. I tested with ListState and ValueState and it seems 
>> >> > that during the integration test (only for integration test) the 
>> >> > process is hanging on the open() or on the initializeState() methods.
>> >> >
>> >> > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java
>> >> >
>> >> > Here is my integration test: 
>> >> > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154
>> >> >
>> >> > If I comment out the state ListState or the ValueState, the integration 
>> >> > test works. It first throws an exception and I see it on the output. 
>> >> > Then after 2 seconds it does not throw exceptions. Then I compare the 
>> >> > output and they match. But I don't have a way to count how many 
>> >> > restarts happened during the integration test. I con only count the 
>> >> > restarts when I run the application on a stand-alone flink cluster.
>> >> >
>> >> > I don't know. Maybe, do I have to configure some parameters to work 
>> >> > with the state on integration tests?
>> >> >
>> >> > --
>> >> > -- Felipe Gutierrez
>> >> > -- skype: felipe.o.gutierrez
>> >> >
>> >> >
>> >> > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez 
>> >> > <felipe.o.gutier...@gmail.com> wrote:
>> >> >>
>> >> >> No, it didn't work.
>> >> >>
>> >> >> The "context.isRestored()" returns true when I run the application on 
>> >> >> the Flink standalone-cluster and it is recovering after a failure. 
>> >> >> When I do the same on a integration test it does not returns true 
>> >> >> after a failure. I mean, I can log the exception that is causing the 
>> >> >> failure, the initializeState() is called after a failure, but the 
>> >> >> context.isRestored() is false again. I also tried to update the state 
>> >> >> on the first time to 0 "if (!context.isRestored()) { 
>> >> >> restartsState.add(0L); }" and it does not work.
>> >> >> I think the problem is not on the ListState that I am using and not on 
>> >> >> the context.isRestore() as well. It is on the 
>> >> >> "context.getOperatorStateStore()" that is always null only on 
>> >> >> integration tests. Using the below code I can see on the logs 
>> >> >> "restarts: 0" twice, before and after failure.
>> >> >>
>> >> >>     @Override
>> >> >>     public void initializeState(FunctionInitializationContext context) 
>> >> >> throws Exception {
>> >> >>         // unit tests does not open OperatorStateStore
>> >> >>         if (context.getOperatorStateStore() != null) {
>> >> >>             restartsState = 
>> >> >> context.getOperatorStateStore().getListState(new 
>> >> >> ListStateDescriptor<Long>("restarts", Long.class));
>> >> >>
>> >> >>             List<Long> restoreList = 
>> >> >> Lists.newArrayList(restartsState.get());
>> >> >>             if (restoreList == null || restoreList.isEmpty()) {
>> >> >>                 restartsState.add(0L);
>> >> >>                 LOG.info("restarts: 0");
>> >> >>             } else {
>> >> >>                 Long max = Collections.max(restoreList);
>> >> >>                 LOG.info("restarts: " + max);
>> >> >>                 restartsState.add(max + 1);
>> >> >>             }
>> >> >>         }
>> >> >>     }
>> >> >>
>> >> >> --
>> >> >> -- Felipe Gutierrez
>> >> >> -- skype: felipe.o.gutierrez
>> >> >>
>> >> >>
>> >> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> 
>> >> >> wrote:
>> >> >>>
>> >> >>> Thanks for sharing,
>> >> >>>
>> >> >>> I think the problem is that restartsState is never updated:
>> >> >>> - on the first attempt, context.isRestored() returns false (and "never
>> >> >>> restored" is logged)
>> >> >>> - on subsequent attempts, it again returns false, because the state
>> >> >>> was never updated before
>> >> >>>
>> >> >>> Adding
>> >> >>> if (!context.isRestored()) { restartsState.add(0L); }
>> >> >>> should solve the problem
>> >> >>> (it's also better to use state.update instead of state.add if only max
>> >> >>> is needed).
>> >> >>>
>> >> >>> Regards,
>> >> >>> Roman
>> >> >>>
>> >> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
>> >> >>> <felipe.o.gutier...@gmail.com> wrote:
>> >> >>> >
>> >> >>> > Sure, here it is. Nothing is mocked. I double-checked.
>> >> >>> >
>> >> >>> > UnitTestClass {.....
>> >> >>> > protected static LocalFlinkMiniCluster flink;
>> >> >>> >
>> >> >>> > @BeforeClass
>> >> >>> > public static void prepare() {
>> >> >>> >     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), 
>> >> >>> > false);
>> >> >>> >     flink.start();
>> >> >>> >
>> >> >>> >     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
>> >> >>> > }
>> >> >>> >
>> >> >>> > private static Configuration getFlinkConfiguration() {
>> >> >>> >     Configuration flinkConfig = new Configuration();
>> >> >>> >     flinkConfig.setInteger("local.number-taskmanager", 1);
>> >> >>> >     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>> >> >>> >     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
>> >> >>> > 16L);
>> >> >>> >     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 
>> >> >>> > s");
>> >> >>> >     try {
>> >> >>> >         flinkConfig.setString("state.checkpoints.dir", "file://" + 
>> >> >>> > tempFolder.newFolder().getAbsolutePath());
>> >> >>> >     } catch (IOException e) {
>> >> >>> >         throw new RuntimeException("error in flink cluster config", 
>> >> >>> > e);
>> >> >>> >     }
>> >> >>> >     return flinkConfig;
>> >> >>> > }
>> >> >>> >
>> >> >>> >
>> >> >>> > The class that I check if the job was restarted:
>> >> >>> >
>> >> >>> > public class ExceptionSimulatorProcessFunction extends 
>> >> >>> > ProcessFunction<Object..., Object...>
>> >> >>> >         implements CheckpointedFunction {
>> >> >>> >
>> >> >>> >     final OutputTag<Long> outputTag = new 
>> >> >>> > OutputTag<Long>("side-output") {
>> >> >>> >     };
>> >> >>> >     private transient ListState<Long> restartsState;
>> >> >>> >     private Long restartsLocal;
>> >> >>> >     ...
>> >> >>> >     @Override
>> >> >>> >     public void processElement(Object value, Context ctx, 
>> >> >>> > Collector<Object> out) throws Exception {
>> >> >>> >         this.currentTimeMillis = System.currentTimeMillis() - 
>> >> >>> > currentTimeMillisBehind;
>> >> >>> >
>> >> >>> >         // If current time is less than the reference time ahead 
>> >> >>> > AND we have the poison auction an exception will throw
>> >> >>> >         if (this.currentTimeMillis < this.referenceTimeMillisAhead 
>> >> >>> > && POISON__TRANSACTION_ID.equals(value.toString())) {
>> >> >>> >
>> >> >>> >             LOG.error("This exception will trigger until the 
>> >> >>> > reference time [{}] reaches the trigger time [{}]",
>> >> >>> >                     sdfMillis.format(new 
>> >> >>> > Date(this.currentTimeMillis)),
>> >> >>> >                     sdfMillis.format(new 
>> >> >>> > Date(this.referenceTimeMillisAhead)));
>> >> >>> >
>> >> >>> >             throw new SimulatedException("Transaction ID: " + 
>> >> >>> > value.toString() +
>> >> >>> >                     " not allowed. This is a simple exception for 
>> >> >>> > testing purposes.");
>> >> >>> >         }
>> >> >>> >         out.collect(value);
>> >> >>> >
>> >> >>> >
>> >> >>> >         // counts the restarts
>> >> >>> >         if (restartsState != null) {
>> >> >>> >             List<Long> restoreList = 
>> >> >>> > Lists.newArrayList(restartsState.get());
>> >> >>> >             Long attemptsRestart = 0L;
>> >> >>> >             if (restoreList != null && !restoreList.isEmpty()) {
>> >> >>> >                 attemptsRestart = Collections.max(restoreList);
>> >> >>> >                 if (restartsLocal < attemptsRestart) {
>> >> >>> >                     restartsLocal = attemptsRestart;
>> >> >>> >                     ctx.output(outputTag, 
>> >> >>> > Long.valueOf(attemptsRestart));
>> >> >>> >                 }
>> >> >>> >             }
>> >> >>> >             LOG.info("Attempts restart: " + attemptsRestart);
>> >> >>> >         }
>> >> >>> >     }
>> >> >>> >
>> >> >>> >     @Override
>> >> >>> >     public void snapshotState(FunctionSnapshotContext context) 
>> >> >>> > throws Exception {}
>> >> >>> >
>> >> >>> >     @Override
>> >> >>> >     public void initializeState(FunctionInitializationContext 
>> >> >>> > context) throws Exception {
>> >> >>> >         restartsState = 
>> >> >>> > context.getOperatorStateStore().getListState(new 
>> >> >>> > ListStateDescriptor<Long>("restarts", Long.class));
>> >> >>> >
>> >> >>> >         if (context.isRestored()) {
>> >> >>> >             List<Long> restoreList = 
>> >> >>> > Lists.newArrayList(restartsState.get());
>> >> >>> >             if (restoreList == null || restoreList.isEmpty()) {
>> >> >>> >                 restartsState.add(1L);
>> >> >>> >                 LOG.info("restarts: 1");
>> >> >>> >             } else {
>> >> >>> >                 Long max = Collections.max(restoreList);
>> >> >>> >                 LOG.info("restarts: " + max);
>> >> >>> >                 restartsState.add(max + 1);
>> >> >>> >             }
>> >> >>> >         } else {
>> >> >>> >             LOG.info("restarts: never restored");
>> >> >>> >         }
>> >> >>> >     }
>> >> >>> > }
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan 
>> >> >>> > <ro...@apache.org> wrote:
>> >> >>> >>
>> >> >>> >> Hi,
>> >> >>> >>
>> >> >>> >> Could you please share the test code?
>> >> >>> >>
>> >> >>> >> I think the returned value might depend on the level on which the
>> >> >>> >> tests are executed. If it's a regular job then it should return the
>> >> >>> >> correct value (as with cluster). If the environment in which the 
>> >> >>> >> code
>> >> >>> >> is executed is mocked then it can be false.
>> >> >>> >>
>> >> >>> >> Regards,
>> >> >>> >> Roman
>> >> >>> >>
>> >> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>> >> >>> >> <felipe.o.gutier...@gmail.com> wrote:
>> >> >>> >> >
>> >> >>> >> > Yes, I have state on the ProcessFunction. I tested it on a 
>> >> >>> >> > stand-alone cluster and it returns true when the application 
>> >> >>> >> > recovers. However, in integration tests it does not returns 
>> >> >>> >> > true. I am using Flink 1.4. Do you know where it is saying at 
>> >> >>> >> > Flink release 1.13 
>> >> >>> >> > (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) 
>> >> >>> >> > that I cannot see `isRestored()` equals true on integration 
>> >> >>> >> > tests?
>> >> >>> >> >
>> >> >>> >> > --
>> >> >>> >> > -- Felipe Gutierrez
>> >> >>> >> > -- skype: felipe.o.gutierrez
>> >> >>> >> >
>> >> >>> >> >
>> >> >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> 
>> >> >>> >> > wrote:
>> >> >>> >> >>
>> >> >>> >> >> Does your ProcessFunction has state? If not it would be in line 
>> >> >>> >> >> with the documentation.
>> >> >>> >> >>
>> >> >>> >> >> Also which Flink version are you using? Before Flink 1.13 empty 
>> >> >>> >> >> state was omitted so I could imagine that `isRestored()` would 
>> >> >>> >> >> return false but it should actually now also return true for 
>> >> >>> >> >> empty state.
>> >> >>> >> >>
>> >> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez 
>> >> >>> >> >> <felipe.o.gutier...@gmail.com> wrote:
>> >> >>> >> >>>
>> >> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction 
>> >> >>> >> >>> as it shows here [1]. But the method isRestored() says in its 
>> >> >>> >> >>> documentation [2]:
>> >> >>> >> >>>
>> >> >>> >> >>> "Returns true, if state was restored from the snapshot of a 
>> >> >>> >> >>> previous execution. This returns always false for stateless 
>> >> >>> >> >>> tasks."
>> >> >>> >> >>>
>> >> >>> >> >>> It is weird because I am extending a ProcessFunction which is 
>> >> >>> >> >>> a RichFunction.
>> >> >>> >> >>>
>> >> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends 
>> >> >>> >> >>> ProcessFunction<KeyedReportingData, KeyedReportingData>
>> >> >>> >> >>>         implements CheckpointedFunction {
>> >> >>> >> >>> ...
>> >> >>> >> >>>
>> >> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know 
>> >> >>> >> >>> what could be wrong? I used the same implementation method of 
>> >> >>> >> >>> [1].
>> >> >>> >> >>>
>> >> >>> >> >>> [1] 
>> >> >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> >> >>> >> >>> [2] 
>> >> >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >> >>> >> >>>
>> >> >>> >> >>>
>> >> >>> >> >>> --
>> >> >>> >> >>> -- Felipe Gutierrez
>> >> >>> >> >>> -- skype: felipe.o.gutierrez
>> >> >>> >> >>>
>> >> >>> >> >>>
>> >> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan 
>> >> >>> >> >>> <ro...@apache.org> wrote:
>> >> >>> >> >>>>
>> >> >>> >> >>>> You can also use accumulators [1] to collect the number of 
>> >> >>> >> >>>> restarts
>> >> >>> >> >>>> (and then access it via client); but side outputs should work 
>> >> >>> >> >>>> as well.
>> >> >>> >> >>>>
>> >> >>> >> >>>> [1]
>> >> >>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>> >> >>> >> >>>>
>> >> >>> >> >>>> Regards,
>> >> >>> >> >>>> Roman
>> >> >>> >> >>>>
>> >> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>> >> >>> >> >>>> <felipe.o.gutier...@gmail.com> wrote:
>> >> >>> >> >>>> >
>> >> >>> >> >>>> > I just realised that only the ProcessFunction is enough. I 
>> >> >>> >> >>>> > don't need the CheckpointFunction.
>> >> >>> >> >>>> >
>> >> >>> >> >>>> >
>> >> >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, 
>> >> >>> >> >>>> > <felipe.o.gutier...@gmail.com> wrote:
>> >> >>> >> >>>> >>
>> >> >>> >> >>>> >> Cool!
>> >> >>> >> >>>> >>
>> >> >>> >> >>>> >> I did using this example 
>> >> >>> >> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>> >> >>> >> >>>> >>  because I don't have a keyed stream on the specific 
>> >> >>> >> >>>> >> operator that I want to count the number of restarts. (yes 
>> >> >>> >> >>>> >> I am using version 1.4 unfortunately).
>> >> >>> >> >>>> >>
>> >> >>> >> >>>> >> Because I need to test it in an integration test I am 
>> >> >>> >> >>>> >> using a side output 
>> >> >>> >> >>>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>> >> >>> >> >>>> >>  to attach a sink. I am not sure if you have a better idea 
>> >> >>> >> >>>> >> to test the restarts on an integration test. If you have a 
>> >> >>> >> >>>> >> simple idea please tell me :). This was the way that I 
>> >> >>> >> >>>> >> solved....
>> >> >>> >> >>>> >>
>> >> >>> >> >>>> >> Thanks
>> >> >>> >> >>>> >> Felipe
>> >> >>> >> >>>> >>
>> >> >>> >> >>>> >> --
>> >> >>> >> >>>> >> -- Felipe Gutierrez
>> >> >>> >> >>>> >> -- skype: felipe.o.gutierrez
>> >> >>> >> >>>> >>
>> >> >>> >> >>>> >>
>> >> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan 
>> >> >>> >> >>>> >> <ro...@apache.org> wrote:
>> >> >>> >> >>>> >>>
>> >> >>> >> >>>> >>> Hi Felipe,
>> >> >>> >> >>>> >>>
>> >> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] 
>> >> >>> >> >>>> >>> (but beware
>> >> >>> >> >>>> >>> that depending on the configuration only a pipeline 
>> >> >>> >> >>>> >>> region can be
>> >> >>> >> >>>> >>> restarted, not the whole job).
>> >> >>> >> >>>> >>>
>> >> >>> >> >>>> >>> But if all you want is to check whether it's a first 
>> >> >>> >> >>>> >>> attempt or not,
>> >> >>> >> >>>> >>> you can also call context.isRestored() from 
>> >> >>> >> >>>> >>> initializeState() [2]
>> >> >>> >> >>>> >>>
>> >> >>> >> >>>> >>> [1]
>> >> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >> >>> >> >>>> >>>
>> >> >>> >> >>>> >>> [2]
>> >> >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >> >>> >> >>>> >>>
>> >> >>> >> >>>> >>> Regards,
>> >> >>> >> >>>> >>> Roman
>> >> >>> >> >>>> >>>
>> >> >>> >> >>>> >>>
>> >> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >> >>> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote:
>> >> >>> >> >>>> >>> >
>> >> >>> >> >>>> >>> > Hello community,
>> >> >>> >> >>>> >>> >
>> >> >>> >> >>>> >>> > Is it possible to know programmatically how many times 
>> >> >>> >> >>>> >>> > my Flink stream job restarted since it was running?
>> >> >>> >> >>>> >>> >
>> >> >>> >> >>>> >>> > My use case is like this. I have an Unit test that uses 
>> >> >>> >> >>>> >>> > checkpoint and I throw one exception in a MapFunction 
>> >> >>> >> >>>> >>> > for a given time, i.e.: for the 2 seconds ahead. 
>> >> >>> >> >>>> >>> > Because Flink restarts the job and I have checkpoint I 
>> >> >>> >> >>>> >>> > can recover the state and after 2 seconds I don't throw 
>> >> >>> >> >>>> >>> > any exception anymore. Then I would like to know how 
>> >> >>> >> >>>> >>> > many times the job was restarted.
>> >> >>> >> >>>> >>> >
>> >> >>> >> >>>> >>> > Thanks,
>> >> >>> >> >>>> >>> > Felipe
>> >> >>> >> >>>> >>> >

Reply via email to