[ https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ming li updated FLINK-21990: ---------------------------- Summary: SourceStreamTask will always hang if the CheckpointedFunction#snapshotState throws an exception. (was: Double check Task status when perform checkpoint.) > SourceStreamTask will always hang if the CheckpointedFunction#snapshotState > throws an exception. > ------------------------------------------------------------------------------------------------ > > Key: FLINK-21990 > URL: https://issues.apache.org/jira/browse/FLINK-21990 > Project: Flink > Issue Type: Bug > Affects Versions: 1.11.0 > Reporter: ming li > Priority: Major > > We need to double check Task status when making Checkpoint. Otherwise, after > a Task failed, the checkpoint may still be made successfully. > For example, I try to throw an exception at 17:10:24.069, get the lock at > 17:10:24.070 and start making Checkpoint, and finish making Checkpoint at > 17:10:24.373. > {code:java} > 17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed > (2/4)- execution # 0] INFO > org.apache.flink.test.checkpointing.RegionCheckpointITCase - throw expected > exception > 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution > # 0,5,Flink Task Threads] took 0 ms. > 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO > org.apache.flink.test.checkpointing.RegionCheckpointITCase - sleep 300 ms > 17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution > # 0,5,Flink Task Threads] took 0 ms. > 17:10:24.373 [jobmanager-future-thread-1] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms). > {code} > From the code point of view, we only judged the state of the task at the > beginning, and when the lock was obtained, we directly started to make the > Checkpoint. > {code:java} > private boolean performCheckpoint( > CheckpointMetaData checkpointMetaData, > CheckpointOptions checkpointOptions, > CheckpointMetricsBuilder checkpointMetrics) > throws Exception { > if (isRunning) { > actionExecutor.runThrowing( > () -> {//do checkpoint}); > return true; > } else { > ... > } > }{code} > However, during the period of acquiring the lock, the task state is likely to > change. Compared with the Flink 1.9 version code, the 1.9 version judges the > task status after acquiring the lock. > {code:java} > private boolean performCheckpoint( > CheckpointMetaData checkpointMetaData, > CheckpointOptions checkpointOptions, > CheckpointMetrics checkpointMetrics, > boolean advanceToEndOfTime) throws Exception { > LOG.debug("Starting checkpoint ({}) {} on task {}", > checkpointMetaData.getCheckpointId(), > checkpointOptions.getCheckpointType(), getName()); > final long checkpointId = checkpointMetaData.getCheckpointId(); > synchronized (lock) { > if (isRunning) { > //do checkpoint > } else { > ... > } > }{code} > Therefore, I think we need to double check the task status to avoid the > situation where the task fails but the Checkpoint can still succeed in the > process of acquiring the lock. > -- This message was sent by Atlassian Jira (v8.3.4#803005)