I tried to run the test that you mentioned
(WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev.
6f08d0a.

In IDE, I see that:
- checkpoint is never triggered (sentence is too short, checkpoint
pause and interval are too large)
- exception is never thrown, so the job never restarted
(currentTimeMillis is incremented but referenceTimeMillisAhead is not)

When I add sleep between each element, set 10ms interval, 0ms pause
and introduce some random exception, I do see
2021-06-18 17:37:53,241 INFO
org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess  -
Attempts restart: 1
in the logs.

These settings probably differ on the cluster and there is some
unrelated exception which causes a restart.

Regards,
Roman

On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez
<felipe.o.gutier...@gmail.com> wrote:
>
> I investigated a little bit more. I created the same POC on a Flink version 
> 1.13. I have this ProcessFunction where I want to count the times it 
> recovers. I tested with ListState and ValueState and it seems that during the 
> integration test (only for integration test) the process is hanging on the 
> open() or on the initializeState() methods.
>
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java
>
> Here is my integration test: 
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154
>
> If I comment out the state ListState or the ValueState, the integration test 
> works. It first throws an exception and I see it on the output. Then after 2 
> seconds it does not throw exceptions. Then I compare the output and they 
> match. But I don't have a way to count how many restarts happened during the 
> integration test. I con only count the restarts when I run the application on 
> a stand-alone flink cluster.
>
> I don't know. Maybe, do I have to configure some parameters to work with the 
> state on integration tests?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
>
>
> On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez 
> <felipe.o.gutier...@gmail.com> wrote:
>>
>> No, it didn't work.
>>
>> The "context.isRestored()" returns true when I run the application on the 
>> Flink standalone-cluster and it is recovering after a failure. When I do the 
>> same on a integration test it does not returns true after a failure. I mean, 
>> I can log the exception that is causing the failure, the initializeState() 
>> is called after a failure, but the context.isRestored() is false again. I 
>> also tried to update the state on the first time to 0 "if 
>> (!context.isRestored()) { restartsState.add(0L); }" and it does not work.
>> I think the problem is not on the ListState that I am using and not on the 
>> context.isRestore() as well. It is on the "context.getOperatorStateStore()" 
>> that is always null only on integration tests. Using the below code I can 
>> see on the logs "restarts: 0" twice, before and after failure.
>>
>>     @Override
>>     public void initializeState(FunctionInitializationContext context) 
>> throws Exception {
>>         // unit tests does not open OperatorStateStore
>>         if (context.getOperatorStateStore() != null) {
>>             restartsState = context.getOperatorStateStore().getListState(new 
>> ListStateDescriptor<Long>("restarts", Long.class));
>>
>>             List<Long> restoreList = Lists.newArrayList(restartsState.get());
>>             if (restoreList == null || restoreList.isEmpty()) {
>>                 restartsState.add(0L);
>>                 LOG.info("restarts: 0");
>>             } else {
>>                 Long max = Collections.max(restoreList);
>>                 LOG.info("restarts: " + max);
>>                 restartsState.add(max + 1);
>>             }
>>         }
>>     }
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>>
>>
>> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> wrote:
>>>
>>> Thanks for sharing,
>>>
>>> I think the problem is that restartsState is never updated:
>>> - on the first attempt, context.isRestored() returns false (and "never
>>> restored" is logged)
>>> - on subsequent attempts, it again returns false, because the state
>>> was never updated before
>>>
>>> Adding
>>> if (!context.isRestored()) { restartsState.add(0L); }
>>> should solve the problem
>>> (it's also better to use state.update instead of state.add if only max
>>> is needed).
>>>
>>> Regards,
>>> Roman
>>>
>>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
>>> <felipe.o.gutier...@gmail.com> wrote:
>>> >
>>> > Sure, here it is. Nothing is mocked. I double-checked.
>>> >
>>> > UnitTestClass {.....
>>> > protected static LocalFlinkMiniCluster flink;
>>> >
>>> > @BeforeClass
>>> > public static void prepare() {
>>> >     flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
>>> >     flink.start();
>>> >
>>> >     TestStreamEnvironment.setAsContext(flink, PARALLELISM);
>>> > }
>>> >
>>> > private static Configuration getFlinkConfiguration() {
>>> >     Configuration flinkConfig = new Configuration();
>>> >     flinkConfig.setInteger("local.number-taskmanager", 1);
>>> >     flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
>>> >     flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
>>> >     flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
>>> >     try {
>>> >         flinkConfig.setString("state.checkpoints.dir", "file://" + 
>>> > tempFolder.newFolder().getAbsolutePath());
>>> >     } catch (IOException e) {
>>> >         throw new RuntimeException("error in flink cluster config", e);
>>> >     }
>>> >     return flinkConfig;
>>> > }
>>> >
>>> >
>>> > The class that I check if the job was restarted:
>>> >
>>> > public class ExceptionSimulatorProcessFunction extends 
>>> > ProcessFunction<Object..., Object...>
>>> >         implements CheckpointedFunction {
>>> >
>>> >     final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") {
>>> >     };
>>> >     private transient ListState<Long> restartsState;
>>> >     private Long restartsLocal;
>>> >     ...
>>> >     @Override
>>> >     public void processElement(Object value, Context ctx, 
>>> > Collector<Object> out) throws Exception {
>>> >         this.currentTimeMillis = System.currentTimeMillis() - 
>>> > currentTimeMillisBehind;
>>> >
>>> >         // If current time is less than the reference time ahead AND we 
>>> > have the poison auction an exception will throw
>>> >         if (this.currentTimeMillis < this.referenceTimeMillisAhead && 
>>> > POISON__TRANSACTION_ID.equals(value.toString())) {
>>> >
>>> >             LOG.error("This exception will trigger until the reference 
>>> > time [{}] reaches the trigger time [{}]",
>>> >                     sdfMillis.format(new Date(this.currentTimeMillis)),
>>> >                     sdfMillis.format(new 
>>> > Date(this.referenceTimeMillisAhead)));
>>> >
>>> >             throw new SimulatedException("Transaction ID: " + 
>>> > value.toString() +
>>> >                     " not allowed. This is a simple exception for testing 
>>> > purposes.");
>>> >         }
>>> >         out.collect(value);
>>> >
>>> >
>>> >         // counts the restarts
>>> >         if (restartsState != null) {
>>> >             List<Long> restoreList = 
>>> > Lists.newArrayList(restartsState.get());
>>> >             Long attemptsRestart = 0L;
>>> >             if (restoreList != null && !restoreList.isEmpty()) {
>>> >                 attemptsRestart = Collections.max(restoreList);
>>> >                 if (restartsLocal < attemptsRestart) {
>>> >                     restartsLocal = attemptsRestart;
>>> >                     ctx.output(outputTag, Long.valueOf(attemptsRestart));
>>> >                 }
>>> >             }
>>> >             LOG.info("Attempts restart: " + attemptsRestart);
>>> >         }
>>> >     }
>>> >
>>> >     @Override
>>> >     public void snapshotState(FunctionSnapshotContext context) throws 
>>> > Exception {}
>>> >
>>> >     @Override
>>> >     public void initializeState(FunctionInitializationContext context) 
>>> > throws Exception {
>>> >         restartsState = context.getOperatorStateStore().getListState(new 
>>> > ListStateDescriptor<Long>("restarts", Long.class));
>>> >
>>> >         if (context.isRestored()) {
>>> >             List<Long> restoreList = 
>>> > Lists.newArrayList(restartsState.get());
>>> >             if (restoreList == null || restoreList.isEmpty()) {
>>> >                 restartsState.add(1L);
>>> >                 LOG.info("restarts: 1");
>>> >             } else {
>>> >                 Long max = Collections.max(restoreList);
>>> >                 LOG.info("restarts: " + max);
>>> >                 restartsState.add(max + 1);
>>> >             }
>>> >         } else {
>>> >             LOG.info("restarts: never restored");
>>> >         }
>>> >     }
>>> > }
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> 
>>> > wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> Could you please share the test code?
>>> >>
>>> >> I think the returned value might depend on the level on which the
>>> >> tests are executed. If it's a regular job then it should return the
>>> >> correct value (as with cluster). If the environment in which the code
>>> >> is executed is mocked then it can be false.
>>> >>
>>> >> Regards,
>>> >> Roman
>>> >>
>>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>>> >> <felipe.o.gutier...@gmail.com> wrote:
>>> >> >
>>> >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone 
>>> >> > cluster and it returns true when the application recovers. However, in 
>>> >> > integration tests it does not returns true. I am using Flink 1.4. Do 
>>> >> > you know where it is saying at Flink release 1.13 
>>> >> > (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I 
>>> >> > cannot see `isRestored()` equals true on integration tests?
>>> >> >
>>> >> > --
>>> >> > -- Felipe Gutierrez
>>> >> > -- skype: felipe.o.gutierrez
>>> >> >
>>> >> >
>>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote:
>>> >> >>
>>> >> >> Does your ProcessFunction has state? If not it would be in line with 
>>> >> >> the documentation.
>>> >> >>
>>> >> >> Also which Flink version are you using? Before Flink 1.13 empty state 
>>> >> >> was omitted so I could imagine that `isRestored()` would return false 
>>> >> >> but it should actually now also return true for empty state.
>>> >> >>
>>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez 
>>> >> >> <felipe.o.gutier...@gmail.com> wrote:
>>> >> >>>
>>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it 
>>> >> >>> shows here [1]. But the method isRestored() says in its 
>>> >> >>> documentation [2]:
>>> >> >>>
>>> >> >>> "Returns true, if state was restored from the snapshot of a previous 
>>> >> >>> execution. This returns always false for stateless tasks."
>>> >> >>>
>>> >> >>> It is weird because I am extending a ProcessFunction which is a 
>>> >> >>> RichFunction.
>>> >> >>>
>>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends 
>>> >> >>> ProcessFunction<KeyedReportingData, KeyedReportingData>
>>> >> >>>         implements CheckpointedFunction {
>>> >> >>> ...
>>> >> >>>
>>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what 
>>> >> >>> could be wrong? I used the same implementation method of [1].
>>> >> >>>
>>> >> >>> [1] 
>>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>>> >> >>> [2] 
>>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >> >>>
>>> >> >>>
>>> >> >>> --
>>> >> >>> -- Felipe Gutierrez
>>> >> >>> -- skype: felipe.o.gutierrez
>>> >> >>>
>>> >> >>>
>>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> 
>>> >> >>> wrote:
>>> >> >>>>
>>> >> >>>> You can also use accumulators [1] to collect the number of restarts
>>> >> >>>> (and then access it via client); but side outputs should work as 
>>> >> >>>> well.
>>> >> >>>>
>>> >> >>>> [1]
>>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>> >> >>>>
>>> >> >>>> Regards,
>>> >> >>>> Roman
>>> >> >>>>
>>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>> >> >>>> <felipe.o.gutier...@gmail.com> wrote:
>>> >> >>>> >
>>> >> >>>> > I just realised that only the ProcessFunction is enough. I don't 
>>> >> >>>> > need the CheckpointFunction.
>>> >> >>>> >
>>> >> >>>> >
>>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, 
>>> >> >>>> > <felipe.o.gutier...@gmail.com> wrote:
>>> >> >>>> >>
>>> >> >>>> >> Cool!
>>> >> >>>> >>
>>> >> >>>> >> I did using this example 
>>> >> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>>> >> >>>> >>  because I don't have a keyed stream on the specific operator 
>>> >> >>>> >> that I want to count the number of restarts. (yes I am using 
>>> >> >>>> >> version 1.4 unfortunately).
>>> >> >>>> >>
>>> >> >>>> >> Because I need to test it in an integration test I am using a 
>>> >> >>>> >> side output 
>>> >> >>>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>>> >> >>>> >>  to attach a sink. I am not sure if you have a better idea to 
>>> >> >>>> >> test the restarts on an integration test. If you have a simple 
>>> >> >>>> >> idea please tell me :). This was the way that I solved....
>>> >> >>>> >>
>>> >> >>>> >> Thanks
>>> >> >>>> >> Felipe
>>> >> >>>> >>
>>> >> >>>> >> --
>>> >> >>>> >> -- Felipe Gutierrez
>>> >> >>>> >> -- skype: felipe.o.gutierrez
>>> >> >>>> >>
>>> >> >>>> >>
>>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan 
>>> >> >>>> >> <ro...@apache.org> wrote:
>>> >> >>>> >>>
>>> >> >>>> >>> Hi Felipe,
>>> >> >>>> >>>
>>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but 
>>> >> >>>> >>> beware
>>> >> >>>> >>> that depending on the configuration only a pipeline region can 
>>> >> >>>> >>> be
>>> >> >>>> >>> restarted, not the whole job).
>>> >> >>>> >>>
>>> >> >>>> >>> But if all you want is to check whether it's a first attempt or 
>>> >> >>>> >>> not,
>>> >> >>>> >>> you can also call context.isRestored() from initializeState() 
>>> >> >>>> >>> [2]
>>> >> >>>> >>>
>>> >> >>>> >>> [1]
>>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>> >> >>>> >>>
>>> >> >>>> >>> [2]
>>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >> >>>> >>>
>>> >> >>>> >>> Regards,
>>> >> >>>> >>> Roman
>>> >> >>>> >>>
>>> >> >>>> >>>
>>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote:
>>> >> >>>> >>> >
>>> >> >>>> >>> > Hello community,
>>> >> >>>> >>> >
>>> >> >>>> >>> > Is it possible to know programmatically how many times my 
>>> >> >>>> >>> > Flink stream job restarted since it was running?
>>> >> >>>> >>> >
>>> >> >>>> >>> > My use case is like this. I have an Unit test that uses 
>>> >> >>>> >>> > checkpoint and I throw one exception in a MapFunction for a 
>>> >> >>>> >>> > given time, i.e.: for the 2 seconds ahead. Because Flink 
>>> >> >>>> >>> > restarts the job and I have checkpoint I can recover the 
>>> >> >>>> >>> > state and after 2 seconds I don't throw any exception 
>>> >> >>>> >>> > anymore. Then I would like to know how many times the job was 
>>> >> >>>> >>> > restarted.
>>> >> >>>> >>> >
>>> >> >>>> >>> > Thanks,
>>> >> >>>> >>> > Felipe
>>> >> >>>> >>> >

Reply via email to