Hi Aljoscha,

Yes, looks that way, thanks the issue reference - I’d checked Jira few days 
ago, looks like FLINK-9458 was added very recently :)

I’ll follow up in Jira to see if a small code snippet would be useful.

— Ken

> On May 31, 2018, at 1:17 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi Ken,
> 
> I think you might have independently discovered this issue: 
> https://issues.apache.org/jira/browse/FLINK-9458 
> <https://issues.apache.org/jira/browse/FLINK-9458>
> 
> Best,
> Aljoscha
> 
>> On 31. May 2018, at 01:46, Ken Krugler <kkrugler_li...@transpac.com> wrote:
>> 
>> Hi devs,
>> 
>> I coded up a simple iteration that uses a KeyedProcessFunction, as a way of 
>> showing how to use timers to do state iteration.
>> 
>> This worked fine, but then I wanted to try out checkpoints. I modified the 
>> KeyedProcessFunction to throw an exception after a fixed number of calls.
>> 
>> When this happens, it puts my job into a loop, where restarting the job 
>> fails with a NullPointerException:
>> 
>> 18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to restart 
>> or fail the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) if no 
>> longer possible.
>> java.lang.RuntimeException: Example of a failure triggering a job restart
>>      at 
>> com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:74)
>>      at 
>> com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:1)
>>      at 
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>      at java.lang.Thread.run(Thread.java:748)
>> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink 
>> Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state FAILING 
>> to RESTARTING.
>> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting the 
>> job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353).
>> 18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to 
>> restart the job.
>> java.lang.NullPointerException
>>      at 
>> org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
>>      at 
>> org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
>>      at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
>>      at 
>> org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>>      at 
>> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>>      at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>      at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>      at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:748)
>> 
>> CoLocationContraint.java:104 is this one line function:
>> 
>>      public boolean isAssignedAndAlive() {
>>              return lockedLocation != null && sharedSlot.isAlive();
>>      }
>> 
>> So I have to assume sharedSlot is null - I don’t know if that’s valid, or if 
>> this means that the constraint is being used before setSharedSlot() is 
>> called.
>> 
>> In any case, this same chunk of logging output repeats immediately, ad 
>> infinitum.
>> 
>> Is there something else I should try to track down what’s going on?
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> PS - checkpointing is set up via:
>> 
>>       final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createLocalEnvironment(2);
>>       env.setParallelism(2);
>>       env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
>>       env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
>> 
>> And the skeleton of the simple workflow is:
>> 
>>       IterativeStream<String> iter = env.addSource(source).iterate(1000L);
>>       DataStream<String> updated = iter.keyBy(new 
>> MyKeySelector()).process(new MyKeyedProcessFunction());
>>       iter.closeWith(updated).print();

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to