[ https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
vinoyang reassigned FLINK-8638: ------------------------------- Assignee: (was: vinoyang) > Job restart when Checkpoint On Barrier failed > --------------------------------------------- > > Key: FLINK-8638 > URL: https://issues.apache.org/jira/browse/FLINK-8638 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 1.4.0, 1.3.2 > Reporter: Ran Tao > Priority: Major > > The following example comes from the one snapshotState process by using hdfs, > snapshotState failed due to hdfs disk problems, so that > triggerCheckpointOnBarrier fails and throws an exception to make the > application restart. However, when restarting, flink needs to recover from > the recent completed checkpoint and start chasing the data, which can lead to > significant delays. We think that when StreamTask's > triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, > the application should not restart but instead continue running and mark the > checkpoint failed. Finally, notify the JobManager this checkpoint > failed. By adding Checkpoint failure alarm let developers or users know this > situation, and take the appropriate action. During this time, the flink job > always keeps running. > > {code:java} > java.lang.Exception: Could not perform checkpoint 45843 for operator > TriggerWindow(TumblingEventTimeWindows(60000), > ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050, > > reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8}, > EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map > (153/459). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not complete snapshot 45843 for > operator TriggerWindow(TumblingEventTimeWindows(60000), > ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050, > > reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8}, > EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map > (153/459). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105) > at > org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30) > at > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131) > at > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387) > ... 13 more > Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are > bad. Aborting... > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)