Thanks for reporting the issue Ken. This looks indeed very strange and we need to investigate how this can happen.
Cheers, Till On Thu, May 31, 2018 at 8:07 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote: > Hi Aljoscha, > > Yes, looks that way, thanks the issue reference - I’d checked Jira few > days ago, looks like FLINK-9458 was added very recently :) > > I’ll follow up in Jira to see if a small code snippet would be useful. > > — Ken > > > On May 31, 2018, at 1:17 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > > Hi Ken, > > > > I think you might have independently discovered this issue: > https://issues.apache.org/jira/browse/FLINK-9458 < > https://issues.apache.org/jira/browse/FLINK-9458> > > > > Best, > > Aljoscha > > > >> On 31. May 2018, at 01:46, Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> > >> Hi devs, > >> > >> I coded up a simple iteration that uses a KeyedProcessFunction, as a > way of showing how to use timers to do state iteration. > >> > >> This worked fine, but then I wanted to try out checkpoints. I modified > the KeyedProcessFunction to throw an exception after a fixed number of > calls. > >> > >> When this happens, it puts my job into a loop, where restarting the job > fails with a NullPointerException: > >> > >> 18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to > restart or fail the job Flink Streaming Job ( > f144fd0fb301db0ae14c7b991a25b353) if no longer possible. > >> java.lang.RuntimeException: Example of a failure triggering a job > restart > >> at com.scaleunlimited.flinksnippets.examples. > IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement( > IterationWithProcessFunctionTimers.java:74) > >> at com.scaleunlimited.flinksnippets.examples. > IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement( > IterationWithProcessFunctionTimers.java:1) > >> at org.apache.flink.streaming.api.operators.KeyedProcessOperator. > processElement(KeyedProcessOperator.java:85) > >> at org.apache.flink.streaming.runtime.io.StreamInputProcessor. > processInput(StreamInputProcessor.java:202) > >> at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask.run(OneInputStreamTask.java:103) > >> at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:306) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > >> at java.lang.Thread.run(Thread.java:748) > >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink > Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state > FAILING to RESTARTING. > >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting > the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353). > >> 18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to > restart the job. > >> java.lang.NullPointerException > >> at org.apache.flink.runtime.jobmanager.scheduler. > CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104) > >> at org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup. > resetConstraints(CoLocationGroup.java:119) > >> at org.apache.flink.runtime.executiongraph.ExecutionGraph. > restart(ExecutionGraph.java:1247) > >> at org.apache.flink.runtime.executiongraph.restart. > ExecutionGraphRestartCallback.triggerFullRecovery( > ExecutionGraphRestartCallback.java:59) > >> at org.apache.flink.runtime.executiongraph.restart. > FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) > >> at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > >> at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > >> at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > >> at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > >> at java.lang.Thread.run(Thread.java:748) > >> > >> CoLocationContraint.java:104 is this one line function: > >> > >> public boolean isAssignedAndAlive() { > >> return lockedLocation != null && sharedSlot.isAlive(); > >> } > >> > >> So I have to assume sharedSlot is null - I don’t know if that’s valid, > or if this means that the constraint is being used before setSharedSlot() > is called. > >> > >> In any case, this same chunk of logging output repeats immediately, ad > infinitum. > >> > >> Is there something else I should try to track down what’s going on? > >> > >> Thanks, > >> > >> — Ken > >> > >> PS - checkpointing is set up via: > >> > >> final StreamExecutionEnvironment env = StreamExecutionEnvironment. > createLocalEnvironment(2); > >> env.setParallelism(2); > >> env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, > true); > >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L); > >> > >> And the skeleton of the simple workflow is: > >> > >> IterativeStream<String> iter = env.addSource(source).iterate( > 1000L); > >> DataStream<String> updated = iter.keyBy(new > MyKeySelector()).process(new MyKeyedProcessFunction()); > >> iter.closeWith(updated).print(); > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > >