No, it didn't work.

The "context.isRestored()" returns true when I run the application on the
Flink standalone-cluster and it is recovering after a failure. When I do
the same on a integration test it does not returns true after a failure. I
mean, I can log the exception that is causing the failure, the
initializeState() is called after a failure, but the context.isRestored()
is false again. I also tried to update the state on the first time to 0 "if
(!context.isRestored()) { restartsState.add(0L); }" and it does not work.
I think the problem is not on the ListState that I am using and not on the
context.isRestore() as well. It is on the "context.getOperatorStateStore()"
that is always null only on integration tests. Using the below code I can
see on the logs "restarts: 0" twice, before and after failure.

    @Override
    public void initializeState(FunctionInitializationContext context)
throws Exception {
        // unit tests does not open OperatorStateStore
        if (context.getOperatorStateStore() != null) {
            restartsState =
context.getOperatorStateStore().getListState(new
ListStateDescriptor<Long>("restarts", Long.class));

            List<Long> restoreList =
Lists.newArrayList(restartsState.get());
            if (restoreList == null || restoreList.isEmpty()) {
                restartsState.add(0L);
                LOG.info("restarts: 0");
            } else {
                Long max = Collections.max(restoreList);
                LOG.info("restarts: " + max);
                restartsState.add(max + 1);
            }
        }
    }

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> wrote:

