On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan <ro...@apache.org> wrote:

> I tried to run the test that you mentioned
> (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
> 6f08d0a.
>
> In IDE, I see that:
> - checkpoint is never triggered (sentence is too short, checkpoint
> pause and interval are too large)
> - exception is never thrown, so the job never restarted
> (currentTimeMillis is incremented but referenceTimeMillisAhead is not)
>
> When I add sleep between each element, set 10ms interval, 0ms pause
> and introduce some random exception, I do see
>

do you mean inside the processElement() method?

what is 0ms pause? do you
mean env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); ?

How do you create a random exception? do you mean not
mine SimulatedException?

Using these configurations that I just said it is not working for me. I am
testing on the terminal "mvn
-Dtest=WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery test".
On IntelliJ I have the error "Caused by: java.lang.NoSuchFieldError:
latencyTrackingConfigBuilder" when I call "env.execute();"

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
at
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
at
org.sense.flink.examples.stream.edgent.WordCountFilterQEPTest.integrationTestWithPoisonPillRecovery(WordCountFilterQEPTest.java:210)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.NoSuchFieldError: latencyTrackingConfigBuilder




> 2021-06-18 17:37:53,241 INFO
> org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess  -
> Attempts restart: 1
> in the logs.
>
> These settings probably differ on the cluster and there is some
> unrelated exception which causes a restart.
>
> Regards,
> Roman
>
> On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez
> <felipe.o.gutier...@gmail.com> wrote:
> >
> > I investigated a little bit more. I created the same POC on a Flink
> version 1.13. I have this ProcessFunction where I want to count the times
> it recovers. I tested with ListState and ValueState and it seems that
> during the integration test (only for integration test) the process is
> hanging on the open() or on the initializeState() methods.
> >
> >
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java
> >
> > Here is my integration test:
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154
> >
> > If I comment out the state ListState or the ValueState, the integration
> test works. It first throws an exception and I see it on the output. Then
> after 2 seconds it does not throw exceptions. Then I compare the output and
> they match. But I don't have a way to count how many restarts happened
> during the integration test. I con only count the restarts when I run the
> application on a stand-alone flink cluster.
> >
> > I don't know. Maybe, do I have to configure some parameters to work with
> the state on integration tests?
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> >
> >
> > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
> >>
> >> No, it didn't work.
> >>
> >> The "context.isRestored()" returns true when I run the application on
> the Flink standalone-cluster and it is recovering after a failure. When I
> do the same on a integration test it does not returns true after a failure.
> I mean, I can log the exception that is causing the failure, the
> initializeState() is called after a failure, but the context.isRestored()
> is false again. I also tried to update the state on the first time to 0 "if
> (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
> >> I think the problem is not on the ListState that I am using and not on
> the context.isRestore() as well. It is on the
> "context.getOperatorStateStore()" that is always null only on integration
> tests. Using the below code I can see on the logs "restarts: 0" twice,
> before and after failure.
> >>
> >>     @Override
> >>     public void initializeState(FunctionInitializationContext context)
> throws Exception {
> >>         // unit tests does not open OperatorStateStore
> >>         if (context.getOperatorStateStore() != null) {
> >>             restartsState =
> context.getOperatorStateStore().getListState(new
> ListStateDescriptor<Long>("restarts", Long.class));
> >>
> >>             List<Long> restoreList =
> Lists.newArrayList(restartsState.get());
> >>             if (restoreList == null || restoreList.isEmpty()) {
> >>                 restartsState.add(0L);
> >>                 LOG.info("restarts: 0");
> >>             } else {
> >>                 Long max = Collections.max(restoreList);
> >>                 LOG.info("restarts: " + max);
> >>                 restartsState.add(max + 1);
> >>             }
> >>         }
> >>     }
> >>
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >>
> >>
> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>>
> >>> Thanks for sharing,
> >>>
> >>> I think the problem is that restartsState is never updated:
> >>> - on the first attempt, context.isRestored() returns false (and "never
> >>> restored" is logged)
> >>> - on subsequent attempts, it again returns false, because the state
> >>> was never updated before
> >>>
> >>> Adding
> >>> if (!context.isRestored()) { restartsState.add(0L); }
> >>> should solve the problem
> >>> (it's also better to use state.update instead of state.add if only max
> >>> is needed).
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
> >>> <felipe.o.gutier...@gmail.com> wrote:
> >>> >
> >>> > Sure, here it is. Nothing is mocked. I double-checked.
> >>> >
> >>> > UnitTestClass {.....
> >>> > protected static LocalFlinkMiniCluster flink;
> >>> >
> >>> > @BeforeClass
> >>> > public static void prepare() {
> >>> >     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(),
> false);
> >>> >     flink.start();
> >>> >
> >>> >     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> >>> > }
> >>> >
> >>> > private static Configuration getFlinkConfiguration() {
> >>> >     Configuration flinkConfig = new Configuration();
> >>> >     flinkConfig.setInteger("local.number-taskmanager", 1);
> >>> >     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
> >>> >     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
> >>> >     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0
> s");
> >>> >     try {
> >>> >         flinkConfig.setString("state.checkpoints.dir", "file://" +
> tempFolder.newFolder().getAbsolutePath());
> >>> >     } catch (IOException e) {
> >>> >         throw new RuntimeException("error in flink cluster config",
> e);
> >>> >     }
> >>> >     return flinkConfig;
> >>> > }
> >>> >
> >>> >
> >>> > The class that I check if the job was restarted:
> >>> >
> >>> > public class ExceptionSimulatorProcessFunction extends
> ProcessFunction<Object..., Object...>
> >>> >         implements CheckpointedFunction {
> >>> >
> >>> >     final OutputTag<Long> outputTag = new
> OutputTag<Long>("side-output") {
> >>> >     };
> >>> >     private transient ListState<Long> restartsState;
> >>> >     private Long restartsLocal;
> >>> >     ...
> >>> >     @Override
> >>> >     public void processElement(Object value, Context ctx,
> Collector<Object> out) throws Exception {
> >>> >         this.currentTimeMillis = System.currentTimeMillis() -
> currentTimeMillisBehind;
> >>> >
> >>> >         // If current time is less than the reference time ahead AND
> we have the poison auction an exception will throw
> >>> >         if (this.currentTimeMillis < this.referenceTimeMillisAhead
> && POISON__TRANSACTION_ID.equals(value.toString())) {
> >>> >
> >>> >             LOG.error("This exception will trigger until the
> reference time [{}] reaches the trigger time [{}]",
> >>> >                     sdfMillis.format(new
> Date(this.currentTimeMillis)),
> >>> >                     sdfMillis.format(new
> Date(this.referenceTimeMillisAhead)));
> >>> >
> >>> >             throw new SimulatedException("Transaction ID: " +
> value.toString() +
> >>> >                     " not allowed. This is a simple exception for
> testing purposes.");
> >>> >         }
> >>> >         out.collect(value);
> >>> >
> >>> >
> >>> >         // counts the restarts
> >>> >         if (restartsState != null) {
> >>> >             List<Long> restoreList =
> Lists.newArrayList(restartsState.get());
> >>> >             Long attemptsRestart = 0L;
> >>> >             if (restoreList != null && !restoreList.isEmpty()) {
> >>> >                 attemptsRestart = Collections.max(restoreList);
> >>> >                 if (restartsLocal < attemptsRestart) {
> >>> >                     restartsLocal = attemptsRestart;
> >>> >                     ctx.output(outputTag,
> Long.valueOf(attemptsRestart));
> >>> >                 }
> >>> >             }
> >>> >             LOG.info("Attempts restart: " + attemptsRestart);
> >>> >         }
> >>> >     }
> >>> >
> >>> >     @Override
> >>> >     public void snapshotState(FunctionSnapshotContext context)
> throws Exception {}
> >>> >
> >>> >     @Override
> >>> >     public void initializeState(FunctionInitializationContext
> context) throws Exception {
> >>> >         restartsState =
> context.getOperatorStateStore().getListState(new
> ListStateDescriptor<Long>("restarts", Long.class));
> >>> >
> >>> >         if (context.isRestored()) {
> >>> >             List<Long> restoreList =
> Lists.newArrayList(restartsState.get());
> >>> >             if (restoreList == null || restoreList.isEmpty()) {
> >>> >                 restartsState.add(1L);
> >>> >                 LOG.info("restarts: 1");
> >>> >             } else {
> >>> >                 Long max = Collections.max(restoreList);
> >>> >                 LOG.info("restarts: " + max);
> >>> >                 restartsState.add(max + 1);
> >>> >             }
> >>> >         } else {
> >>> >             LOG.info("restarts: never restored");
> >>> >         }
> >>> >     }
> >>> > }
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>> >>
> >>> >> Hi,
> >>> >>
> >>> >> Could you please share the test code?
> >>> >>
> >>> >> I think the returned value might depend on the level on which the
> >>> >> tests are executed. If it's a regular job then it should return the
> >>> >> correct value (as with cluster). If the environment in which the
> code
> >>> >> is executed is mocked then it can be false.
> >>> >>
> >>> >> Regards,
> >>> >> Roman
> >>> >>
> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
> >>> >> <felipe.o.gutier...@gmail.com> wrote:
> >>> >> >
> >>> >> > Yes, I have state on the ProcessFunction. I tested it on a
> stand-alone cluster and it returns true when the application recovers.
> However, in integration tests it does not returns true. I am using Flink
> 1.4. Do you know where it is saying at Flink release 1.13 (
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I
> cannot see `isRestored()` equals true on integration tests?
> >>> >> >
> >>> >> > --
> >>> >> > -- Felipe Gutierrez
> >>> >> > -- skype: felipe.o.gutierrez
> >>> >> >
> >>> >> >
> >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org>
> wrote:
> >>> >> >>
> >>> >> >> Does your ProcessFunction has state? If not it would be in line
> with the documentation.
> >>> >> >>
> >>> >> >> Also which Flink version are you using? Before Flink 1.13 empty
> state was omitted so I could imagine that `isRestored()` would return false
> but it should actually now also return true for empty state.
> >>> >> >>
> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
> >>> >> >>>
> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction
> as it shows here [1]. But the method isRestored() says in its documentation
> [2]:
> >>> >> >>>
> >>> >> >>> "Returns true, if state was restored from the snapshot of a
> previous execution. This returns always false for stateless tasks."
> >>> >> >>>
> >>> >> >>> It is weird because I am extending a ProcessFunction which is a
> RichFunction.
> >>> >> >>>
> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends
> ProcessFunction<KeyedReportingData, KeyedReportingData>
> >>> >> >>>         implements CheckpointedFunction {
> >>> >> >>> ...
> >>> >> >>>
> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know
> what could be wrong? I used the same implementation method of [1].
> >>> >> >>>
> >>> >> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
> >>> >> >>> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
> >>> >> >>>
> >>> >> >>>
> >>> >> >>> --
> >>> >> >>> -- Felipe Gutierrez
> >>> >> >>> -- skype: felipe.o.gutierrez
> >>> >> >>>
> >>> >> >>>
> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <
> ro...@apache.org> wrote:
> >>> >> >>>>
> >>> >> >>>> You can also use accumulators [1] to collect the number of
> restarts
> >>> >> >>>> (and then access it via client); but side outputs should work
> as well.
> >>> >> >>>>
> >>> >> >>>> [1]
> >>> >> >>>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
> >>> >> >>>>
> >>> >> >>>> Regards,
> >>> >> >>>> Roman
> >>> >> >>>>
> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
> >>> >> >>>> <felipe.o.gutier...@gmail.com> wrote:
> >>> >> >>>> >
> >>> >> >>>> > I just realised that only the ProcessFunction is enough. I
> don't need the CheckpointFunction.
> >>> >> >>>> >
> >>> >> >>>> >
> >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
> felipe.o.gutier...@gmail.com> wrote:
> >>> >> >>>> >>
> >>> >> >>>> >> Cool!
> >>> >> >>>> >>
> >>> >> >>>> >> I did using this example
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
> because I don't have a keyed stream on the specific operator that I want to
> count the number of restarts. (yes I am using version 1.4 unfortunately).
> >>> >> >>>> >>
> >>> >> >>>> >> Because I need to test it in an integration test I am using
> a side output (
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
> to attach a sink. I am not sure if you have a better idea to test the
> restarts on an integration test. If you have a simple idea please tell me
> :). This was the way that I solved....
> >>> >> >>>> >>
> >>> >> >>>> >> Thanks
> >>> >> >>>> >> Felipe
> >>> >> >>>> >>
> >>> >> >>>> >> --
> >>> >> >>>> >> -- Felipe Gutierrez
> >>> >> >>>> >> -- skype: felipe.o.gutierrez
> >>> >> >>>> >>
> >>> >> >>>> >>
> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <
> ro...@apache.org> wrote:
> >>> >> >>>> >>>
> >>> >> >>>> >>> Hi Felipe,
> >>> >> >>>> >>>
> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1]
> (but beware
> >>> >> >>>> >>> that depending on the configuration only a pipeline region
> can be
> >>> >> >>>> >>> restarted, not the whole job).
> >>> >> >>>> >>>
> >>> >> >>>> >>> But if all you want is to check whether it's a first
> attempt or not,
> >>> >> >>>> >>> you can also call context.isRestored() from
> initializeState() [2]
> >>> >> >>>> >>>
> >>> >> >>>> >>> [1]
> >>> >> >>>> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
> >>> >> >>>> >>>
> >>> >> >>>> >>> [2]
> >>> >> >>>> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
> >>> >> >>>> >>>
> >>> >> >>>> >>> Regards,
> >>> >> >>>> >>> Roman
> >>> >> >>>> >>>
> >>> >> >>>> >>>
> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
> >>> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote:
> >>> >> >>>> >>> >
> >>> >> >>>> >>> > Hello community,
> >>> >> >>>> >>> >
> >>> >> >>>> >>> > Is it possible to know programmatically how many times
> my Flink stream job restarted since it was running?
> >>> >> >>>> >>> >
> >>> >> >>>> >>> > My use case is like this. I have an Unit test that uses
> checkpoint and I throw one exception in a MapFunction for a given time,
> i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have
> checkpoint I can recover the state and after 2 seconds I don't throw any
> exception anymore. Then I would like to know how many times the job was
> restarted.
> >>> >> >>>> >>> >
> >>> >> >>>> >>> > Thanks,
> >>> >> >>>> >>> > Felipe
> >>> >> >>>> >>> >
>

Reply via email to