I investigated a little bit more. I created the same POC on a Flink version
1.13. I have this ProcessFunction where I want to count the times it
recovers. I tested with ListState and ValueState and it seems that during
the integration test (only for integration test) the process is hanging on
the open() or on the initializeState() methods.

https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java

Here is my integration test:
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154

If I comment out the state ListState or the ValueState, the integration
test works. It first throws an exception and I see it on the output. Then
after 2 seconds it does not throw exceptions. Then I compare the output and
they match. But I don't have a way to count how many restarts happened
during the integration test. I con only count the restarts when I run the
application on a stand-alone flink cluster.

I don't know. Maybe, do I have to configure some parameters to work with
the state on integration tests?

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> No, it didn't work.
>
> The "context.isRestored()" returns true when I run the application on the
> Flink standalone-cluster and it is recovering after a failure. When I do
> the same on a integration test it does not returns true after a failure. I
> mean, I can log the exception that is causing the failure, the
> initializeState() is called after a failure, but the context.isRestored()
> is false again. I also tried to update the state on the first time to 0 "if
> (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
> I think the problem is not on the ListState that I am using and not on the
> context.isRestore() as well. It is on the "context.getOperatorStateStore()"
> that is always null only on integration tests. Using the below code I can
> see on the logs "restarts: 0" twice, before and after failure.
>
>     @Override
>     public void initializeState(FunctionInitializationContext context)
> throws Exception {
>         // unit tests does not open OperatorStateStore
>         if (context.getOperatorStateStore() != null) {
>             restartsState =
> context.getOperatorStateStore().getListState(new
> ListStateDescriptor<Long>("restarts", Long.class));
>
>             List<Long> restoreList =
> Lists.newArrayList(restartsState.get());
>             if (restoreList == null || restoreList.isEmpty()) {
>                 restartsState.add(0L);
>                 LOG.info("restarts: 0");
>             } else {
>                 Long max = Collections.max(restoreList);
>                 LOG.info("restarts: " + max);
>                 restartsState.add(max + 1);
>             }
>         }
>     }
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>
>
> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> Thanks for sharing,
>>
>> I think the problem is that restartsState is never updated:
>> - on the first attempt, context.isRestored() returns false (and "never
>> restored" is logged)
>> - on subsequent attempts, it again returns false, because the state
>> was never updated before
>>
>> Adding
>> if (!context.isRestored()) { restartsState.add(0L); }
>> should solve the problem
>> (it's also better to use state.update instead of state.add if only max
>> is needed).
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
>> <felipe.o.gutier...@gmail.com> wrote:
>> >
>> > Sure, here it is. Nothing is mocked. I double-checked.
>> >
>> > UnitTestClass {.....
>> > protected static LocalFlinkMiniCluster flink;
>> >
>> > @BeforeClass
>> > public static void prepare() {
>> >     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>> >     flink.start();
>> >
>> >     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
>> > }
>> >
>> > private static Configuration getFlinkConfiguration() {
>> >     Configuration flinkConfig = new Configuration();
>> >     flinkConfig.setInteger("local.number-taskmanager", 1);
>> >     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>> >     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>> >     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>> >     try {
>> >         flinkConfig.setString("state.checkpoints.dir", "file://" +
>> tempFolder.newFolder().getAbsolutePath());
>> >     } catch (IOException e) {
>> >         throw new RuntimeException("error in flink cluster config", e);
>> >     }
>> >     return flinkConfig;
>> > }
>> >
>> >
>> > The class that I check if the job was restarted:
>> >
>> > public class ExceptionSimulatorProcessFunction extends
>> ProcessFunction<Object..., Object...>
>> >         implements CheckpointedFunction {
>> >
>> >     final OutputTag<Long> outputTag = new
>> OutputTag<Long>("side-output") {
>> >     };
>> >     private transient ListState<Long> restartsState;
>> >     private Long restartsLocal;
>> >     ...
>> >     @Override
>> >     public void processElement(Object value, Context ctx,
>> Collector<Object> out) throws Exception {
>> >         this.currentTimeMillis = System.currentTimeMillis() -
>> currentTimeMillisBehind;
>> >
>> >         // If current time is less than the reference time ahead AND we
>> have the poison auction an exception will throw
>> >         if (this.currentTimeMillis < this.referenceTimeMillisAhead &&
>> POISON__TRANSACTION_ID.equals(value.toString())) {
>> >
>> >             LOG.error("This exception will trigger until the reference
>> time [{}] reaches the trigger time [{}]",
>> >                     sdfMillis.format(new Date(this.currentTimeMillis)),
>> >                     sdfMillis.format(new
>> Date(this.referenceTimeMillisAhead)));
>> >
>> >             throw new SimulatedException("Transaction ID: " +
>> value.toString() +
>> >                     " not allowed. This is a simple exception for
>> testing purposes.");
>> >         }
>> >         out.collect(value);
>> >
>> >
>> >         // counts the restarts
>> >         if (restartsState != null) {
>> >             List<Long> restoreList =
>> Lists.newArrayList(restartsState.get());
>> >             Long attemptsRestart = 0L;
>> >             if (restoreList != null && !restoreList.isEmpty()) {
>> >                 attemptsRestart = Collections.max(restoreList);
>> >                 if (restartsLocal < attemptsRestart) {
>> >                     restartsLocal = attemptsRestart;
>> >                     ctx.output(outputTag,
>> Long.valueOf(attemptsRestart));
>> >                 }
>> >             }
>> >             LOG.info("Attempts restart: " + attemptsRestart);
>> >         }
>> >     }
>> >
>> >     @Override
>> >     public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {}
>> >
>> >     @Override
>> >     public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>> >         restartsState =
>> context.getOperatorStateStore().getListState(new
>> ListStateDescriptor<Long>("restarts", Long.class));
>> >
>> >         if (context.isRestored()) {
>> >             List<Long> restoreList =
>> Lists.newArrayList(restartsState.get());
>> >             if (restoreList == null || restoreList.isEmpty()) {
>> >                 restartsState.add(1L);
>> >                 LOG.info("restarts: 1");
>> >             } else {
>> >                 Long max = Collections.max(restoreList);
>> >                 LOG.info("restarts: " + max);
>> >                 restartsState.add(max + 1);
>> >             }
>> >         } else {
>> >             LOG.info("restarts: never restored");
>> >         }
>> >     }
>> > }
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could you please share the test code?
>> >>
>> >> I think the returned value might depend on the level on which the
>> >> tests are executed. If it's a regular job then it should return the
>> >> correct value (as with cluster). If the environment in which the code
>> >> is executed is mocked then it can be false.
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>> >> <felipe.o.gutier...@gmail.com> wrote:
>> >> >
>> >> > Yes, I have state on the ProcessFunction. I tested it on a
>> stand-alone cluster and it returns true when the application recovers.
>> However, in integration tests it does not returns true. I am using Flink
>> 1.4. Do you know where it is saying at Flink release 1.13 (
>> https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I
>> cannot see `isRestored()` equals true on integration tests?
>> >> >
>> >> > --
>> >> > -- Felipe Gutierrez
>> >> > -- skype: felipe.o.gutierrez
>> >> >
>> >> >
>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org>
>> wrote:
>> >> >>
>> >> >> Does your ProcessFunction has state? If not it would be in line
>> with the documentation.
>> >> >>
>> >> >> Also which Flink version are you using? Before Flink 1.13 empty
>> state was omitted so I could imagine that `isRestored()` would return false
>> but it should actually now also return true for empty state.
>> >> >>
>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>> >> >>>
>> >> >>> So, I was trying to improve by using the CheckpointedFunction as
>> it shows here [1]. But the method isRestored() says in its documentation
>> [2]:
>> >> >>>
>> >> >>> "Returns true, if state was restored from the snapshot of a
>> previous execution. This returns always false for stateless tasks."
>> >> >>>
>> >> >>> It is weird because I am extending a ProcessFunction which is a
>> RichFunction.
>> >> >>>
>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends
>> ProcessFunction<KeyedReportingData, KeyedReportingData>
>> >> >>>         implements CheckpointedFunction {
>> >> >>> ...
>> >> >>>
>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what
>> could be wrong? I used the same implementation method of [1].
>> >> >>>
>> >> >>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> >> >>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> -- Felipe Gutierrez
>> >> >>> -- skype: felipe.o.gutierrez
>> >> >>>
>> >> >>>
>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <
>> ro...@apache.org> wrote:
>> >> >>>>
>> >> >>>> You can also use accumulators [1] to collect the number of
>> restarts
>> >> >>>> (and then access it via client); but side outputs should work as
>> well.
>> >> >>>>
>> >> >>>> [1]
>> >> >>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>> >> >>>>
>> >> >>>> Regards,
>> >> >>>> Roman
>> >> >>>>
>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>> >> >>>> <felipe.o.gutier...@gmail.com> wrote:
>> >> >>>> >
>> >> >>>> > I just realised that only the ProcessFunction is enough. I
>> don't need the CheckpointFunction.
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
>> felipe.o.gutier...@gmail.com> wrote:
>> >> >>>> >>
>> >> >>>> >> Cool!
>> >> >>>> >>
>> >> >>>> >> I did using this example
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>> because I don't have a keyed stream on the specific operator that I want to
>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>> >> >>>> >>
>> >> >>>> >> Because I need to test it in an integration test I am using a
>> side output (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>> to attach a sink. I am not sure if you have a better idea to test the
>> restarts on an integration test. If you have a simple idea please tell me
>> :). This was the way that I solved....
>> >> >>>> >>
>> >> >>>> >> Thanks
>> >> >>>> >> Felipe
>> >> >>>> >>
>> >> >>>> >> --
>> >> >>>> >> -- Felipe Gutierrez
>> >> >>>> >> -- skype: felipe.o.gutierrez
>> >> >>>> >>
>> >> >>>> >>
>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <
>> ro...@apache.org> wrote:
>> >> >>>> >>>
>> >> >>>> >>> Hi Felipe,
>> >> >>>> >>>
>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but
>> beware
>> >> >>>> >>> that depending on the configuration only a pipeline region
>> can be
>> >> >>>> >>> restarted, not the whole job).
>> >> >>>> >>>
>> >> >>>> >>> But if all you want is to check whether it's a first attempt
>> or not,
>> >> >>>> >>> you can also call context.isRestored() from initializeState()
>> [2]
>> >> >>>> >>>
>> >> >>>> >>> [1]
>> >> >>>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >> >>>> >>>
>> >> >>>> >>> [2]
>> >> >>>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >> >>>> >>>
>> >> >>>> >>> Regards,
>> >> >>>> >>> Roman
>> >> >>>> >>>
>> >> >>>> >>>
>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote:
>> >> >>>> >>> >
>> >> >>>> >>> > Hello community,
>> >> >>>> >>> >
>> >> >>>> >>> > Is it possible to know programmatically how many times my
>> Flink stream job restarted since it was running?
>> >> >>>> >>> >
>> >> >>>> >>> > My use case is like this. I have an Unit test that uses
>> checkpoint and I throw one exception in a MapFunction for a given time,
>> i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have
>> checkpoint I can recover the state and after 2 seconds I don't throw any
>> exception anymore. Then I would like to know how many times the job was
>> restarted.
>> >> >>>> >>> >
>> >> >>>> >>> > Thanks,
>> >> >>>> >>> > Felipe
>> >> >>>> >>> >
>>
>

Reply via email to