> Thanks for sharing,
>
> I think the problem is that restartsState is never updated:
> - on the first attempt, context.isRestored() returns false (and "never
> restored" is logged)
> - on subsequent attempts, it again returns false, because the state
> was never updated before
>
> Adding
> if (!context.isRestored()) { restartsState.add(0L); }
> should solve the problem
> (it's also better to use state.update instead of state.add if only max
> is needed).
>
> Regards,
> Roman
>
> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
> <felipe.o.gutier...@gmail.com> wrote:
> >
> > Sure, here it is. Nothing is mocked. I double-checked.
> >
> > UnitTestClass {.....
> > protected static LocalFlinkMiniCluster flink;
> >
> > @BeforeClass
> > public static void prepare() {
> >     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
> >     flink.start();
> >
> >     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> > }
> >
> > private static Configuration getFlinkConfiguration() {
> >     Configuration flinkConfig = new Configuration();
> >     flinkConfig.setInteger("local.number-taskmanager", 1);
> >     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
> >     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
> >     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
> >     try {
> >         flinkConfig.setString("state.checkpoints.dir", "file://" +
> tempFolder.newFolder().getAbsolutePath());
> >     } catch (IOException e) {
> >         throw new RuntimeException("error in flink cluster config", e);
> >     }
> >     return flinkConfig;
> > }
> >
> >
> > The class that I check if the job was restarted:
> >
> > public class ExceptionSimulatorProcessFunction extends
> ProcessFunction<Object..., Object...>
> >         implements CheckpointedFunction {
> >
> >     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output")
> {
> >     };
> >     private transient ListState<Long> restartsState;
> >     private Long restartsLocal;
> >     ...
> >     @Override
> >     public void processElement(Object value, Context ctx,
> Collector<Object> out) throws Exception {
> >         this.currentTimeMillis = System.currentTimeMillis() -
> currentTimeMillisBehind;
> >
> >         // If current time is less than the reference time ahead AND we
> have the poison auction an exception will throw
> >         if (this.currentTimeMillis < this.referenceTimeMillisAhead &&
> POISON__TRANSACTION_ID.equals(value.toString())) {
> >
> >             LOG.error("This exception will trigger until the reference
> time [{}] reaches the trigger time [{}]",
> >                     sdfMillis.format(new Date(this.currentTimeMillis)),
> >                     sdfMillis.format(new
> Date(this.referenceTimeMillisAhead)));
> >
> >             throw new SimulatedException("Transaction ID: " +
> value.toString() +
> >                     " not allowed. This is a simple exception for
> testing purposes.");
> >         }
> >         out.collect(value);
> >
> >
> >         // counts the restarts
> >         if (restartsState != null) {
> >             List<Long> restoreList =
> Lists.newArrayList(restartsState.get());
> >             Long attemptsRestart = 0L;
> >             if (restoreList != null && !restoreList.isEmpty()) {
> >                 attemptsRestart = Collections.max(restoreList);
> >                 if (restartsLocal < attemptsRestart) {
> >                     restartsLocal = attemptsRestart;
> >                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
> >                 }
> >             }
> >             LOG.info("Attempts restart: " + attemptsRestart);
> >         }
> >     }
> >
> >     @Override
> >     public void snapshotState(FunctionSnapshotContext context) throws
> Exception {}
> >
> >     @Override
> >     public void initializeState(FunctionInitializationContext context)
> throws Exception {
> >         restartsState = context.getOperatorStateStore().getListState(new
> ListStateDescriptor<Long>("restarts", Long.class));
> >
> >         if (context.isRestored()) {
> >             List<Long> restoreList =
> Lists.newArrayList(restartsState.get());
> >             if (restoreList == null || restoreList.isEmpty()) {
> >                 restartsState.add(1L);
> >                 LOG.info("restarts: 1");
> >             } else {
> >                 Long max = Collections.max(restoreList);
> >                 LOG.info("restarts: " + max);
> >                 restartsState.add(max + 1);
> >             }
> >         } else {
> >             LOG.info("restarts: never restored");
> >         }
> >     }
> > }
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>
> >> Hi,
> >>
> >> Could you please share the test code?
> >>
> >> I think the returned value might depend on the level on which the
> >> tests are executed. If it's a regular job then it should return the
> >> correct value (as with cluster). If the environment in which the code
> >> is executed is mocked then it can be false.
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
> >> <felipe.o.gutier...@gmail.com> wrote:
> >> >
> >> > Yes, I have state on the ProcessFunction. I tested it on a
> stand-alone cluster and it returns true when the application recovers.
> However, in integration tests it does not returns true. I am using Flink
> 1.4. Do you know where it is saying at Flink release 1.13 (
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I
> cannot see `isRestored()` equals true on integration tests?
> >> >
> >> > --
> >> > -- Felipe Gutierrez
> >> > -- skype: felipe.o.gutierrez
> >> >
> >> >
> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote:
> >> >>
> >> >> Does your ProcessFunction has state? If not it would be in line with
> the documentation.
> >> >>
> >> >> Also which Flink version are you using? Before Flink 1.13 empty
> state was omitted so I could imagine that `isRestored()` would return false
> but it should actually now also return true for empty state.
> >> >>
> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
> >> >>>
> >> >>> So, I was trying to improve by using the CheckpointedFunction as it
> shows here [1]. But the method isRestored() says in its documentation [2]:
> >> >>>
> >> >>> "Returns true, if state was restored from the snapshot of a
> previous execution. This returns always false for stateless tasks."
> >> >>>
> >> >>> It is weird because I am extending a ProcessFunction which is a
> RichFunction.
> >> >>>
> >> >>> public class AuctionExceptionSimulatorProcessFunction extends
> ProcessFunction<KeyedReportingData, KeyedReportingData>
> >> >>>         implements CheckpointedFunction {
> >> >>> ...
> >> >>>
> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what
> could be wrong? I used the same implementation method of [1].
> >> >>>
> >> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
> >> >>> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
> >> >>>
> >> >>>
> >> >>> --
> >> >>> -- Felipe Gutierrez
> >> >>> -- skype: felipe.o.gutierrez
> >> >>>
> >> >>>
> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> >> >>>>
> >> >>>> You can also use accumulators [1] to collect the number of restarts
> >> >>>> (and then access it via client); but side outputs should work as
> well.
> >> >>>>
> >> >>>> [1]
> >> >>>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
> >> >>>>
> >> >>>> Regards,
> >> >>>> Roman
> >> >>>>
> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
> >> >>>> <felipe.o.gutier...@gmail.com> wrote:
> >> >>>> >
> >> >>>> > I just realised that only the ProcessFunction is enough. I don't
> need the CheckpointFunction.
> >> >>>> >
> >> >>>> >
> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
> felipe.o.gutier...@gmail.com> wrote:
> >> >>>> >>
> >> >>>> >> Cool!
> >> >>>> >>
> >> >>>> >> I did using this example
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
> because I don't have a keyed stream on the specific operator that I want to
> count the number of restarts. (yes I am using version 1.4 unfortunately).
> >> >>>> >>
> >> >>>> >> Because I need to test it in an integration test I am using a
> side output (
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
> to attach a sink. I am not sure if you have a better idea to test the
> restarts on an integration test. If you have a simple idea please tell me
> :). This was the way that I solved....
> >> >>>> >>
> >> >>>> >> Thanks
> >> >>>> >> Felipe
> >> >>>> >>
> >> >>>> >> --
> >> >>>> >> -- Felipe Gutierrez
> >> >>>> >> -- skype: felipe.o.gutierrez
> >> >>>> >>
> >> >>>> >>
> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan <
> ro...@apache.org> wrote:
> >> >>>> >>>
> >> >>>> >>> Hi Felipe,
> >> >>>> >>>
> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but
> beware
> >> >>>> >>> that depending on the configuration only a pipeline region can
> be
> >> >>>> >>> restarted, not the whole job).
> >> >>>> >>>
> >> >>>> >>> But if all you want is to check whether it's a first attempt
> or not,
> >> >>>> >>> you can also call context.isRestored() from initializeState()
> [2]
> >> >>>> >>>
> >> >>>> >>> [1]
> >> >>>> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
> >> >>>> >>>
> >> >>>> >>> [2]
> >> >>>> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
> >> >>>> >>>
> >> >>>> >>> Regards,
> >> >>>> >>> Roman
> >> >>>> >>>
> >> >>>> >>>
> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote:
> >> >>>> >>> >
> >> >>>> >>> > Hello community,
> >> >>>> >>> >
> >> >>>> >>> > Is it possible to know programmatically how many times my
> Flink stream job restarted since it was running?
> >> >>>> >>> >
> >> >>>> >>> > My use case is like this. I have an Unit test that uses
> checkpoint and I throw one exception in a MapFunction for a given time,
> i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have
> checkpoint I can recover the state and after 2 seconds I don't throw any
> exception anymore. Then I would like to know how many times the job was
> restarted.
> >> >>>> >>> >
> >> >>>> >>> > Thanks,
> >> >>>> >>> > Felipe
> >> >>>> >>> >
>

Reply via email to