[ https://issues.apache.org/jira/browse/FLINK-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513890#comment-16513890 ]
Till Rohrmann commented on FLINK-9552: -------------------------------------- Hi [~kien_truong], Flink currently does not fully support checkpointing of cyclic topologies. See FLINK-3257 for more information. Part of it is that the streaming iteration tasks don't honour the checkpoint lock. That's why we see the NPE because a concurrent checkpointing operation can reset the {{targetBuffer}} in {{SpanningRecordSerializer}}. Since this is a known limitation, I will unblock {{1.6.0}} from this issue. > NPE in SpanningRecordSerializer during checkpoint with iterations > ----------------------------------------------------------------- > > Key: FLINK-9552 > URL: https://issues.apache.org/jira/browse/FLINK-9552 > Project: Flink > Issue Type: Bug > Components: Type Serialization System > Affects Versions: 1.5.0 > Reporter: Truong Duc Kien > Assignee: vinoyang > Priority: Blocker > > We're encountering NPE intermittently inside SpanningRecordSerializer during > checkpoint. > > {code:java} > 2018-06-08 08:31:35,741 [ka.actor.default-dispatcher-83] INFO > o.a.f.r.e.ExecutionGraph IterationSource-22 (44/120) > (c1b94ef849db0e5fb9fb7b85c17073ce) switched from RUNNING to FAILED. > java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 5 more > 2018-06-08 08:31:35,746 [ka.actor.default-dispatcher-83] INFO > o.a.f.r.e.ExecutionGraph Job xxx (8a4eaf02c46dc21c7d6f3f70657dbb17) switched > from state RUNNING to FAILING. > java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 5 more > {code} > This issue is probably concurrency related, because the revelant Flink code > seems to have proper null checking > https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java#L98 > {code:java} > // Copy from intermediate buffers to current target memory segment > if (targetBuffer != null) { > targetBuffer.append(lengthBuffer); > targetBuffer.append(dataBuffer); > targetBuffer.commit(); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)