Thanks for reporting the issue Ken. This looks indeed very strange and we
need to investigate how this can happen.

Cheers,
Till

On Thu, May 31, 2018 at 8:07 PM, Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Aljoscha,
>
> Yes, looks that way, thanks the issue reference - I’d checked Jira few
> days ago, looks like FLINK-9458 was added very recently :)
>
> I’ll follow up in Jira to see if a small code snippet would be useful.
>
> — Ken
>
> > On May 31, 2018, at 1:17 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> >
> > Hi Ken,
> >
> > I think you might have independently discovered this issue:
> https://issues.apache.org/jira/browse/FLINK-9458 <
> https://issues.apache.org/jira/browse/FLINK-9458>
> >
> > Best,
> > Aljoscha
> >
> >> On 31. May 2018, at 01:46, Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
> >>
> >> Hi devs,
> >>
> >> I coded up a simple iteration that uses a KeyedProcessFunction, as a
> way of showing how to use timers to do state iteration.
> >>
> >> This worked fine, but then I wanted to try out checkpoints. I modified
> the KeyedProcessFunction to throw an exception after a fixed number of
> calls.
> >>
> >> When this happens, it puts my job into a loop, where restarting the job
> fails with a NullPointerException:
> >>
> >> 18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to
> restart or fail the job Flink Streaming Job (
> f144fd0fb301db0ae14c7b991a25b353) if no longer possible.
> >> java.lang.RuntimeException: Example of a failure triggering a job
> restart
> >>      at com.scaleunlimited.flinksnippets.examples.
> IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(
> IterationWithProcessFunctionTimers.java:74)
> >>      at com.scaleunlimited.flinksnippets.examples.
> IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(
> IterationWithProcessFunctionTimers.java:1)
> >>      at org.apache.flink.streaming.api.operators.KeyedProcessOperator.
> processElement(KeyedProcessOperator.java:85)
> >>      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:202)
> >>      at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:103)
> >>      at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:306)
> >>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> >>      at java.lang.Thread.run(Thread.java:748)
> >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink
> Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state
> FAILING to RESTARTING.
> >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting
> the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353).
> >> 18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to
> restart the job.
> >> java.lang.NullPointerException
> >>      at org.apache.flink.runtime.jobmanager.scheduler.
> CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
> >>      at org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.
> resetConstraints(CoLocationGroup.java:119)
> >>      at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> restart(ExecutionGraph.java:1247)
> >>      at org.apache.flink.runtime.executiongraph.restart.
> ExecutionGraphRestartCallback.triggerFullRecovery(
> ExecutionGraphRestartCallback.java:59)
> >>      at org.apache.flink.runtime.executiongraph.restart.
> FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
> >>      at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >>      at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >>      at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>      at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >>      at java.lang.Thread.run(Thread.java:748)
> >>
> >> CoLocationContraint.java:104 is this one line function:
> >>
> >>      public boolean isAssignedAndAlive() {
> >>              return lockedLocation != null && sharedSlot.isAlive();
> >>      }
> >>
> >> So I have to assume sharedSlot is null - I don’t know if that’s valid,
> or if this means that the constraint is being used before setSharedSlot()
> is called.
> >>
> >> In any case, this same chunk of logging output repeats immediately, ad
> infinitum.
> >>
> >> Is there something else I should try to track down what’s going on?
> >>
> >> Thanks,
> >>
> >> — Ken
> >>
> >> PS - checkpointing is set up via:
> >>
> >>       final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> createLocalEnvironment(2);
> >>       env.setParallelism(2);
> >>       env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE,
> true);
> >>       env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
> >>
> >> And the skeleton of the simple workflow is:
> >>
> >>       IterativeStream<String> iter = env.addSource(source).iterate(
> 1000L);
> >>       DataStream<String> updated = iter.keyBy(new
> MyKeySelector()).process(new MyKeyedProcessFunction());
> >>       iter.closeWith(updated).print();
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to