I investigated a little bit more. I created the same POC on a Flink version 1.13. I have this ProcessFunction where I want to count the times it recovers. I tested with ListState and ValueState and it seems that during the integration test (only for integration test) the process is hanging on the open() or on the initializeState() methods.
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/edgent/ExceptionSimulatorProcess.java Here is my integration test: https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/edgent/WordCountFilterQEPTest.java#L154 If I comment out the state ListState or the ValueState, the integration test works. It first throws an exception and I see it on the output. Then after 2 seconds it does not throw exceptions. Then I compare the output and they match. But I don't have a way to count how many restarts happened during the integration test. I con only count the restarts when I run the application on a stand-alone flink cluster. I don't know. Maybe, do I have to configure some parameters to work with the state on integration tests? *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Fri, Jun 18, 2021 at 9:31 AM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > No, it didn't work. > > The "context.isRestored()" returns true when I run the application on the > Flink standalone-cluster and it is recovering after a failure. When I do > the same on a integration test it does not returns true after a failure. I > mean, I can log the exception that is causing the failure, the > initializeState() is called after a failure, but the context.isRestored() > is false again. I also tried to update the state on the first time to 0 "if > (!context.isRestored()) { restartsState.add(0L); }" and it does not work. > I think the problem is not on the ListState that I am using and not on the > context.isRestore() as well. It is on the "context.getOperatorStateStore()" > that is always null only on integration tests. Using the below code I can > see on the logs "restarts: 0" twice, before and after failure. > > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > // unit tests does not open OperatorStateStore > if (context.getOperatorStateStore() != null) { > restartsState = > context.getOperatorStateStore().getListState(new > ListStateDescriptor<Long>("restarts", Long.class)); > > List<Long> restoreList = > Lists.newArrayList(restartsState.get()); > if (restoreList == null || restoreList.isEmpty()) { > restartsState.add(0L); > LOG.info("restarts: 0"); > } else { > Long max = Collections.max(restoreList); > LOG.info("restarts: " + max); > restartsState.add(max + 1); > } > } > } > > *--* > *-- Felipe Gutierrez* > *-- skype: felipe.o.gutierrez* > > > On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> Thanks for sharing, >> >> I think the problem is that restartsState is never updated: >> - on the first attempt, context.isRestored() returns false (and "never >> restored" is logged) >> - on subsequent attempts, it again returns false, because the state >> was never updated before >> >> Adding >> if (!context.isRestored()) { restartsState.add(0L); } >> should solve the problem >> (it's also better to use state.update instead of state.add if only max >> is needed). >> >> Regards, >> Roman >> >> On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez >> <felipe.o.gutier...@gmail.com> wrote: >> > >> > Sure, here it is. Nothing is mocked. I double-checked. >> > >> > UnitTestClass {..... >> > protected static LocalFlinkMiniCluster flink; >> > >> > @BeforeClass >> > public static void prepare() { >> > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); >> > flink.start(); >> > >> > TestStreamEnvironment.setAsContext(flink, PARALLELISM); >> > } >> > >> > private static Configuration getFlinkConfiguration() { >> > Configuration flinkConfig = new Configuration(); >> > flinkConfig.setInteger("local.number-taskmanager", 1); >> > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); >> > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); >> > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); >> > try { >> > flinkConfig.setString("state.checkpoints.dir", "file://" + >> tempFolder.newFolder().getAbsolutePath()); >> > } catch (IOException e) { >> > throw new RuntimeException("error in flink cluster config", e); >> > } >> > return flinkConfig; >> > } >> > >> > >> > The class that I check if the job was restarted: >> > >> > public class ExceptionSimulatorProcessFunction extends >> ProcessFunction<Object..., Object...> >> > implements CheckpointedFunction { >> > >> > final OutputTag<Long> outputTag = new >> OutputTag<Long>("side-output") { >> > }; >> > private transient ListState<Long> restartsState; >> > private Long restartsLocal; >> > ... >> > @Override >> > public void processElement(Object value, Context ctx, >> Collector<Object> out) throws Exception { >> > this.currentTimeMillis = System.currentTimeMillis() - >> currentTimeMillisBehind; >> > >> > // If current time is less than the reference time ahead AND we >> have the poison auction an exception will throw >> > if (this.currentTimeMillis < this.referenceTimeMillisAhead && >> POISON__TRANSACTION_ID.equals(value.toString())) { >> > >> > LOG.error("This exception will trigger until the reference >> time [{}] reaches the trigger time [{}]", >> > sdfMillis.format(new Date(this.currentTimeMillis)), >> > sdfMillis.format(new >> Date(this.referenceTimeMillisAhead))); >> > >> > throw new SimulatedException("Transaction ID: " + >> value.toString() + >> > " not allowed. This is a simple exception for >> testing purposes."); >> > } >> > out.collect(value); >> > >> > >> > // counts the restarts >> > if (restartsState != null) { >> > List<Long> restoreList = >> Lists.newArrayList(restartsState.get()); >> > Long attemptsRestart = 0L; >> > if (restoreList != null && !restoreList.isEmpty()) { >> > attemptsRestart = Collections.max(restoreList); >> > if (restartsLocal < attemptsRestart) { >> > restartsLocal = attemptsRestart; >> > ctx.output(outputTag, >> Long.valueOf(attemptsRestart)); >> > } >> > } >> > LOG.info("Attempts restart: " + attemptsRestart); >> > } >> > } >> > >> > @Override >> > public void snapshotState(FunctionSnapshotContext context) throws >> Exception {} >> > >> > @Override >> > public void initializeState(FunctionInitializationContext context) >> throws Exception { >> > restartsState = >> context.getOperatorStateStore().getListState(new >> ListStateDescriptor<Long>("restarts", Long.class)); >> > >> > if (context.isRestored()) { >> > List<Long> restoreList = >> Lists.newArrayList(restartsState.get()); >> > if (restoreList == null || restoreList.isEmpty()) { >> > restartsState.add(1L); >> > LOG.info("restarts: 1"); >> > } else { >> > Long max = Collections.max(restoreList); >> > LOG.info("restarts: " + max); >> > restartsState.add(max + 1); >> > } >> > } else { >> > LOG.info("restarts: never restored"); >> > } >> > } >> > } >> > >> > >> > >> > >> > >> > >> > >> > >> > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> >> wrote: >> >> >> >> Hi, >> >> >> >> Could you please share the test code? >> >> >> >> I think the returned value might depend on the level on which the >> >> tests are executed. If it's a regular job then it should return the >> >> correct value (as with cluster). If the environment in which the code >> >> is executed is mocked then it can be false. >> >> >> >> Regards, >> >> Roman >> >> >> >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez >> >> <felipe.o.gutier...@gmail.com> wrote: >> >> > >> >> > Yes, I have state on the ProcessFunction. I tested it on a >> stand-alone cluster and it returns true when the application recovers. >> However, in integration tests it does not returns true. I am using Flink >> 1.4. Do you know where it is saying at Flink release 1.13 ( >> https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I >> cannot see `isRestored()` equals true on integration tests? >> >> > >> >> > -- >> >> > -- Felipe Gutierrez >> >> > -- skype: felipe.o.gutierrez >> >> > >> >> > >> >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> >> wrote: >> >> >> >> >> >> Does your ProcessFunction has state? If not it would be in line >> with the documentation. >> >> >> >> >> >> Also which Flink version are you using? Before Flink 1.13 empty >> state was omitted so I could imagine that `isRestored()` would return false >> but it should actually now also return true for empty state. >> >> >> >> >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez < >> felipe.o.gutier...@gmail.com> wrote: >> >> >>> >> >> >>> So, I was trying to improve by using the CheckpointedFunction as >> it shows here [1]. But the method isRestored() says in its documentation >> [2]: >> >> >>> >> >> >>> "Returns true, if state was restored from the snapshot of a >> previous execution. This returns always false for stateless tasks." >> >> >>> >> >> >>> It is weird because I am extending a ProcessFunction which is a >> RichFunction. >> >> >>> >> >> >>> public class AuctionExceptionSimulatorProcessFunction extends >> ProcessFunction<KeyedReportingData, KeyedReportingData> >> >> >>> implements CheckpointedFunction { >> >> >>> ... >> >> >>> >> >> >>> In the end, I cannot rely on the "isRestored()". Do you know what >> could be wrong? I used the same implementation method of [1]. >> >> >>> >> >> >>> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction >> >> >>> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >> >>> >> >> >>> >> >> >>> -- >> >> >>> -- Felipe Gutierrez >> >> >>> -- skype: felipe.o.gutierrez >> >> >>> >> >> >>> >> >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan < >> ro...@apache.org> wrote: >> >> >>>> >> >> >>>> You can also use accumulators [1] to collect the number of >> restarts >> >> >>>> (and then access it via client); but side outputs should work as >> well. >> >> >>>> >> >> >>>> [1] >> >> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters >> >> >>>> >> >> >>>> Regards, >> >> >>>> Roman >> >> >>>> >> >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez >> >> >>>> <felipe.o.gutier...@gmail.com> wrote: >> >> >>>> > >> >> >>>> > I just realised that only the ProcessFunction is enough. I >> don't need the CheckpointFunction. >> >> >>>> > >> >> >>>> > >> >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < >> felipe.o.gutier...@gmail.com> wrote: >> >> >>>> >> >> >> >>>> >> Cool! >> >> >>>> >> >> >> >>>> >> I did using this example >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state >> because I don't have a keyed stream on the specific operator that I want to >> count the number of restarts. (yes I am using version 1.4 unfortunately). >> >> >>>> >> >> >> >>>> >> Because I need to test it in an integration test I am using a >> side output ( >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) >> to attach a sink. I am not sure if you have a better idea to test the >> restarts on an integration test. If you have a simple idea please tell me >> :). This was the way that I solved.... >> >> >>>> >> >> >> >>>> >> Thanks >> >> >>>> >> Felipe >> >> >>>> >> >> >> >>>> >> -- >> >> >>>> >> -- Felipe Gutierrez >> >> >>>> >> -- skype: felipe.o.gutierrez >> >> >>>> >> >> >> >>>> >> >> >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan < >> ro...@apache.org> wrote: >> >> >>>> >>> >> >> >>>> >>> Hi Felipe, >> >> >>>> >>> >> >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but >> beware >> >> >>>> >>> that depending on the configuration only a pipeline region >> can be >> >> >>>> >>> restarted, not the whole job). >> >> >>>> >>> >> >> >>>> >>> But if all you want is to check whether it's a first attempt >> or not, >> >> >>>> >>> you can also call context.isRestored() from initializeState() >> [2] >> >> >>>> >>> >> >> >>>> >>> [1] >> >> >>>> >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- >> >> >>>> >>> >> >> >>>> >>> [2] >> >> >>>> >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- >> >> >>>> >>> >> >> >>>> >>> Regards, >> >> >>>> >>> Roman >> >> >>>> >>> >> >> >>>> >>> >> >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez >> >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote: >> >> >>>> >>> > >> >> >>>> >>> > Hello community, >> >> >>>> >>> > >> >> >>>> >>> > Is it possible to know programmatically how many times my >> Flink stream job restarted since it was running? >> >> >>>> >>> > >> >> >>>> >>> > My use case is like this. I have an Unit test that uses >> checkpoint and I throw one exception in a MapFunction for a given time, >> i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have >> checkpoint I can recover the state and after 2 seconds I don't throw any >> exception anymore. Then I would like to know how many times the job was >> restarted. >> >> >>>> >>> > >> >> >>>> >>> > Thanks, >> >> >>>> >>> > Felipe >> >> >>>> >>> > >> >