No, it didn't work. The "context.isRestored()" returns true when I run the application on the Flink standalone-cluster and it is recovering after a failure. When I do the same on a integration test it does not returns true after a failure. I mean, I can log the exception that is causing the failure, the initializeState() is called after a failure, but the context.isRestored() is false again. I also tried to update the state on the first time to 0 "if (!context.isRestored()) { restartsState.add(0L); }" and it does not work. I think the problem is not on the ListState that I am using and not on the context.isRestore() as well. It is on the "context.getOperatorStateStore()" that is always null only on integration tests. Using the below code I can see on the logs "restarts: 0" twice, before and after failure.
@Override public void initializeState(FunctionInitializationContext context) throws Exception { // unit tests does not open OperatorStateStore if (context.getOperatorStateStore() != null) { restartsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class)); List<Long> restoreList = Lists.newArrayList(restartsState.get()); if (restoreList == null || restoreList.isEmpty()) { restartsState.add(0L); LOG.info("restarts: 0"); } else { Long max = Collections.max(restoreList); LOG.info("restarts: " + max); restartsState.add(max + 1); } } } *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Thu, Jun 17, 2021 at 11:17 PM Roman Khachatryan <ro...@apache.org> wrote: > Thanks for sharing, > > I think the problem is that restartsState is never updated: > - on the first attempt, context.isRestored() returns false (and "never > restored" is logged) > - on subsequent attempts, it again returns false, because the state > was never updated before > > Adding > if (!context.isRestored()) { restartsState.add(0L); } > should solve the problem > (it's also better to use state.update instead of state.add if only max > is needed). > > Regards, > Roman > > On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: > > > > Sure, here it is. Nothing is mocked. I double-checked. > > > > UnitTestClass {..... > > protected static LocalFlinkMiniCluster flink; > > > > @BeforeClass > > public static void prepare() { > > flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); > > flink.start(); > > > > TestStreamEnvironment.setAsContext(flink, PARALLELISM); > > } > > > > private static Configuration getFlinkConfiguration() { > > Configuration flinkConfig = new Configuration(); > > flinkConfig.setInteger("local.number-taskmanager", 1); > > flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8); > > flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); > > flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s"); > > try { > > flinkConfig.setString("state.checkpoints.dir", "file://" + > tempFolder.newFolder().getAbsolutePath()); > > } catch (IOException e) { > > throw new RuntimeException("error in flink cluster config", e); > > } > > return flinkConfig; > > } > > > > > > The class that I check if the job was restarted: > > > > public class ExceptionSimulatorProcessFunction extends > ProcessFunction<Object..., Object...> > > implements CheckpointedFunction { > > > > final OutputTag<Long> outputTag = new OutputTag<Long>("side-output") > { > > }; > > private transient ListState<Long> restartsState; > > private Long restartsLocal; > > ... > > @Override > > public void processElement(Object value, Context ctx, > Collector<Object> out) throws Exception { > > this.currentTimeMillis = System.currentTimeMillis() - > currentTimeMillisBehind; > > > > // If current time is less than the reference time ahead AND we > have the poison auction an exception will throw > > if (this.currentTimeMillis < this.referenceTimeMillisAhead && > POISON__TRANSACTION_ID.equals(value.toString())) { > > > > LOG.error("This exception will trigger until the reference > time [{}] reaches the trigger time [{}]", > > sdfMillis.format(new Date(this.currentTimeMillis)), > > sdfMillis.format(new > Date(this.referenceTimeMillisAhead))); > > > > throw new SimulatedException("Transaction ID: " + > value.toString() + > > " not allowed. This is a simple exception for > testing purposes."); > > } > > out.collect(value); > > > > > > // counts the restarts > > if (restartsState != null) { > > List<Long> restoreList = > Lists.newArrayList(restartsState.get()); > > Long attemptsRestart = 0L; > > if (restoreList != null && !restoreList.isEmpty()) { > > attemptsRestart = Collections.max(restoreList); > > if (restartsLocal < attemptsRestart) { > > restartsLocal = attemptsRestart; > > ctx.output(outputTag, Long.valueOf(attemptsRestart)); > > } > > } > > LOG.info("Attempts restart: " + attemptsRestart); > > } > > } > > > > @Override > > public void snapshotState(FunctionSnapshotContext context) throws > Exception {} > > > > @Override > > public void initializeState(FunctionInitializationContext context) > throws Exception { > > restartsState = context.getOperatorStateStore().getListState(new > ListStateDescriptor<Long>("restarts", Long.class)); > > > > if (context.isRestored()) { > > List<Long> restoreList = > Lists.newArrayList(restartsState.get()); > > if (restoreList == null || restoreList.isEmpty()) { > > restartsState.add(1L); > > LOG.info("restarts: 1"); > > } else { > > Long max = Collections.max(restoreList); > > LOG.info("restarts: " + max); > > restartsState.add(max + 1); > > } > > } else { > > LOG.info("restarts: never restored"); > > } > > } > > } > > > > > > > > > > > > > > > > > > On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> > >> Hi, > >> > >> Could you please share the test code? > >> > >> I think the returned value might depend on the level on which the > >> tests are executed. If it's a regular job then it should return the > >> correct value (as with cluster). If the environment in which the code > >> is executed is mocked then it can be false. > >> > >> Regards, > >> Roman > >> > >> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez > >> <felipe.o.gutier...@gmail.com> wrote: > >> > > >> > Yes, I have state on the ProcessFunction. I tested it on a > stand-alone cluster and it returns true when the application recovers. > However, in integration tests it does not returns true. I am using Flink > 1.4. Do you know where it is saying at Flink release 1.13 ( > https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I > cannot see `isRestored()` equals true on integration tests? > >> > > >> > -- > >> > -- Felipe Gutierrez > >> > -- skype: felipe.o.gutierrez > >> > > >> > > >> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise <ar...@apache.org> wrote: > >> >> > >> >> Does your ProcessFunction has state? If not it would be in line with > the documentation. > >> >> > >> >> Also which Flink version are you using? Before Flink 1.13 empty > state was omitted so I could imagine that `isRestored()` would return false > but it should actually now also return true for empty state. > >> >> > >> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> >>> > >> >>> So, I was trying to improve by using the CheckpointedFunction as it > shows here [1]. But the method isRestored() says in its documentation [2]: > >> >>> > >> >>> "Returns true, if state was restored from the snapshot of a > previous execution. This returns always false for stateless tasks." > >> >>> > >> >>> It is weird because I am extending a ProcessFunction which is a > RichFunction. > >> >>> > >> >>> public class AuctionExceptionSimulatorProcessFunction extends > ProcessFunction<KeyedReportingData, KeyedReportingData> > >> >>> implements CheckpointedFunction { > >> >>> ... > >> >>> > >> >>> In the end, I cannot rely on the "isRestored()". Do you know what > could be wrong? I used the same implementation method of [1]. > >> >>> > >> >>> [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction > >> >>> [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > >> >>> > >> >>> > >> >>> -- > >> >>> -- Felipe Gutierrez > >> >>> -- skype: felipe.o.gutierrez > >> >>> > >> >>> > >> >>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> >>>> > >> >>>> You can also use accumulators [1] to collect the number of restarts > >> >>>> (and then access it via client); but side outputs should work as > well. > >> >>>> > >> >>>> [1] > >> >>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters > >> >>>> > >> >>>> Regards, > >> >>>> Roman > >> >>>> > >> >>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez > >> >>>> <felipe.o.gutier...@gmail.com> wrote: > >> >>>> > > >> >>>> > I just realised that only the ProcessFunction is enough. I don't > need the CheckpointFunction. > >> >>>> > > >> >>>> > > >> >>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, < > felipe.o.gutier...@gmail.com> wrote: > >> >>>> >> > >> >>>> >> Cool! > >> >>>> >> > >> >>>> >> I did using this example > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state > because I don't have a keyed stream on the specific operator that I want to > count the number of restarts. (yes I am using version 1.4 unfortunately). > >> >>>> >> > >> >>>> >> Because I need to test it in an integration test I am using a > side output ( > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) > to attach a sink. I am not sure if you have a better idea to test the > restarts on an integration test. If you have a simple idea please tell me > :). This was the way that I solved.... > >> >>>> >> > >> >>>> >> Thanks > >> >>>> >> Felipe > >> >>>> >> > >> >>>> >> -- > >> >>>> >> -- Felipe Gutierrez > >> >>>> >> -- skype: felipe.o.gutierrez > >> >>>> >> > >> >>>> >> > >> >>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan < > ro...@apache.org> wrote: > >> >>>> >>> > >> >>>> >>> Hi Felipe, > >> >>>> >>> > >> >>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but > beware > >> >>>> >>> that depending on the configuration only a pipeline region can > be > >> >>>> >>> restarted, not the whole job). > >> >>>> >>> > >> >>>> >>> But if all you want is to check whether it's a first attempt > or not, > >> >>>> >>> you can also call context.isRestored() from initializeState() > [2] > >> >>>> >>> > >> >>>> >>> [1] > >> >>>> >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber-- > >> >>>> >>> > >> >>>> >>> [2] > >> >>>> >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored-- > >> >>>> >>> > >> >>>> >>> Regards, > >> >>>> >>> Roman > >> >>>> >>> > >> >>>> >>> > >> >>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez > >> >>>> >>> <felipe.o.gutier...@gmail.com> wrote: > >> >>>> >>> > > >> >>>> >>> > Hello community, > >> >>>> >>> > > >> >>>> >>> > Is it possible to know programmatically how many times my > Flink stream job restarted since it was running? > >> >>>> >>> > > >> >>>> >>> > My use case is like this. I have an Unit test that uses > checkpoint and I throw one exception in a MapFunction for a given time, > i.e.: for the 2 seconds ahead. Because Flink restarts the job and I have > checkpoint I can recover the state and after 2 seconds I don't throw any > exception anymore. Then I would like to know how many times the job was > restarted. > >> >>>> >>> > > >> >>>> >>> > Thanks, > >> >>>> >>> > Felipe > >> >>>> >>> > >