Hi Aljoscha, Yes, looks that way, thanks the issue reference - I’d checked Jira few days ago, looks like FLINK-9458 was added very recently :)
I’ll follow up in Jira to see if a small code snippet would be useful. — Ken > On May 31, 2018, at 1:17 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi Ken, > > I think you might have independently discovered this issue: > https://issues.apache.org/jira/browse/FLINK-9458 > <https://issues.apache.org/jira/browse/FLINK-9458> > > Best, > Aljoscha > >> On 31. May 2018, at 01:46, Ken Krugler <kkrugler_li...@transpac.com> wrote: >> >> Hi devs, >> >> I coded up a simple iteration that uses a KeyedProcessFunction, as a way of >> showing how to use timers to do state iteration. >> >> This worked fine, but then I wanted to try out checkpoints. I modified the >> KeyedProcessFunction to throw an exception after a fixed number of calls. >> >> When this happens, it puts my job into a loop, where restarting the job >> fails with a NullPointerException: >> >> 18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to restart >> or fail the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) if no >> longer possible. >> java.lang.RuntimeException: Example of a failure triggering a job restart >> at >> com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:74) >> at >> com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:1) >> at >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >> at java.lang.Thread.run(Thread.java:748) >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink >> Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state FAILING >> to RESTARTING. >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting the >> job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353). >> 18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to >> restart the job. >> java.lang.NullPointerException >> at >> org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104) >> at >> org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247) >> at >> org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >> at >> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> >> CoLocationContraint.java:104 is this one line function: >> >> public boolean isAssignedAndAlive() { >> return lockedLocation != null && sharedSlot.isAlive(); >> } >> >> So I have to assume sharedSlot is null - I don’t know if that’s valid, or if >> this means that the constraint is being used before setSharedSlot() is >> called. >> >> In any case, this same chunk of logging output repeats immediately, ad >> infinitum. >> >> Is there something else I should try to track down what’s going on? >> >> Thanks, >> >> — Ken >> >> PS - checkpointing is set up via: >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.createLocalEnvironment(2); >> env.setParallelism(2); >> env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true); >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L); >> >> And the skeleton of the simple workflow is: >> >> IterativeStream<String> iter = env.addSource(source).iterate(1000L); >> DataStream<String> updated = iter.keyBy(new >> MyKeySelector()).process(new MyKeyedProcessFunction()); >> iter.closeWith(updated).print(); -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra