Hi devs,

I coded up a simple iteration that uses a KeyedProcessFunction, as a way of 
showing how to use timers to do state iteration.

This worked fine, but then I wanted to try out checkpoints. I modified the 
KeyedProcessFunction to throw an exception after a fixed number of calls.

When this happens, it puts my job into a loop, where restarting the job fails 
with a NullPointerException:

18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to restart or 
fail the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) if no 
longer possible.
java.lang.RuntimeException: Example of a failure triggering a job restart
        at 
com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:74)
        at 
com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:1)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink Streaming 
Job (f144fd0fb301db0ae14c7b991a25b353) switched from state FAILING to 
RESTARTING.
18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting the job 
Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353).
18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to restart 
the job.
java.lang.NullPointerException
        at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
        at 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
        at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
        at 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
        at 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

CoLocationContraint.java:104 is this one line function:

        public boolean isAssignedAndAlive() {
                return lockedLocation != null && sharedSlot.isAlive();
        }

So I have to assume sharedSlot is null - I don’t know if that’s valid, or if 
this means that the constraint is being used before setSharedSlot() is called.

In any case, this same chunk of logging output repeats immediately, ad 
infinitum.

Is there something else I should try to track down what’s going on?

Thanks,

— Ken

PS - checkpointing is set up via:

        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(2);
        env.setParallelism(2);
        env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);

And the skeleton of the simple workflow is:

        IterativeStream<String> iter = env.addSource(source).iterate(1000L);
        DataStream<String> updated = iter.keyBy(new 
MyKeySelector()).process(new MyKeyedProcessFunction());
        iter.closeWith(updated).print();


--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378

Reply via email to