I tried to run the test that you mentioned (WordCountFilterQEPTest#integrationTestWithPoisonPillRecovery) in rev. 6f08d0a.
In IDE, I see that: - checkpoint is never triggered (sentence is too short, checkpoint pause and interval are too large) - exception is never thrown, so the job never restarted (currentTimeMillis is incremented but referenceTimeMillisAhead is not) When I add sleep between each element, set 10ms interval, 0ms pause and introduce some random exception, I do see 2021-06-18 17:37:53,241 INFO org.sense.flink.examples.stream.edgent.ExceptionSimulatorProcess - Attempts restart: 1 in the logs. These settings probably differ on the cluster and there is some unrelated exception which causes a restart. Regards, Roman On Fri, Jun 18, 2021 at 12:20 PM Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > > I investigated a little bit more. I created the same POC on a Flink version > 1.13. I have this ProcessFunction where I want to count the times it > recovers. I tested with ListState and ValueState and it seems that during the > integration test (only for integration test) the process is hanging on the > open() or on the initializeState() methods. > > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java > > Here is my integration test: > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 > > If I comment out the state ListState or the ValueState, the integration test > works. It first throws an exception and I see it on the output. Then after 2 > seconds it does not throw exceptions. Then I compare the output and they > match. But I don't have a way to count how many restarts happened during the > integration test. I con only count the restarts when I run the application on > a stand-alone flink cluster. > > I don't know. Maybe, do I have to configure some parameters to work with the > state on integration tests? > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > > > On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: >> >> No, it didn't work. >> >> The "context.isRestored()" returns true when I run the application on the >> Flink standalone-cluster and it is recovering after a failure. When I do the >> same on a integration test it does not returns true after a failure. I mean, >> I can log the exception that is causing the failure, the initializeState() >> is called after a failure, but the context.isRestored() is false again. I >> also tried to update the state on the first time to 0 "if >> (!context.isRestored()) { restartsState.add(0L); }" and it does not work. >> I think the problem is not on the ListState that I am using and not on the >> context.isRestore() as well. It is on the "context.getOperatorStateStore()" >> that is always null only on integration tests. Using the below code I can >> see on the logs "restarts: 0" twice, before and after failure. >> >> @Override >> public void initializeState(FunctionInitializationContext context) >> throws Exception { >> // unit tests does not open OperatorStateStore >> if (context.getOperatorStateStore() != null) { >> restartsState = context.getOperatorStateStore().getListState(new >> ListStateDescriptor<Long>("restarts", Long.class)); >> >> List<Long> restoreList = Lists.newArrayList(restartsState.get()); >> if (restoreList == null || restoreList.isEmpty()) { >> restartsState.add(0L); >> LOG.info("restarts: 0"); >> } else { >> Long max = Collections.max(restoreList); >> LOG.info("restarts: " + max); >> restartsState.add(max + 1); >> } >> } >> } >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> >> >> On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> wrote: >>> >>> Thanks for sharing, >>> >>> I think the problem is that restartsState is never updated: >>> - on the first attempt, context.isRestored() returns false (and "never >>> restored" is logged) >>> - on subsequent attempts, it again returns false, because the state >>> was never updated before >>> >>> Adding >>> if (!context.isRestored()) { restartsState.add(0L); } >>> should solve the problem >>> (it's also better to use state.update instead of state.add if only max >>> is needed). >>> >>> Regards, >>> Roman >>> >>> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez >>> <felipe.o.gutier...@gmail.com> wrote: >>> > >>> > Sure, here it is. Nothing is mocked. I double-checked. >>> > >>> > UnitTestClass {..... >>> > protected static LocalFlinkMiniCluster flink; >>> > >>> > @BeforeClass >>> > public static void prepare() { >>> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); >>> > flink.start(); >>> > >>> > TestStreamEnvironment.setAsContext(flink, PARALLELISM); >>> > } >>> > >>> > private static Configuration getFlinkConfiguration() { >>> > Configuration flinkConfig = new Configuration(); >>> > flinkConfig.setInteger("local.number-taskmanager", 1); >>> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); >>> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); >>> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); >>> > try { >>> > flinkConfig.setString("state.checkpoints.dir", "file://" + >>> > tempFolder.newFolder().getAbsolutePath()); >>> > } catch (IOException e) { >>> > throw new RuntimeException("error in flink cluster config", e); >>> > } >>> > return flinkConfig; >>> > } >>> > >>> > >>> > The class that I check if the job was restarted: >>> > >>> > public class ExceptionSimulatorProcessFunction extends >>> > ProcessFunction<Object..., Object...> >>> > implements CheckpointedFunction { >>> > >>> > final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") { >>> > }; >>> > private transient ListState<Long> restartsState; >>> > private Long restartsLocal; >>> > ... >>> > @Override >>> > public void processElement(Object value, Context ctx, >>> > Collector<Object> out) throws Exception { >>> > this.currentTimeMillis = System.currentTimeMillis() - >>> > currentTimeMillisBehind; >>> > >>> > // If current time is less than the reference time ahead AND we >>> > have the poison auction an exception will throw >>> > if (this.currentTimeMillis < this.referenceTimeMillisAhead && >>> > POISON__TRANSACTION_ID.equals(value.toString())) { >>> > >>> > LOG.error("This exception will trigger until the reference >>> > time [{}] reaches the trigger time [{}]", >>> > sdfMillis.format(new Date(this.currentTimeMillis)), >>> > sdfMillis.format(new >>> > Date(this.referenceTimeMillisAhead))); >>> > >>> > throw new SimulatedException("Transaction ID: " + >>> > value.toString() + >>> > " not allowed. This is a simple exception for testing >>> > purposes."); >>> > } >>> > out.collect(value); >>> > >>> > >>> > // counts the restarts >>> > if (restartsState != null) { >>> > List<Long> restoreList = >>> > Lists.newArrayList(restartsState.get()); >>> > Long attemptsRestart = 0L; >>> > if (restoreList != null && !restoreList.isEmpty()) { >>> > attemptsRestart = Collections.max(restoreList); >>> > if (restartsLocal < attemptsRestart) { >>> > restartsLocal = attemptsRestart; >>> > ctx.output(outputTag, Long.valueOf(attemptsRestart)); >>> > } >>> > } >>> > LOG.info("Attempts restart: " + attemptsRestart); >>> > } >>> > } >>> > >>> > @Override >>> > public void snapshotState(FunctionSnapshotContext context) throws >>> > Exception {} >>> > >>> > @Override >>> > public void initializeState(FunctionInitializationContext context) >>> > throws Exception { >>> > restartsState = context.getOperatorStateStore().getListState(new >>> > ListStateDescriptor<Long>("restarts", Long.class)); >>> > >>> > if (context.isRestored()) { >>> > List<Long> restoreList = >>> > Lists.newArrayList(restartsState.get()); >>> > if (restoreList == null || restoreList.isEmpty()) { >>> > restartsState.add(1L); >>> > LOG.info("restarts: 1"); >>> > } else { >>> > Long max = Collections.max(restoreList); >>> > LOG.info("restarts: " + max); >>> > restartsState.add(max + 1); >>> > } >>> > } else { >>> > LOG.info("restarts: never restored"); >>> > } >>> > } >>> > } >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> >>> > wrote: >>> >> >>> >> Hi, >>> >> >>> >> Could you please share the test code? >>> >> >>> >> I think the returned value might depend on the level on which the >>> >> tests are executed. If it's a regular job then it should return the >>> >> correct value (as with cluster). If the environment in which the code >>> >> is executed is mocked then it can be false. >>> >> >>> >> Regards, >>> >> Roman >>> >> >>> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >>> >> <felipe.o.gutier...@gmail.com> wrote: >>> >> > >>> >> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone >>> >> > cluster and it returns true when the application recovers. However, in >>> >> > integration tests it does not returns true. I am using Flink 1.4. Do >>> >> > you know where it is saying at Flink release 1.13 >>> >> > (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I >>> >> > cannot see `isRestored()` equals true on integration tests? >>> >> > >>> >> > -- >>> >> > -- Felipe Gutierrez >>> >> > -- skype: felipe.o.gutierrez >>> >> > >>> >> > >>> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote: >>> >> >> >>> >> >> Does your ProcessFunction has state? If not it would be in line with >>> >> >> the documentation. >>> >> >> >>> >> >> Also which Flink version are you using? Before Flink 1.13 empty state >>> >> >> was omitted so I could imagine that `isRestored()` would return false >>> >> >> but it should actually now also return true for empty state. >>> >> >> >>> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez >>> >> >> <felipe.o.gutier...@gmail.com> wrote: >>> >> >>> >>> >> >>> So, I was trying to improve by using the CheckpointedFunction as it >>> >> >>> shows here [1]. But the method isRestored() says in its >>> >> >>> documentation [2]: >>> >> >>> >>> >> >>> "Returns true, if state was restored from the snapshot of a previous >>> >> >>> execution. This returns always false for stateless tasks." >>> >> >>> >>> >> >>> It is weird because I am extending a ProcessFunction which is a >>> >> >>> RichFunction. >>> >> >>> >>> >> >>> public class AuctionExceptionSimulatorProcessFunction extends >>> >> >>> ProcessFunction<KeyedReportingData, KeyedReportingData> >>> >> >>> implements CheckpointedFunction { >>> >> >>> ... >>> >> >>> >>> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what >>> >> >>> could be wrong? I used the same implementation method of [1]. >>> >> >>> >>> >> >>> [1] >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >>> >> >>> [2] >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >> >>> >>> >> >>> >>> >> >>> -- >>> >> >>> -- Felipe Gutierrez >>> >> >>> -- skype: felipe.o.gutierrez >>> >> >>> >>> >> >>> >>> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> >>> >> >>> wrote: >>> >> >>>> >>> >> >>>> You can also use accumulators [1] to collect the number of restarts >>> >> >>>> (and then access it via client); but side outputs should work as >>> >> >>>> well. >>> >> >>>> >>> >> >>>> [1] >>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >>> >> >>>> >>> >> >>>> Regards, >>> >> >>>> Roman >>> >> >>>> >>> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >>> >> >>>> <felipe.o.gutier...@gmail.com> wrote: >>> >> >>>> > >>> >> >>>> > I just realised that only the ProcessFunction is enough. I don't >>> >> >>>> > need the CheckpointFunction. >>> >> >>>> > >>> >> >>>> > >>> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, >>> >> >>>> > <felipe.o.gutier...@gmail.com> wrote: >>> >> >>>> >> >>> >> >>>> >> Cool! >>> >> >>>> >> >>> >> >>>> >> I did using this example >>> >> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >>> >> >>>> >> because I don't have a keyed stream on the specific operator >>> >> >>>> >> that I want to count the number of restarts. (yes I am using >>> >> >>>> >> version 1.4 unfortunately). >>> >> >>>> >> >>> >> >>>> >> Because I need to test it in an integration test I am using a >>> >> >>>> >> side output >>> >> >>>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >>> >> >>>> >> to attach a sink. I am not sure if you have a better idea to >>> >> >>>> >> test the restarts on an integration test. If you have a simple >>> >> >>>> >> idea please tell me :). This was the way that I solved.... >>> >> >>>> >> >>> >> >>>> >> Thanks >>> >> >>>> >> Felipe >>> >> >>>> >> >>> >> >>>> >> -- >>> >> >>>> >> -- Felipe Gutierrez >>> >> >>>> >> -- skype: felipe.o.gutierrez >>> >> >>>> >> >>> >> >>>> >> >>> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan >>> >> >>>> >> <ro...@apache.org> wrote: >>> >> >>>> >>> >>> >> >>>> >>> Hi Felipe, >>> >> >>>> >>> >>> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but >>> >> >>>> >>> beware >>> >> >>>> >>> that depending on the configuration only a pipeline region can >>> >> >>>> >>> be >>> >> >>>> >>> restarted, not the whole job). >>> >> >>>> >>> >>> >> >>>> >>> But if all you want is to check whether it's a first attempt or >>> >> >>>> >>> not, >>> >> >>>> >>> you can also call context.isRestored() from initializeState() >>> >> >>>> >>> [2] >>> >> >>>> >>> >>> >> >>>> >>> [1] >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >>> >> >>>> >>> >>> >> >>>> >>> [2] >>> >> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >>> >> >>>> >>> >>> >> >>>> >>> Regards, >>> >> >>>> >>> Roman >>> >> >>>> >>> >>> >> >>>> >>> >>> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >>> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote: >>> >> >>>> >>> > >>> >> >>>> >>> > Hello community, >>> >> >>>> >>> > >>> >> >>>> >>> > Is it possible to know programmatically how many times my >>> >> >>>> >>> > Flink stream job restarted since it was running? >>> >> >>>> >>> > >>> >> >>>> >>> > My use case is like this. I have an Unit test that uses >>> >> >>>> >>> > checkpoint and I throw one exception in a MapFunction for a >>> >> >>>> >>> > given time, i.e.: for the 2 seconds ahead. Because Flink >>> >> >>>> >>> > restarts the job and I have checkpoint I can recover the >>> >> >>>> >>> > state and after 2 seconds I don't throw any exception >>> >> >>>> >>> > anymore. Then I would like to know how many times the job was >>> >> >>>> >>> > restarted. >>> >> >>>> >>> > >>> >> >>>> >>> > Thanks, >>> >> >>>> >>> > Felipe >>> >> >>>> >>> >